基本使用

CountDownLatch典型用法:
某一线程在开始运行前等待n个任务线程数执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程数执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行

img

  • 自定义任务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.example;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Task implements Runnable{
private final static Random random = new Random();
private Integer id;
private CountDownLatch latch;
public Task(Integer id, CountDownLatch latch) {
this.id = id;
this.latch = latch;
}
@Override
public void run() {
System.out.println("开始寻找" + id + "号龙珠");
int seconds = random.nextInt(10);
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("花费" + seconds + "s, 找到了" + id + "号龙珠");
latch.countDown();
}
}
  • 测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package org.example;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Main {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
CountDownLatch latch = new CountDownLatch(list.size());
for (Integer id: list) {
Thread thread = new Thread(new Task(id, latch));
thread.start();
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("over");
}
}

底层源码实现

基本架构

img

img

img

由上图中的等待队列唤醒变化可以发现,独占模式唤醒阻塞队列的头节点,共享模式唤醒阻塞队列所有的头节点

这里是特殊的,一般情况下await调用后Node是会进入condition单向链表,但是countdownLatch这里是直接进入AQS同步队列

在countdownlatch中sync实现的AQS采用的是共享模式

底层源码

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
/*
* 如果state为0(锁被完全释放)则返回1,否则返回-1
* state为0,表示锁空闲。
* tryAcquireShared返回值是1,此时没有子任务持有锁,直接跳出等待,主线程不会被阻塞
* 为什么不返回0?
* 主任务不只有一个,假如有两个主任务都在等待两个子任务的完成。
* 一旦子任务全部完成,两个主任务都需要被唤醒。
*/
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
/*
* 每次通过一个CAS操作将AQS内部的state自减1
* 若不需要释放锁,或未完全释放锁,则返回false
* 若锁完全释放,返回true
*/
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

public long getCount() {
return sync.getCount();
}

public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}

/*
* public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
*/

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
/*
* 如果线程中断则抛出异常
*/
if (Thread.interrupted())
throw new InterruptedException();
/*
*
*/
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}


/*
* 如果返回的值是负数,则获取锁失败
* 如果返回的是0,获取锁成功,但不唤醒后续节点
* 如果返回正数,获取锁成功,唤醒后续节点
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}


private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
/*
* tryAcquireShared返回值(传播值)大于等于0,锁已经是空闲的
* 说明子任务已经全部执行完,当前节点不必在等待,可以出队
*/
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHeadAndPropagate(Node node, int propagate) {
/*
* 将当前节点设置为head,表示退出(head实际上是一个虚节点
* 也意味着当前节点封装的线程已经完成了同步,可以继续自由执行
*/
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
/*
* 如果传播值是正数,就需要找到当前节点的后置节点,如果该节点也是共享模式需要将其唤醒
* 唤醒等待队列头部的线程,也就是主任务所在的线程
*/
doReleaseShared();
}
}

/*
* public void countDown() {
sync.releaseShared(1);
}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}


protected boolean tryReleaseShared(int releases) {
/*
* 通过自旋的方式保证了state的值的自减
* 一旦state减到0,就返回true,锁已经释放完
*/
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

private void doReleaseShared() {
/*
* 对等待队列的节点从后往前搜索(AQS),将head的后置节点唤醒
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

/*
* doReleaseShared();
* 上面已经提及
*/

一些问题

state是0的时候,会唤醒阻塞队列的Node,这里唤醒是唤醒Head下的第一个,还是都唤醒?

CountDownLatch基于aqs共享模式,会唤醒sync queue的所有node,详细源码看doReleaseShared

img

其他线程被唤醒后,会先执行哪里的代码呢

主线程会在这里被阻塞

img

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

可知,调用了park方法,主线程被阻塞

所以for循环那里的目的何在?

被唤醒后,需要看一下r值,如果r值不符合逻辑,那么会接着阻塞

与CyclicBarrier的对比

  • 初始化
1
2
3
4
5
6
7
// 初始化值为5的栅栏
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
// 每个线程调用 await()
cyclicBarrier.await();
// 等到有 5 个线程都执行了 await() 之后,继续执行。
// 并且 栅栏的 计数器会自动重置为 5 ,可以接着用

  • 测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CyclicBarrierTest {
private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5);
private final static CyclicBarrier BARRIER = new CyclicBarrier(10);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
final String name = "玩家" + i;
EXECUTOR_SERVICE.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println(name + "已准备,等待其他玩家准备...");
BARRIER.await();
Thread.sleep(1000);
System.out.println(name + "已加入游戏");
} catch (InterruptedException e) {
System.out.println(name + "离开游戏");
} catch (BrokenBarrierException e) {
System.out.println(name + "离开游戏");
}
}
});
}
EXECUTOR_SERVICE.shutdown();
}
}
  • CountDownLatch 操作的是事件,阻塞足够多的次数即可,不管几个线程;而 CyclicBarrier 侧重点是线程,强调多个线程间互相等待,同时结束
  • CountDownLatch:面向任务数、不可重用、不指定线程
  • CyclicBarrier: 面向线程、可重用、指定线程
  • CountDownLatch调用countDown计数器-1,CyclicBarrier调用await计数器-1
  • 都是计数器为0的时候,唤醒同步队列的所有线程,让所有的被阻塞的线程一起运行
  • 不管什么线程调用await都是让谁暂停,只不过CountDownLatch是主线程暂停等待任务都完成,CyclicBarrier是所有任务线程都暂停后同时就绪,测试并行性