目次
- 同步器简介
- 方法简介
- 同步器实现原理
- 自己动手实现独占锁
- 同步器源码分析
同步器简介
AQS 可以以为是一个模板方法,是 JDK 提供的实现 JVM 锁的模板,AQS 封装了同步机制,通过实现 AQS 可以让开辟者比较简单就可以实现 JVM 锁,而不用去考虑底层的同步机制。降低了锁的实现难度及实现代价。
AQS 同步器提供了两套同步方案:独占式、共享式。也就是说要实现独占锁或者共享锁都可以通过继承 AQS 实现。
方法简介
可以看出 AQS 同步器为两套方案分别提供了两套方法,不带 Shared 的方法是独占式的,相应带 Shared 的方法就是共享式的。AQS 同步器提供了三种不同的加锁方式,用于实用不同的业务场景。
实现原理
同步状态
AQS 提供了一个同步状态属性 state,通过线程同步地设置同步状态实现加锁逻辑,让一个属性实现线程同步很简单,两个步骤:
- 利用 volatile 修饰,保证线程可见性
- 利用 Unsafe 类的 cas 方法修改属性值
volatile + cas 可以保证线程安全,当有多个线程需要修改 state 属性值时,只会有一个线程会操作成功,这就可以在JVM层面实现加锁操作。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 同步状态 private volatile int state; // 获取同步状态 protected final int getState() { return state; } // 设置同步状态 protected final void setState(int newState) { state = newState; } // cas 设置同步状态 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }}同步状态 state 是个 int 类型的数值,通过对数值的控制可以实现不同类型的锁。好比 ReentrantLock 就是将同步状态从 0 修改为 1 表现加锁来实现独占锁,而 Semaphore 是为同步状态设置一个初始值,同步状态大于 0 就可以加锁成功,然后同步状态减 1,这样实现多个线程同时加锁的共享锁。
多个线程同时加锁,但是只有一个线程会成功,那么加锁失败的线程怎么处置惩罚?AQS 提供了一套完备的基于 CLH 队列的解决方案,加锁失败后线程会进入 CLH 队列进行阻塞等待,AQS 的加锁流程如下图所示
开始尝试加锁尝试成功?结束进入CLH队列yesno
CLH 锁
CLH(Craig, Landin, and Hagersten locks): 是一种基于链表的可扩展、高性能、公平的自旋锁,能确保无饥饿性,提供先来先服务的公平性。
通过 CLH 的定义可以捕获到几个重要的点
AQS 利用的是一个 FIFO 的双向链表来实现 CLH 队列,可以看到 AQS 中定义了一个 Node 类作为 CLH 队列中的节点,Node 类中定义了 prev 指向前一个节点,next 指向后一个节点,AQS 中定义了 head 及 tail 分别表现 CLH 的头节点和尾节点,通过这个结构就可以从头节点或者尾节点遍历整个 CLH 队列。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 头节点 private transient volatile Node head; // 尾节点 private transient volatile Node tail; // 节点定义 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 等待状态 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // 线程 volatile Thread thread; // 标记用 Node nextWaiter; }}CLH 队列处置惩罚流程
源码中提供了一个浅易的图形表现,线程加锁失败后,进入 CLH 队列,成为队列的尾节点,优先头节点获取锁,然后按链表顺序依次获取锁。
+------+ prev +-----+ +-----+ head | | 0); pred.next = node; } else { // 0、`CONDITION` 或者 `PROPAGATE` 状态,设置为 SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}parkAndCheckInterrupt 方法用于阻塞当前线程
private final boolean parkAndCheckInterrupt() { // 阻塞当前线程,进入等待状态,需要调用 LockSupport.unpark 方法或者 interrupt 中断进行唤醒 LockSupport.park(this); // 被唤醒之后返回当前线程是否在中断状态,并打扫中断记号 return Thread.interrupted();}至此,整个加锁过程就已经完成。
末了看一下解锁方法 tryRelease,代码如下
public final boolean release(int arg) { // 执行开释锁,需要实现类自行实现逻辑 if (tryRelease(arg)) { Node h = head; // 唤醒 CLH 队列的头节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}unparkSuccessor 方法用于唤醒 CLH 头节点的后继节点(前文中一直说唤醒头节点,其实真正唤醒的是头节点的后继节点),传入的 node 就是头节点。如果后继节点状态为已取消,则向后查找最近的一个有效节点进行唤醒。
private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 将头节点的状态重置为 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 如果后续节点为已取消,则从队列尾部开始向前查找,找到离当前节点最近的一个有效节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus = 0) { // 加锁成功 setHeadAndPropagate(node, r);// 设置当前节点为头节点 p.next = null; // help GC if (interrupted) selfInterrupt(); // 设置中断标记 failed = false; return; } } // 加锁失败,阻塞线程,这里与独占锁的实现一模一样 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}可以对比一下独占锁的代码实现,可以发现实现逻辑几乎一样。这里只阐明一下不一样的地方。
- 创建节点,调用 addWaiter(Node.SHARED) 创建共享节点,而独占锁调用 addWaiter(Node.EXCLUSIVE) 创建独占节点。
- 加锁,调用 tryAcquireShared,而独占锁调用 tryAcquire 方法,这两个方法都需要实现类自行实现。
- 加锁成功,调用 setHeadAndPropagate(node, r) 方法设置当前节点为头节点,而独占锁调用 setHead(node)
共享锁的加锁与独占锁不一样,虽然都是利用同步状态做文章,共享锁可以理解为资源,每一次加锁,就是向 AQS 申请资源,而开释锁就是向 AQS 归还资源,所以只要 AQS 还有资源就可以允许加锁。所以 tryAcquireShared 方法的返回值,可以理解为加锁后的剩余资源,所以只要大于等于 0,就是加锁成功。
这里重点关注一下 setHeadAndPropagate 这个方法,通过方法名可以猜测,setHeadAndPropagate 包含了 setHead 的逻辑,同时还要实现 propagate (通报),所谓通报就是指,只要 AQS 还有剩余资源,就继承唤醒后继节点,进行加锁操作。唤醒操作调用 doReleaseShared() 方法实现,这个方法也是开释同步锁时会调用的方法,在下文还会详细先容,这里只要理解为继承唤醒后继节点即可。
// propagate 就是 tryAcquireShared 加锁后返回的剩余资源private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); // 将当前节点设置为头节点 // propagate > 0 表现只要资源还有,则需要继承唤醒后续节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 后继节点应该也是共享节点 if (s == null || s.isShared()) doReleaseShared(); // 开释共享锁时调用的方法 }}接着看一下共享锁开释方法 releaseShared,首先调用 tryReleaseShared 进行资源归还,这个方法需要实现类自行实现,然后调用 doReleaseShared 方法唤醒 CLH 队列中的节点。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}doReleaseShared 这个方法对比独占锁开释时的操作就要复杂很多,因为独占锁只有一个线程可以获取锁,所以开释的时候不存在线程安全的题目,但是这个方法在队列中加锁成功与恣意线程开释锁都会调用,就有可能出现判定过程中出现头节点被替换的题目。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 如果节点状态为 -1,则将状态设置为 0,唤醒后继节点 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); // 如果节点状态为 0,则将状态设置为 -3 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }}末了理一下共享模式下 CLH 节点的状态变革过程,如下图所示。
|