深夜敲代码 发表于 2021-9-4 20:41:04

并发编程之:AQS源码解析

在Java并发编程中,经常会用到锁,除了Synchronized这个JDK关键字以外,还有Lock接口下面的各种锁实现,如重入锁ReentrantLock,还有读写锁ReadWriteLock等,他们在实现锁的过程中都是依赖与AQS来完成核心的加解锁逻辑的。那么AQS具体是什么呢?
提供一个框架,用于实现依赖先辈先出(FIFO)等候队列的阻塞锁和相关同步器(信号量,事件等)。 该类被设计为大多数范例的同步器的有用依据,这些同步器依赖于单个原子int值来表示状态。 子类必须定义改变此状态的受保护方法,以及根据该对象被获取或释放来定义该状态的寄义。 给定这些,这个类中的其他方法实行所有排队和阻塞机制。 子类可以保持其他状态字段,但只以原子方式更新int使用方法操纵值getState() , setState(int)和compareAndSetState(int, int)被跟踪相对于同步。
上述内容来自JDK官方文档。
简单来说,AQS是一个先辈先出(FIFO)的等候队列,主要用在一些线程同步场景,需要通过一个int范例的值来表示同步状态。提供了排队和阻塞机制。
类图结构

https://p6.toutiaoimg.com/large/pgc-image/c5a9ea17981c40db9850ebabf43fd666
从类图可以看出,在ReentrantLock中定义了AQS的子类Sync,可以通过Sync实现对于可重入锁的加锁,解锁。
AQS通过int范例的状态state来表示同步状态。
AQS中主要提供的方法:
acquire(int) 独占方式获取锁
acquireShared(int) 共享方式获取锁
release(int) 独占方式释放锁
releaseShared(int) 共享方式释放锁
独占锁和共享锁
关于独占锁和共享锁先给大家普及一下这个概念。
独占锁指该锁只能同时被一个线程持有;
共享锁指该锁可以被多个线程同时持有。
举个生活中的例子,比如我们使用打车软件打车,独占锁就好比我们打快车或者专车,一辆车只能让一个客户打到,不能两个客户同时打到一辆车;共享锁就好比打拼车,可以有多个客户一起打到同一辆车。
AQS内部结构

我们简单通过一张图片来了解下AQS的内部结构。实在就是有一个队列,这个队列的头结点head代表当前正在持有锁的线程,后续的其他节点代表当前正在等候的线程。
https://p5.toutiaoimg.com/large/pgc-image/58c60418c2384cf3ba2904b0eab20409
<hr>接下来我们通过源码来看看AQS的加锁和解锁过程。先来看看独占锁是怎样进行加解锁的。
独占锁加锁过程

ReentrantLock lock = new ReentrantLock();lock.lock();public void lock() {    // 调用sync的lock方法    sync.lock();}可以看到在ReentrantLock的lock方法中,直接调用了sync这个AQS子类的lock方法。
final void lock() {    // 获取锁    acquire(1);}public final void acquire(int arg) {    // 1.先尝试获取,如果获取成功,则直接返回,代表加锁成功    if (!tryAcquire(arg) &&      // 2.如果获取失败,则调用addWaiter在等候队列中增加一个节点      // 3. 调用acquireQueued告诉前一个节点,在解锁之后唤醒自己,然后线程进入等候状态      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))      // 如果在等候过程中被中断,则当火线程中断      selfInterrupt();}在获取锁时,基本可以分为3步:

[*]尝试获取,如果成功则返回,如果失败,实行下一步;
[*]将当火线程放入等候队列尾部;
[*]标记前面等候的线程实行完之后唤醒当火线程。
/** * 尝试获取锁(公平锁实现) */protected final boolean tryAcquire(int acquires) {    final Thread current = Thread.currentThread();        // 获取state,初始值为0,每次加锁成功会+1,解锁成功-1    int c = getState();    // 当前没有线程占用    if (c == 0) {         // 判断是否有其他线程排队在本线程之前      if (!hasQueuedPredecessors() &&            // 如果没有,通过CAS进行加锁            compareAndSetState(0, acquires)) {            // 将当火线程设置为AQS的独占线程            setExclusiveOwnerThread(current);            return true;      }    }    // 如果当火线程是正在独占的线程(已持有锁,重入)    else if (current == getExclusiveOwnerThread()) {      int nextc = c + acquires;          if (nextc < 0)            throw new Error("Maximum lock count exceeded");      // state+1      setState(nextc);      return true;    }    return false;}private Node addWaiter(Node mode) {    // 创建一个当火线程的Node节点    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    // 如果等候队列的尾节点!=null    if (pred != null) {      // 将本线程对应节点的前置节点设置为原来的尾节点      node.prev = pred;      // 通过CAS将本线程节点设置为尾节点      if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;      }    }    //尾节点为空,或者在CAS时失败,则通过enq方法重新参加到尾部。(本方法内部接纳自旋)    enq(node);    return node;}private Node enq(final Node node) {    for (;;) {      Node t = tail;      // 尾节点为空,代表等候队列还没有被初始化过      if (t == null) {             // 创建一个空的Node对象,通过CAS赋值给Head节点,如果失败,则重新自旋一次,如果成功,将Head节点赋值给尾节点            if (compareAndSetHead(new Node()))                tail = head;         } else {            // 尾节点不为空的情况,说明等候队列已经被初始化过,将当前节点的前置节点指向尾节点            node.prev = t;            // 将当前节点CAS赋值给尾节点            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }      }    }}final boolean acquireQueued(final Node node, int arg) {    // 标识是否加锁失败    boolean failed = true;    try {      // 是否被中断      boolean interrupted = false;      for (;;) {            // 取出来当前节点的前一个节点            final Node p = node.predecessor();            // 如果前一个节点是head节点,那么自己就是老二,这个时候再尝试获取一次锁            if (p == head && tryAcquire(arg)) {                // 如果获取成功,把当前节点设置为head节点                setHead(node);                p.next = null; // help GC                failed = false; // 标识加锁成功                return interrupted;            }            // shouldParkAfterFailedAcquire 检查并更新前置节点p的状态,如果node节点应该阻塞就返回true            // 如果返回false,则自旋一次。            if (shouldParkAfterFailedAcquire(p, node) &&                // 当火线程阻塞,在阻塞被唤醒时,判断是否被中断                parkAndCheckInterrupt())                interrupted = true;      }    } finally {      if (failed) // 如果加锁成功,则取消获取锁            cancelAcquire(node);    }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    if (ws == Node.SIGNAL) // ws == -1      /*               * 这个节点已经设置了请求释放的状态,所以它可以在这里安全park.         */      return true;    if (ws > 0) {      /*         * 前置节点被取消了,跳过前置节点重试         */      do {            node.prev = pred = pred.prev;      } while (pred.waitStatus > 0);      pred.next = node;    } else {      /*         * 将前置节点的状态设置为请求释放         */      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}在整个加锁过程可以通过下图更清晰的理解。
https://p9.toutiaoimg.com/large/pgc-image/0e60ad9f363546f5ae3a4096ba63a8b2
独占锁解锁过程

public void unlock() {    sync.release(1);}同样解锁时也是直接调用AQS子类sync的release方法。
public final boolean release(int arg) {    // 尝试解锁    if (tryRelease(arg)) {      Node h = head;      // 解锁成功,如果head!=null并且head.ws不等0,代表有其他线程排队      if (h != null && h.waitStatus != 0)            // 唤醒后续等候的节点            unparkSuccessor(h);      return true;    }    return false;}解锁过程如下:

[*]先尝试解锁,解锁失败则直接返回false。(理论上不会解锁失败,因为正在实行解锁的线程一定是持有锁的线程)
[*]解锁成功之后,如果有head节点并且状态不是0,代表有线程被阻塞等候,则唤醒下一个等候的线程。
protected final boolean tryRelease(int releases) {    // state - 1    int c = getState() - releases;    // 如果当火线程不是独占AQS的线程,但是这时候又来解锁,这种情况肯定是非法的。    if (Thread.currentThread() != getExclusiveOwnerThread())      throw new IllegalMonitorStateException();    boolean free = false;    if (c == 0) { // 如果状态归零,代表锁释放了,将独占线程设置为null      free = true;      setExclusiveOwnerThread(null);    }        // 将减1之后的状态设置为state    setState(c);    return free;}private void unparkSuccessor(Node node) {    /*   * 如果节点的ws小于0,将ws设置为0   */    int ws = node.waitStatus;    if (ws < 0)      compareAndSetWaitStatus(node, ws, 0);    /*   * 从等候队列的尾部往前找,直到第二个节点,ws 0) {      s = null;      for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus = 0) {                  // 获取成功,把当前节点设置为头节点。并且判断是否需要唤醒后面的等候节点。                  // 如果条件允许,就会唤醒后面的节点                  setHeadAndPropagate(node, r);                  p.next = null; // help GC                  if (interrupted)                        selfInterrupt();                  failed = false;                  return;                }            }            // 如果前置节点不是头结点,说明当前节点线程需要阻塞等候,并告知前一个节点唤醒            // 检查并更新前置节点p的状态,如果node节点应该阻塞就返回true            // 当火线程被唤醒之后,会从parkAndCheckInterrupt()实行            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                interrupted = true;      }    } finally {      if (failed)             cancelAcquire(node);    }}共享锁释放过程

public void unlock() {    sync.releaseShared(1);}public final boolean releaseShared(int arg) {    //tryReleaseShared()尝试释放允许,这个方法在AQS中默认抛出一个异常,需要在子类中实现    if (tryReleaseShared(arg)) {      // 唤醒线程,设置流传状态 WS      doReleaseShared();      return true;    }    return false;}AQS是许多并发场景下同步控制的基石,其中的实现相对要复杂许多,还需要多看多琢磨才能完全理解。本文也是和大家做一个初探,给大家展示了核心的代码逻辑,渴望能有所帮助。

通往大数据的地铁 发表于 2021-9-5 00:41:08

转发了

Bony886 发表于 2021-9-4 23:30:02

转发了

某用户1310762158 发表于 2021-9-4 22:32:22

转发了
页: [1]
查看完整版本: 并发编程之:AQS源码解析