源码修炼笔记之AQS(AbstractQueuedSynchronizer)源码剖析
AbstractQueuedSynchronizer被称为队列同步器,简称为各人熟知的AQS,这个类可以称作concurrent包的底子,该类提供了同步的根本功能。该类包罗如下几个核心要素:[*]AQS内部维护一个volatile修饰的state变量,state用于标记锁的状态;
[*]AQS通过内部类Node记录当前是哪个线程持有锁;
[*]AQS通过LockSupport的park和unPark方法来阻塞和唤醒线程;
[*]AQS通过node来维护一个队列,用于生存所有阻塞的线程。
下面通太过析源码来看看AQS是如何工作的。
AQS概要
AQS通过内部类Node记录当前是哪个线程持有锁,Node中有一个前驱节点和一个后继节点,形成一个双向链表,这个链表是一种CLH队列,此中waitStatus表示当前线程的状态,其可能的取值包罗以下几种:
[*]SIGNAL(-1),表示后继线程已经大概即将被阻塞,当前线程释放锁大概获取锁失败后需要唤醒后继线程;
[*]CANCELLED(1),表示当前线程由于超时大概中断被取消,这个状态不可以被修改;
[*]CONDITION(-2),当前线程为条件等待,其状态设置0之后才能去竞争锁;
[*]PROPAGATE(-3),表示共享锁释放之后需要通报给后继节点,只有头结点的才会有该状态;
[*]0,该状态为初始值,不属于上面恣意一种状态。
Node对象中还有一个nextWaiter变量,指向下一个条件等待节点,相当于在CLH队列的底子上维护了一个简朴的单链表来关联条件等待的节点。
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED =1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } ... 构造方法 ... }复制代码Node提供了两种入队列的方法,即enq和addWaiter,enq方法如下所示,当尾节点tail为null时,表明阻塞队列还没有被初始化,通过CAS操作来设置头结点,头结点为new Node(),实际上头结点中没有阻塞的线程,算得上是一个空的节点(注意空节点和null是不一样的),然后举行tail=head操作,这也说明当head=tail的时间,队列中实际上是不存在阻塞线程的,然后将需要入队列的node放入队列尾部,将tail指向node。
private Node enq(final Node node) { for (;;) { Node t = tail; //如果tail为空,说明CLH队列没有被初始化, if (t == null) { //初始化CLH队列,将head和tail指向一个new Node(), //此时虽然CLH有一个节点,但是并没有真正意义的阻塞线程 if (compareAndSetHead(new Node())) tail = head; } else { //将node放入队列尾部,并通过cas将tail指向node node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }复制代码addWaiter通常表示添加一个条件等待的节点入队列,该方法首先尝试通过CAS操作快速入队列,如果失败则通过调用enq来入队列。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //尝试快速入队列 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //快速入队列失败则接纳enq方入队列 enq(node); return node; }复制代码Node还提供了唤醒后继节点线程的功能,主要是通过LockSupport来实现的,源码如下所示,
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus = 0) { setHeadAndPropagate(node, r); p.next = null; // help GC //获取锁成功,补偿中断 if (interrupted) selfInterrupt(); failed = false; return; } } //通过interrupted记录中断信息 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }复制代码doAcquireShared方法没有返回值,与acquireQueued不同的是:
[*]doAcquireShared没有返回值,该方法的中断补偿是在方法内完成的,获取锁成功之后,会判断中断信息interrupted的状态,如果为true则调用selfInterrupt()方法中断当前线程;
[*]获取锁成功之后不是简朴的设置head,而是通过setHeadAndPropagate方法来设置头结点和并且判断后继节点的信息,对后继节点中的线程举行唤醒操作等,setHeadAndPropagate方法源码如下所示:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //设置新的头结点 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //如果后继节点为空大概为SHARED类型的节点,实行doReleaseShared方法 if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //状态为SIGNAL,则唤醒后继节点中的线程 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } //若状态为0,则设置状态为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }复制代码锁的释放
锁的释放也分为释放排他锁和释放共享锁,分别为release方法和releaseShared方法,源码如下所示,
//释放排他锁 public final boolean release(int arg) { //释放锁,然后唤醒后继节点的线程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //释放共享锁 public final boolean releaseShared(int arg) { //释放锁,然后调用doReleaseShared方法 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }复制代码release方法和releaseShared方法分别调用模板方法tryRelease和tryReleaseShared来释放锁,release方法中直接通过调用unparkSuccessor唤醒后继线程,而releaseShared的唤醒操作在doReleaseShared方法中举行。
取消获取锁
当获取锁失败时,需要举行一些状态清理和变革,cancelAcquire方法就是用来实现这些功能的,其源码如下所示,
private void cancelAcquire(Node node) { if (node == null) return; //节点线程置为null node.thread = null; //从CLH队列中清除已经取消的节点(CANCELLED) Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; //判断如果node是尾部节点,则设置尾部节点 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; //若不是头节点则直接从CLH队列中清除当前节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws
页:
[1]