本日为你带来的是 ReentrantLock 公平锁与非公平锁的分析,它是 Java 并发包下的一个实现类,实现了 Lock 接口和 Serializable 接口。
初识
ReentrantLock 类有两个构造函数,一个是默认的不带参数的构造函数,创建一个默认的非公平锁的实现,一个是带参数的构造函数,根据参数 fair 创建一个公平照旧非公平的锁。
public ReentrantLock() { sync = new NonfairSync();}public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}这里简单的说一下公平锁和非公平锁的界说:
- 公平锁:线程在同队伍列中通过先辈先出(FIFO)的方式获取锁,每个线程最终都能获取锁。
- 非公平锁:线程可以通过插队的方式抢占锁,抢不到锁就进入同队伍列列队。
NonfairSync 类和 FairSync 类继承了 Sync 类,它们三个都是 ReentrantLock 的内部类。
AbstractQueuedSynchronizer,简称 AQS,拥有三个核心组件:
- state:volatile 修饰,线程是否可以获取锁。
- Node:内部队列,双向链表形式,没有抢到锁的对象就进入这个队列。 主要字段有:pre 前一个节点,next 下一个节点,thread 线程,waitStatus 线程的状态。
- exclusiveOwnerThread:当前抢到锁的线程。
如下图,简单的相识一下 AQS。
//继承了 AQSabstract static class Sync extends AbstractQueuedSynchronizer//继承了 Sync 类,界说一个非公平锁的实现static final class NonfairSync extends Sync//继承了 Sync 类,界说了一个公平锁的实现static final class FairSync extends Sync公平锁
在分析公平锁之前,先先容一下 Sync 类,它是 ReentrantLock 的唯一的属性,在构造函数中被初始化,决定了用公平锁照旧非公平锁的方式获取锁。
private final Sync sync;用以下构造方法创建一个公平锁。
Lock lock = new ReentrantLock(true);沿着 lock.lock() 调用环境一路往下分析。
//FairSync.lock()final void lock() { // 将 1 作为参数,调用 AQS 的 acquire 方法获取锁 acquire(1);}acquire() 方法主要是干了 3 件事情
- tryAcquire() 实行获取锁。
- 获取锁失败后,调用 addWaiter() 方法将线程封装成 Node,参加同队伍列。
- acquireQueued() 将队列中的节点按自旋的方式实行获取锁。
//AbstractQueuedSynchronizer.acquire()public final void acquire(int arg) { //实行获取锁,true:直接返回,false:调用 addWaiter() // addWaiter() 将当前线程封装成节点 // acquireQueued() 将同队伍列中的节点循环实行获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}tryAcquire() 实行获取锁,如果线程本身持有锁,则将这个线程重入锁。
//FairSync.tryAcquire()protected final boolean tryAcquire(int acquires) { //当前线程 final Thread current = Thread.currentThread(); //AQS 中的状态值 int c = getState(); //无线程占用锁 if (c == 0) { //当前线程 用 cas 的方式设置状态为 1 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //当前线程成功设置 state 为 1,将当前线程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 变量中 setExclusiveOwnerThread(current); return true; } } //当前线程原来就持有锁,进入重入逻辑,返回 true else if (current == getExclusiveOwnerThread()) { //将 state + 1 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 设置 state 变量,当前线程持有锁,不需要用 CAS 设置 state setState(nextc); return true; } //当前线程获取锁失败 return false;}hasQueuedPredecessors() 这个方法比较有名流风度,在 tryAcquire() 方法中被第一个调用,它忍让比自己列队长的线程。
//AbstractQueuedSynchronizer.hasQueuedPredecessors()public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // 如果首节点获取到了锁,第二个节点不是当前线程,返回 true,否则返回 false return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());}addWaiter() 方法就是将获取锁失败的线程参加到同队伍列尾部。
//AbstractOwnableSynchronizer.addWaiter()private Node addWaiter(Node mode) { //将当前线程封装成一个节点 Node node = new Node(Thread.currentThread(), mode); //将新节点参加到同队伍列中 Node pred = tail; if (pred != null) { node.prev = pred; // CAS 设置尾节点是新节点 if (compareAndSetTail(pred, node)) { pred.next = node; //返回新的节点 return node; } } //没有将尾节点设置为新节点 enq(node); return node;}//AbstractQueuedSynchronizer.enq()private Node enq(final Node node) { //自旋 for (;;) { //尾节点为 null,创建新的同队伍列 Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //尾节点不为 null,CAS方式将新节点的前一个节点设置为尾节点 node.prev = t; if (compareAndSetTail(t, node)) { //旧的尾节点的下一个节点为新节点 t.next = node; return t; } } }}acquireQueued() 方法当节点为首节点的时候,再次调用 tryAcquire() 获取锁,否则就壅闭线程,等待被叫醒。
//AbstractQueuedSynchronizer.acquireQueued()final boolean acquireQueued(final Node node, int arg) { //失败标识 boolean failed = true; try { //中断标识 boolean interrupted = false; //自旋 for (;;) { //获取前一个节点 final Node p = node.predecessor(); //如果前一个节点是首节点,轮到当前线程获取锁 //tryAcquire() 实行获取锁 if (p == head && tryAcquire(arg)) { //获取锁成功,将当前节点设置为首节点 setHead(node); //将前一个节点的 Next 设置为 null p.next = null; //获取锁成功 failed = false; return interrupted; } //是否需要壅闭 if (shouldParkAfterFailedAcquire(p, node) && //壅闭的方法 parkAndCheckInterrupt()) //中断标识设为 true interrupted = true; } } finally { if (failed) cancelAcquire(node); }}shouldParkAfterFailedAcquire() 线程是否需要被壅闭,更改线程的 waitStatus 为 SIGNAL。parkAndCheckInterrupt() 实现真正的壅闭线程。
//AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前一个节点的 waitStatus 状态,默认状态为 0,第一次进入一定走 else int ws = pred.waitStatus; // 第二次,直接返回 true if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //将waitStatus 状态设置为 SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}//AbstractQueuedSynchronizer.parkAndCheckInterrupt()private final boolean parkAndCheckInterrupt() { //壅闭当前线程 LockSupport.park(this); return Thread.interrupted();}以上就是公平锁获取锁的全部过程,总结一下公平锁获取锁的过程:
- 当前线程调用 tryAcquire() 获取锁,成功则返回。
- 调用 addWaiter(),将线程封装成 Node 节点参加同队伍列。
- acquireQueued() 自旋实行获取锁,成功则返回。
- shouldParkAfterFailedAcquire() 将线程设置为等待叫醒状态,壅闭当前线程。
- 如果线程被叫醒,实行获取锁,成功则返回,失败则继续壅闭。
非公平锁
用默认的构造方式创建一个非公平锁。lock() 方法上来就实行抢占锁,失败则调用 acquire() 方法。
//NonfairSync.lock()final void lock() { //CAS 设置 state 为 1 if (compareAndSetState(0, 1)) //将当前线程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 变量中 setExclusiveOwnerThread(Thread.currentThread()); else //设置失败,参考上一节公平锁的 acquire() acquire(1);}nonfairTryAcquire() 就没有名流风度了,没有了公平锁 hasQueuedPredecessors() 方法。
//NonfairSync.tryAcquire()protected final boolean tryAcquire(int acquires) { //调用 ReentrantLock 的 nonfairTryAcquire() return nonfairTryAcquire(acquires);}//ReentrantLock.nonfairTryAcquire()final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果 state 变量为 0,用 CAS 设置 state 的值 if (c == 0) { if (compareAndSetState(0, acquires)) { //将当前线程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 变量中 setExclusiveOwnerThread(current); return true; } } //当前线程原来就持有锁,进入重入逻辑,返回 true else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); //设置 state 变量 setState(nextc); return true; } return false;}以上就是非公平锁获取锁,总结一下非公平锁获取锁的过程:
- lock() 第一次实行获取锁,成功则返回。
- nonfairTryAcquire() 再次实行获取锁。
- 失败则调用 addWaiter() 封装线程为 Node 节点参加同队伍列。
- acquireQueued() 自旋实行获取锁,成功则返回。
- shouldParkAfterFailedAcquire() 将线程设置为等待叫醒状态,壅闭当前线程。
- 如果线程被叫醒,实行获取锁,成功则返回,失败则继续壅闭。
公平锁和非公平锁对比
在下图源码中可以看出,公平锁多了一个 !hasQueuedPredecessors() 用来判断是否有其他线程比当前线程在同队伍列中列队时间更长。除此之外,非公平锁在初始时就有 2 次获取锁的机会,然后再到同队伍列中列队。
unlock() 开释锁
获取锁之后必须得开释,同一个线程不管重入了几次锁,必须得开释几次锁,不然 state 变量将不会变成 0,锁被永久占用,其他线程将永远也获取不到锁。
//ReentrantLock.unlock()public void unlock() { sync.release(1);}//AbstractQueuedSynchronizer.release()public final boolean release(int arg) { //调用 Sync 的 tryRelease() if (tryRelease(arg)) { Node h = head; //首节点不是 null,首节点的 waitStatus 是 SIGNAL if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}//Sync.tryRelease()protected final boolean tryRelease(int releases) { //state 变量减去 1 int c = getState() - releases; //当前线程不是占有锁的线程,非常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //state 变量成了 0 if (c == 0) { free = true; //将当前占有的线程设置为 null setExclusiveOwnerThread(null); } //设置 state 变量 setState(c); return free;}//AbstractQueuedSynchronizer.unparkSuccessor()private void unparkSuccessor(Node node) { //节点的 waitStatus int ws = node.waitStatus; //为 SIGNAL 的时候,CAS 的方式更新为初始值 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //获取下一个节点 Node s = node.next; //下一个节点为空,或者 waitStatus 状态为 CANCELLED if (s == null || s.waitStatus > 0) { s = null; //从最后一个节点开始找出 waitStatus 不是 CANCELLED 的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus |