Java并发编程——AQS原理

AQS 全称是 Abstract Queued Synchronizer,翻译为抽象排队同步器。它是一套实现多线程同步功能的框架,由大名鼎鼎的 Doug Lea 操刀设计并开发实现的。AQS 在源码中被广泛使用,尤其是在 JUC包中,比如 ReentrantLockSemaphoreCountDownLatchThreadPoolExecutor

理解 AQS 对我们理解 JUC 中其他组件至关重要,并且在实际开发中也可以通过自定义 AQS 来实现各种需求场景。

框架概览

首先,我们来纵览一下整个AQS框架:

AQS框架概览(图源美团技术团队)

其中:

  • 上图中有颜色的为Method,无颜色的为Attribution。
  • 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

下面我们会从整体到细节,从流程到方法逐一剖析AQS框架。

CLH同步队列

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。

这个机制就是CLH同步队列,想要搞懂AQS原理我们需要了解一下这个数据结构。

CLH同步队列是一个FIFO双向队列,其名字源于Craig、Landin 和 Hagersten三位大佬的名字首字母,AQS依赖它来完成同步状态的管理。

当前线程如果获取同步状态失败时,AQS则会将当前线程和等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

这里说的节点,在源码中为Node类,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static final class Node {
/** 共享 */
static final Node SHARED = new Node();

/** 独占 */
static final Node EXCLUSIVE = null;

/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
*/
static final int CANCELLED = 1;

/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;

/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int CONDITION = -2;

/**
* 表示下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;

/** 等待状态 */
volatile int waitStatus;

/** 前驱节点 */
volatile Node prev;

/** 后继节点 */
volatile Node next;

/** 获取同步状态的线程 */
volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() {
}

Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

几个重要的方法和属性:

方法和属性值 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出npe
nextWaiter 指向下一个处于CONDITION状态的节点(由于本篇文章不讲述Condition Queue队列,这个指针不多介绍)
next 后继指针

其中SHAREDEXCLUSIVE表示锁的两种模式。

waitStatus表示节点在队列中的等待状态,它有几个枚举值:

  • 0:当一个Node被初始化的时候的默认值,或者当前Node对应线程刚刚被唤醒
  • 1(CANCELLED):由于超时或者中断,线程获取的锁的请求已经取消了,节点一旦被取消了就不会再改变状态,取消节点的线程不会再阻塞。
  • -1(SIGNAL):表示此节点的后继节点已经处于阻塞状态,当此节点在释放或者取消时,需要唤醒后继节点。
  • -2(CONDITION):表示节点在条件等待队列中,标记为CONDITION的节点会被移动到一个特殊的条件等待队列,直到条件时才会被重新移动到同步等待队列 。
  • -3(PROPAGATE):当前线程处在SHARED情况下,该字段才会使用,表示应将releaseShared传播到其他节点。

入队

看过上面Node结点我们知道CLH是基于链表的实现,那么入队其实只需要尾节点即可,不过过程中由于涉及到多线程所以需要配合CAS操作使用。

具体的实现在addWaiter方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速设置
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 自旋尝试
enq(node);
return node;
}

addWaiter方法首先尝试使用CAS将AQS中保存的tail,即队尾替换为当前需要入队的Node,如果成功则Node成功入队,如果失败说明其他线程也在调用addWaiter进行入队,则自旋不断尝试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// tail为null,此时队列未初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 尝试更新tail
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

对于tail为null的情况,说明此时CLH队列还未初始化,那么将会创建一个head来初始化队列。这里有个关键的地方在于,队列的Head是通过Node的无参构造创建的,这说明CLH中队头CLH是一个亚节点(Dummy Head),不具备其他意义。

对于tail不为null的情况,使用CAS替换tail引用来更新尾节点。

过程图如下:

入队

虚线表示此次入队所做操作,包括:

  1. 将current的prev指针指向Node2,即tail
  2. 将Node2的next指针指向current
  3. 将AQS中保存的tail指针指向current

出队

CLH同步队列遵循FIFO,首节点对应的线程释放同步状态后,将会唤醒它的后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。

首先来看如何唤醒后继节点,在方法unparkSuccessor中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 注意这里node为队头head
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// head的next指针为真正的队头
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从队尾开始寻找队列中head的合适的后继节点
for (Node t = tail; t != null && t != node; t = t.prev)
// waitStatus大于0即取消状态(1)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点对应线程
LockSupport.unpark(s.thread);
}

从代码中可知所谓后继节点并不一定就是head的next指针指向的节点,如果next指针指向的节点的waitStatus大于0,则会跳过。根据前面的小节,我们知道大于0其实就是CANCELLED状态,此状态表示该节点对应线程已经取消了此次锁的申请,对应的我们可以直接跳过此节点使其出队。

后继节点被唤醒后,需要更改指向将自己设置为首节点,对应代码在acquireQueued方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点且获取同步状态成功
if (p == head && tryAcquire(arg)) {
// 更新Node为头节点,即出队
// 同时会断开prev指针
setHead(node);
// 断开原头节点的nex指针
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

过程图如下:

出队

同步状态的获取与释放

在了解数据结构后,接下来了解一下AQS的同步状态——State。AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。

AQS中提供了几个方法给子类来访问这个字段:

方法名 描述
protected final int getState() 获取State的值
protected final void setState(int newState) 设置State的值
protected final boolean compareAndSetState(int expect, int update) 使用CAS方式更新State

仅仅只需要上面三个方法,我们就可以实现共享资源的两套模式:独占模式(EXCLUSIVE)共享模式(SHARED)

如果熟悉操作系统的话,可以将state类比为Linux中的信号量,信号量的值表示当前空闲的共享资源数量,而state的值则表示当前持有同步状态的线程数量。

那么对于独占模式,即互斥模式来说,state只会有两个可能的值:0和1。

其中0表示当前没有线程持有同步状态,即没有线程持有锁,这时如果线程A尝试获取锁,只需要使用CAS替换state的状态为1,如果成功则表示它拿到了这个互斥锁,失败表示存在锁竞争。

1表示有线程持有同步状态,即当前有线程持有这个互斥锁,这时如果未持有锁的线程尝试获取锁发现state为1,将直接进行入队。

下面我们就来看看独占模式和共享模式分别是如何实现的。

独占模式

独占模式表示同一时刻有且仅有一个线程能够持有同步状态,也就是我们常说的互斥模式。

独占式同步状态获取

AQS提供了acquire(int arg)模版方法为独占式获取同步状态。

该方法对中断不敏感,也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。代码如下:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

此方法只有三行(准确的来说只有两行),但涉及到了三个方法,定义如下:

  • tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。此方法涉及到具体实现,因此由子类来完成,注意实现时必须要保证获取同步状态操作是线程安全的。
  • addWaiter:此方法上一节介绍过,如果tryAcquire返回false,表示获取同步状态失败,则调用该方法将当前线程加入到CLH同步队列尾部。
  • acquireQueued:此方法上一节介绍过,当前线程在获取锁失败后会进行阻塞,不过不是立马阻塞,而是先设置前一个节点为SIGNAL,再进行阻塞,知道前一个节点将其唤醒。
  • selfInterrupt:产生一个中断。

其中逻辑主要在addWaiteracquireQueued中,其中addWaiter在上一节已经详细介绍了,acquireQueued只介绍了替换头部的这部分,这里我们再来看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果前驱节点不为head或者获取锁时失败了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

在调用addWaiter之后执行此方法,假设此时Node的前驱节点不是Head,或者Node获取同步状态失败了,那么会调用shouldParkAfterFailedAcquire方法和parkAndCheckInterrupt方法。

shouldParkAfterFailedAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱结点已经设置了SIGNAL,当前节点可以直接阻塞等待被唤醒
return true;
if (ws > 0) {
// 如果前驱节点已经处于CANCELLED状态时
// 跳过这些无效的前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱节点的waitStatus为0或者PROPAGATE
// 这种情况当前节点线程不能直接阻塞,因为前驱结点还未设置SIGNAL
// CAS操作设置前驱结点waitStatus为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

此方法主要做了以下工作:

  • 如果前驱节点的状态为SIGNAL,说明闹钟标志已设好,返回true表示当前节点可以阻塞了。

  • 如果前驱节点的状态为CANCELLED,说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把node和这个有效节点的前驱后继连接好。

  • 如果是其他情况,那么CAS尝试设置前驱节点为SIGNAL

可以看到此方法主要是要保证前驱节点设置好了SIGNAL状态,只要前驱节点已经为SIGNAL状态就会返回true,表示当前节点可以被阻塞了。

shouldParkAfterFailedAcquire在死循环中调用,因此对于前驱不为head的情况,可以保证最终一定能设置前驱为SIGNAL成功。

shouldParkAfterFailedAcquire返回了true时,调用parkAndCheckInterrupt阻塞线程。

parkAndCheckInterrupt

此方法将会阻塞当前线程,并且在唤醒后检测中断状态:

1
2
3
4
5
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 注意返回值是是否响应了中断而唤醒
return Thread.interrupted();
}

到这里整个acquire方法执行流程就分析完了,如下图所示:

acquire执行流程

注意:对于独占式同步状态的获取,AQS支持响应中断与超时检测,对应acquireInterruptiblydoAcquireInterruptiblytryAcquireNanosdoAcquireNanos方法,这些方法与上文介绍的流程大同小异,因此这里不再赘述。

独占式同步状态释放

当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态。AQS提供了了release(int arg)方法释放同步状态:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

同样的,类似于tryAcquirerelease方法中首先通过tryRelease方法来判断是否释放成功,tryRelease也是一个抽象方法需要子类实现。

释放成功后,会调用unparkSuccessor方法唤醒后继节点。unparkSuccessor方法在出队这一小节已经详细介绍过了,不再赘述。

共享模式

共享模式表示允许多个线程同时持有同步状态,我们常说的读写锁中的读锁就是这种模式。

共享式同步状态获取

AQS提供acquireShared方法共享式获取同步状态:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

从上面程序可以看出,方法首先是调用tryAcquireShared方法尝试获取同步状态,如果获取失败则调用doAcquireShared自旋方式获取同步状态,共享式获取同步状态的标志是返回 >= 0 的值表示获取成功。

tryAcquireShared同样是一个未实现的方法,需要由子类实现,不过对于这个方法的返回值需要重点注意下:

  • 返回值小于0:表示获取共享同步状态失败
  • 返回值等于0:表示获取共享同步状态成功,但没有剩余同步状态可以获取
  • 返回值大于0:表示获取共享同步状态成功,并且还有”返回值“个同步状态资源可以获取,需要传播到队列的后继节点中

如何利用这个返回值呢,来看看doAcquireShared

doAcquireShared
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final Node SHARED = new Node();

private void doAcquireShared(int arg) {
// 向队列中添加一个节点,其nextWaiter指向SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 前驱节点
final Node p = node.predecessor();
if (p == head) {
// 如果p是头节点,尝试获取共享同步状态
int r = tryAcquireShared(arg);
// 获取共享同步状态成功
if (r >= 0) {
// 设置头节点并且传播共享锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

此方法与上节独占式同步获取中acquireQueued十分类似,区别在于这里使用tryAcquireShared方法来获取共享同步状态,并且调用了setHeadAndPropagate方法来设置头节点。

setHeadAndPropagate由名字可知,其除了设置头节点外,还需要传播共享锁状态,来看看源码。

setHeadAndPropagate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void setHeadAndPropagate(Node node, int propagate) {
// 保存旧头节点
Node h = head;
// 设置新头节点,执行完后head为当前node
setHead(node);

// 满足条件的情况下释放共享锁并唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 释放共享同步状态
doReleaseShared();
}
}

先来看两个入参:node代表的线程一定是当前线程,propagate则表示剩余可获取的共享同步状态

方法中首先保存了旧头节点,同时设置了node为新的头节点。

第一个if条件判断逻辑较为复杂,我们来重点梳理一下:

  • propagate > 0:传入的propagate可能是0也可能是正整数,当大于0时表示需要传播共享锁,因此可以直接短路。
  • h == null || h.waitStatus < 0:由于方法一开始就将head赋值给了h,因此这里h == null是不可能成立的。而h的waitStatus可能有两种情况:
    • h.waitStatus为0:这个情况表示某个线程释放了锁,或者在获取共享锁时传播给后继节点共享锁状态,将h的h.waitStatus更新为了0,具体实现在doReleaseShared方法中,下一节会介绍。
    • h.waitStatus为PROPAGATE:同样是doReleaseShared中唤醒后继节点时将head的waitStatus更新为0,但又有其他线程执行了doReleaseShared,将waitStatus更新为了PROPAGATE。
  • (h = head) == null || h.waitStatus < 0):判断新head(下称:node)是否为null或者h.waitStatus小于0。一般node不会为null,而node的waitStatus可能有多种情况:
    • node.waitStatus为0:后继节点刚好入队列,还没有走到shouldParkAfterFailedAcquire中的修改前驱节点waitStatus的代码。
    • node.waitStatus为PROPAGATE:上一个共享节点被唤醒后,成为新head,后继节点刚入队列,又有其他线程释放锁调用doReleaseSharednode.waitStatus从0改为-3。
    • node.waitStatus为SIGNAL:已经调用了shouldParkAfterFailedAcquirenode.waitStatus从0或者PROPAGATE改为SIGNAL,可能阻塞,可能未阻塞。

可以看到由于共享锁的特殊性,相比于独占锁,这里更多的是要考虑其他线程可能会修改新/旧头节点的情况,逻辑较为复杂,了解即可。

上面多个条件只需满足一个即可if代码块中,当后继节点为共享状态时,调用doReleaseShared将共享状态传播给后继节点。

共享式同步状态释放

共享式同步状态释放逻辑在doReleaseShared方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

为了保证线程安全,共享锁的释放操作同样需要自旋配合CAS来完成。

释放时同样需要判断当前head的waitStatus来确保成功:

  • 当waitStatus为SIGNAL时:尝试更新head的waitStatus为0,成功则调用unparkSuccessor唤醒后继节点,并且unparkSuccessor中会更新waitStatus为SIGNAL,失败表示有其他线程更改了waitStatus,重新循环
  • 当waitStatus为0时:此状态说明有线程执行了doReleaseShared并将waitStatus成功更新为0,但还未执行unparkSuccessor,或在unparkSuccessor更新waiStatus失败,此时当前线程刚好执行到doReleaseShared就会发现waitStatus为0。这种情况尝试更新waitStatus为PROPAGATE,失败将会继续循环。

当上面两个操作中有一个执行成功时,将会继续执行判断:

1
2
if (h == head)                   // loop if head changed
break;

当前head如果发生了改变,继续循环,没有发生改变则说明此次释放操作成功。

总结

本篇文章从数据结构到独占和共享锁的获取与释放过程简单分析了AQS框架的一些原理,当然这些只是冰山一角,实际上整个框架的设计非常巧妙,建议读者自行阅读一遍。

在了解了AQS的原理之后,我们就可以理解AQS为什么叫抽象排队同步器了,实际上这个名字起的非常巧妙,仅仅使用三个词就完美的诠释了它的特点:

  • 抽象(Abstract):它是一套抽象实现的框架,需要子类继承来实现整个锁机制
  • 排队(Queued):AQS中的核心数据结构为CLH队列,获取同步状态失败时,线程将会以队列中的节点的方式进行排队
  • 同步器(Synchronizer):整个框架用于实现线程间同步机制

下一篇文章将会从ReentrantLock的实现来看AQS的应用。

参考

  1. 《Java并发编程实战》
  2. 《Java并发编程的艺术》
  3. 深入理解 AQS 和 CAS 原理
  4. J.U.C之AQS:CLH同步队列
  5. 从ReentrantLock的实现看AQS的原理及应用
  6. LockSupport的park和unpark的基本使用,以及对线程中断的响应性
  7. AQS深入理解 shouldParkAfterFailedAcquire源码分析 状态为0或PROPAGATE的情况分析