嗨,你好呀,我是猿java
CountDownLatch
是 Java 中的一个用于管理并发控制的同步辅助类,作用是允许一个或多个线程等待其他线程完成操作。顾名思义,它的工作机制类似于"倒计时闩锁",线程会阻塞等待,直到闩锁的计数器减少到 0,然后才能继续执行。这篇文章,我们将深度剖析其原理。
- 什么是 CountDownLatch? {#1-什么是-CountDownLatch?} ===============================================
CountDownLatch
是java.util.concurrent
包的一部分,用于同步一个或多个线程以等待特定条件的满足。它在创建时初始化一个给定的计数,表示必须发生的事件数量,才能使线程继续执行。这个计数通过调用 countDown() 方法来递减,等待该条件的线程调用 await() 方法来阻塞,直到计数达到零。
CountDownLatch
的关键组件包含:
- 计数 :
CountDownLatch
的核心概念是计数。它从创建锁存器时指定的初始值开始,只能递减,不能重置。 - await() :线程使用此方法等待计数达到零。如果当前计数大于零,这些线程将被置于等待状态。
- countDown() :调用此方法以递减计数。当计数达到零时,所有等待的线程将被释放。
- 线程安全 :
CountDownLatch
是线程安全的,它使用内部的 AQS(AbstractQueuedSynchronizer)来管理状态,确保计数的可见性和原子性。
- 工作原理 {#2-工作原理} =================
CountDownLatch
本质上是一种简化的信号量(Semaphore
)。它的核心思想是设定一个计数器,当计数器值为 0 时,其他被阻塞的线程才会开始运行,线程的释放建立在调用 countDown
方法去减少计数器次数的基础上。
CountDownLatch 的典型功能包括:
- 使多个线程等待一系列事件发生。
- 让一个线程等待完成多个步协作操作的线程。
- 在某个条件达到之前阻塞线程。
它包含了两个核心方法:
countDown()
: 当前线程执行完任务后,调用该方法时,计数器 -1;当计数器为 1,调用该方法可以使计数器变为 0。await()
: 当前线程调用后,会阻塞,进入等待状态,直到计数器为 0。
通过这两种操作,我们就可以构建出各种灵活的并发控制逻辑。
2.1 简单实现 {#2-1-简单实现}
|------------------------------------------------------------------------------------------------------------------------------------||
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // 创建一个计数器,设置初始的计数值为 3 CountDownLatch latch = new CountDownLatch(3); // 创建三个工作线程 new Thread(() -> { try { Thread.sleep(1000); // 模拟任务耗时 System.out.println("Thread 1 finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 每个线程任务完成后,使计数器减 1 } }).start(); new Thread(() -> { try { Thread.sleep(2000); // 模拟任务耗时 System.out.println("Thread 2 finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 每个线程任务完成后,使计数器减 1 } }).start(); new Thread(() -> { try { Thread.sleep(3000); // 模拟任务耗时 System.out.println("Thread 3 finished"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 每个线程任务完成后,使计数器减 1 } }).start(); // 主线程会在此阻塞,直到计数器减为 0 latch.await(); System.out.println("All tasks finished. Main thread proceeding."); } }
|
输出结果如下:
|-----------------|-----------------------------------------------------------------------------------------------------------|
| 1 2 3 4
| Thread 1 finished Thread 2 finished Thread 3 finished All tasks finished. Main thread proceeding.
|
在上述代码中,主线程调用 latch.await();
进入阻塞状态,等待三个工作线程完成任务后,计数器将变为 0,然后解除阻塞并进入后续逻辑。
2.2 底层工作原理 {#2-2-底层工作原理}
从底层工作原理来看,CountDownLatch
内部维护了一个 Sync
类,这实际上是一个基于 AQS(AbstractQueuedSynchronizer, 抽象队列同步器)的同步工具。
State
的初始值为计数器值,也就是通过构造函数传递的参数(n)。await
方法的实现就是通过验证state
的值是否为 0,若不为 0,则会阻塞当前线程并加入AQS等待队列中,否则继续向下执行。countDown
方法会将state
的值减 1,当state==0
时,会唤醒所有在 AQS 阻塞队列中的线程。
内部实现机制对线程的阻塞、唤醒、队列管理等是通过 AQS 实现的,AQS
的设计模式使得它能高效、安全地管理同步状态。
2.3 核心代码片段 {#2-3-核心代码片段}
|---------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
|
- 使用场景分析 {#3-使用场景分析} =====================
CountDownLatch
的应用场景比较广泛,尤其是在处理并发问题时,这里列举了几个:
3.1 批量任务协调 {#3-1-批量任务协调}
有时候,不同子线程可能会同时执行各自的任务,然而主线程会等待所有子线程的执行完毕后,才继续执行后续操作。
比如 Web 应用中多个 API 的响应聚合:假设有多个远程服务需要调用,主线程希望在所有调用都返回结果后,再执行后续处理,可以使用 CountDownLatch
来等待响应的到来。示例代码如下:
|------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| // 类似一些应用需要同时从多个微服务中拉数据,再一起处理 CountDownLatch latch = new CountDownLatch(3); ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 0; i < 3; i++) { executor.submit(() -> { try { // 模拟拉取和处理数据 } catch (Exception e) { // 异常处理 } finally { latch.countDown(); // 每个任务结束后调用 } }); } latch.await(); // 等待所有子任务执行结束 System.out.println("汇总所有数据.");
|
3.2 并行计算 {#3-2-并行计算}
假如有这样一个情景:计算任务很耗时,但是可以分成多个部分并行处理,然后将结果进行合并。
实现方式:我们先将任务分解成 n 个子任务,全部执行完毕后,将子任务的结果进行汇总分析。示例代码如下:
|------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14
| CountDownLatch latch = new CountDownLatch(n); List<Integer> results = new CopyOnWriteArrayList<>(); for (int i = 0; i < n; i++) { new Thread(() -> { try { int result = // 处理部分任务 results.add(result); } finally { latch.countDown(); // 完成后计数减 1 } }).start(); } latch.await(); // 等到结果全部处理完 int finalResult = results.stream().mapToInt(Integer::intValue).sum();
|
3.3 服务启动检查 {#3-3-服务启动检查}
CountDownLatch
还可以为应用服务做"健康检查"。例如,系统在完全启动之前,需要依赖多个外部服务,那么我们可以通过异步方式检测各个服务的健康状态,只有当所有服务都正常启动时,才允许继续执行下一步。
简单的示例代码如下:
|------------------------------------------------------------------------------------------------||
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class ServiceStartChecker { private final CountDownLatch latch; public ServiceStartChecker(int serviceCount) { latch = new CountDownLatch(serviceCount); } public void checkServices() throws InterruptedException { // 启动多个异步线程去检查服务是否就绪 for (int i = 0; i < 3; i++) { new Thread(() -> { try { // 模拟服务检查 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " is ready"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); // 完成后调用,减少计数器 } }).start(); } latch.await(); // 等待所有服务就绪 System.out.println("All services are up. System is ready."); } public static void main(String[] args) throws InterruptedException { ServiceStartChecker checker = new ServiceStartChecker(3); checker.checkServices(); } }
|
- 与其他并发工具对比 {#4-与其他并发工具对比} ===========================
CountDownLatch
只是 Java 并发工具包中的一个工具,其功能与一些其他工具如 CyclicBarrier
、Semaphore
等具有一定的共性和不同点。
4.1 CountDownLatch vs CyclicBarrier {#4-1-CountDownLatch-vs-CyclicBarrier}
- 用途 :
CountDownLatch
适用于一组线程完成某一工作后进入"继续工作"的状态,且无法进行reset
重新使用。而CyclicBarrier
更适合复用,在一组线程都到达某一屏障时统一放行,之后可以通过reset
重复使用。 - 结束条件 :
CyclicBarrier
需要每个等待的线程都到达某个同步点才能继续;而CountDownLatch
则更灵活,它并不关心是哪个线程调用了countDown
,只关注countDown
是否次数到了。
4.2 CountDownLatch vs Semaphore {#4-2-CountDownLatch-vs-Semaphore}
- 任务控制力度的差异 :
Semaphore
更倾向于对信号量的数量进行限流。简单来说,Semaphore 可以限制某个操作的并发次数,比如最多只允许 5 个线程同时执行某个任务。而CountDownLatch
只是简单的减少计数,不去限流,只是关注完成情况。
- 实际项目中的使用 {#5-实际项目中的使用} =========================
在多线程爬虫、分布式系统、并行数据处理等具体项目中,CountDownLatch
都能找到合适的应用场景。
5.1 分布式系统的启动控制 {#5-1-分布式系统的启动控制}
假设我们在一个分布式服务系统中,每个微服务间可能有复杂的依赖关系,借助 CountDownLatch,我可以构建出一个依赖的启动顺序。
5.2 性能测试 {#5-2-性能测试}
在进行性能测试时,可能需要多个线程同时工作,例如使用 CountDownLatch
控制开始时间,以模拟高并发访问场景。
|---------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| CountDownLatch ready = new CountDownLatch(1); CountDownLatch done = new CountDownLatch(N); for (int i = 0; i < N; i++) { new Thread(() -> { try { ready.await(); // 等待所有线程就绪 // 执行模拟请求 } finally { done.countDown(); } }).start(); } // 开始测试 ready.countDown(); done.await(); // 等到所有线程结束
|
通过这样的实践,我们可以轻松模拟高并发性能测试和压力测试场景。
- 总结 {#6-总结} =============
本文我们深度剖析了CountDownLatch
,CountDownLatch
虽然是一个简单的并发工具,对其整体总结如下:
-
核心工作原理 :
CountDownLatch
基于AQS机制,用于管理一个线程集合的执行流程控制。 -
适用场景: 主要用于在处理并行操作时控制线程的执行顺序。
-
与其他工具的对比 :与
CyclicBarrier
和信号量Semaphore
分别有不同的贡献场景。 -
多应用场景使用:包括但不限于服务启动依赖、并行计算结果收集、并发控制等。
-
交流学习 {#7-交流学习} =================
最后,把猿哥的座右铭送给你:投资自己才是最大的财富。 如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。