JavaDriver JavaDriver
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

YoungAnn

西二旗Java老司机一枚 致力于社会主义添砖Java
首页
  • 基础
  • 并发
  • JVM
  • 设计模式
  • 计算机网络
  • 操作系统
  • 数据结构
  • 算法
  • MYSQL
  • REDIS
  • Netty
  • Kafka
系统设计
非技术
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 基础

  • 并发

    • 线程池是如何实现的?
    • 简述 CAS 原理,什么是 ABA 问题,怎么解决?
    • 简述 Synchronized,Volatile,可重入锁的不同使用场景及优缺点
    • Synchronized 与 Lock 相比优缺点分别是什么?
    • 重入锁是如何实现的?
    • volatile 关键字解决了什么问题,它的实现原理是什么?
    • 简述 Java 锁升级的机制
    • 简述 Java AQS 的原理以及使用场景
      • 简述 Java AQS 的原理以及使用场景
      • AbstractQueuedSynchronizer 是什么
      • AbstractQueuedSynchronizer 是如何实现的
      • 如何用 AQS 实现一个排他锁
      • AQS原理 - 排它锁-加锁
        • 尝试获取锁 tryAcquire(arg)
        • 加入sync队列 addWaiter(Node.EXCLUSIVE)
        • 排队 acquireQueued(Node,arg)
        • AQS原理 - 获取排它锁流程图
      • AQS原理 - 排它锁-解锁
      • AQS原理 - 共享锁-加锁
      • AQS原理 - 共享锁-解锁
    • 什么是公平锁?什么是非公平锁?
    • Java 的线程有哪些状态,转换关系是怎么样的?
    • Java 是如何实现线程安全的,哪些数据结构是线程安全的?
    • 手写死锁
    • 为什么我们不能直接调用 run() 方法?
    • Java 线程有哪些常用方法?
    • 手写生产者消费者模型
    • ThreadLocal 实现原理是什么?为什么要使用弱引用?
  • JVM

  • 设计模式

  • Java相关
  • 并发
YoungAnn
2022-04-04
目录

简述 Java AQS 的原理以及使用场景

# 简述 Java AQS 的原理以及使用场景

# AbstractQueuedSynchronizer 是什么

AbstractQueuedSynchronizer(AQS) 提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)。

该类被设计为大多数类型的同步器的依据,这些同步器依赖于单个原子int值来表示状态。

AQS的子类通过继承并实现它的抽象方法来实现管理同步状态,子类可以通过 AQS 提供的是三个方法来 修改状态值

  • getState()
  • setState(int)
  • compareAndSetState(int, int)

因为这三个方法能保证状态的修改是安全的。

AQS采用模板方法,内部实现了获取锁失败后加入等待队列的机制,大大降低了四线一个自定义同步组件的门槛。

CountDownLatch、Semaphore、ReentrantLock等等常见的工具类都是由AQS来实现的。所以不管是面试也好,还是自己研究底层实现也好,AQS类都是必须要重点关注的。

# AbstractQueuedSynchronizer 是如何实现的

同步器的开始提到了其实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:

Node {
//表示节点的状态。
    int waitStatus;
    //前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
    Node prev;
    //后继节点。
    Node next;
    //存储condition队列中的后继节点。
    Node nextWaiter;
    //入队列时的当前线程。
    Thread thread;
}
1
2
3
4
5
6
7
8
9
10
11
12

waitStatus取值有5个枚举:

  • CANCELLED,值为1,表示当前的线程被取消;
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
  • 值为0,表示当前节点在sync队列中,等待着获取锁。

AQS(同步器) 和 其持有的FIFO队列的关系如上图,AQS 持有队列的头结点和尾结点,竞争锁失败的线程会被放到队列的后面,排队获取锁。

# 如何用 AQS 实现一个排他锁

排他锁的实现,顾名思义,一次只能一个线程获取到锁。 伪代码的获取:

while(获取锁) {
  if (获取到) {
    退出while循环
  } else {
    if(当前线程没有入队列) {
      那么入队列
    }
    阻塞当前线程
  }
}
1
2
3
4
5
6
7
8
9
10

伪代码的释放:

if (释放成功) {
  删除头结点
  激活原头结点的后继节点
}
1
2
3
4

通过AQS 实现:

class Mutex implements Lock, java.io.Serializable {
   // 内部类,自定义同步器
   private static class Sync extends AbstractQueuedSynchronizer {
     // 是否处于占用状态
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }
     // 当状态为0的时候获取锁
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }
     // 释放锁,将状态设置为0
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }
     // 返回一个Condition,每个condition都包含了一个condition队列
     Condition newCondition() { return new ConditionObject(); }
   }
   // 仅需要将操作代理到Sync上即可
   private final Sync sync = new Sync();
   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

可以看到Mutex将Lock接口均代理给了同步器的实现。

# AQS原理 - 排它锁-加锁

AQS 中的 public final void acquire(int arg)以非阻塞的方式获取排它锁,实现 synchronized 语义,可以说是 AQS 中最重要的方法。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
1
2
3
4
5

这段代码逻辑做了三件事情:

  1. 尝试获取锁 ;tryAcquire(arg)
  2. 加入sync队列。如果获取不到,将当前线程构造成节点Node并加入sync队列;addWaiter(Node.EXCLUSIVE)
  3. 排队。再次尝试获取,如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态 ;acquireQueued(Node,arg)

下面我们逐步来看下这三个过程

# 尝试获取锁 tryAcquire(arg)

整个方法需要子类实现,比如公平锁、非公平锁就是在这个方法中做的逻辑。 我们来看下公平锁的实现:

protected final boolean tryAcquire(int acquires) {
  // 当前线程
    final Thread current = Thread.currentThread();
    // 获取state状态,0表示未锁定,大于1表示重入
    int c = getState();
    // 0表示没有线程获取锁
    if (c == 0) {
    // 没有比当前线程等待更久的线程了,通过CAS的方式修改state
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 成功之后,设置当前拥有独占访问权的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 独占访问权的线程就是当前线程,重入,此处就是【可重入性】的实现
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 直接修改state
        setState(nextc);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 加入sync队列 addWaiter(Node.EXCLUSIVE)

private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  Node pred = tail;
    // 如果队列不为空 快速尝试在尾部添加 那么node节点的前继节点是tail
  if (pred != null) {
    node.prev = pred;
        //CAS操作 将node设置为尾结点 (多线程竞争的情况下,这里可能会失败)
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
            //快速尝试在尾部添加成功 返回
      return node;
    }
  }
    //队列为空 或者 快速尝试在尾部添加失败 程序会走到这里
  enq(node);
  return node;
}

//在循环中 CAS入队列
private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // 如果队列为空 先初始化
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {// 如果队列不为空 CAS入队列
      node.prev = t;
      if (compareAndSetTail(t, node)) {
      t.next = node;
      return t;
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

上述逻辑主要包括:

  1. 如果队列不为空 快速尝试在尾部添加,一次CAS
  2. 队列为空 或者 快速尝试在尾部添加失败 程序进入循环判断3、4步
  3. 如果队列为空 先初始化
  4. 如果队列不为空 CAS入队列

# 排队 acquireQueued(Node,arg)

线程挂起之前 循环尝试获取锁;

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
            //如果本线程已经是头结点了 tryAcquire(arg)下 尝试获取锁
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
                //获取到了 退出循环
        return interrupted;
      }
            //没获取到 判断要不要挂起
      if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
        interrupted = true;
                }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

上述逻辑主要包括:

  1. 如果本线程已经是头结点了 tryAcquire(arg)下 尝试获取锁,获取到了 退出循环,没获取到 判断要不要挂起
  2. 挂起前,循环执行第一步

如何判断要不要挂起呢,我们再来看下shouldParkAfterFailedAcquire(p, node):

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取pred前置节点的等待状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
/* 前置节点状态是signal,那当前节点可以安全阻塞,因为前置节点承诺执行完之后会通知唤醒当前
* 节点
*/
            return true;
        if (ws > 0) {
 
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
// 前置节点如果已经被取消了,则一直往前遍历直到前置节点不是取消状态,与此同时会修改链表关系
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
// 前置节点是0或者propagate状态,这里通过CAS把前置节点状态改成signal
// 这里不返回true让当前节点阻塞,而是返回false,目的是让调用者再check一下当前线程是否能
// 成功获取锁,失败的话再阻塞,这里说实话我也不是特别理解这么做的原因
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

这段代码用来检测是否挂起当先线程,分三种情况,

  1. 第一种情况是前驱节点的 ws = singal,表示前驱节点释放同步状态的时候会唤醒当前节点,可以安全挂起当前线程;
  2. 第二种情况是前驱节点被取消,那就从前驱节点继续往前遍历,直到往前找到第一个ws <= 0 的节点;
  3. 第三种是前驱节点的 ws = 0,表示前驱节点获取到同步状态,当前线程不能挂起,应该尝试去获取同步状态,前驱节点的同步状态的释放正好可以让当前节点进行获取,所以使用CAS把前驱节点的ws设为singal,另外如果 ws =PROPAGATE,说明正以共享模式进行传播,也需要使用CAS把ws设为singal.

如何挂起线程呢?我们来看下parkAndCheckInterrupt():

private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程,监事是当前sync对象
        LockSupport.park(this);
        // 阻塞返回后,返回当前线程是否被中断
        return Thread.interrupted();
}
1
2
3
4
5
6

# AQS原理 - 获取排它锁流程图

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
1
2
3
4
5

首先执行tryAcquire方法用于尝试获取锁,成功后就直接返回,失败后就通过addWaiter方法把当前线程封装成一个Node,加到队列的尾部,再通过acquireQueued方法尝试获取同步锁,成功获取锁的线程的Node节点会被移出队列。

如果以上条件都满足,会执行selfInterrupt方法中断当前线程。

最后 我们再用一张流程图来回顾下这整个流程

# AQS原理 - 排它锁-解锁

我们已经知道了sync是AQS的实现,所以直接查看AQS中的release方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
      // 尝试释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
          // 头节点已经释放,唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11

tryRelease 同样是模板方法:

protected final boolean tryRelease(int releases) {
  // 计算剩余的重入次数
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否完全的释放了锁(针对可重入性)
    boolean free = false;
    if (c == 0) {
      // 表示完全释放了锁
        free = true;
        // 设置独占锁的持有者为null
        setExclusiveOwnerThread(null);
    }
    // 设置AQS的state
    setState(c);
    return free;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# AQS原理 - 共享锁-加锁

protected int tryAcquireShared(int acquires) {
    for (;;) {
      // 自旋
        if (hasQueuedPredecessors())
          // 如果有线程排在自己的前面(公平锁排队),直接返回
            return -1;
        // 获取同步状态的值
        int available = getState();
        // 可用的(许可)减去申请的,等于剩余的
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            // 如果剩余的小于0,或者设置状态成功,就返回,如果设置失败,则进入下一次循环
            // 如果剩余小于0,返回负数,表示失败
            // 如果设置状态成功,表示申请许可成功,返回正数
            return remaining;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# AQS原理 - 共享锁-解锁

releaseShared()

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
1
2
3
4
5
6
7

tryReleaseShared()

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
      // 自旋
      // 获取同步状态的值
        int current = getState();
        // 可用的(许可)加上释放的,等于剩余的
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
          // CAS的方式设置同步状态
            return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

doReleaseShared()

/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
      // 自旋
      // 记录头节点
        Node h = head;
        if (h != null && h != tail) {
          // 头节点不为null,且不等于尾结点,说明队列中还有节点
          // 获取头节点等待状态
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
              // 头节点等待状态是SIGNAL
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                  // 如果修改节点等待状态失败,进入下一次循环
                    continue;            // loop to recheck cases
                // 修改成功后,唤醒后继节点,unparkSuccessor前文讲过
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
编辑 (opens new window)
上次更新: 2022/05/19, 21:26:01
简述 Java 锁升级的机制
什么是公平锁?什么是非公平锁?

← 简述 Java 锁升级的机制 什么是公平锁?什么是非公平锁?→

最近更新
01
电商-商品系统设计
12-17
02
关于如何写OKR
12-09
03
对事不对人 vs 对人不对事
12-09
更多文章>
Theme by Vdoing | Copyright © 2022-2023 YoungAnnn | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式