简述 Java AQS 的原理以及使用场景
# 简述 Java AQS 的原理以及使用场景
# AbstractQueuedSynchronizer
是什么
AbstractQueuedSynchronizer(AQS)
提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)。
该类被设计为大多数类型的同步器的依据,这些同步器依赖于单个原子int值来表示状态。
AQS的子类通过继承并实现它的抽象方法来实现管理同步状态,子类可以通过 AQS 提供的是三个方法来 修改状态值
getState()
setState(int)
compareAndSetState(int, int)
因为这三个方法能保证状态的修改是安全的。
AQS采用模板方法,内部实现了获取锁失败后加入等待队列的机制,大大降低了四线一个自定义同步组件的门槛。
CountDownLatch、Semaphore、ReentrantLock等等常见的工具类都是由AQS来实现的。所以不管是面试也好,还是自己研究底层实现也好,AQS类都是必须要重点关注的。
# AbstractQueuedSynchronizer
是如何实现的
同步器的开始提到了其实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:
Node {
//表示节点的状态。
int waitStatus;
//前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
Node prev;
//后继节点。
Node next;
//存储condition队列中的后继节点。
Node nextWaiter;
//入队列时的当前线程。
Thread thread;
}
2
3
4
5
6
7
8
9
10
11
12
waitStatus取值有5个枚举:
- CANCELLED,值为1,表示当前的线程被取消;
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
- CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
- PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
- 值为0,表示当前节点在sync队列中,等待着获取锁。
AQS(同步器) 和 其持有的FIFO队列的关系如上图,AQS 持有队列的头结点和尾结点,竞争锁失败的线程会被放到队列的后面,排队获取锁。
# 如何用 AQS 实现一个排他锁
排他锁的实现,顾名思义,一次只能一个线程获取到锁。 伪代码的获取:
while(获取锁) {
if (获取到) {
退出while循环
} else {
if(当前线程没有入队列) {
那么入队列
}
阻塞当前线程
}
}
2
3
4
5
6
7
8
9
10
伪代码的释放:
if (释放成功) {
删除头结点
激活原头结点的后继节点
}
2
3
4
通过AQS 实现:
class Mutex implements Lock, java.io.Serializable {
// 内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() { return new ConditionObject(); }
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
可以看到Mutex将Lock接口均代理给了同步器的实现。
# AQS原理 - 排它锁-加锁
AQS 中的 public final void acquire(int arg)
以非阻塞的方式获取排它锁,实现 synchronized
语义,可以说是 AQS 中最重要的方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
这段代码逻辑做了三件事情:
- 尝试获取锁 ;
tryAcquire(arg)
- 加入sync队列。如果获取不到,将当前线程构造成节点Node并加入sync队列;
addWaiter(Node.EXCLUSIVE)
- 排队。再次尝试获取,如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态 ;
acquireQueued(Node,arg)
下面我们逐步来看下这三个过程
# 尝试获取锁 tryAcquire(arg)
整个方法需要子类实现,比如公平锁、非公平锁就是在这个方法中做的逻辑。 我们来看下公平锁的实现:
protected final boolean tryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 获取state状态,0表示未锁定,大于1表示重入
int c = getState();
// 0表示没有线程获取锁
if (c == 0) {
// 没有比当前线程等待更久的线程了,通过CAS的方式修改state
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 成功之后,设置当前拥有独占访问权的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 独占访问权的线程就是当前线程,重入,此处就是【可重入性】的实现
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 直接修改state
setState(nextc);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 加入sync队列 addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果队列不为空 快速尝试在尾部添加 那么node节点的前继节点是tail
if (pred != null) {
node.prev = pred;
//CAS操作 将node设置为尾结点 (多线程竞争的情况下,这里可能会失败)
if (compareAndSetTail(pred, node)) {
pred.next = node;
//快速尝试在尾部添加成功 返回
return node;
}
}
//队列为空 或者 快速尝试在尾部添加失败 程序会走到这里
enq(node);
return node;
}
//在循环中 CAS入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 如果队列为空 先初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {// 如果队列不为空 CAS入队列
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
上述逻辑主要包括:
- 如果队列不为空 快速尝试在尾部添加,一次CAS
- 队列为空 或者 快速尝试在尾部添加失败 程序进入循环判断3、4步
- 如果队列为空 先初始化
- 如果队列不为空 CAS入队列
# 排队 acquireQueued(Node,arg)
线程挂起之前 循环尝试获取锁;
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果本线程已经是头结点了 tryAcquire(arg)下 尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//获取到了 退出循环
return interrupted;
}
//没获取到 判断要不要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
上述逻辑主要包括:
- 如果本线程已经是头结点了 tryAcquire(arg)下 尝试获取锁,获取到了 退出循环,没获取到 判断要不要挂起
- 挂起前,循环执行第一步
如何判断要不要挂起呢,我们再来看下shouldParkAfterFailedAcquire(p, node)
:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取pred前置节点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
/* 前置节点状态是signal,那当前节点可以安全阻塞,因为前置节点承诺执行完之后会通知唤醒当前
* 节点
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 前置节点如果已经被取消了,则一直往前遍历直到前置节点不是取消状态,与此同时会修改链表关系
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 前置节点是0或者propagate状态,这里通过CAS把前置节点状态改成signal
// 这里不返回true让当前节点阻塞,而是返回false,目的是让调用者再check一下当前线程是否能
// 成功获取锁,失败的话再阻塞,这里说实话我也不是特别理解这么做的原因
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
这段代码用来检测是否挂起当先线程,分三种情况,
- 第一种情况是前驱节点的 ws = singal,表示前驱节点释放同步状态的时候会唤醒当前节点,可以安全挂起当前线程;
- 第二种情况是前驱节点被取消,那就从前驱节点继续往前遍历,直到往前找到第一个ws <= 0 的节点;
- 第三种是前驱节点的 ws = 0,表示前驱节点获取到同步状态,当前线程不能挂起,应该尝试去获取同步状态,前驱节点的同步状态的释放正好可以让当前节点进行获取,所以使用CAS把前驱节点的ws设为singal,另外如果 ws =PROPAGATE,说明正以共享模式进行传播,也需要使用CAS把ws设为singal.
如何挂起线程呢?我们来看下parkAndCheckInterrupt()
:
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程,监事是当前sync对象
LockSupport.park(this);
// 阻塞返回后,返回当前线程是否被中断
return Thread.interrupted();
}
2
3
4
5
6
# AQS原理 - 获取排它锁流程图
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
首先执行tryAcquire方法用于尝试获取锁,成功后就直接返回,失败后就通过addWaiter方法把当前线程封装成一个Node,加到队列的尾部,再通过acquireQueued方法尝试获取同步锁,成功获取锁的线程的Node节点会被移出队列。
如果以上条件都满足,会执行selfInterrupt方法中断当前线程。
最后 我们再用一张流程图来回顾下这整个流程
# AQS原理 - 排它锁-解锁
我们已经知道了sync是AQS的实现,所以直接查看AQS中的release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
// 头节点已经释放,唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
tryRelease
同样是模板方法:
protected final boolean tryRelease(int releases) {
// 计算剩余的重入次数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全的释放了锁(针对可重入性)
boolean free = false;
if (c == 0) {
// 表示完全释放了锁
free = true;
// 设置独占锁的持有者为null
setExclusiveOwnerThread(null);
}
// 设置AQS的state
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# AQS原理 - 共享锁-加锁
protected int tryAcquireShared(int acquires) {
for (;;) {
// 自旋
if (hasQueuedPredecessors())
// 如果有线程排在自己的前面(公平锁排队),直接返回
return -1;
// 获取同步状态的值
int available = getState();
// 可用的(许可)减去申请的,等于剩余的
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
// 如果剩余的小于0,或者设置状态成功,就返回,如果设置失败,则进入下一次循环
// 如果剩余小于0,返回负数,表示失败
// 如果设置状态成功,表示申请许可成功,返回正数
return remaining;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# AQS原理 - 共享锁-解锁
releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2
3
4
5
6
7
tryReleaseShared()
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 自旋
// 获取同步状态的值
int current = getState();
// 可用的(许可)加上释放的,等于剩余的
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
// CAS的方式设置同步状态
return true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
doReleaseShared()
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
// 自旋
// 记录头节点
Node h = head;
if (h != null && h != tail) {
// 头节点不为null,且不等于尾结点,说明队列中还有节点
// 获取头节点等待状态
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 头节点等待状态是SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 如果修改节点等待状态失败,进入下一次循环
continue; // loop to recheck cases
// 修改成功后,唤醒后继节点,unparkSuccessor前文讲过
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41