基本使用
CountDownLatch典型用法:
某一线程在开始运行前等待n个任务线程数执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程数执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package org.example; import java.util.Random; import java.util.concurrent.CountDownLatch; public class Task implements Runnable{ private final static Random random = new Random(); private Integer id; private CountDownLatch latch; public Task(Integer id, CountDownLatch latch) { this.id = id; this.latch = latch; } @Override public void run() { System.out.println("开始寻找" + id + "号龙珠"); int seconds = random.nextInt(10); try { Thread.sleep(seconds * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("花费" + seconds + "s, 找到了" + id + "号龙珠"); latch.countDown(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package org.example; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; public class Main { public static void main(String[] args) { List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); CountDownLatch latch = new CountDownLatch(list.size()); for (Integer id: list) { Thread thread = new Thread(new Task(id, latch)); thread.start(); } try { latch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("over"); } }
|
底层源码实现
基本架构



由上图中的等待队列唤醒变化可以发现,独占模式唤醒阻塞队列的头节点,共享模式唤醒阻塞队列所有的头节点
这里是特殊的,一般情况下await调用后Node是会进入condition单向链表,但是countdownLatch这里是直接进入AQS同步队列
在countdownlatch中sync实现的AQS采用的是共享模式
底层源码

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
| public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) {
for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);
if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared())
doReleaseShared(); } }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) {
for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } private void doReleaseShared() {
for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
|
一些问题
state是0的时候,会唤醒阻塞队列的Node,这里唤醒是唤醒Head下的第一个,还是都唤醒?
CountDownLatch基于aqs共享模式,会唤醒sync queue的所有node,详细源码看doReleaseShared

其他线程被唤醒后,会先执行哪里的代码呢
主线程会在这里被阻塞

1 2 3 4
| private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
|
可知,调用了park方法,主线程被阻塞
所以for循环那里的目的何在?
被唤醒后,需要看一下r值,如果r值不符合逻辑,那么会接着阻塞
与CyclicBarrier的对比
1 2 3 4 5 6 7
| CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
cyclicBarrier.await();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class CyclicBarrierTest { private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5); private final static CyclicBarrier BARRIER = new CyclicBarrier(10); public static void main(String[] args) { for (int i = 0; i < 10; i++) { final String name = "玩家" + i; EXECUTOR_SERVICE.execute(new Runnable() { @Override public void run() { try { Thread.sleep(2000); System.out.println(name + "已准备,等待其他玩家准备..."); BARRIER.await(); Thread.sleep(1000); System.out.println(name + "已加入游戏"); } catch (InterruptedException e) { System.out.println(name + "离开游戏"); } catch (BrokenBarrierException e) { System.out.println(name + "离开游戏"); } } }); } EXECUTOR_SERVICE.shutdown(); } }
|
- CountDownLatch 操作的是事件,阻塞足够多的次数即可,不管几个线程;而 CyclicBarrier 侧重点是线程,强调多个线程间互相等待,同时结束
- CountDownLatch:面向任务数、不可重用、不指定线程
- CyclicBarrier: 面向线程、可重用、指定线程
- CountDownLatch调用countDown计数器-1,CyclicBarrier调用await计数器-1
- 都是计数器为0的时候,唤醒同步队列的所有线程,让所有的被阻塞的线程一起运行
- 不管什么线程调用await都是让谁暂停,只不过CountDownLatch是主线程暂停等待任务都完成,CyclicBarrier是所有任务线程都暂停后同时就绪,测试并行性