Java老油条 潜水
  • 3发帖数
  • 3主题数
  • 0关注数
  • 0粉丝
开启左侧

源码修炼笔记之AQS(AbstractQueuedSynchronizer)源码剖析

[复制链接]
Java老油条 发表于 2021-9-24 15:18:43 来自手机 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
AbstractQueuedSynchronizer被称为队列同步器,简称为各人熟知的AQS,这个类可以称作concurrent包的底子,该类提供了同步的根本功能。该类包罗如下几个核心要素:

  • AQS内部维护一个volatile修饰的state变量,state用于标记锁的状态;
  • AQS通过内部类Node记录当前是哪个线程持有锁;
  • AQS通过LockSupport的park和unPark方法来阻塞和唤醒线程;
  • AQS通过node来维护一个队列,用于生存所有阻塞的线程。
下面通太过析源码来看看AQS是如何工作的。
AQS概要
AQS通过内部类Node记录当前是哪个线程持有锁,Node中有一个前驱节点和一个后继节点,形成一个双向链表,这个链表是一种CLH队列,此中waitStatus表示当前线程的状态,其可能的取值包罗以下几种:

  • SIGNAL(-1),表示后继线程已经大概即将被阻塞,当前线程释放锁大概获取锁失败后需要唤醒后继线程;
  • CANCELLED(1),表示当前线程由于超时大概中断被取消,这个状态不可以被修改;
  • CONDITION(-2),当前线程为条件等待,其状态设置0之后才能去竞争锁;
  • PROPAGATE(-3),表示共享锁释放之后需要通报给后继节点,只有头结点的才会有该状态;
  • 0,该状态为初始值,不属于上面恣意一种状态。
Node对象中还有一个nextWaiter变量,指向下一个条件等待节点,相当于在CLH队列的底子上维护了一个简朴的单链表来关联条件等待的节点。
        static final class Node {        static final Node SHARED = new Node();        static final Node EXCLUSIVE = null;        static final int CANCELLED =  1;        static final int SIGNAL    = -1;        static final int CONDITION = -2;          static final int PROPAGATE = -3;        volatile int waitStatus;        volatile Node prev;        volatile Node next;        volatile Thread thread;        Node nextWaiter;        final boolean isShared() {            return nextWaiter == SHARED;        }        final Node predecessor() throws NullPointerException {            Node p = prev;            if (p == null)                throw new NullPointerException();            else                return p;        }        ...        构造方法        ...    }复制代码Node提供了两种入队列的方法,即enq和addWaiter,enq方法如下所示,当尾节点tail为null时,表明阻塞队列还没有被初始化,通过CAS操作来设置头结点,头结点为new Node(),实际上头结点中没有阻塞的线程,算得上是一个空的节点(注意空节点和null是不一样的),然后举行tail=head操作,这也说明当head=tail的时间,队列中实际上是不存在阻塞线程的,然后将需要入队列的node放入队列尾部,将tail指向node。
    private Node enq(final Node node) {        for (;;) {            Node t = tail;                //如果tail为空,说明CLH队列没有被初始化,            if (t == null) {                    //初始化CLH队列,将head和tail指向一个new Node(),                    //此时虽然CLH有一个节点,但是并没有真正意义的阻塞线程                if (compareAndSetHead(new Node()))                    tail = head;            } else {                    //将node放入队列尾部,并通过cas将tail指向node                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }复制代码addWaiter通常表示添加一个条件等待的节点入队列,该方法首先尝试通过CAS操作快速入队列,如果失败则通过调用enq来入队列。
    private Node addWaiter(Node mode) {        Node node = new Node(Thread.currentThread(), mode);        //尝试快速入队列        Node pred = tail;        if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        //快速入队列失败则接纳enq方入队列        enq(node);        return node;    }复制代码Node还提供了唤醒后继节点线程的功能,主要是通过LockSupport来实现的,源码如下所示,
    private void unparkSuccessor(Node node) {        int ws = node.waitStatus;        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus = 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        //获取锁成功,补偿中断                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }                //通过interrupted记录中断信息                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }复制代码doAcquireShared方法没有返回值,与acquireQueued不同的是:

  • doAcquireShared没有返回值,该方法的中断补偿是在方法内完成的,获取锁成功之后,会判断中断信息interrupted的状态,如果为true则调用selfInterrupt()方法中断当前线程;
  • 获取锁成功之后不是简朴的设置head,而是通过setHeadAndPropagate方法来设置头结点和并且判断后继节点的信息,对后继节点中的线程举行唤醒操作等,setHeadAndPropagate方法源码如下所示:
    private void setHeadAndPropagate(Node node, int propagate) {        Node h = head;         //设置新的头结点        setHead(node);        if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            //如果后继节点为空大概为SHARED类型的节点,实行doReleaseShared方法            if (s == null || s.isShared())                doReleaseShared();        }    }    private void doReleaseShared() {        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                        //状态为SIGNAL,则唤醒后继节点中的线程                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;                                unparkSuccessor(h);                }                //若状态为0,则设置状态为PROPAGATE                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                            }            if (h == head)                                  break;        }    }复制代码锁的释放
锁的释放也分为释放排他锁和释放共享锁,分别为release方法和releaseShared方法,源码如下所示,
        //释放排他锁    public final boolean release(int arg) {            //释放锁,然后唤醒后继节点的线程        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }    //释放共享锁    public final boolean releaseShared(int arg) {            //释放锁,然后调用doReleaseShared方法        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }复制代码release方法和releaseShared方法分别调用模板方法tryRelease和tryReleaseShared来释放锁,release方法中直接通过调用unparkSuccessor唤醒后继线程,而releaseShared的唤醒操作在doReleaseShared方法中举行。
取消获取锁
当获取锁失败时,需要举行一些状态清理和变革,cancelAcquire方法就是用来实现这些功能的,其源码如下所示,
    private void cancelAcquire(Node node) {               if (node == null)            return;        //节点线程置为null        node.thread = null;        //从CLH队列中清除已经取消的节点(CANCELLED)        Node pred = node.prev;        while (pred.waitStatus > 0)            node.prev = pred = pred.prev;        Node predNext = pred.next;        node.waitStatus = Node.CANCELLED;        //判断如果node是尾部节点,则设置尾部节点        if (node == tail && compareAndSetTail(node, pred)) {            compareAndSetNext(pred, predNext, null);        } else {            int ws;                   //若不是头节点则直接从CLH队列中清除当前节点            if (pred != head &&                ((ws = pred.waitStatus) == Node.SIGNAL ||                 (ws
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

猜你喜欢
在线客服邮箱
wxcy#wkgb.net

邮箱地址#换为@

Powered by 创意电子 ©2018-现在 专注资源实战分享源码下载站联盟商城