简介
先来看一下官方介绍:
雪花算法
[size=1.882em]“
雪花算法(Snowflake)是一种生成分布式全局唯一 ID 的算法,生成的 ID 称为 Snowflake IDs 或 snowflakes。这种算法由 Twitter 创建,并用于推文的 ID。Discord 和 Instagram 等其他公司采用了修改后的版本。一个 Snowflake ID 有 64 位。前 41 位是时间戳,表示了自选定的时期以来的毫秒数。接下来的 10 位代表计算机 ID,防止冲突。其余 12 位代表每台呆板上生成 ID 的序列号,这允许在同一毫秒内创建多个 Snowflake ID。SnowflakeID 基于时间生成,故可以按时间排序。别的,一个 ID 的生成时间可以由其自身推断出来,反之亦然。该特性可以用于按时间筛选 ID,以及与之联系的对象。
[size=1.882em]”
第 1 位
该位不用主要是为了保持 ID 的自增特性,若利用了最高位,int64 会表示为负数。在 Java 中由于 long 范例的最高位是符号位,正数是 0,负数是 1,一般生成的 ID 为正整数,所以最高位为 0
41 位时间戳
毫秒级的时间,一般实现上不会存储当前的时间戳,而是时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始;
41 bit 可以表示的数字多达 2^41 - 1,也就是可以标识 2 ^ 41 - 1 个毫秒值,换算成年就是表示 69 年的时间。
<span style="letter-spacing: 1px;">(1L bitsAllocator.getMaxDeltaSeconds()) { throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond); } return currentSecond;}disposableWorkerIdAssigner
Worker ID 分配器,用于为每个工作呆板分配一个唯一的 ID,现在来说是用完即弃,在初始化 Bean 的时间会自动向 MySQL 中插入一条关于该服务的启动信息,待 MySQL 返回其自增 ID 之后,利用该 ID 作为工作呆板 ID 并柔和到 UID 的生成当中。
@Transactionalpublic long assignWorkerId() { // build worker node entity WorkerNodeEntity workerNodeEntity = buildWorkerNode(); // add worker node for new (ignore the same IP + PORT) workerNodeDAO.addWorkerNode(workerNodeEntity); LOGGER.info("Add worker node:" + workerNodeEntity); return workerNodeEntity.getId();}buildWorkerNode() 为获取该启动服务的信息,兼容 Docker 服务。但要注意,无论是 docker 还是用 k8s,需要添加相关的情况变量 env 在设置文件中以便程序可以或许获取到。
/** Environment param keys 主要是端口和 host */ private static final String ENV_KEY_HOST = "JPAAS_HOST"; private static final String ENV_KEY_PORT = "JPAAS_HTTP_PORT";核心方法
介绍完上面这些,我们来看下 [size=0.882em]defaultUidGenerator 生成 ID 的核心方法(注意这个方法是[size=0.882em]同步方法)
protected synchronized long nextId() { long currentSecond = getCurrentSecond(); // 时钟向后移动,拒绝生成 id (解决时钟回拨题目) if (currentSecond < lastSecond) { long refusedSeconds = lastSecond - currentSecond; throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds); } // 如果是在同一秒内,那么增加 sequence if (currentSecond == lastSecond) { sequence = (sequence + 1) & bitsAllocator.getMaxSequence(); // 如果凌驾了最大值,那么需要比及下一秒再举行生成 if (sequence == 0) { currentSecond = getNextSecond(lastSecond); } // 不同秒的情况下,sequence 重新从 0 开始计数 } else { sequence = 0L; } lastSecond = currentSecond; // Allocate bits for UID return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);}可以看到大部门代码用来处理异常情况,比如时钟回拨题目,这里的做法比较简单,就是直接抛出异常。
末了一行才是根据传入的或计算好的参数举行 ID 的真正分配,通过二进制的移位和或运算得到最终的 long ID 值。
public long allocate(long deltaSeconds, long workerId, long sequence) { return (deltaSeconds 0 && paddingFactor < 100, "RingBuffer size must be positive"); this.bufferSize = bufferSize; this.indexMask = bufferSize - 1; this.slots = new long[bufferSize]; this.flags = initFlags(bufferSize); this.paddingThreshold = bufferSize * paddingFactor / 100; }bufferSize 的默认值 ,如果 sequence 是 13 位,那么默认最大值是 8192,且是支持扩容的。
触发添补缓冲区的阈值也是支持设置的,
RingBuffer 的添补和获取
RingBuffer 的添补和获取操作是线程安全的,但是添补和获取操作的性能会受到 RingBuffer 的大小的影响,先来看下 [size=0.882em]put 操作:
public synchronized boolean put(long uid) { long currentTail = tail.get(); long currentCursor = cursor.get(); // 当 tail 追上了 cursor 时,表示 RingBuffer 满了,不能再放了 // Tail 不能凌驾 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor); if (distance == bufferSize - 1) { rejectedPutHandler.rejectPutBuffer(this, uid); return false; } // 1. 预检查 flag 是否为 CAN_PUT_FLAG,初次 put 时,currentTail 为-1 int nextTailIndex = calSlotIndex(currentTail + 1); if (flags[nextTailIndex].get() != CAN_PUT_FLAG) { rejectedPutHandler.rejectPutBuffer(this, uid); return false; } // 2. put UID in the next slot slots[nextTailIndex] = uid; // 3. update next slot' flag to CAN_TAKE_FLAG flags[nextTailIndex].set(CAN_TAKE_FLAG); // 4. publish tail with sequence increase by one 移动 tail tail.incrementAndGet(); // 上述操作的原子性由“synchronized”包管。换句话说 // take 操作不能消费我们刚刚放的 UID,直到 tail 移动 (tail.incrementAndGet()) 才可以 return true; }注意 put 方法是 synchronized。再来看下 [size=0.882em]take 方法:
UID 的读取是一个无锁的操作。在获取 UID 之前,还要检查是否到达了 padding 阈值,在另一个线程中会触发 padding buffer 操作,如果没有更多可用的 UID 可以获取,则应用指定的 RejectedTakeBufferHandler
public long take() { // spin get next available cursor long currentCursor = cursor.get(); // cursor 初始化为-1,现在 cursor 即是 tail,所以初始化时 nextCursor 为-1 long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1); // check for safety consideration, it never occurs // 初始化或者全部 UID 耗尽时 nextCursor == currentCursor Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back"); // 如果到达阈值,则以异步模式触发添补 long currentTail = tail.get(); if (currentTail - nextCursor < paddingThreshold) { LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail, nextCursor, currentTail - nextCursor); bufferPaddingExecutor.asyncPadding(); } // cursor 追上 tail ,意味着没有更多可用的 UID 可以获取 if (nextCursor == currentCursor) { rejectedTakeHandler.rejectTakeBuffer(this); } // 1. check next slot flag is CAN_TAKE_FLAG int nextCursorIndex = calSlotIndex(nextCursor); // 这个位置必须要是可以 TAKE Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status"); // 2. get UID from next slot // 3. set next slot flag as CAN_PUT_FLAG. long uid = slots[nextCursorIndex]; // 告知 flags 数组这个位置是可以被重用了 flags[nextCursorIndex].set(CAN_PUT_FLAG); // 注意:步骤 2,3 不能交换。如果我们在获取 slot 的值之前设置 flag,生产者可能会用新的 UID 覆盖 slot,这可能会导致消费者在一个 ring 中 获取 UID 两次 return uid; }BufferPaddingExecutor
默认情况下,slots 被消费大于 50%的时间举行异步添补,这个添补由 BufferPaddingExecutor 所执行的,下面我们马上看看这个执行者的主要代码。
/** * Padding buffer fill the slots until to catch the cursor * 该方法被即时添补和定期添补所调用 */public void paddingBuffer() { LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer); // is still running // 这个是代表添补 executor 在执行,不是 RingBuffer 在执行。避免多个线程同时扩容。 if (!running.compareAndSet(false, true)) { LOGGER.info("Padding buffer is still running. {}", ringBuffer); return; } // fill the rest slots until to catch the cursor boolean isFullRingBuffer = false; while (!isFullRingBuffer) { // 添补完指定 SECOND 内里的所有 UID,直至填满 List uidList = uidProvider.provide(lastSecond.incrementAndGet()); for (Long uid : uidList) { isFullRingBuffer = !ringBuffer.put(uid); if (isFullRingBuffer) { break; } } } // not running now running.compareAndSet(true, false); LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);}
- 当线程池分发多条线程来执行添补任务的时间,乐成抢夺运行状态的线程会真正执行对 RingBuffer 添补,直至全部填满,其他抢夺失败的线程将会直接返回。
- 该类还提供定时添补功能,如果有设置开关则会生效,默认不会启用周期性添补
- RIngBuffer 的添补机遇有 3 个:CachedUidGenerator 时对 RIngBuffer 初始化、RIngBuffer#take() 时检测到达阈值和周期性添补(如果有打开)
利用 RingBuffer 的 UID 生成器
末了我们看一下利用 CachedUidGenerator 生成 UID 的代码,CachedUidGenerator 继续了 DefaultUidGenerator,实现了 UidGenerator 接口。
该类在应用中作为 Spring Bean 注入到各个组件中,主要作用是初始化 RingBuffer 和 BufferPaddingExecutor。最重要的方法为 BufferedUidProvider 的提供者,即 lambda 表达式中的 nextIdsForOneSecond(long) 方法。
/** * Get the UIDs in the same specified second under the max sequence * * @param currentSecond * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1 */ protected List nextIdsForOneSecond(long currentSecond) { // Initialize result list size of (max sequence + 1) int listSize = (int) bitsAllocator.getMaxSequence() + 1; List uidList = new ArrayList(listSize); // Allocate the first sequence of the second, the others can be calculated with the offset long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L); for (int offset = 0; offset < listSize; offset++) { uidList.add(firstSeqUid + offset); } return uidList; }获取 ID 是通过委托 RingBuffer 的 take() 方法达成的
@Override public long getUID() { try { return ringBuffer.take(); } catch (Exception e) { LOGGER.error("Generate unique id exception. ", e); throw new UidGenerateException(e); } }这里通过时序图再串一下 获取 id 的流程。
几段位运算代码
判定是不是 2 的幂
利用 bitCount 函数,原理是计算参数通报的 int 值的二进制值有多少个 1,如果只有 1 个,则说明是 2 的幂,否则不是。
Integer.bitCount(bufferSize) == 1根据位数取最大值
// initialize max valuethis.maxDeltaSeconds = ~(-1L >> (workerIdBits + sequenceBits);参考
- https://www.cnblogs.com/cyfonly/p/5800758.html
- http://www.semlinker.com/uuid-snowflake/
- https://programmer.group/snowflake-algorithm-improved-version-snowflake.html
- https://github.com/baidu/uid-generator/blob/master/README.zh_cn.md
- http://blog.chriscs.com/2017/08/02/baidu-uid-generator/
|