一、前言 {#一、前言}
AQS 是抽象的队列同步器,是用来构建锁或其他同步组件的重量级基础框架及整个 JUC 体系的基石。
二、相关组件 {#二、相关组件}
下边的组件都是基于 AQS 框架扩展实现的:
- ReentrantLock:可重入锁,避免多线程竞争资源的安全问题
- Semaphore:信号量,限制多线程的访问数量
- CountDownLatch:计数器,用于线程之间的等待场景(如线程A等待其他多个线程完成任务后,线程A才能执行自己的任务)
- CyclicBarrier:回环栅栏,用于线程之间的等待场景(如在一组线程中,如果线程A执行到代码段S点就会停下等待,等到组内其他线程都执行到S点时它们才会立刻一起执行剩余的任务)
虽然这些组件在多线程场景下有不同的作用,但代码中也有相似之处,如都需要管理锁状态,维护阻塞线程,维护唤醒线程。而 AQS 的作用就是将这些相似的、公共的代码封装在一起。
三、运行原理 {#三、运行原理}
AQS 使用一个 volatile 的 int 类型的 state 变量来表示锁竞争状态,将每条要去抢占资源的线程封装成一个 Node 节点放入到内置的 CLH 同步队列(FIFO 双向队列)来维护排队工作,通过 CAS 对 state 值进行修改。
我们常说的 AQS 指的是 java.util.concurrent.locks.AbstractQueuedSynchronizer 类,其源码的核心如下:
|------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { // 等待状态 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // 工作线程 volatile Thread thread; ... } // 头结点 private transient volatile Node head; // 尾节点 private transient volatile Node tail; // 同步状态,默认值 0,说明 0:资源未抢占 1:资源已抢占 private volatile int state; ... } |
注意:源码中有个状态:一个是 state,针对资源抢占的状态;另一个是 waitStatus,针对 node 节点的状态。
将上文的源码转成图形,可便于我们理解,加深记忆,其运行模型图如下:
四、源码分析 {#四、源码分析}
文中涉及到 CAS 和 LockSupport 相关内容,不清楚的读者可以先跳至末尾,浏览相关的参考资料。
由于 AQS 并非单独使用,为了完整的讲解 AQS 的源码,本篇章以 ReentrantLock 组件为例,一步一步揭开 AQS 如何作为基石使用的。
- 演示案例
|---------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public class LockTest { public static void main(String[] args) { Lock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { lock.lock(); // 业务代码 System.out.println(Thread.currentThread().getName() + " 开始工作"); lock.unlock(); }, "t1"); Thread t2 = new Thread(() -> { lock.lock(); // 业务代码 System.out.println(Thread.currentThread().getName() + " 开始工作"); lock.unlock(); }, "t2"); Thread t3 = new Thread(() -> { lock.lock(); // 业务代码 System.out.println(Thread.currentThread().getName() + " 开始工作"); lock.unlock(); }, "t3"); t1.start(); t2.start(); t3.start(); } } |
创建一把非公平锁,3 个线程通过抢占锁来执行任务。
此时,AQS 的模型如下:
其中,state 表示锁的抢占状态,ownerThread 表示抢占锁的线程。
进入 lock() 方法,来到 ReentrantLock 源码中:
|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public class ReentrantLock implements Lock, java.io.Serializable { ... public void lock() { sync.lock(); } abstract static class Sync extends AbstractQueuedSynchronizer { // 锁定 abstract void lock(); // 尝试获取非公平锁 final boolean nonfairTryAcquire(int acquires) { ... } // 尝试释放锁 protected final boolean tryRelease(int releases) { ... } ... } ... } |
可以看出 lock() 方法是通过 Sync 类来实现的。而 Sync 是一个抽象的静态内部类,它继承了 AbstractQueuedSynchronizer 类,因此它具备了 AQS 的"能力"。
Q1: Sync 定义了抽象的 lock() 方法,需要通过其子类来实现具体的锁方式(模板方法模式)。为何要这要设计的? 当然是为了方便扩展。
我们都知道通过 ReentrantLock 类能创建公平锁和非公平锁,其原因是 Sync 有两个实现类: FairSync 和 NonfairSync ,它们都实现了 lock() 的具体细节。案例中,我们创建出的 lock 实例底层使用到的就是 NonfairSync 的实例(多态特性),即非公平锁。假设哪天 ReentrantLock 需要新增第三种锁,只需新增个子类继承 Sync ,实现 lock() 方法即可。
演示案例中我们创建的是非公平锁,我们来看看 Sync 对应非公平锁的子类 NonfairSync ,它同样是定义在 ReentrantLock 的静态内部类:
|---------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public class ReentrantLock implements Lock, java.io.Serializable { ... // 非公平锁实现 static final class NonfairSync extends Sync { final void lock() { // (1) if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // (2) acquire(1); } // (3) protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } ... } |
我们开始沿着演示案例讲解:
假设 t1 线程率先启动获取到 CPU 资源调用了 lock() 方法,执行到 (1) 处,即 compareAndSetState(0, 1),该方法来自 AQS 。通过 CAS 方式将 AQS 中的 state 值变成 1,执行成功返回 true。
由于 t1 是第一条执行的线程,结果肯定返回 true,然后将 ownerThread 设置为当前线程 t1。最终整个 lock() 方法也成功返回,获取锁成功,执行自己的业务代码。
此时,AQS 的模型如下:
这时 t2 和 t3 线程也启动,"轮流" 执行到了 (1) 处,判断肯定失败,然后执行 (2),即 acquire(1) 方法,该方法来自 AQS。
我们进到 acquire(1) 方法中:
|------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public final void acquire(int arg) { // (4) if (!tryAcquire(arg) && // (5):addWaiter // (6): acquireQueued acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // (7) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } ... } |
假设当前线程为 t2,执行到 (4) 处,即 tryAcquire(arg) 方法,用于尝试获取锁。它是个抽象方法,由子类 NonfairSync 实现,NonfairSync 的 tryAcquire 方法最终调用其父类 Sync 的 nonfairTryAcquire() 方法来实现,可以返回至 (3) 处查看。
|------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // (8) int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } |
t2 线程进入到 nonfairTryAcquire() 方法,先获取当前线程(t2 线程),然后执行到 (8) 处,即 getState(),此方法来自 AQS ,用于返回 AQS 的 state 状态。
由于 t1 线程没有释放锁,因此 state = 1 、OwnerThread = t1 ,下边的 if 判断都不成立,最终方法返回 false。
回到 acquire() 方法中,由于返回 false,!tryAcquire(arg) 判断成立,t2 线程会继续执行到 (5) 处,即 addWaiter(Node.EXCLUSIVE),该方法用于将线程封装到 Node 节点中。
|------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // (9) if (pred != null) { node.prev = pred; // (10) if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // (11) enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // (12) if (compareAndSetHead(new Node())) tail = head; } else { // (13) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ... } |
t2 线程进入到 addWaiter 方法中,先被封装到 node1 节点中,由于是首个线程被封装成 Node,因此 tail 和 pred 必定为 null,(9) 处的判断不成立,执行到 (11) 处,即 enq() 方法。该方法中开启无限循环,通过 CAS 方式设置 CLH 队列的头结点/尾节点。
第一次循环,由于 tail 为空,因此线程执行到 (12) 处,创建一个傀儡节点(无数据,用于占位)设置为头结点和尾节点。
此时,AQS 的模型图如下:
由于没有遇到循环终止的指令,将执行下一次循环。
在第二次循环中,尾节点不为空,因此进入(13) 处,将 node1 节点设置成尾节点,同时前后两个 node 节点建立关系,最终返回 node1 节点。
此时,AQS 的模型图如下:
返回的 node1 节点后,t2 线程开始执行 (6) ,node1 节点被当作参数传入到 acquireQueued() 方法中,该方法用于将节点放入到 CLH 队列中,将其线程挂起等待。
|---------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // (14) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // (15): shouldParkAfterFailedAcquire // (16): parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ... } |
acquireQueued() 方法中也开启无限循环。在循环中,node1 节点先获取它的前驱节点(傀儡节点),然后判断是否为头结点,是则调用 tryAcquire()尝试获取锁,该方法在 (4) 处出现过一次,此处不再赘述。
由于 t1 线程还没有释放锁,(14) 处的最终判断肯定为 false。t2 执行到 (15) 处,即 shouldParkAfterFailedAcquire(),该方法用于修改 node1 节点的前驱节点的 waitStatus 状态。
Node 节点的 waitStatus 有以下 5 种状态:
| 状态 | 说明 | |:---|:-------------------------| | 0 | 初始状态 | | 1 | 线程已取消 | | -1 | 当前节点封装的线程释放锁后,会唤醒后继节点的线程 | | -2 | 线程处在等待状态,在等待队列中 | | -3 | 表示下一次的共享状态会被无条件的传播下去 |
|------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点的 waitStatus 值 int ws = pred.waitStatus; // (17) 状态为 -1 if (ws == Node.SIGNAL) return true; if (ws > 0) { // (18) 状态值 > 0,即线程被取消,从后往前遍历节点,删除已取消状态的线程的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // (19) 将前驱节点的 waitStatus 值改成 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } ... } |
t2 线程进入 shouldParkAfterFailedAcquire() 方法中, 获取 node1 前驱节点的 waitStatus 状态,值为 0,直接跳到 (19) 处,通过 CAS 方式将 waitStatus 值改成 -1,最终的方法是返回 false 的。
此时,AQS 的模型图如下:
Q2: 为何在创建 node 节点封装线程时,不直接将 waitStatus 的值设置成 -1,而是专门定义这个方法进行修改? 答案我们留在下文解答。
shouldParkAfterFailedAcquire() 方法返回 false,t2 线程回到 acquireQueued() 方法中,由于之前是在无限循环中进行的,没有遇到终止指定,因此 t2 线程将执行第二次循环的操作。
毫无疑问,t2 线程又会再次执行 shouldParkAfterFailedAcquire() 方法,此时 node1 的前驱节点的 waitStatus = -1 ,最终方法返回 true。随后 t2 开始执行 (16),即 parkAndCheckInterrupt() 方法,用于线程等待。
|---------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // (20) return Thread.interrupted(); } ... } |
方法里边调用了 LockSupport.park(this),t2 线程立即被挂起,变成 WAIT 状态(此处的状态是线程的状态,与 AQS 中的 state 状态,Node 节点的 waitStatus 无关)。
假设 t1 线程仍未释放锁,轮到 t3 线程运行,不出意外其运行的最终结果也是被封装到 node2 节点,放到 CLH 队列中被挂起等待。
此时, AQS 的模型图如下:
现在只有 t1 线程处在运行状态,当它运行完业务代码,随后执行 unlock() 方法:
|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 | public class ReentrantLock implements Lock, java.io.Serializable { ... public void unlock() { sync.release(1); } ... } |
方法通过调用 Sync 的 release() 方法来实现,而 release() 方法来自 AQS:
|---------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public final boolean release(int arg) { // (21) if (tryRelease(arg)) { Node h = head; // (22) if (h != null && h.waitStatus != 0) // (23) unparkSuccessor(h); return true; } return false; } ... } |
当 t1 线程执行到 (21) 处,即 tryRelease(arg) 方法,该方法都抽象方法,由 Sync 子类实现:
|------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public class ReentrantLock implements Lock, java.io.Serializable { ... abstract static class Sync extends AbstractQueuedSynchronizer { ... // 尝试释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } ... } |
进入该方法:
- 先获取 AQS 状态 state = 1,减去入参(值为 1),结果 c = 0。
- 判断当前线程(t1) 是否不等于 OwnerThread 的线程(t1),显然是相等的。
- 判断 c 是否为 0,判断成立设置返回值 free 为 true, 将 OwnerThread 线程设置为 null。
- 修改 AQS 状态为 c 的值,即 AQS 的 state = 0。
- 最终返回 free。
经过上述的操作,此时 AQS 模型图为:
方法执行完回到 (21),条件判断成立,进入 if 方法体中:
- 获取头结点(值为 dummy 节点)
- 头节点进行非空判断 和 waitStatus 的非零判断(值为 -1),判断成立进入到 (23) 处,即
unparkSuccessor()方法,用于唤醒下个节点的线程。
|------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 还原 waitStatus 状态为 0 compareAndSetWaitStatus(node, ws, 0); // (24) Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从后往前找 waitStatus <= 0 的节点(剔除取消状态的线程节点) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // (25) if (s != null) // 唤醒后继节点封装的线程 LockSupport.unpark(s.thread); } ... } |
头结点被当作参数传入到 unparkSuccessor() 方法中,该方法执行 3 个操作:
- 将头结点的 waitStatus 状态恢复成 0。
- 获取头结点的后继节点,如果后继节点为空或 waitStatus 状态为已取消,则从后往前遍历 CLH 队列,获取非取消状态的节点。
- 唤醒后继节点中封装的线程。
回到案例中,线程执行到 (24) 处,获取的后继节点为 node1(封装 t2 线程)。来到 (25) 处,唤醒 node1 中的 t2 线程。
讲解到此处,大家是否还记得 Q2:为何在创建 Node 节点封装线程时,不直接将 waitStatus 的值设置成 -1。
我们以创建的 Node 节点时, waitStatus 值设置为 -1 为前提,进行案例推演:
- 依然是 3 个线程在运行,t1 线程先拿到锁执行业务代码。
- CPU 切换到 t2 线程,执行到 (5) 处,即
addWaiter(),被封装到 node1 节点中(waitStatus = -1),此时 node1 节点已经和头结点建立关系。 - 在 t2 线程执行 (16) 处,即在执行
parkAndCheckInterrupt()方法,t2 线程要被挂起之前,CPU 又切换至 t1 线程。 - t1 线程执行完业务代码,要释放锁时,会执行 (25) 处代码
LockSupport.unpark(s.thread);,唤醒 t2 。 - 但实际上 t2 线程并没有被挂起等待,如果某个线程被提前
unpark(thread),那么当该 thread 线程调用park()时是不会被挂起等待的,这样锁的机制就乱套了(线程未获取到锁,但又不挂起等待)。
因此,Node 节点的 waitStatus 值不能一开始被设置的成 -1。
回到正常案例中,t1 线程唤醒 t2 线程结束任务。t2 线程被唤醒并拿到 CPU 资源,执行到 (20) 处,即 Thread.interrupted() 方法,检测当前线程是否被中断,t2 线程并未中断,因此 parkAndCheckInterrupt() 方法返回 false。
|---------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... final boolean (final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // (14) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // (15): shouldParkAfterFailedAcquire // (16): parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ... } |
t2 线程将在 acquireQueued() 方法中进行第三次循环操作:
- 获取前驱节点 oldHead (dummy)
- 前驱节点是否为头结点,是则尝试获取锁。因为 AQS 的 state 已恢复成 0,因此 t2 可以成功获取到锁(修改 state = 1 和 ownerThread = t2)。
- 将 node1 节点设置为头结点(将前驱节点设置为 null,封装的线程设置为 null)
- 将 oldHead 节点的后继节点设置为 null。
此时,AQS 的模型图如下:
t2 线程获取到锁执行任务,之后释放锁。。。
t3 线程之后的执行流程与 t2 类似,此处也不在赘述。
五、执行流程 {#五、执行流程}
最后附加代码执行流程图:
流程图并未充分展示执行的细节,最终还得需要读者自行阅读源码加深理解。
51工具盒子