Semaphore 信号量源码分析
概述Semaphore 信号量, 信号量维护了一组许可。如果有必要每个采集模块都回阻塞,直到有许可可用。然后获取许可证。每次发布都会添加一个许可证,可能会释放一个阻塞资源。但是,没有使用实际的许可对象;信号量可用数量的计数,并且举行操作。 信号量通常可以用于限制访问某些(物理或者逻辑)资源的线程数。比方下面是一个使用信号量控制对线程池访问。
class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used) { used = true; return items; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items) { if (used) { used = false; return true; } else return false; } } return false; } }在获取项目之前,每个线程必须从信号量获取一个许可证,以确保项目可用。当线程处理完该项后,它将返回到池中,并向信号量返回一个许可证,允许另一个线程获取该项。请注意,在调用acquire时不会保持同步锁,因为这会阻止项目返回池。信号量封装了限制对池的访问所需的同步,与维护池本身同等性所需的任何同步分开。
初始化为1的信号量,其使用方式是最多只有一个可用的许可证,可以用作互斥锁。这通常被称为二进制信号量,因为它只有两个状态:一个许可证可用,或者零个许可证可用。以这种方式使用时,二进制信号量的属性(与许多java.util.concurrent.locks.Lock实现差异)是“锁”可以由所有者以外的线程释放(因为信号量没有所有权的概念)。这在某些特定的上下文中非常有用,比方死锁恢复。
此类的构造函数可以选择接受公平性参数。当设置为false时,此类不包管线程获取许可的顺序。特别是,允许bargging,也就是说,调用acquire的线程可以在一直在等待的线程之前分配一个许可证-从逻辑上讲,新线程将自己置于等待线程队列的头部。当公平性设置为true时,信号量包管选择调用任何acquire方法的线程,以按照其调用这些方法的处理顺序(先辈先出;先辈先出)。请注意,FIFO排序必然实用于这些方法中的特定内部执行点。因此,一个线程可以在另一个线程之前调用acquire,但在另一个线程之后到达排序点,雷同地,从方法返回时也是如此。还请注意,untimed tryAcquire方法不支持公平性设置,但将接受任何可用的许可。
通常,用于控制资源访问的信号量应该初始化为公平,以确保没有线程因访问资源而耗尽。当将信号量用于其他类型的同步控制时,非公平排序的吞吐量上风每每凌驾公平性考虑。
此类还提供了方便的方法,可以一次获取和发布多个许可证。当使用这些方法时,如果没有将公平设置为真,则要小心无限期耽误的风险增长。
内存同等性影响:在调用“release”方法(如release())之前的线程中的操作发生在另一个线程中成功的“acquire”方法(如acquire()之后的操作)之前。
原理分析
Semaphore 信号量,是控制并发的有用手段。它底层通过 AQS 实现。入下图所示:
https://p3.toutiaoimg.com/large/pgc-image/64523507ad474ba7bf31ee235461a44a
构造方法
Semaphore 构造方法有两个 Semaphore(int permits) 和 Semaphore(int permits, boolean fair) 后者有两个参数:第一个参数是许可数量初始化,第二个参数界说信号量是否公平锁同步(默认为非公平)。
public Semaphore(int permits) { sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits);}acquire 方法
acquire 方法可以为理解获取许可,如果存在剩余许可那么就可以进入后续代码块,如果没有获取线程进入阻塞。 在共享模式下获取,如果停止将中止。通过首先查抄停止状态,然后调用至少一次tryAcquireShared,并在成功时返返来实现。否则线程将排队,可能会重复阻塞和取消阻塞,调用tryAcquireShared,直到成功或线程停止。
release 方法
acquire 方法可以为理表明放许可,其他等待许可的线程进入资源竞争阶段。然后去查找等待队列队头有用的等待节点举行叫醒。
整体流程
https://p6.toutiaoimg.com/large/pgc-image/f5b9324ecea149d78de942927e6a2e5a
举个例子
场景描述
对于控制流量,或者控制并发我们可以使用 Semaphore 信号量来完成。 例子: 有100 个人必要过桥,但是桥上最多同时可以或许承受 5 个人的重量。如果我们必要有序的过桥那么就可以采取信号量的方式来控制。
[*]初始化 5 个许可。
[*]上桥之前先去获取 许可,如果有剩余许可就上桥。
[*]如果没有 许可,就等待许可。
https://p9.toutiaoimg.com/large/pgc-image/f0874fe04565445fbc57603b425866d0
模拟代码
[*]首先界说桥对象,入下所示:
public class Bridge { private String name; private String address; private Integer max; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public Integer getMax() { return max; } public void setMax(Integer max) { this.max = max; }}
[*]然后界说迁徙者对象,就是过桥的人,然后他有个动作就是过桥。代码如下所示。
public class Migrator { private String name; public void gapBridge() { System.out.println("Migrator: " + this.name + ", time:" + System.currentTimeMillis()); } public String getName() { return name; } public void setName(String name) { this.name = name; }}
[*]调用代码如下:
public class MainTest { public static void main(String[] args) { Bridge bridge = new Bridge(); bridge.setAddress("云南"); bridge.setName("XX 桥"); bridge.setMax(5); Semaphore semaphore = new Semaphore(bridge.getMax()); for (int i=0; i< 100; i++) { int idx = i; new Thread(()-> { try { Migrator migrator = new Migrator(); migrator.setName("name-" + idx); semaphore.acquire(); TimeUnit.SECONDS.sleep(1); migrator.gapBridge(); System.out.println("name " + migrator.getName() + " 通过"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start(); } }}
[*]输出日志如下:我们可以看到刚开始的时候有 5 个线程获取到 "许可" 几乎同时过桥,后面逐渐就是释放一个许可,另外一个线程继续执行。
Migrator: name-7, time:1630495912011name name-7 通过Migrator: name-2, time:1630495912011name name-2 通过Migrator: name-4, time:1630495912011Migrator: name-8, time:1630495912011Migrator: name-3, time:1630495912011name name-3 通过name name-8 通过name name-4 通过Migrator: name-5, time:1630495913012name name-5 通过Migrator: name-0, time:1630495913012name name-0 通过Migrator: name-6, time:1630495913013
页:
[1]