AbstractQueuedSynchronizer抽象类是核心,需要重点掌握。它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。
AQS框架借助于两个类:Unsafe(提供CAS操作)和LockSupport(提供park/unpark操作)
带着BAT大厂的面试问题去理解
什么是AQS? 为什么它是核心?
AQS的核心思想是什么? 它是怎么实现的? 底层数据结构等
AQS有哪些核心的方法?
AQS定义什么样的资源获取方式? AQS定义了两种资源获取方式:
独占 (只有一个线程能访问执行,又根据是否按队列的顺序分为公平锁和非公平锁,如ReentrantLock)
共享 (多个线程可同时访问执行,如Semaphore、CountDownLatch、 CyclicBarrier )。
ReentrantReadWriteLock可以看成是组合式,允许多个线程同时对某一资源进行读。
AQS底层使用了什么样的设计模式? 模板模式
AQS的应用示例?
AbstractQueuedSynchronizer简介 AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock、Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。 当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
AQS核心思想 AQS核心思想是:如果被请求的共享资源空闲 ,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态;如果被请求的共享资源被占用 ,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁 实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。 状态信息通过procted类型的getState,setState,compareAndSetState进行操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private volatile int state;protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
AQS对资源的共享方式
独占 Exclusive 只有一个线程能执行,如ReentrantLock,又可分为公平锁和非公平锁:
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
共享 Share 多个线程可同时执行,如Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
AQS的设计模式 - 模板模式 同步器的设计是基于模板方法模式的。
使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
自定义同步器时需要重写下面几个AQS提供的模板方法:
1 2 3 4 5 isHeldExclusively() tryAcquire(int ) tryRelease(int ) tryAcquireShared(int ) tryReleaseShared(int )
AbstractQueuedSynchronizer数据结构
AQS有两个队列,双向的同步队列 sync queue 和单向的条件队列 condition queue。
AbstractQueuedSynchronizer类底层的数据结构是使用CLH(Craig,Landin,and Hagersten)队列
的变体。
AQS是将每个请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。 其中,Sync queue,即同步队列,是虚拟的双向链表,包括head结点和tail结点,head结点主要用作后续的调度。 而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。
CLH队列 同步队列 - Sync queue 为什么同步队列是双向的? 方便以O(1)的时间复杂度获取到prev前驱结点。 按照同步队列的设计,头结点head是空的,表示已经获取到锁的线程,只有头结点的后继结点next有必要尝试获取锁 ,否则会造成羊群效应,其它后面的大量结点在阻塞之前尝试去竞争锁会带来比较大的性能开销。 所以acquireQueued
时,会判断前驱结点是不是头结点,如果不是头结点就没有必要去tryAcquire
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
条件队列 - Condition queue
AQS中的条件队列,是一个单向链表,通过Node节点的nextWaiter来连接 。
Condition#await 调用Condition#await
方法时必须持有锁,所以当前线程位于同步队列(sync queue)的头节点。 此时会将当前线程移动到条件队列(condition queue)的尾节点——addConditionWaiter()
,释放线程当前持有的锁,阻塞当前线程。
Condition#signal 调用Condition#signal
方法会将条件队列(condition queue)的首节点移动到同步队列(sync queue)尾部,然后唤醒因调用Condition#await
方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了)。 所以调用Condition#signal
方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await
方法而阻塞的线程。
AbstractQueuedSynchronizer源码分析 类的继承关系 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io .Serializable {} public abstract class AbstractOwnableSynchronizer implements java .io .Serializable { private static final long serialVersionUID = 3737899427754241961L ; protected AbstractOwnableSynchronizer () { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread () { return exclusiveOwnerThread; } }
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。
AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用
内部类 - Node类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
每个被阻塞的线程都会被封装成一个Node结点,放入队列。 每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下:
CANCELLED
,值为1,表示当前的线程被取消。
SIGNAL
,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
CONDITION
,值为-2,表示当前节点在等待condition,也就是在condition queue中。
PROPAGATE
,值为-3,表示当前场景下后续的acquireShared能够得以执行。
值为0,表示当前节点在sync queue中,等待着获取锁。
内部类 - ConditionObject类 ConditionObject类实现了Condition接口,Condition接口定义了条件操作规范,具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public interface Condition { void await () throws InterruptedException ; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException ; boolean await (long time, TimeUnit unit) throws InterruptedException ; boolean awaitUntil (Date deadline) throws InterruptedException ; void signal () ; void signalAll () ; }
类的属性 属性中包含了头节点head、尾结点tail、状态state、自旋时间spinForTimeoutThreshold,还有AbstractQueuedSynchronizer抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io .Serializable { private static final long serialVersionUID = 7373984972572414691L ; private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final long spinForTimeoutThreshold = 1000L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state" )); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head" )); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail" )); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus" )); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next" )); } catch (Exception ex) { throw new Error(ex); } } }
类的构造方法 此类构造方法为从抽象构造方法,供子类调用。
1 protected AbstractQueuedSynchronizer () { }
核心方法 - acquire方法 该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。
源码如下:
1 2 3 4 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下:
首先调用tryAcquire 方法。 在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。
若tryAcquire失败,则调用addWaiter 方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入同步队列Sync queue 。
调用acquireQueued 方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源 ,若成功,则返回true,否则,返回false。
addWaiter(Node mode) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter方法使用快速添加的方式往sync queue尾部添加结点,如果sync queue队列还没有初始化,则会使用enq插入队列中。 enq方法会使用无限循环来确保节点的成功插入。
acquireQueued(final Node node, int arg) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
核心方法 - release方法 以独占模式释放对象。
1 2 3 4 5 6 7 8 9 10 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
总结 对于AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。
每一个结点都是由前一个结点唤醒
当结点发现前驱结点是head并且尝试获取成功,则会轮到该线程运行。
condition queue中的结点向sync queue中转移是通过signal操作完成的。
当结点的状态为SIGNAL时,表示后面的结点需要运行。