Java多线程Semaphore和CountDownLatch

最近忙着毕设,要做前端,所以看更多的是React的知识,小后端🐶还是要继续学习总结,不然就要没(tou)时(lan)间继续写了。

Semaphore

中文含义是信号量,它是synchronized的升级版。

synchronized 关键字,代表这个方法加锁,相当于不管哪一个线程(例如线程A),运行到这个方法时,都要检查有没有其它线程B(或者C、 D等)正在用这个方法(或者该类的其他同步方法),有的话要等正在使用synchronized方法的线程B(或者C 、D)运行完这个方法后再运行此线程A,没有的话,锁定调用者,然后直接运行。它包括两种用法:synchronized 方法和 synchronized 块。(度娘解释)

Semaphore主要的作用是控制线程的并发数,如果单纯的使用synchronized是不能实现的。

简单看

1
2
3
4
5
6
7
8
9
10
11
12
13
Semaphore.java
/**
* A counting semaphore. Conceptually, a semaphore maintains a set of
* permits. Each {@link #acquire} blocks if necessary until a permit is
* available, and then takes it. Each {@link #release} adds a permit,
* potentially releasing a blocking acquirer.
* However, no actual permit objects are used; the {@code Semaphore} just
* keeps a count of the number available and acts accordingly.
*
简单的来讲,信号量维持了一组许可证(permits),每次调用的时候,需要获取permit才能进行进行操作。
每个线程调用semaphore.acquire()获取一个许可证后,就会减少许可证数量。
当没有许可证的时候,线程会阻塞,直到之前获得许可证的线程操作完释放才能进行。

Semaphore结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

两个构造方法:
参数permit是许可的意思,代表在同一时间内,最多允许多少个线程同时执行acquire()和release()之间的代码,Semaphore发放许可的操作是减法操作。

参数fair,表示内部使用的是FairSync(公平锁)或者NonfairSync(非公平锁),表示每次线程获取锁的机会是否是公平的,具体的可以看底层实现,继承自AbstractQueuedSynchronizer,通过state判断是否是加锁状态,state 为0,表示锁未被获取,不为0,表示已被获取。

如何用

第一种,同步的执行一个任务(与Synchronized相似)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//多个线程里,保持一份信号量
private Semaphore semaphore = new Semaphore(1);
//线程中调用
public void testMethod() {
try {
//获取锁
semaphore.acquire();
System.out.println(Thread.currentThread().getName() +
"begin Time" + System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() +
"end Time" + System.currentTimeMillis());
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//释放锁
semaphore.release();
}
}

这种比较简单,就不深入展示了,要看的是在多个线程下如何控制并发数量。

第二种,控制线程并发数量

首先是执行类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestService {
//许可证数量为2
private Semaphore semaphore = new Semaphore(2);
public void testMethod() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() +
"begin Time" + System.currentTimeMillis());
//停顿5秒
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() +
"end Time" + System.currentTimeMillis());
} catch (Exception ex) {
ex.printStackTrace();
} finally {
semaphore.release();
}
}
}

创建一个线程池,分配任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws IOException {
//创建线程池,自己新建一个ThreadFactory,定义线程名字
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "当前线程哈希值是:" + r.hashCode());
}
});
TestService testService = new TestService();
//分派任务
for (int i = 0; i < 10; i++){
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
testService.testMethod();
}
});
poolExecutor.submit(thread);
}
//关闭线程池,等待池中的线程任务执行完毕
poolExecutor.shutdown();
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Connected to the target VM, address: '127.0.0.1:60937', transport: 'socket'
当前线程哈希值是:55909012begin Time1521281771135
当前线程哈希值是:922151033begin Time1521281771135
当前线程哈希值是:55909012end Time1521281776136
当前线程哈希值是:922151033end Time1521281776136
当前线程哈希值是:1915058446begin Time1521281776136
当前线程哈希值是:1387228415begin Time1521281776136
当前线程哈希值是:1915058446end Time1521281781140
当前线程哈希值是:1387228415end Time1521281781140
当前线程哈希值是:748658608begin Time1521281781140
当前线程哈希值是:167185492begin Time1521281781140
当前线程哈希值是:167185492end Time1521281786145
当前线程哈希值是:748658608end Time1521281786145
当前线程哈希值是:1937348256begin Time1521281786145
当前线程哈希值是:1358444045begin Time1521281786145
当前线程哈希值是:1937348256end Time1521281791148
当前线程哈希值是:1358444045end Time1521281791148
当前线程哈希值是:331844619begin Time1521281791148
当前线程哈希值是:64830413begin Time1521281791148
Disconnected from the target VM, address: '127.0.0.1:60937', transport: 'socket'
当前线程哈希值是:331844619end Time1521281796152
当前线程哈希值是:64830413end Time1521281796152
Process finished with exit code 0

可以看到,在设定的线程睡眠5秒内,只有两个线程同时执行acquire()和release()之间的逻辑,通过Semaphore控制了线程的并发数量。

其它方法

  • acquire(int) : 一次获取多个许可
  • acquireUninterruptibly() : 使等待进入acquire()方法,不允许被终止
  • tryAcquire() : 尝试地获得一个许可,如果获取不到就返回false,通常与if判断使用,具有无阻塞的特点
  • tryAcquire(long timeout, TimeUnit unit) : 多少时间内获取不到许可就放弃

还有很多方法,诸如availablePermits()/drainPermits()/hasQueuedThreads()/getQueueLength()等,感兴趣的话请怒开IDE查看具体实现吧。

CountDownLatch

CountDownLatch也是一个工具类,可以使线程同步的处理上更加灵活,CountDownLatch也是减法操作

简单介绍

1
2
3
4
5
6
/* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
一个同步辅助类,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
使用效果是,给定一个计数,当使用这个CountDownLatch类的线程判断计数不为0的时候,线程处于wait状态,如果为0,就继续进行。

举个🌰:
来了一辆小汽车,要等满5个人才开车,来了1个,不开;再来1个,还是不开,最后5个人到齐了,开车开车。

类结构&构造方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
//这货也是继承AbstractQueuedSynchronizer
private final Sync sync;
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

参数count表示要等待的数量

方法示范

执行类,等待5秒后执行countDown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TestService {
private Semaphore semaphore = new Semaphore(1);
private CountDownLatch countDownLatch;
public TestService(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public void testMethod() {
try {
semaphore.acquire();
System.out.println("当前线程是: " + Thread.currentThread().getName() +
" 时间是: " + System.currentTimeMillis());
//等待5秒
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
countDownLatch.countDown();
semaphore.release();
}
}
}

运行类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) throws IOException, InterruptedException {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "当前线程哈希值是:" + r.hashCode());
}
});
//设定十个限制
CountDownLatch countDownLatch = new CountDownLatch(10);
TestService testService = new TestService(countDownLatch);
for (int i = 0; i < 10; i++){
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
testService.testMethod();
}
});
thread.setName("" + i);
poolExecutor.submit(thread);
}
//关闭线程池,等待池中的线程任务执行完毕
poolExecutor.shutdown();
System.out.println("poolExecutor分发任务结束: " + System.currentTimeMillis());
countDownLatch.await();
System.out.println("CountDown方法结束: " + System.currentTimeMillis());
}

执行日志:

1
2
3
4
5
6
7
8
9
10
11
12
当前线程是: 当前线程哈希值是:922151033 时间是: 1521287832750
poolExecutor分发任务结束: 1521287832752
当前线程是: 当前线程哈希值是:55909012 时间是: 1521287833755
当前线程是: 当前线程哈希值是:1387228415 时间是: 1521287834759
当前线程是: 当前线程哈希值是:748658608 时间是: 1521287835763
当前线程是: 当前线程哈希值是:167185492 时间是: 1521287836765
当前线程是: 当前线程哈希值是:1937348256 时间是: 1521287837769
当前线程是: 当前线程哈希值是:1358444045 时间是: 1521287838770
当前线程是: 当前线程哈希值是:331844619 时间是: 1521287839774
当前线程是: 当前线程哈希值是:64830413 时间是: 1521287840776
当前线程是: 当前线程哈希值是:653687670 时间是: 1521287841779
CountDown方法结束: 1521287842784

可以看到,在线程池控制1个并发线程,poolExecutor提交任务之后打印日志,但是countDownLatch.await()方法之后的代码,因为count没有减到0,不能执行。

在TestService方法中,每隔一秒执行countDownLatch.countDown()方法,最后十个线程跑完,count减到0,countDownLatch.await()方法之后的代码才可以执行。

方法

  • await() : 等待
  • countDown() : 计数减一
  • await(long timeout, TimeUnit unit) : 在限定时间内进行等待,超过时间返回false
  • getCount() : 获取计数count

小结

Semaphore作为信号量,用来控制线程的并发数量,CountDownLatch用来控制线程执行任务的时机也挺不错的。它们两个的理解和使用都比较简单,好了,又填了一个坑,下次继续挖坑和填坑hhh