建议:本篇文章结合JUC AbstractQueuedSynchronizer(AQS)源码-加解锁分析效果更佳!

应用场景

CountDownLatch是一个比较有用的线程并发工具类,他的应用场景有很多,一般都是用在监听某些初始化操作上,等待初始化操作完成后然后通知某些主线程继续执行;在生活中例如接力赛吧,一个队员必须要接到另一个队员的接力棒才能跑;再例如我玩游戏的时候必须要先加载一些基础数据,基础数据加载完成之后才能开始游戏。这样的例子有很多;CountDownLatch的应用场景也有很多;在之前我们做电子合同的时候有个场景是要把用户上传的合同文档(一般都是.doc或者.docx),我们需要把这个.doc或者.docx文档转换成pdf;原因就是电子签章必须要签署到pdf上;一个文档页数有很多,我们就开启几个线程分页的去转换,我们必须要到等到所有线程都转换完成才能继续下一步的操作,就是这个场景我们就用到了CountDownLatch这个并发工具类,

原理分析

它的内部提供一个计数器,在构造方法中必须要指定计数器的初始值,且计数器的初始值必须要大于0

CountDownLatch countDownLatch = new CountDownLatch(2);

另外它提供了一个countDown()方法来操作计数器的值,每调用一次countDown()方法计数器的值就会减1,直到计数器的值减为0,代表初始化操作已经完成,所有调用await方法而阻塞的线程都会被唤醒;可以进行下一步的操作!

实例

public class UseCountDownLatch {

public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread t1 = new Thread(() -> {
try {
System.out.println("进入线程t1" + "等待其他线程处理完成...");
countDownLatch.await();
System.out.println("t1线程继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
Thread t2 = new Thread(() -> {
try {
System.out.println("t2线程进行初始化操作...");
TimeUnit.SECONDS.sleep(3);
System.out.println("t2线程初始化完毕,通知t1线程继续");
countDownLatch.countDown(); //类似通知
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2");
Thread t3 = new Thread(() -> {
try {
System.out.println("t3线程进行初始化操作...");
TimeUnit.SECONDS.sleep(4);
System.out.println("t3线程初始化完毕,通知t1线程继续");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3");
t1.start();
t2.start();
t3.start();
}
}

在初始化CountDownLatch时构造方法中传入了数值2,t1会等待t2和t3都调用countDownLatch.countDown();之后继续执行;执行结果如下:

进入线程t1等待其他线程处理完成...
t2线程进行初始化操作...
t3线程进行初始化操作...
t2线程初始化完毕,通知t1线程继续
t3线程初始化完毕,通知t1线程继续
t1线程继续执行...

API介绍

  • await(): 调用该方法的线程必须等到构造方法传入的值减到0的时候才能继续往下执行;
  • await(long timeout,TimeUnit unit): 与上面的await方法功能一致,只不过这里有了时间限制,调用了该方法的线程等到指定的timeout时间后,不管构造方法传入的值是否减为0,都会继续往下执行;
  • countDown(): 使CountDownLatch初始值(构造方法中传入的值)减1;
  • long getCount(): 获取当前CountDownLatch维护的值;

源码分析

下面贴一下CountDownLatch的构造方法:

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

CountDownLatch只有一个带参的构造方法,从这里可以看出count必须要大于0,不然会报错!在构造方法中只是new 了一个Sync对象;并赋值给了成员变量sync; 了解AQS的同学可以知道,CountDownLatch的实现依赖于AQS,它是AQS共享模式下的一个应用;

image-20210507172847358.png

关于AQS我们后面有篇幅单独去讲解这个;上图可以看到不光CountDownLatch依赖于AQS,像ReentrantLock,ReentrantReadWriteLock,Semaphore都是依赖于AQS;可以看到AQS的重要性;同时也是面试的重点;

回归原题:CountDownLatch实现了一个内部类Sync并用它去继承AQS,这样就能使用AQS提供的大部分模板方法了;

我们看一下Sync内部类的代码:

public class CountDownLatch {
//Sync继承了AQS
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//Sync构造方法
Sync(int count) {
setState(count);
}
//获取当前同步状态
int getCount() {
return getState();
}
//获取锁方法
protected int tryAcquireShared(int acquires) {
//在构造方法中state这个值必须要传入大于0的值,所以这里一直都是获取锁成功的;
//直到每调用一次countDown()方法将state减1;
return (getState() == 0) ? 1 : -1;
}
//释放锁方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取锁状态
int c = getState();
//如果等于0,就不能再释放锁了
if (c == 0)
return false;
//否则将同步器减1
int nextc = c-1;
//使用CAS更新锁状态
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
image.png

在获取锁方法tryAcquireShared()方法中:返回的是一个int类型的数值,分别为-1,0,1;他们分别表示:

  • 负值:表示获取锁失败
  • 零值:表示当前节点获取成功,但是后继节点不能再获取了
  • 正值:表示当前节点获取成功,并且后继节点同样可以获取

CountDownLatch获取锁和释放锁的过程比较简单,我们在使用CountDownLatch的时候会调用await()方法来加锁,countDown()方法来解锁;下面我们先来看一下调用awit()方法的流程:

public void await() throws InterruptedException {
//以响应线程中断方式获取
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
//1 判断线程是否中断
if (Thread.interrupted())
//2以抛出异常的方式响应线程中断
throw new InterruptedException();
//3 尝试获取锁;在上面对正数,负数,0,3种取值已经进行了说明
if (tryAcquireShared(arg) < 0)
//4 获取锁失败
doAcquireSharedInterruptibly(arg);
}

关于是否响应线程中断后面文章会有介绍,感兴趣的可以关注我后面会有更新;

这里我们先关注3,4处的代码,首先调用了tryAcquireShared(arg)方法进行获取锁;这个方法就是上面我们贴出来的tryAcquireShared(int acquires)方法,看state是否等于0.如果等于0就返回1;表示加锁成功,否则返回-1表示不能获取锁。如果此方法返回1线程不必等待继续向下执行,如果此方法返回-1则进行doAcquireSharedInterruptibly(arg)方法:

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

JUC AbstractQueuedSynchronizer(AQS)源码-加解锁分析这篇文章中我们讲述doAcquireShared()这个方法:

image-20210524111021488

基本上大致是一样的,唯一的区别就是在是否响应中断上有些区别;由于代码基本是一样的,这里就不过多的诉述了,建议看一下上一篇文章;

下面大致的说一下countDown()方法的过程:首先我们在CountDownLatch的构造方法中传入了一个数值count,这个数值赋给你内部类Sync,在Sync的构造方法中将count设置给了State同步状态,当每次调用countDown()方法的时候就会调用内部类Sync的sync.releaseShared(1);方法然后调用tryReleaseShared()方法实现自己释放锁的逻辑将State的值减1,每调用一次countDown()方法state就会减1,直到state减为0

public void countDown() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

关于调用countDown()方法用一个生活中的例子解释更恰当;记得小时候家里的门都是使用门闩(shuan栓)去锁门;如果不知道门闩是什么,我在bai度搜索了图片:

image-20210523140427228

在保证更安全更牢固的情况下,可能一个门上会有多个门闩;当每调用一次countDown()就相当于打开一个门闩;直到每个门闩都会打开,这个门才能打开;

我们大致对countDown()方法有了了解,我们再去看源码,在源码中调用了tryReleaseShared(arg)方法去释放锁,tryReleaseShared方法如果返回true表示释放成功,返回false表示释放失败,只有当将同步状态减1后该同步状态恰好为0时才会返回true,其他情况都是返回false。

//释放锁方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取锁状态
int c = getState();
//如果等于0,就不能再释放锁了
if (c == 0)
return false;
//否则将同步器减1
int nextc = c-1;
//使用CAS更新锁状态
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

这里假设在构造CountDownLatch时count传入的是2,执行到这里getState();等于2,然后执行c-1,最后进行CAS将c-1的结果赋值给state,这时return的肯定是false;然后整个releaseShared()方法返回false;也就是说必须执行2次countDown()方法才会返回true;当下一次执行countDown()完毕之后,结果返回true,随后继续执行doReleaseShared();方法去唤醒同步队列的所有线程!

doReleaseShared();调用的是AQS类中的方法,在JUC AbstractQueuedSynchronizer(AQS)源码-加解锁分析这篇文章中我们已经讲述过,这个不在阐述了;