Java中的并发工具类

CountDownLatch

它允许一个或多个线程等待其他线程完成操作,相当于join()的功能,但比join()的功能更多。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.CountDownLatch;

/**
* @author 南风
* @date 2020/2/6-12:53
*/
public class CountDownLatchTest {

/**
* 1、构造函数接收一个int类型的参数作为计数器,想等待N个点完成,就传入N
* 2、这里的N可以是N个线程,也可以是一个线程里面的N个执行步骤
* 3、如果用在多个线程,只需要把这个CountDownLatch引用传递到线程里即可
* 4、N必须大于0,如果等于0,则调用await()方法不会阻塞当前线程
*/

static CountDownLatch downLatch = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
//调用countDown()方法时,N就会减1
downLatch.countDown();
System.out.println(2);
downLatch.countDown();
}
}).start();
//调用await()方法就会阻塞当前线程,直到N变成0
//可以使用await(long time, TimeUnit unit)方法来等待指定时间后就不再等待
downLatch.await();
System.out.println(3);
}
}

执行结果

1
2
3
1
2
3

同步屏障CyclicBarrier

让一组线程达到一个屏障(也叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* @author 南风
* @date 2020/2/6-13:07
*/
public class CyclicBarrierTest {

/**
* 1、默认参数是int类型的,表示屏障拦截的线程数量
* 2、还有一个构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction线程
* 3、如果这里参数为3,则主线程和子线程会永远等待,因为没有第三个线程执行await()方法
*/
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//调用此方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞
//还有个方法await(long timeout, TimeUnit unit),等待指定时间后不再等待
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(1);
}
}).start();
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(2);
}
}

运行结果

1
2
3
4
5
1
2

2
1

应用场景

用于多线程计算数据,最后合并计算结果的场景。

CyclicBarrier和CountDownLatch的区别

  1. CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
  2. CyclicBarrier还提供其他有用方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量;isBroken()方法用来了解阻塞的线程是否被中断。

控制线程并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* @author 南风
* @date 2020/2/6-13:28
*/
public class SemaphoreTest {

/**
* 创建容量为30的线程池
*/
static ExecutorService service = Executors.newFixedThreadPool(30);

/**
* 创建允许同时执行10个线程的信号量,接收整型参数,表示可用的许可证数量
*/
static Semaphore s = new Semaphore(10);

public static void main(String[] args) {
//创建30个线程
for(int i=0; i<30; i++){
service.execute(new Runnable() {
@Override
public void run() {
try {
//使用该方法获取一个许可证
//可以使用acquire(int permits)方法获取指定个许可证
//可以使用tryAcquire()方法尝试获取许可证
s.acquire();
System.out.println("save data");
//使用该方法归还许可证
//可以使用release(int permits)方法归还指定个许可证
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//关闭线程池
service.shutdown();
}
}

应用场景: 用作流量控制,比如数据库连接

其他方法

  • int availablePermits():返回此信号量中当前可用的许可证数
  • int getQueueLength():返回正在等待获取许可证的线程数
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证

线程间交换数据的Exchanger

进行线程间的数据交换,提供一个同步点,两个线程可以在同步点交换彼此的数据。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author 南风
* @date 2020/2/6-13:47
*/
public class ExchangerTest {

/**
* 创建交换数据为String类型的Exchanger
*/
static final Exchanger<String> ex = new Exchanger<String>();

static ExecutorService service = Executors.newFixedThreadPool(2);

public static void main(String[] args) {
service.execute(new Runnable() {
@Override
public void run() {
String A = "银行流水A";
try {
// 第一个线程执行exchange(String str)方法,它会一直等待第二个线程也执行exchange(String str)方法
// 当第二个线程执行交换数据的方法后,将从第二个线程交换的数据返回,同时将自己的数据交换给第二个线程
// 参数表示要交换的值
// 可以使用exchange(V x, long timeout, TimeUnit unit)方法,设置最大等待时长
String str = ex.exchange(A);
System.out.println(Thread.currentThread().getName()+":"+"从第二个线程交换回来的值:"+str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

service.execute(new Runnable() {
@Override
public void run() {
String B = "银行流水B";
try {
//第二个线程执行完exchange方法后,即到达同步点时,这两个线程交换数据,将本线程的数据传递给对方
String str = ex.exchange(B);
System.out.println(Thread.currentThread().getName()+":"+"从第一个线程交换回来的值:"+str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//关闭线程池
service.shutdown();
}
}

运行结果

1
2
pool-1-thread-1:从第二个线程交换回来的值:银行流水B
pool-1-thread-2:从第一个线程交换回来的值:银行流水A

应用场景

  • 遗传算法,使用交叉规则进行遗传
  • 校对工作,比如银行的AB岗电子银行流水录入
-------- ♥感谢阅读♥ --------