51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

AQS 源码详解

一、前言 {#一、前言}

AQS 是抽象的队列同步器,是用来构建锁或其他同步组件的重量级基础框架及整个 JUC 体系的基石。

二、相关组件 {#二、相关组件}

下边的组件都是基于 AQS 框架扩展实现的:

  • ReentrantLock:可重入锁,避免多线程竞争资源的安全问题
  • Semaphore:信号量,限制多线程的访问数量
  • CountDownLatch:计数器,用于线程之间的等待场景(如线程A等待其他多个线程完成任务后,线程A才能执行自己的任务)
  • CyclicBarrier:回环栅栏,用于线程之间的等待场景(如在一组线程中,如果线程A执行到代码段S点就会停下等待,等到组内其他线程都执行到S点时它们才会立刻一起执行剩余的任务)

虽然这些组件在多线程场景下有不同的作用,但代码中也有相似之处,如都需要管理锁状态,维护阻塞线程,维护唤醒线程。而 AQS 的作用就是将这些相似的、公共的代码封装在一起。

三、运行原理 {#三、运行原理}

AQS 使用一个 volatile 的 int 类型的 state 变量来表示锁竞争状态,将每条要去抢占资源的线程封装成一个 Node 节点放入到内置的 CLH 同步队列(FIFO 双向队列)来维护排队工作,通过 CASstate 值进行修改。

我们常说的 AQS 指的是 java.util.concurrent.locks.AbstractQueuedSynchronizer 类,其源码的核心如下:

|------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { // 等待状态 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // 工作线程 volatile Thread thread; ... } // 头结点 private transient volatile Node head; // 尾节点 private transient volatile Node tail; // 同步状态,默认值 0,说明 0:资源未抢占 1:资源已抢占 private volatile int state; ... } |

注意:源码中有个状态:一个是 state,针对资源抢占的状态;另一个是 waitStatus,针对 node 节点的状态。

将上文的源码转成图形,可便于我们理解,加深记忆,其运行模型图如下:

四、源码分析 {#四、源码分析}

文中涉及到 CAS 和 LockSupport 相关内容,不清楚的读者可以先跳至末尾,浏览相关的参考资料。

由于 AQS 并非单独使用,为了完整的讲解 AQS 的源码,本篇章以 ReentrantLock 组件为例,一步一步揭开 AQS 如何作为基石使用的。

  • 演示案例

|---------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public class LockTest { public static void main(String[] args) { Lock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { lock.lock(); // 业务代码 System.out.println(Thread.currentThread().getName() + " 开始工作"); lock.unlock(); }, "t1"); Thread t2 = new Thread(() -> { lock.lock(); // 业务代码 System.out.println(Thread.currentThread().getName() + " 开始工作"); lock.unlock(); }, "t2"); Thread t3 = new Thread(() -> { lock.lock(); // 业务代码 System.out.println(Thread.currentThread().getName() + " 开始工作"); lock.unlock(); }, "t3"); t1.start(); t2.start(); t3.start(); } } |

创建一把非公平锁,3 个线程通过抢占锁来执行任务。

此时,AQS 的模型如下:

其中,state 表示锁的抢占状态,ownerThread 表示抢占锁的线程。

进入 lock() 方法,来到 ReentrantLock 源码中:

|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public class ReentrantLock implements Lock, java.io.Serializable { ... public void lock() { sync.lock(); } abstract static class Sync extends AbstractQueuedSynchronizer { // 锁定 abstract void lock(); // 尝试获取非公平锁 final boolean nonfairTryAcquire(int acquires) { ... } // 尝试释放锁 protected final boolean tryRelease(int releases) { ... } ... } ... } |

可以看出 lock() 方法是通过 Sync 类来实现的。而 Sync 是一个抽象的静态内部类,它继承了 AbstractQueuedSynchronizer 类,因此它具备了 AQS 的"能力"。

Q1: Sync 定义了抽象的 lock() 方法,需要通过其子类来实现具体的锁方式(模板方法模式)。为何要这要设计的? 当然是为了方便扩展。

我们都知道通过 ReentrantLock 类能创建公平锁和非公平锁,其原因是 Sync 有两个实现类: FairSyncNonfairSync ,它们都实现了 lock() 的具体细节。案例中,我们创建出的 lock 实例底层使用到的就是 NonfairSync 的实例(多态特性),即非公平锁。假设哪天 ReentrantLock 需要新增第三种锁,只需新增个子类继承 Sync ,实现 lock() 方法即可。

演示案例中我们创建的是非公平锁,我们来看看 Sync 对应非公平锁的子类 NonfairSync ,它同样是定义在 ReentrantLock 的静态内部类:

|---------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public class ReentrantLock implements Lock, java.io.Serializable { ... // 非公平锁实现 static final class NonfairSync extends Sync { final void lock() { // (1) if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // (2) acquire(1); } // (3) protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } ... } |

我们开始沿着演示案例讲解:

假设 t1 线程率先启动获取到 CPU 资源调用了 lock() 方法,执行到 (1) 处,即 compareAndSetState(0, 1),该方法来自 AQS 。通过 CAS 方式将 AQS 中的 state 值变成 1,执行成功返回 true。

由于 t1 是第一条执行的线程,结果肯定返回 true,然后将 ownerThread 设置为当前线程 t1。最终整个 lock() 方法也成功返回,获取锁成功,执行自己的业务代码。

此时,AQS 的模型如下:

这时 t2 和 t3 线程也启动,"轮流" 执行到了 (1) 处,判断肯定失败,然后执行 (2),即 acquire(1) 方法,该方法来自 AQS

我们进到 acquire(1) 方法中:

|------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public final void acquire(int arg) { // (4) if (!tryAcquire(arg) && // (5):addWaiter // (6): acquireQueued acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // (7) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } ... } |

假设当前线程为 t2,执行到 (4) 处,即 tryAcquire(arg) 方法,用于尝试获取锁。它是个抽象方法,由子类 NonfairSync 实现,NonfairSynctryAcquire 方法最终调用其父类 SyncnonfairTryAcquire() 方法来实现,可以返回至 (3) 处查看。

|------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // (8) int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } |

t2 线程进入到 nonfairTryAcquire() 方法,先获取当前线程(t2 线程),然后执行到 (8) 处,即 getState(),此方法来自 AQS ,用于返回 AQSstate 状态。

由于 t1 线程没有释放锁,因此 state = 1OwnerThread = t1 ,下边的 if 判断都不成立,最终方法返回 false。

回到 acquire() 方法中,由于返回 false,!tryAcquire(arg) 判断成立,t2 线程会继续执行到 (5) 处,即 addWaiter(Node.EXCLUSIVE),该方法用于将线程封装到 Node 节点中。

|------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // (9) if (pred != null) { node.prev = pred; // (10) if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // (11) enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // (12) if (compareAndSetHead(new Node())) tail = head; } else { // (13) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ... } |

t2 线程进入到 addWaiter 方法中,先被封装到 node1 节点中,由于是首个线程被封装成 Node,因此 tail 和 pred 必定为 null,(9) 处的判断不成立,执行到 (11) 处,即 enq() 方法。该方法中开启无限循环,通过 CAS 方式设置 CLH 队列的头结点/尾节点。

第一次循环,由于 tail 为空,因此线程执行到 (12) 处,创建一个傀儡节点(无数据,用于占位)设置为头结点和尾节点。

此时,AQS 的模型图如下:

由于没有遇到循环终止的指令,将执行下一次循环。

在第二次循环中,尾节点不为空,因此进入(13) 处,将 node1 节点设置成尾节点,同时前后两个 node 节点建立关系,最终返回 node1 节点。

此时,AQS 的模型图如下:

返回的 node1 节点后,t2 线程开始执行 (6) ,node1 节点被当作参数传入到 acquireQueued() 方法中,该方法用于将节点放入到 CLH 队列中,将其线程挂起等待。

|---------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // (14) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // (15): shouldParkAfterFailedAcquire // (16): parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ... } |

acquireQueued() 方法中也开启无限循环。在循环中,node1 节点先获取它的前驱节点(傀儡节点),然后判断是否为头结点,是则调用 tryAcquire()尝试获取锁,该方法在 (4) 处出现过一次,此处不再赘述。

由于 t1 线程还没有释放锁,(14) 处的最终判断肯定为 false。t2 执行到 (15) 处,即 shouldParkAfterFailedAcquire(),该方法用于修改 node1 节点的前驱节点的 waitStatus 状态。

Node 节点的 waitStatus 有以下 5 种状态:

| 状态 | 说明 | |:---|:-------------------------| | 0 | 初始状态 | | 1 | 线程已取消 | | -1 | 当前节点封装的线程释放锁后,会唤醒后继节点的线程 | | -2 | 线程处在等待状态,在等待队列中 | | -3 | 表示下一次的共享状态会被无条件的传播下去 |

|------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点的 waitStatus 值 int ws = pred.waitStatus; // (17) 状态为 -1 if (ws == Node.SIGNAL) return true; if (ws > 0) { // (18) 状态值 > 0,即线程被取消,从后往前遍历节点,删除已取消状态的线程的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // (19) 将前驱节点的 waitStatus 值改成 -1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } ... } |

t2 线程进入 shouldParkAfterFailedAcquire() 方法中, 获取 node1 前驱节点的 waitStatus 状态,值为 0,直接跳到 (19) 处,通过 CAS 方式将 waitStatus 值改成 -1,最终的方法是返回 false 的。

此时,AQS 的模型图如下:

Q2: 为何在创建 node 节点封装线程时,不直接将 waitStatus 的值设置成 -1,而是专门定义这个方法进行修改? 答案我们留在下文解答。

shouldParkAfterFailedAcquire() 方法返回 false,t2 线程回到 acquireQueued() 方法中,由于之前是在无限循环中进行的,没有遇到终止指定,因此 t2 线程将执行第二次循环的操作。

毫无疑问,t2 线程又会再次执行 shouldParkAfterFailedAcquire() 方法,此时 node1 的前驱节点的 waitStatus = -1 ,最终方法返回 true。随后 t2 开始执行 (16),即 parkAndCheckInterrupt() 方法,用于线程等待。

|---------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // (20) return Thread.interrupted(); } ... } |

方法里边调用了 LockSupport.park(this),t2 线程立即被挂起,变成 WAIT 状态(此处的状态是线程的状态,与 AQS 中的 state 状态,Node 节点的 waitStatus 无关)。

假设 t1 线程仍未释放锁,轮到 t3 线程运行,不出意外其运行的最终结果也是被封装到 node2 节点,放到 CLH 队列中被挂起等待。

此时, AQS 的模型图如下:

现在只有 t1 线程处在运行状态,当它运行完业务代码,随后执行 unlock() 方法:

|---------------------------------|--------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 | public class ReentrantLock implements Lock, java.io.Serializable { ... public void unlock() { sync.release(1); } ... } |

方法通过调用 Syncrelease() 方法来实现,而 release() 方法来自 AQS

|---------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... public final boolean release(int arg) { // (21) if (tryRelease(arg)) { Node h = head; // (22) if (h != null && h.waitStatus != 0) // (23) unparkSuccessor(h); return true; } return false; } ... } |

当 t1 线程执行到 (21) 处,即 tryRelease(arg) 方法,该方法都抽象方法,由 Sync 子类实现:

|------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public class ReentrantLock implements Lock, java.io.Serializable { ... abstract static class Sync extends AbstractQueuedSynchronizer { ... // 尝试释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } ... } |

进入该方法:

  1. 先获取 AQS 状态 state = 1,减去入参(值为 1),结果 c = 0。
  2. 判断当前线程(t1) 是否不等于 OwnerThread 的线程(t1),显然是相等的。
  3. 判断 c 是否为 0,判断成立设置返回值 free 为 true, 将 OwnerThread 线程设置为 null。
  4. 修改 AQS 状态为 c 的值,即 AQSstate = 0
  5. 最终返回 free。

经过上述的操作,此时 AQS 模型图为:

方法执行完回到 (21),条件判断成立,进入 if 方法体中:

  1. 获取头结点(值为 dummy 节点)
  2. 头节点进行非空判断 和 waitStatus 的非零判断(值为 -1),判断成立进入到 (23) 处,即 unparkSuccessor() 方法,用于唤醒下个节点的线程。

|------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 还原 waitStatus 状态为 0 compareAndSetWaitStatus(node, ws, 0); // (24) Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从后往前找 waitStatus <= 0 的节点(剔除取消状态的线程节点) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // (25) if (s != null) // 唤醒后继节点封装的线程 LockSupport.unpark(s.thread); } ... } |

头结点被当作参数传入到 unparkSuccessor() 方法中,该方法执行 3 个操作:

  1. 将头结点的 waitStatus 状态恢复成 0。
  2. 获取头结点的后继节点,如果后继节点为空或 waitStatus 状态为已取消,则从后往前遍历 CLH 队列,获取非取消状态的节点。
  3. 唤醒后继节点中封装的线程。

回到案例中,线程执行到 (24) 处,获取的后继节点为 node1(封装 t2 线程)。来到 (25) 处,唤醒 node1 中的 t2 线程。

讲解到此处,大家是否还记得 Q2:为何在创建 Node 节点封装线程时,不直接将 waitStatus 的值设置成 -1。

我们以创建的 Node 节点时, waitStatus 值设置为 -1 为前提,进行案例推演:

  1. 依然是 3 个线程在运行,t1 线程先拿到锁执行业务代码。
  2. CPU 切换到 t2 线程,执行到 (5) 处,即 addWaiter() ,被封装到 node1 节点中(waitStatus = -1),此时 node1 节点已经和头结点建立关系。
  3. 在 t2 线程执行 (16) 处,即在执行 parkAndCheckInterrupt() 方法,t2 线程要被挂起之前,CPU 又切换至 t1 线程。
  4. t1 线程执行完业务代码,要释放锁时,会执行 (25) 处代码 LockSupport.unpark(s.thread); ,唤醒 t2 。
  5. 但实际上 t2 线程并没有被挂起等待,如果某个线程被提前 unpark(thread),那么当该 thread 线程调用 park() 时是不会被挂起等待的,这样锁的机制就乱套了(线程未获取到锁,但又不挂起等待)。

因此,Node 节点的 waitStatus 值不能一开始被设置的成 -1。

回到正常案例中,t1 线程唤醒 t2 线程结束任务。t2 线程被唤醒并拿到 CPU 资源,执行到 (20) 处,即 Thread.interrupted() 方法,检测当前线程是否被中断,t2 线程并未中断,因此 parkAndCheckInterrupt() 方法返回 false。

|---------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... final boolean (final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // (14) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // (15): shouldParkAfterFailedAcquire // (16): parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } ... } |

t2 线程将在 acquireQueued() 方法中进行第三次循环操作:

  1. 获取前驱节点 oldHead (dummy)
  2. 前驱节点是否为头结点,是则尝试获取锁。因为 AQSstate 已恢复成 0,因此 t2 可以成功获取到锁(修改 state = 1ownerThread = t2)。
  3. 将 node1 节点设置为头结点(将前驱节点设置为 null,封装的线程设置为 null)
  4. 将 oldHead 节点的后继节点设置为 null。

此时,AQS 的模型图如下:

t2 线程获取到锁执行任务,之后释放锁。。。

t3 线程之后的执行流程与 t2 类似,此处也不在赘述。

五、执行流程 {#五、执行流程}

最后附加代码执行流程图:

流程图并未充分展示执行的细节,最终还得需要读者自行阅读源码加深理解。

六、参考资料 {#六、参考资料}

CAS 原理新讲

LockSupport 工具介绍

赞(0)
未经允许不得转载:工具盒子 » AQS 源码详解