51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

听说你想学Java并发编程?先把这个学了(1) --20210602

大家好,我是指北君。

最近在学习Java并发编程,但学了很久,总觉得差点意思,因为只会使用相关工具类,却不知实现原理,有时候写出了bug也不知道啥原因,所以指北君一怒之下,决定死磕java.util .concurrent工具包下的源码!!经过一个月的熬灯夜读,指北君总于小有所成,现在决定输出这一个月的所有收获!

JUC包下的类这么多,我们先从哪个开始呢?这个答案是唯一的,那就是AQS!


前言 {#前言}

别急,在指北君谈AQS之前,想先问问大家知不知道并发需要关注的两个核心问题?不知道?那这个必须知道!第一个问题是互斥,即同一时间只允许一个线程访问共享资源,第二个是同步,即线程如何进行通信和协作。

那我们又是怎么解决这两个问题的呢?那就是我们大名鼎鼎的管程和信号量了!但我们今天先说说管程,为啥大家后面就知道了。

那什么是管程呢?所谓管程,就是管理共享变量以及对共享变量操作的过程,其有三种模型,分别为 Hasen 模型、Hoare 模型和 MESA 模型。目前应用最广泛的是MESA模型,而JAVA采用的也是这种MESA模型(其模型图如下图所示):

MESA Monitor

可能这个图大家现在还看不太明白,没关系,暂时留个印象,当看完指北君AQS系列文章以后,你再回过头来看这个图,肯定秒懂!

Java中的synchronized关键字就是其管程的具体实现,当然,今天所要聊的AQS同样也是。AQS是Java并发编程的基础,只要掌握它,java.util .concurrent工具包下的大部分工具类源码你都能在10分钟内看懂,连源码都懂了,还怕不知道怎么用吗?!

所以,针对AQS,指北君将用三篇文章来讲解:

第一篇即本篇,我会将AQS做个整体的介绍,并将其基础总结成了三把斧头,有了这三把斧头,你就能快速斩获AQS

第二篇我们则讲AQS如何通过锁机制解决互斥问题

第三篇则是AQS如何通过条件变量来解决线程通信协作问题

好了,现在随着指北君开始第一篇的学习吧


AQS是什么 {#aqs是什么}

好了,本文的正篇正式开始。前言说了半天AQS,AQS到底是什么呢?AQS全称为AbstractQueuedSynchronizer,翻译成中文即:抽象队列同步器,它是java.util .concurrent工具包下的抽象类,也是一个模板类(设计模式中的模板模式可以了解一波),你也可以理解为为开发人员提供的一种同步框架,它已经帮我们实现了大部分公共通用逻辑,如线程入队、出队,阻塞、唤醒等,我们只需要根据我们自己的需求,实现一些特定的方法,这些方法也叫钩子方法,比如下面几个方法:

  1. tryAcquire:独占模式下获取同步状态

  2. tryRelease:独占模式下释放同步状态

  3. tryAcquireShared:共享模式下获取同步状态

  4. tryReleaseShared:共享模式下释放同步状态

  5. isHeldExclusively:独占模式下,查看当前线程是否获取同步状态

AQS针对互斥,提供了两种模式,即独占模式和共享模式。独占模式只允许一个线程拿到锁去操作共享资源,而共享锁则有多把锁,允许多个线程同时操作共享资源,这个第二篇会详细讲解,在这之前我们需要先了解AQS的三板斧,这三个是了解AQS的基础:

  1. 状态:AQS中的所有逻辑都是依据状态state来进行的,所以它是整个类的和兴。它是被关键字volatile修饰的,保证其可见性和部分有序性

  2. 队列:一共有两种队列,同步队列和条件队列。当线程的请求在短时间内得不到满足时,线程会被包装成某种类型的数据结构放入队列中,当条件满足时则会拿出队列。

  3. CAS:由Unsafe工具类来实现的,其操作具有原子性,AQS通过CAS和volatile来保证状态的线程安全


第一板斧:状态 {#第一板斧状态}

|-----------|-----------------------------| | 1 | private volatile int state; |

状态是被volatile修饰的int类型变量,它的值代表着锁还剩多少。在独占锁模式下,只有一把锁,则state只有0和1两个值,1代表锁没被其他线程占有,目前可获取;0则代表锁已经被其他线程占有了。在共享锁模式下,state最大值就是锁的数量。

后面我们对state的修改都是通过CAS操作,所以是线程安全的。


第二板斧:队列 {#第二板斧队列}

AQS中存在两种队列,一种叫同步队列,它是双向链表,其获取锁失败的线程会被包装成Node放入同步队列中;另一种叫条件队列,它是单向链表,线程可以调用等待方法来把自己放入条件队列,或者调用唤醒方法把条件队列的其他被包装成Node的线程移到同步队列中。这个大家如果此时看不太懂没关系,第三篇文章会详细介绍这个等待唤醒机制。我们来看看线程包装成Node的Node类:

|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | static final class Node { // nextWaiter属性的具体值 static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 锁的状态 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 线程所处的等待锁的状态:CANCELLED,SIGNAL,CONDITION,PROPAGATE,初始化时,该值为0 volatile int waitStatus; // 双向链表前指针和后指针 volatile Node prev; volatile Node next; // 表示当前Node所代表的Thread volatile Thread thread; // 如果此属性为EXCLUSIVE,则为独占模式;为SHARED,则为共享模式 Node nextWaiter; // 判断是否是共享模式 final boolean isShared() { return nextWaiter == SHARED; } // 获取链表的头个节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // 构造函数,一般用在创建head头结点时使用 Node() { } // 构造函数,在addWaiter方法时使用 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 构造函数,在Condition中使用Node(Threadthread,intwaitStatus){this.waitStatus=waitStatus;this.thread=thread;}} |

指北君已经在源码上做了详细的注释,但这几个关键的属性上还是提出来单独看看。

下面四个属性是和节点相关的:

  1. prev:双向链表的前驱节点
  2. next:双向链表的后继节点
  3. thread:节点所代表的线程
  4. waitStatus:该节点线程所处的状态,即等待锁的状态

下面四个属性是waitStatus的具体状态:

  1. CANCELLED:此节点的线程被取消了
  2. SIGNAL:此节点的后继节点线程被挂起,需要被唤醒
  3. CONDITION:此节点的线程在等待信号,也表明当前节点不在同步队列中,而在条件队列中
  4. PROPAGATE:此节点下一个acquireShared应该无条件传播

关于waitStatus有几个点需要注意下:

  1. waitStatus除了上面四个状态,还有一个隐式的状态为0,即在Node初始化的时候
  2. 在独占锁模式下,只会有到状态CANCELLED和SIGNAL。需要特别注意的是,SIGNAL它代表的不是自己线程的状态,而是它后继节点的状态,当一个节点waitStatus为SIGNAL时,意味着此节点的后继节点被挂起,当此节点释放锁或者被取消拿锁,应该要唤醒后继节点
  3. 在共享锁模式下,只会用到状态CANCELLED和PROPAGATE

第三板斧:CAS {#第三板斧cas}

CAS又叫比较交换操作,它是Unsafe类中compareAndSwapXXX方法,我通过下面的例子做个简单的介绍:

|-----------|----------------------------------------------------------------| | 1 | unsafe.compareAndSwapObject(this, tailOffset, expect, update); |

CAS执行时,会拿地址tailOffset上的值与expect做比较,如果相同,则会将地址上的值更新为update,并返回true,否则直接返回false。

了解了CAS的基本例子后,我们看下AQS中CAS相关的代码:

|------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | private static final Unsafe unsafe = Unsafe.getUnsafe(); // state、head、tail,waitStatus、next的偏移量 private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; // 静态代码块,初始化五个偏移量 static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } // 下面5个方法都是CAS操作了 // cas操作 state protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // cas操作 head private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } // cas操作 tail private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } // cas操作 waitStatus private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } // cas操作 nextOffsetprivatestaticfinalbooleancompareAndSetNext(Nodenode,Nodeexpect,Nodeupdate){returnunsafe.compareAndSwapObject(node,nextOffset,expect,update);} |

我们说第二板斧的时候说过,其五个属性state、head、tail,waitStatus、next都是被volatile修饰的,所以CAS对其操作能保证其线程安全,因此也可以猜到这些属性肯定是多线程争着修改的目标。静态块里则是对这五个属性偏移量进行初始化。



总结 {#总结}

AQS的三板斧就介绍完啦,我们再来简单回顾下:

AQS(AbstractQueuedSynchronizer)是java.util .concurrent工具包下的抽象类,它通过实现MESA管程来解决并发领域中的同步与互斥问题。AQS实现中最重要的三点就是状态、CAS和队列,我们也称之为AQS的三板斧。AQS的一切操作都是依据状态state来的,它是被volatile修饰的全局变量,因此我们通过CAS操作使其线程安全。队列是维护阻塞等待线程的容器,所有未获得锁或被要求等待的线程都会被包装成Node放入队列中。

AQS第一篇就到这里了,这篇是AQS的基础,而后面指北君会通过另外两篇文章具体介绍AQS是如何解决并发领域中的互斥和同步问题的,敬请期待~

赞(1)
未经允许不得转载:工具盒子 » 听说你想学Java并发编程?先把这个学了(1) --20210602