AQS源码阅读
简介AQS 全程为 AbstractQueuedSynchronizer , 在 java.util.concurrent.locks包下的一个抽象类。
类的具体作用以及计划在开始类描述信息里面就有很好的表达
Provides a framework for implementing blocking locks and relatedsynchronizers (semaphores, events, etc) that rely onfirst-in-first-out (FIFO) wait queues.This class is designed tobe a useful basis for most kinds of synchronizers that rely on asingle atomic {@code int} value to represent state. Subclassesmust define the protected methods that change this state, and whichdefine what that state means in terms of this object being acquiredor released.Given these, the other methods in this class carryout all queuing and blocking mechanics. Subclasses can maintainother state fields, but only the atomically updated {@code int}value manipulated using methods {@link #getState}, {@link#setState} and {@link #compareAndSetState} is tracked with respectto synchronization. 具体翻译为:提供了实现阻塞锁和相关的框架,依赖于的同步器(信号量、变乱等)。先进先出 (FIFO) 等待队列。 这个类被计划为对于大多数依赖于单个原子 {@code int} 值来表现状态。 子类必须定义更改此状态的受掩护方法,以及根据正在获取的对象定义该状态的寄义或开释。 鉴于这些,此类中的其他方法带有排除所有列队和阻塞机制。 子类可以维护其他状态字段,但只有原子更新的 {@code int}使用方法 {@link #getState}、{@link 操作的值#setState} 和 {@link #compareAndSetState} 被地跟踪、同步。简单描述为:通过原子性的int值来标记同步状态,实现阻塞锁机制,基于FIFO队列来实现列队机制
AQS实现了两种模式,独占模式和共享模式,实现共享锁和排他锁
以独占锁为例,FIFO队列头节点获取到锁之后,其他节点会进入等待状态,开释锁之后,头节点的下一个节点会尝试获取锁,但是不一定成功(公平、非公平)。
LockSupport工具类
LockSupport 官方是如许描述的 Basic thread blocking primitives for creating locks and other , 翻译为 “基本线程阻塞原语,用于创建锁和其他”,可以明白为使用计算机原语来创建锁,底层实现为unsafe类,主要方法为pack(线程阻塞)与unpack(唤醒线程),源码如下:
// LockSupport设置阻塞分为两组, // 一组是不设置blocker的方法,另一组是设置blocker方法 // JDK推荐为Thread设置blocker方便调试 public static void park(Object blocker) { // 获取当前线程 Thread t = Thread.currentThread(); // blocker位线程内存当前偏移量,设置线程内存偏移量, setBlocker(t, blocker); // Unsafe包下的方法可以直接操作内存,设置线程阻塞 U.park(false, 0L); // 线程唤醒后 blockers设置为空 setBlocker(t, null); } public static void park() { // 线程阻塞,单纯唤醒,不会记载偏移量 U.park(false, 0L); } // 与park基本相同,添加阻塞变乱,主动唤醒 public static void parkNanos(Object blocker, long nanos); public static void parkUntil(Object blocker, long deadline);// unpark方法为unsafe.unpark方法,为C++实现,基于内存进行操作 public static void unpark(Thread thread) { if (thread != null) U.unpark(thread); }AQS内部结构
AQS类图
AQS的实现类为NofairSync(非公平)、FairSync(公平)两种。
内部类Node
static final class Node { // 共享 static final Node SHARED = new Node(); // 独占 static final Node EXCLUSIVE = null; // 由于超时或者停止,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态; static final int CANCELLED =1; // 后继节点的线程状态处于等待状态,而当前节点的线程如果开释了同步状态或者被取消,将会通知后继节点,使后继节点线程继续运行 static final int SIGNAL = -1; // 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中 static final int CONDITION = -2; // 表现下一次共享式同步状态获取将会无条件地传播下去 static final int PROPAGATE = -3; // 等待状态 volatile int waitStatus; // 前驱结点 volatile Node prev; // 后继节点 volatile Node next; // Node封装当前线程 volatile Thread thread; // 指向ConditionObject的下一个节点 Node nextWaiter; }节点状态
[*]CANCELLED (1):当前线程由于超时或者停止被取消。这是一个闭幕态,也就是状态到此为止
[*]SIGNAL (-1):当前线程的后继线程被阻塞或者即将被阻塞,当前线程开释锁或者取消后需要唤醒后继线程。这个状态一样平常都是后继线程来设置前驱节点的
[*]CONDITION (-2):当前线程在condition队列中
[*]PROPAGATE (-3):用于将唤醒后继线程传递下去,这个状态的引入是为了完善和加强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的
主要属性
/** * 等待队列的头节点, 赖加载 * 除了初始化之外, 只能通过 setHead 方法来改变其值 * 如果 head 不为 null, waitStatus 值就一定不会是 CANCELLED */private transient volatile Node head; /** * 等待队列的尾结点, 懒加载 * 只能通过 enq 方法添加新节点时才会去改变尾结点 */private transient volatile Node tail; /** * 同步器的状态 * 以 ReentrantLock 为例, 0 表现可以获取到锁, 其他的正整数表现无法获取到锁 */private volatile int state;AQS属性结构计划如下
具体方法
开放实现接口API,用于场景自定义:
方法
作用
boolean tryAcquire(int arg)
试获取独占锁
boolean tryRelease(int arg)
试开释独占锁
int tryAcquireShared(int arg)
试获取共享锁
boolean tryReleaseShared(int arg)
试开释共享锁
boolean isHeldExclusively()
当前线程是否获得了独占锁
AQS本身将同步状态的管理用模板方法模式都封装好了,以下列举了AQS中的一些模板方法
方法
描述
void acquire(int arg)
获取独占锁。会调用 tryAcquire 方法,如果未获取成功,则会进入同步队列等待
void acquireInterruptibly(int arg)
响应停止版本的 acquire
boolean tryAcquireNanos(int arg,long nanos)
响应停止+带超时版本的 acquire
void acquireShared(int arg)
获取共享锁。会调用 tryAcquireShared 方法
void acquireSharedInterruptibly(int arg)
响应停止版本的 acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos)
响应停止+带超时版本的 acquireShared
boolean release(int arg)
开释独占锁
boolean releaseShared(int arg)
开释共享锁
Collection getQueuedThreads()
获取同步队列上的线程集合
源码层面上会对acquire、release、acquireShared 、releaseShared 进行详解
独占锁
// 获取独占锁 public final void acquire(int arg) { if (!tryAcquire(arg) && // 试获取独占锁,获取同步状态 // addWaiter,将线程封装成Node节点,添加到同步队列的尾部,acquireQueued,自旋等待,如果获取状态失败,挂起线程 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); // 线程停止}private Node addWaiter(Node mode) { //创建Node节点 Node node = new Node(mode); for (;;) { // 当前队尾进行标记 Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); // CAS操作放到队尾 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } }}// 将该线程加入等待队列的尾部,并标记为独占模式final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { // 自旋,不断尝试当前线程获取锁,知道成功或者线程被打断 for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 尝试获取锁 if (p == head && tryAcquire(arg)) { // 成功之后将节点设置为新的头部节点 setHead(node); p.next = null; // help GC return interrupted; } // 获取锁失败 // 判断线程是否需要停止, 判断标准是前节点状态 if (shouldParkAfterFailedAcquire(p, node)) //挂起当前线程 interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; }}// 如果线程获取同步状态失败就要查抄它的节点status,要包管prev = node.prevprivate static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点状态 int ws = pred.waitStatus; // singal 表现前驱节点开释后会唤起当前线程,会继续等待,那么线程可以停止等待 if (ws == Node.SIGNAL) return true; // 前驱节点不是Head节点,而且状态为CANCELLED,那么就需要剔除前置节点,向前遍历,直到找到合适的前置 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 这种情况表现前驱节点的 ws = 0 或者 ws = PROPAGATE,那么设置前驱节点为singal,之后重新循环获取锁 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false;}acquire方法为获取独占锁
[*]tryAcquire() 尝试直接去获取资源,如果成功则直接返回true,AQS中空方法,由实现类来实现,具体分析会在ReentrantLock中进行详细代码分析。
[*]addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式
[*]acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被停止过,则返回true,否则返回false。如果线程在等待过程中被停止过,它是不响应的。只是获取资源后才再进行自我停止selfInterrupt(),将停止补上
// 独占锁开释节点public final boolean release(int arg) { // tryRelease由实现类具体实现 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒下一个节点 unparkSuccessor(h); return true; } return false;}release方法通过tryRelease()实现了扩展性,来进行锁的重入计划,unparkSuccessor则自身实现,完成了下一个Node节点的唤醒
共享锁
public final void acquireShared(int arg) { // tryAcquireShared为抽象方法,由子类实现 if (tryAcquireShared(arg) < 0) doAcquireShared(arg);} private void doAcquireShared(int arg) { // 不断尝试从队列中获取共享锁,当成功获取到锁或者线程被打断时会成功退出循环,竞争锁失败的线程会被 park 直到被唤醒,唤醒之后会再次进入循环尝试去获取锁,不断的重复整个过程 final Node node = addWaiter(Node.SHARED); boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 上面由详细解读 if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) selfInterrupt(); }}// 开释共享锁public final boolean releaseShared(int arg) { // 子类实现 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}//当前节点调用private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果头节点状态为SIGNAL, 表现可以去开释锁 if (ws == Node.SIGNAL) { // 通过 cas 将 waitStatus 设为 0 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases // 线程唤醒 unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }}关于独占锁和共享锁的异同
相同
1)获取锁前都会判断是否有权限,只有满足条件才可能获取到锁
2)未获取到锁的线程会创建新节点放入队列尾部
不同
1)独占锁只会开释头部后节点的线程,而共享锁会依次开释所有线程
2)独占锁存在非公平锁的情况,新的线程可能抢占队列中线程的锁,共享锁则不存在这种情况
扩展
AQS使用了模板方法的计划模式,以固定的方法进行调用,子类对实在现,方便扩展,后续会对具体实现类继续读源码。
原文 https://www.cnblogs.com/Tonyzczc/p/16179428.html
转发了 转发了 转发了 转发了
页:
[1]