Java老油条 发表于 2021-9-24 15:18:43

源码修炼笔记之AQS(AbstractQueuedSynchronizer)源码剖析

AbstractQueuedSynchronizer被称为队列同步器,简称为各人熟知的AQS,这个类可以称作concurrent包的底子,该类提供了同步的根本功能。该类包罗如下几个核心要素:

[*]AQS内部维护一个volatile修饰的state变量,state用于标记锁的状态;
[*]AQS通过内部类Node记录当前是哪个线程持有锁;
[*]AQS通过LockSupport的park和unPark方法来阻塞和唤醒线程;
[*]AQS通过node来维护一个队列,用于生存所有阻塞的线程。
下面通太过析源码来看看AQS是如何工作的。
AQS概要
AQS通过内部类Node记录当前是哪个线程持有锁,Node中有一个前驱节点和一个后继节点,形成一个双向链表,这个链表是一种CLH队列,此中waitStatus表示当前线程的状态,其可能的取值包罗以下几种:

[*]SIGNAL(-1),表示后继线程已经大概即将被阻塞,当前线程释放锁大概获取锁失败后需要唤醒后继线程;
[*]CANCELLED(1),表示当前线程由于超时大概中断被取消,这个状态不可以被修改;
[*]CONDITION(-2),当前线程为条件等待,其状态设置0之后才能去竞争锁;
[*]PROPAGATE(-3),表示共享锁释放之后需要通报给后继节点,只有头结点的才会有该状态;
[*]0,该状态为初始值,不属于上面恣意一种状态。
Node对象中还有一个nextWaiter变量,指向下一个条件等待节点,相当于在CLH队列的底子上维护了一个简朴的单链表来关联条件等待的节点。
        static final class Node {      static final Node SHARED = new Node();      static final Node EXCLUSIVE = null;      static final int CANCELLED =1;      static final int SIGNAL    = -1;      static final int CONDITION = -2;          static final int PROPAGATE = -3;      volatile int waitStatus;      volatile Node prev;      volatile Node next;      volatile Thread thread;      Node nextWaiter;      final boolean isShared() {            return nextWaiter == SHARED;      }      final Node predecessor() throws NullPointerException {            Node p = prev;            if (p == null)                throw new NullPointerException();            else                return p;      }      ...      构造方法      ...    }复制代码Node提供了两种入队列的方法,即enq和addWaiter,enq方法如下所示,当尾节点tail为null时,表明阻塞队列还没有被初始化,通过CAS操作来设置头结点,头结点为new Node(),实际上头结点中没有阻塞的线程,算得上是一个空的节点(注意空节点和null是不一样的),然后举行tail=head操作,这也说明当head=tail的时间,队列中实际上是不存在阻塞线程的,然后将需要入队列的node放入队列尾部,将tail指向node。
    private Node enq(final Node node) {      for (;;) {            Node t = tail;              //如果tail为空,说明CLH队列没有被初始化,            if (t == null) {                    //初始化CLH队列,将head和tail指向一个new Node(),                    //此时虽然CLH有一个节点,但是并没有真正意义的阻塞线程                if (compareAndSetHead(new Node()))                  tail = head;            } else {                    //将node放入队列尾部,并通过cas将tail指向node                node.prev = t;                if (compareAndSetTail(t, node)) {                  t.next = node;                  return t;                }            }      }    }复制代码addWaiter通常表示添加一个条件等待的节点入队列,该方法首先尝试通过CAS操作快速入队列,如果失败则通过调用enq来入队列。
    private Node addWaiter(Node mode) {      Node node = new Node(Thread.currentThread(), mode);      //尝试快速入队列      Node pred = tail;      if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }      }      //快速入队列失败则接纳enq方入队列      enq(node);      return node;    }复制代码Node还提供了唤醒后继节点线程的功能,主要是通过LockSupport来实现的,源码如下所示,
    private void unparkSuccessor(Node node) {      int ws = node.waitStatus;      if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);      Node s = node.next;      if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus = 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        //获取锁成功,补偿中断                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                  }                }                //通过interrupted记录中断信息                if (shouldParkAfterFailedAcquire(p, node) &&                  parkAndCheckInterrupt())                  interrupted = true;            }      } finally {            if (failed)                cancelAcquire(node);      }    }复制代码doAcquireShared方法没有返回值,与acquireQueued不同的是:

[*]doAcquireShared没有返回值,该方法的中断补偿是在方法内完成的,获取锁成功之后,会判断中断信息interrupted的状态,如果为true则调用selfInterrupt()方法中断当前线程;
[*]获取锁成功之后不是简朴的设置head,而是通过setHeadAndPropagate方法来设置头结点和并且判断后继节点的信息,对后继节点中的线程举行唤醒操作等,setHeadAndPropagate方法源码如下所示:
    private void setHeadAndPropagate(Node node, int propagate) {      Node h = head;         //设置新的头结点      setHead(node);      if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            //如果后继节点为空大概为SHARED类型的节点,实行doReleaseShared方法            if (s == null || s.isShared())                doReleaseShared();      }    }    private void doReleaseShared() {      for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                        //状态为SIGNAL,则唤醒后继节点中的线程                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;                              unparkSuccessor(h);                }                //若状态为0,则设置状态为PROPAGATE                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                  continue;                            }            if (h == head)                                  break;      }    }复制代码锁的释放
锁的释放也分为释放排他锁和释放共享锁,分别为release方法和releaseShared方法,源码如下所示,
        //释放排他锁    public final boolean release(int arg) {            //释放锁,然后唤醒后继节点的线程      if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;      }      return false;    }    //释放共享锁    public final boolean releaseShared(int arg) {            //释放锁,然后调用doReleaseShared方法      if (tryReleaseShared(arg)) {            doReleaseShared();            return true;      }      return false;    }复制代码release方法和releaseShared方法分别调用模板方法tryRelease和tryReleaseShared来释放锁,release方法中直接通过调用unparkSuccessor唤醒后继线程,而releaseShared的唤醒操作在doReleaseShared方法中举行。
取消获取锁
当获取锁失败时,需要举行一些状态清理和变革,cancelAcquire方法就是用来实现这些功能的,其源码如下所示,
    private void cancelAcquire(Node node) {               if (node == null)            return;      //节点线程置为null      node.thread = null;      //从CLH队列中清除已经取消的节点(CANCELLED)      Node pred = node.prev;      while (pred.waitStatus > 0)            node.prev = pred = pred.prev;      Node predNext = pred.next;      node.waitStatus = Node.CANCELLED;      //判断如果node是尾部节点,则设置尾部节点      if (node == tail && compareAndSetTail(node, pred)) {            compareAndSetNext(pred, predNext, null);      } else {            int ws;                 //若不是头节点则直接从CLH队列中清除当前节点            if (pred != head &&                ((ws = pred.waitStatus) == Node.SIGNAL ||               (ws
页: [1]
查看完整版本: 源码修炼笔记之AQS(AbstractQueuedSynchronizer)源码剖析