死牛胖子的技术随笔 潜水
  • 1发帖数
  • 1主题数
  • 0关注数
  • 0粉丝
开启左侧

JUC源码体系之AQS同步器源码解析

[复制链接]
死牛胖子的技术随笔 发表于 2020-12-22 07:06:00 来自手机 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题

                               
登录/注册后可看大图

目次


  • 同步器简介
  • 方法简介
  • 同步器实现原理
  • 自己动手实现独占锁
  • 同步器源码分析

                               
登录/注册后可看大图

同步器简介

AQS 可以以为是一个模板方法,是 JDK 提供的实现 JVM 锁的模板,AQS 封装了同步机制,通过实现 AQS 可以让开辟者比较简单就可以实现 JVM 锁,而不用去考虑底层的同步机制。降低了锁的实现难度及实现代价。
AQS 同步器提供了两套同步方案:独占式、共享式。也就是说要实现独占锁或者共享锁都可以通过继承 AQS 实现。
方法简介


                               
登录/注册后可看大图

可以看出 AQS 同步器为两套方案分别提供了两套方法,不带 Shared 的方法是独占式的,相应带 Shared 的方法就是共享式的。AQS 同步器提供了三种不同的加锁方式,用于实用不同的业务场景。
实现原理

同步状态

AQS 提供了一个同步状态属性 state,通过线程同步地设置同步状态实现加锁逻辑,让一个属性实现线程同步很简单,两个步骤:

  • 利用 volatile 修饰,保证线程可见性
  • 利用 Unsafe 类的 cas 方法修改属性值
volatile + cas 可以保证线程安全,当有多个线程需要修改 state 属性值时,只会有一个线程会操作成功,这就可以在JVM层面实现加锁操作。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    // 同步状态    private volatile int state;    // 获取同步状态    protected final int getState() {        return state;    }    // 设置同步状态    protected final void setState(int newState) {        state = newState;    }    // cas 设置同步状态    protected final boolean compareAndSetState(int expect, int update) {        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);    }}同步状态 state 是个 int 类型的数值,通过对数值的控制可以实现不同类型的锁。好比 ReentrantLock 就是将同步状态从 0 修改为 1 表现加锁来实现独占锁,而 Semaphore 是为同步状态设置一个初始值,同步状态大于 0 就可以加锁成功,然后同步状态减 1,这样实现多个线程同时加锁的共享锁。
多个线程同时加锁,但是只有一个线程会成功,那么加锁失败的线程怎么处置惩罚?AQS 提供了一套完备的基于 CLH 队列的解决方案,加锁失败后线程会进入 CLH 队列进行阻塞等待,AQS 的加锁流程如下图所示
开始尝试加锁尝试成功?结束进入CLH队列yesno
CLH 锁

CLH(Craig, Landin, and Hagersten locks): 是一种基于链表的可扩展、高性能、公平的自旋锁,能确保无饥饿性,提供先来先服务的公平性。
通过 CLH 的定义可以捕获到几个重要的点

  • 基于链表
  • 自旋
  • 先辈先服务
AQS 利用的是一个 FIFO 的双向链表来实现 CLH 队列,可以看到 AQS 中定义了一个 Node 类作为 CLH 队列中的节点,Node 类中定义了 prev 指向前一个节点,next 指向后一个节点,AQS 中定义了 headtail 分别表现 CLH 的头节点和尾节点,通过这个结构就可以从头节点或者尾节点遍历整个 CLH 队列。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    // 头节点    private transient volatile Node head;    // 尾节点    private transient volatile Node tail;    // 节点定义        static final class Node {        static final Node SHARED = new Node();        static final Node EXCLUSIVE = null;        static final int CANCELLED =  1;        static final int SIGNAL    = -1;        static final int CONDITION = -2;        static final int PROPAGATE = -3;        // 等待状态        volatile int waitStatus;        // 前驱节点        volatile Node prev;        // 后继节点        volatile Node next;        // 线程        volatile Thread thread;        // 标记用        Node nextWaiter;    }}CLH 队列处置惩罚流程
源码中提供了一个浅易的图形表现,线程加锁失败后,进入 CLH 队列,成为队列的尾节点,优先头节点获取锁,然后按链表顺序依次获取锁。
          +------+    prev  +-----+           +-----+ head  |          |    0);        pred.next = node;    } else { // 0、`CONDITION` 或者 `PROPAGATE` 状态,设置为 SIGNAL        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}parkAndCheckInterrupt 方法用于阻塞当前线程
private final boolean parkAndCheckInterrupt() {    // 阻塞当前线程,进入等待状态,需要调用 LockSupport.unpark 方法或者 interrupt 中断进行唤醒    LockSupport.park(this);    // 被唤醒之后返回当前线程是否在中断状态,并打扫中断记号    return Thread.interrupted();}至此,整个加锁过程就已经完成。
末了看一下解锁方法 tryRelease,代码如下
public final boolean release(int arg) {    // 执行开释锁,需要实现类自行实现逻辑    if (tryRelease(arg)) {        Node h = head;        // 唤醒 CLH 队列的头节点        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}unparkSuccessor 方法用于唤醒 CLH 头节点的后继节点(前文中一直说唤醒头节点,其实真正唤醒的是头节点的后继节点),传入的 node 就是头节点。如果后继节点状态为已取消,则向后查找最近的一个有效节点进行唤醒。
private void unparkSuccessor(Node node) {    int ws = node.waitStatus;    // 将头节点的状态重置为 0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 如果后续节点为已取消,则从队列尾部开始向前查找,找到离当前节点最近的一个有效节点        Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus = 0) { // 加锁成功                    setHeadAndPropagate(node, r);// 设置当前节点为头节点                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt(); // 设置中断标记                    failed = false;                    return;                }            }            // 加锁失败,阻塞线程,这里与独占锁的实现一模一样            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}可以对比一下独占锁的代码实现,可以发现实现逻辑几乎一样。这里只阐明一下不一样的地方。

  • 创建节点,调用 addWaiter(Node.SHARED) 创建共享节点,而独占锁调用 addWaiter(Node.EXCLUSIVE) 创建独占节点。
  • 加锁,调用 tryAcquireShared,而独占锁调用 tryAcquire 方法,这两个方法都需要实现类自行实现。
  • 加锁成功,调用 setHeadAndPropagate(node, r) 方法设置当前节点为头节点,而独占锁调用 setHead(node)
共享锁的加锁与独占锁不一样,虽然都是利用同步状态做文章,共享锁可以理解为资源,每一次加锁,就是向 AQS 申请资源,而开释锁就是向 AQS 归还资源,所以只要 AQS 还有资源就可以允许加锁。所以 tryAcquireShared 方法的返回值,可以理解为加锁后的剩余资源,所以只要大于等于 0,就是加锁成功。
这里重点关注一下 setHeadAndPropagate 这个方法,通过方法名可以猜测,setHeadAndPropagate 包含了 setHead 的逻辑,同时还要实现 propagate (通报),所谓通报就是指,只要 AQS 还有剩余资源,就继承唤醒后继节点,进行加锁操作。唤醒操作调用 doReleaseShared() 方法实现,这个方法也是开释同步锁时会调用的方法,在下文还会详细先容,这里只要理解为继承唤醒后继节点即可。
// propagate 就是 tryAcquireShared 加锁后返回的剩余资源private void setHeadAndPropagate(Node node, int propagate) {    Node h = head;    setHead(node); // 将当前节点设置为头节点    // propagate > 0 表现只要资源还有,则需要继承唤醒后续节点    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {        Node s = node.next;        // 后继节点应该也是共享节点        if (s == null || s.isShared())            doReleaseShared(); // 开释共享锁时调用的方法    }}接着看一下共享锁开释方法 releaseShared,首先调用 tryReleaseShared 进行资源归还,这个方法需要实现类自行实现,然后调用 doReleaseShared 方法唤醒 CLH 队列中的节点。
public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}doReleaseShared 这个方法对比独占锁开释时的操作就要复杂很多,因为独占锁只有一个线程可以获取锁,所以开释的时候不存在线程安全的题目,但是这个方法在队列中加锁成功与恣意线程开释锁都会调用,就有可能出现判定过程中出现头节点被替换的题目。
private void doReleaseShared() {    for (;;) {        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            // 如果节点状态为 -1,则将状态设置为 0,唤醒后继节点            if (ws == Node.SIGNAL) {                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                unparkSuccessor(h);            // 如果节点状态为 0,则将状态设置为 -3            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }        if (h == head)                   // loop if head changed            break;    }}末了理一下共享模式下 CLH 节点的状态变革过程,如下图所示。

                               
登录/注册后可看大图
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

猜你喜欢
在线客服邮箱
wxcy#wkgb.net

邮箱地址#换为@

Powered by 创意电子 ©2018-现在 专注资源实战分享源码下载站联盟商城