一、前言 {#一、前言}
在高并发访问的场景下,为了保证项目不被大流量请求的压力影响性能导致项目运行崩溃,常用的解决方案就是限流 和服务降级。
本篇介绍 Semaphore , 直译就是信号量,是基于 AQS 扩展的一种多线程并发控制的工具,也就是我们常说的限流工具之一。
二、工作原理 {#二、工作原理}
Semaphore 通过 permit 来判断线程是否可通行。
Semaphore 需要设置一定数量的 permit ,当一个线程执行任务遇到 Semaphore 时会被拦截,该线程需要向 Semaphore 申请一个或多个 permit:
- 如果 permit 数量充足,将 permit 发给该线程然后将其放行。当线程执行完任务后需要将 permit 如数奉还。
- 如果 permit 数量不足,该线程会安排到 CLH 队列中等待挂起,当其他线程归还 permit 后,等待的线程会被唤醒再申请 permit。
前言说到,Semaphore 是基于 AQS 扩展的,线程竞争资源的状态,线程的等待,线程的唤醒都是靠 AQS 实现和维护的,因此我们通过观察其模型图来讲解其工作原理:
图中 permit 的数量值由 state 保存维护,等待的线程则被封装成 Node 节点放在 CLH 队列中挂起等待。
三、源码解析 {#三、源码解析}
我们先通过案例了解 Semaphore 基本使用。
- 案例
我们把 Semaphore 当作停车位,Thread 当作找车位的车子:
|------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class SemaphoreTest { public static void main(String[] args) { // (1) Semaphore semaphore = new Semaphore(2); for (int i = 1; i <= 5; i++) { Thread thread = new Thread(() -> { try { // (2) semaphore.acquire(); System.out.println(LocalDateTime.now() + " -> " + Thread.currentThread().getName() + " 号车获取到停车位"); int randomSec = (int) (Math.random() * 3) + 1; TimeUnit.SECONDS.sleep(randomSec); System.out.println(LocalDateTime.now() + " -> " + Thread.currentThread().getName() + " 号车停车 " + randomSec + " 秒,现在离开"); // (3) semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t" + i); thread.start(); } } }
|
执行结果:
|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10
| 2023-03-06T10:56:24.299 -> t2 号车获取到停车位 2023-03-06T10:56:24.299 -> t1 号车获取到停车位 2023-03-06T10:56:25.300 -> t2 号车停车 1 秒,现在离开 2023-03-06T10:56:25.300 -> t3 号车获取到停车位 2023-03-06T10:56:26.301 -> t3 号车停车 1 秒,现在离开 2023-03-06T10:56:26.301 -> t4 号车获取到停车位 2023-03-06T10:56:27.301 -> t1 号车停车 3 秒,现在离开 2023-03-06T10:56:27.301 -> t5 号车获取到停车位 2023-03-06T10:56:29.302 -> t4 号车停车 3 秒,现在离开 2023-03-06T10:56:30.301 -> t5 号车停车 3 秒,现在离开
|
从结果不难看出,在 5 辆车子(thread)中每次最多只有 2 辆车子能够申请到停车位(permit)停车。
- 源码分析
我们按照例子中的代码执行顺序讲解。
我们先看 (1) 处的代码,即 Semaphore semaphore = new Semaphore(2)
,进入源码:
|---------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { // (4) setState(permits); } // (5) final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // ... 省略 ... } static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } // (6) protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } public Semaphore(int permits) { sync = new NonfairSync(permits); } // ... 省略 ... }
|
调用 Semaphore 构造方法时,它底层是创建一个名为 Sync 的抽象的静态内部类的实例,Semaphore 所有的操作都是通过 Sync 实例来完成。
Sync 类继承了 AbstractQueuedSynchronizer 类,因此它拥有了 AQS 的能力。此处 Sync 的设计和 ReentrantLock 类相似,Sync 都被声明为抽象类,再通过子类实现具体的方法。Sync 拥有 FairSync 和 NonfairSync 两个子类。
我们案例中,创建的 Semaphore 实例,底层使用的是 NonfairSync 实例,调用构造方法时传入的 permits 值为 2。最终值会被传入到 (4) 处,即 setState(permits)
,该方法来自 AQS , 将 AQS 中的 state 值设置成 2。
此时,AQS 的模型图如下:
创建实例成功后,线程来到 (2) 处,即 semaphore.acquire()
申请 permit,查看源码:
|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8
| public class Semaphore implements java.io.Serializable { // ... 省略 ... public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
|
此方法底层调用 sync.acquireSharedInterruptibly(1)
,该方法来自 AQS,我们接着点进去:
|------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ... 省略 ... public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // (7) if (tryAcquireShared(arg) < 0) // (8) doAcquireSharedInterruptibly(arg); } }
|
该方法中:
- 先判断当前线程的中断状态,如果为 true 则抛出 InterruptedException 异常。
- 否则来到 (7) 处,即
tryAcquireShared(arg)
。该方法是一个抽象方法,由子类 NonfairSync 实现,来到 (6) 处,而 (6) 处方法的底层调用父类的方法实现,即 (5) 处代码:
|------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14
| abstract static class Sync extends AbstractQueuedSynchronizer { // ... 省略 ... // (5) final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
|
该方法开启一个无限循环来尝试获取 permit :
- 先获取 AQS 的 state 值(permits 可用数量)。
- 通过 permits 可用数量减去要申请的 permit 数量,计算出剩余的 permits 数量。
- 如果 permits 剩余数量
< 0
直接返回该值或者如果 permits 剩余数量>= 0
则通过 CAS 方式重新设置 state 值,最后再返回该值。
此处开启无限循环是因为多个线程在执行 compareAndSetState
操作前可能会发生线程切换,但只会有一个线程能执行成功,切换回来的线程执行 CAS 操作必定会失败,因此需要循环重新计算 permits 可用值,确保 AQS 的 state 值的准确性。
案例中,t1 和 t2 线程率先拿到 permit 处理业务,此时 AQS 模型图如下:
而 t3 线程以及其他线程未能拿到 permit (tryAcquireShared
返回值小于 0),因此它们来到 (8) 处,即 doAcquireSharedInterruptibly(arg)
,该方法来自 AQS:
|---------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ... 省略 ... private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // (9) final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // (10) int r = tryAcquireShared(arg); if (r >= 0) { // (11) setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // (12): shouldParkAfterFailedAcquire // (13): parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
|
竞争资源失败的线程进入到这个方法中,案例中 t3 线程执行代码流程如下:
- 先执行
addWaiter(Node.SHARED)
方法,将当前线程封装到 Node 节点(node1)。因为 CLH 的数据为空,当 node1 进到 CLH 前,会为其创建一个虚拟头节点(dummy)。 - 进入到一个无限循环中,判断 node1 的前驱节点是否为头结点,判断成功,进入到 (10) 处
tryAcquireShared(arg)
尝试获取 permit,该方法上文已讲解,不再赘述。 - 显然在 t1 和 t2 线程未释放 permit 之前,其他线程无法获取到 permit ,返回值必然小于 0,来到第 (12) 处
shouldParkAfterFailedAcquire(p, node)
。 shouldParkAfterFailedAcquire(p, node)
方法将 node1 节点的前驱节点(dummy)的 waitStatus 状态改为 -1。最后来到第 (13) 处,parkAndCheckInterrupt()
。parkAndCheckInterrupt()
方法底层调用LockSupport.park(this)
,这样 t3 线程就被挂起等待。
此时,AQS 的模型图为:
同样的,其他未获取到 permit 都会被封装成 Node 节点进到 CLH 队列被挂起等待。
我们回到获取到 permit 的线程视角,如 t2 线程业务执行结束后,执行 semaphore.release()
,进到源码中:
|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9
| public class Semaphore implements java.io.Serializable { // ... 省略 ... public void release() { sync.releaseShared(1); } }
|
方法底层调用 Sync 实例的 releaseShared(1)
方法,该方法来自 AQS:
|------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ... 省略 ... public final boolean releaseShared(int arg) { // (14) if (tryReleaseShared(arg)) { // (15) doReleaseShared(); return true; } return false; } }
|
t2 线程来到 (14) 处,执行 tryReleaseShared(arg)
,该方法为抽象方法,由 AQS 的子类 Sync 类实现:
|------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class Semaphore implements java.io.Serializable { // ... 省略 ... abstract static class Sync extends AbstractQueuedSynchronizer { // ... 省略 ... protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } } }
|
该方法用于归还 permit,同样开启一个无限循环:
- 先获取 AQS 的 state 值(permits 剩余数量),当前值为 0。
- 通过 permits 剩余数量加上要归还的 permit 数量,计算出 permits 的新值,当前值为 1。
- 检验数值的合法性
- 通过 CAS 方式将 state 值修改为 permits 新值,返回 true。
此时,AQS 的模型图为:
tryReleaseShared(arg)
方法返回 true,t2 线程进到 (15) 处,即 doReleaseShared()
方法,该方法来自 AQS:
|---------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ... 省略 ... private void doReleaseShared() { for (;;) { Node h = head; // (16) if (h != null && h != tail) { int ws = h.waitStatus; // (17) if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // (18) unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // (19) if (h == head) break; } } }
|
该方法用于修改头结点的 waitStatus 值以及唤醒头结点的后继节点中的线程。 开启一个无限循环:
- 获取 CLH 的头结点
- 判断头结点(dummy)是否为空,同时头结点是否与尾节点相同。由 AQS 模型图可知,(16) 处的判断是成立的,随后 t2 线程进到 if 方法体中。
- 判断头结点(dummy)的 waitStatus 状态,当前状态值为 -1,(17) 处判断成立,将头结点的 waitStatus 通过 CAS 方式还原为 0。
- 修改成功后执行 (18) 处代码,即
unparkSuccessor(h)
,该方法用于查询头结点的后继节点,并通过LockSupport.unpark(thread)
唤醒节点中的线程(t3 线程)。由于该方法在 《AQS 源码详解》 已讲解,此处不多赘述。 - t2 线程最后来到 (19) 处,判断成立退出无限循环。
这样 t2 线程释放锁完毕,结束线程,我们转到被唤醒的 t3 线程视角:
|---------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ... 省略 ... private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // (9) final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // (10) int r = tryAcquireShared(arg); if (r >= 0) { // (11) setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // (12): shouldParkAfterFailedAcquire // (13): parkAndCheckInterrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
|
t3 线程在执行 (13) 处代码被挂起等待的,当被唤醒后,又进入下一个循环:
- 获取当前节点(node1)的前驱节点(dummy),判断是否为头结点,从 AQS 模型图可知,判断成立。
- 执行
tryAcquireShared(arg)
尝试获取 permit ,由于 AQS 的 state 值此时为 1,t3 线程能够成功获取到 permit,方法返回 0。 - 进行
if (r >= 0)
判断也成立,执行 (11) 处方法。
|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // ... 省略 ... private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } }
|
该方法将 t3 线程的 node1 节点设置为头结点,同时将节点中的 prev 和 thread 设置为 null。之后做了一系列判断执行 doReleaseShared()
方法,此方法就是 (15) 处的方法,具体流程此处不多赘述(在执行 doReleaseShared()
期间,node1 节点的下一个节点 node2 的线程被唤醒过,于是申请 permit 失败又被挂起等待)。
t3 线程执行完 (11) 处代码,再执行 p.next = null
后,获取 permit 的工作基本完成。
此时,AQS 的流程图如下:
剩余的被挂起等待的 t4 和 t5 线程后续工作同上。