小盒子的技术分享 发表于 2021-11-5 16:00:23

百度 UidGenerator 源码剖析

简介

先来看一下官方介绍:
雪花算法


雪花算法(Snowflake)是一种生成分布式全局唯一 ID 的算法,生成的 ID 称为 Snowflake IDs 或 snowflakes。这种算法由 Twitter 创建,并用于推文的 ID。Discord 和 Instagram 等其他公司采用了修改后的版本。一个 Snowflake ID 有 64 位。前 41 位是时间戳,表示了自选定的时期以来的毫秒数。接下来的 10 位代表计算机 ID,防止冲突。其余 12 位代表每台呆板上生成 ID 的序列号,这允许在同一毫秒内创建多个 Snowflake ID。SnowflakeID 基于时间生成,故可以按时间排序。别的,一个 ID 的生成时间可以由其自身推断出来,反之亦然。该特性可以用于按时间筛选 ID,以及与之联系的对象。

https://p5.toutiaoimg.com/large/pgc-image/6f4abef4c9e54207ac82ffecac47724b
第 1 位
该位不用主要是为了保持 ID 的自增特性,若利用了最高位,int64 会表示为负数。在 Java 中由于 long 范例的最高位是符号位,正数是 0,负数是 1,一般生成的 ID 为正整数,所以最高位为 0
41 位时间戳
毫秒级的时间,一般实现上不会存储当前的时间戳,而是时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始;
41 bit 可以表示的数字多达 2^41 - 1,也就是可以标识 2 ^ 41 - 1 个毫秒值,换算成年就是表示 69 年的时间。

<span style="letter-spacing: 1px;">(1L                      bitsAllocator.getMaxDeltaSeconds()) {      throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);    }    return currentSecond;}disposableWorkerIdAssigner
Worker ID 分配器,用于为每个工作呆板分配一个唯一的 ID,现在来说是用完即弃,在初始化 Bean 的时间会自动向 MySQL 中插入一条关于该服务的启动信息,待 MySQL 返回其自增 ID 之后,利用该 ID 作为工作呆板 ID 并柔和到 UID 的生成当中。
@Transactionalpublic long assignWorkerId() {    // build worker node entity    WorkerNodeEntity workerNodeEntity = buildWorkerNode();    // add worker node for new (ignore the same IP + PORT)    workerNodeDAO.addWorkerNode(workerNodeEntity);    LOGGER.info("Add worker node:" + workerNodeEntity);    return workerNodeEntity.getId();}buildWorkerNode() 为获取该启动服务的信息,兼容 Docker 服务。但要注意,无论是 docker 还是用 k8s,需要添加相关的情况变量 env 在设置文件中以便程序可以或许获取到。
/** Environment param keys 主要是端口和 host */    private static final String ENV_KEY_HOST = "JPAAS_HOST";    private static final String ENV_KEY_PORT = "JPAAS_HTTP_PORT";核心方法
介绍完上面这些,我们来看下 defaultUidGenerator 生成 ID 的核心方法(注意这个方法是同步方法)
protected synchronized long nextId() {    long currentSecond = getCurrentSecond();    // 时钟向后移动,拒绝生成 id (解决时钟回拨题目)    if (currentSecond < lastSecond) {      long refusedSeconds = lastSecond - currentSecond;      throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);    }    // 如果是在同一秒内,那么增加 sequence    if (currentSecond == lastSecond) {      sequence = (sequence + 1) & bitsAllocator.getMaxSequence();      // 如果凌驾了最大值,那么需要比及下一秒再举行生成      if (sequence == 0) {            currentSecond = getNextSecond(lastSecond);      }    // 不同秒的情况下,sequence 重新从 0 开始计数    } else {      sequence = 0L;    }    lastSecond = currentSecond;    // Allocate bits for UID    return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);}可以看到大部门代码用来处理异常情况,比如时钟回拨题目,这里的做法比较简单,就是直接抛出异常。
末了一行才是根据传入的或计算好的参数举行 ID 的真正分配,通过二进制的移位和或运算得到最终的 long ID 值。
public long allocate(long deltaSeconds, long workerId, long sequence) {    return (deltaSeconds0 && paddingFactor < 100, "RingBuffer size must be positive");      this.bufferSize = bufferSize;      this.indexMask = bufferSize - 1;      this.slots = new long;      this.flags = initFlags(bufferSize);                this.paddingThreshold = bufferSize * paddingFactor / 100;    }bufferSize 的默认值 ,如果 sequence 是 13 位,那么默认最大值是 8192,且是支持扩容的。
触发添补缓冲区的阈值也是支持设置的,
RingBuffer 的添补和获取
RingBuffer 的添补和获取操作是线程安全的,但是添补和获取操作的性能会受到 RingBuffer 的大小的影响,先来看下 put 操作:
public synchronized boolean put(long uid) {      long currentTail = tail.get();      long currentCursor = cursor.get();      // 当 tail 追上了 cursor 时,表示 RingBuffer 满了,不能再放了      // Tail 不能凌驾 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy      long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);      if (distance == bufferSize - 1) {            rejectedPutHandler.rejectPutBuffer(this, uid);            return false;      }      // 1. 预检查 flag 是否为 CAN_PUT_FLAG,初次 put 时,currentTail 为-1      int nextTailIndex = calSlotIndex(currentTail + 1);      if (flags.get() != CAN_PUT_FLAG) {            rejectedPutHandler.rejectPutBuffer(this, uid);            return false;      }      // 2. put UID in the next slot      slots = uid;      // 3. update next slot&#39; flag to CAN_TAKE_FLAG      flags.set(CAN_TAKE_FLAG);      // 4. publish tail with sequence increase by one 移动 tail      tail.incrementAndGet();                // 上述操作的原子性由“synchronized”包管。换句话说      // take 操作不能消费我们刚刚放的 UID,直到 tail 移动 (tail.incrementAndGet()) 才可以      return true;    }注意 put 方法是 synchronized。再来看下 take 方法:
UID 的读取是一个无锁的操作。在获取 UID 之前,还要检查是否到达了 padding 阈值,在另一个线程中会触发 padding buffer 操作,如果没有更多可用的 UID 可以获取,则应用指定的 RejectedTakeBufferHandler
public long take() {    // spin get next available cursor    long currentCursor = cursor.get();    // cursor 初始化为-1,现在 cursor 即是 tail,所以初始化时 nextCursor 为-1    long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);    // check for safety consideration, it never occurs    // 初始化或者全部 UID 耗尽时 nextCursor == currentCursor    Assert.isTrue(nextCursor >= currentCursor, "Curosr can&#39;t move back");    // 如果到达阈值,则以异步模式触发添补    long currentTail = tail.get();    if (currentTail - nextCursor < paddingThreshold) {      LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,                nextCursor, currentTail - nextCursor);      bufferPaddingExecutor.asyncPadding();    }    // cursor 追上 tail ,意味着没有更多可用的 UID 可以获取    if (nextCursor == currentCursor) {      rejectedTakeHandler.rejectTakeBuffer(this);    }    // 1. check next slot flag is CAN_TAKE_FLAG    int nextCursorIndex = calSlotIndex(nextCursor);    // 这个位置必须要是可以 TAKE    Assert.isTrue(flags.get() == CAN_TAKE_FLAG, "Curosr not in can take status");    // 2. get UID from next slot    // 3. set next slot flag as CAN_PUT_FLAG.    long uid = slots;    // 告知 flags 数组这个位置是可以被重用了    flags.set(CAN_PUT_FLAG);    // 注意:步骤 2,3 不能交换。如果我们在获取 slot 的值之前设置 flag,生产者可能会用新的 UID 覆盖 slot,这可能会导致消费者在一个 ring 中获取 UID 两次    return uid; }BufferPaddingExecutor
默认情况下,slots 被消费大于 50%的时间举行异步添补,这个添补由 BufferPaddingExecutor 所执行的,下面我们马上看看这个执行者的主要代码。
/**    * Padding buffer fill the slots until to catch the cursor    * 该方法被即时添补和定期添补所调用    */public void paddingBuffer() {    LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);    // is still running    // 这个是代表添补 executor 在执行,不是 RingBuffer 在执行。避免多个线程同时扩容。    if (!running.compareAndSet(false, true)) {      LOGGER.info("Padding buffer is still running. {}", ringBuffer);      return;    }    // fill the rest slots until to catch the cursor    boolean isFullRingBuffer = false;    while (!isFullRingBuffer) {      // 添补完指定 SECOND 内里的所有 UID,直至填满      List uidList = uidProvider.provide(lastSecond.incrementAndGet());      for (Long uid : uidList) {            isFullRingBuffer = !ringBuffer.put(uid);            if (isFullRingBuffer) {                break;            }      }    }    // not running now    running.compareAndSet(true, false);    LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);}

[*]当线程池分发多条线程来执行添补任务的时间,乐成抢夺运行状态的线程会真正执行对 RingBuffer 添补,直至全部填满,其他抢夺失败的线程将会直接返回。
[*]该类还提供定时添补功能,如果有设置开关则会生效,默认不会启用周期性添补
[*]RIngBuffer 的添补机遇有 3 个:CachedUidGenerator 时对 RIngBuffer 初始化、RIngBuffer#take() 时检测到达阈值和周期性添补(如果有打开)
利用 RingBuffer 的 UID 生成器
末了我们看一下利用 CachedUidGenerator 生成 UID 的代码,CachedUidGenerator 继续了 DefaultUidGenerator,实现了 UidGenerator 接口。
该类在应用中作为 Spring Bean 注入到各个组件中,主要作用是初始化 RingBuffer 和 BufferPaddingExecutor。最重要的方法为 BufferedUidProvider 的提供者,即 lambda 表达式中的 nextIdsForOneSecond(long) 方法。
/**   * Get the UIDs in the same specified second under the max sequence   *      * @param currentSecond   * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1   */    protected List nextIdsForOneSecond(long currentSecond) {      // Initialize result list size of (max sequence + 1)      int listSize = (int) bitsAllocator.getMaxSequence() + 1;      List uidList = new ArrayList(listSize);      // Allocate the first sequence of the second, the others can be calculated with the offset      long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);      for (int offset = 0; offset < listSize; offset++) {            uidList.add(firstSeqUid + offset);      }      return uidList;    }获取 ID 是通过委托 RingBuffer 的 take() 方法达成的
    @Override    public long getUID() {      try {            return ringBuffer.take();      } catch (Exception e) {            LOGGER.error("Generate unique id exception. ", e);            throw new UidGenerateException(e);      }    }这里通过时序图再串一下 获取 id 的流程。
https://p26.toutiaoimg.com/large/pgc-image/e16ac65c8b824c3c84b33e51401710b8
几段位运算代码

判定是不是 2 的幂

利用 bitCount 函数,原理是计算参数通报的 int 值的二进制值有多少个 1,如果只有 1 个,则说明是 2 的幂,否则不是。
Integer.bitCount(bufferSize) == 1根据位数取最大值

// initialize max valuethis.maxDeltaSeconds = ~(-1L >> (workerIdBits + sequenceBits);参考


[*]https://www.cnblogs.com/cyfonly/p/5800758.html
[*]http://www.semlinker.com/uuid-snowflake/
[*]https://programmer.group/snowflake-algorithm-improved-version-snowflake.html
[*]https://github.com/baidu/uid-generator/blob/master/README.zh_cn.md
[*]http://blog.chriscs.com/2017/08/02/baidu-uid-generator/

七大姑八大姨二大爷 发表于 2021-11-5 18:38:02

转发了
页: [1]
查看完整版本: 百度 UidGenerator 源码剖析