创意电子

标题: 百度 UidGenerator 源码剖析 [打印本页]

作者: 小盒子的技术分享    时间: 2021-11-5 16:00
标题: 百度 UidGenerator 源码剖析
简介

先来看一下官方介绍:

雪花算法

[size=1.882em]
雪花算法(Snowflake)是一种生成分布式全局唯一 ID 的算法,生成的 ID 称为 Snowflake IDs 或 snowflakes。这种算法由 Twitter 创建,并用于推文的 ID。Discord 和 Instagram 等其他公司采用了修改后的版本。一个 Snowflake ID 有 64 位。前 41 位是时间戳,表示了自选定的时期以来的毫秒数。接下来的 10 位代表计算机 ID,防止冲突。其余 12 位代表每台呆板上生成 ID 的序列号,这允许在同一毫秒内创建多个 Snowflake ID。SnowflakeID 基于时间生成,故可以按时间排序。别的,一个 ID 的生成时间可以由其自身推断出来,反之亦然。该特性可以用于按时间筛选 ID,以及与之联系的对象。

[size=1.882em]

                               
登录/注册后可看大图

第 1 位

该位不用主要是为了保持 ID 的自增特性,若利用了最高位,int64 会表示为负数。在 Java 中由于 long 范例的最高位是符号位,正数是 0,负数是 1,一般生成的 ID 为正整数,所以最高位为 0

41 位时间戳

毫秒级的时间,一般实现上不会存储当前的时间戳,而是时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始;

41 bit 可以表示的数字多达 2^41 - 1,也就是可以标识 2 ^ 41 - 1 个毫秒值,换算成年就是表示 69 年的时间。


<span style="letter-spacing: 1px;">(1L                      bitsAllocator.getMaxDeltaSeconds()) {        throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);    }    return currentSecond;}
disposableWorkerIdAssigner

Worker ID 分配器,用于为每个工作呆板分配一个唯一的 ID,现在来说是用完即弃,在初始化 Bean 的时间会自动向 MySQL 中插入一条关于该服务的启动信息,待 MySQL 返回其自增 ID 之后,利用该 ID 作为工作呆板 ID 并柔和到 UID 的生成当中。

@Transactionalpublic long assignWorkerId() {    // build worker node entity    WorkerNodeEntity workerNodeEntity = buildWorkerNode();    // add worker node for new (ignore the same IP + PORT)    workerNodeDAO.addWorkerNode(workerNodeEntity);    LOGGER.info("Add worker node:" + workerNodeEntity);    return workerNodeEntity.getId();}
buildWorkerNode() 为获取该启动服务的信息,兼容 Docker 服务。但要注意,无论是 docker 还是用 k8s,需要添加相关的情况变量 env 在设置文件中以便程序可以或许获取到。

/** Environment param keys 主要是端口和 host */    private static final String ENV_KEY_HOST = "JPAAS_HOST";    private static final String ENV_KEY_PORT = "JPAAS_HTTP_PORT";
核心方法

介绍完上面这些,我们来看下 [size=0.882em]defaultUidGenerator 生成 ID 的核心方法(注意这个方法是[size=0.882em]同步方法)

protected synchronized long nextId() {    long currentSecond = getCurrentSecond();    // 时钟向后移动,拒绝生成 id (解决时钟回拨题目)    if (currentSecond < lastSecond) {        long refusedSeconds = lastSecond - currentSecond;        throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);    }    // 如果是在同一秒内,那么增加 sequence    if (currentSecond == lastSecond) {        sequence = (sequence + 1) & bitsAllocator.getMaxSequence();        // 如果凌驾了最大值,那么需要比及下一秒再举行生成        if (sequence == 0) {            currentSecond = getNextSecond(lastSecond);        }    // 不同秒的情况下,sequence 重新从 0 开始计数    } else {        sequence = 0L;    }    lastSecond = currentSecond;    // Allocate bits for UID    return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);}
可以看到大部门代码用来处理异常情况,比如时钟回拨题目,这里的做法比较简单,就是直接抛出异常。

末了一行才是根据传入的或计算好的参数举行 ID 的真正分配,通过二进制的移位和或运算得到最终的 long ID 值。

public long allocate(long deltaSeconds, long workerId, long sequence) {    return (deltaSeconds  0 && paddingFactor < 100, "RingBuffer size must be positive");        this.bufferSize = bufferSize;        this.indexMask = bufferSize - 1;        this.slots = new long[bufferSize];        this.flags = initFlags(bufferSize);                this.paddingThreshold = bufferSize * paddingFactor / 100;    }
bufferSize 的默认值 ,如果 sequence 是 13 位,那么默认最大值是 8192,且是支持扩容的。

触发添补缓冲区的阈值也是支持设置的,

RingBuffer 的添补和获取

RingBuffer 的添补和获取操作是线程安全的,但是添补和获取操作的性能会受到 RingBuffer 的大小的影响,先来看下 [size=0.882em]put 操作:

public synchronized boolean put(long uid) {        long currentTail = tail.get();        long currentCursor = cursor.get();        // 当 tail 追上了 cursor 时,表示 RingBuffer 满了,不能再放了        // Tail 不能凌驾 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy        long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);        if (distance == bufferSize - 1) {            rejectedPutHandler.rejectPutBuffer(this, uid);            return false;        }        // 1. 预检查 flag 是否为 CAN_PUT_FLAG,初次 put 时,currentTail 为-1        int nextTailIndex = calSlotIndex(currentTail + 1);        if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {            rejectedPutHandler.rejectPutBuffer(this, uid);            return false;        }        // 2. put UID in the next slot        slots[nextTailIndex] = uid;        // 3. update next slot&#39; flag to CAN_TAKE_FLAG        flags[nextTailIndex].set(CAN_TAKE_FLAG);        // 4. publish tail with sequence increase by one 移动 tail        tail.incrementAndGet();                // 上述操作的原子性由“synchronized”包管。换句话说        // take 操作不能消费我们刚刚放的 UID,直到 tail 移动 (tail.incrementAndGet()) 才可以        return true;    }
注意 put 方法是 synchronized。再来看下 [size=0.882em]take 方法:

UID 的读取是一个无锁的操作。在获取 UID 之前,还要检查是否到达了 padding 阈值,在另一个线程中会触发 padding buffer 操作,如果没有更多可用的 UID 可以获取,则应用指定的 RejectedTakeBufferHandler

public long take() {    // spin get next available cursor    long currentCursor = cursor.get();    // cursor 初始化为-1,现在 cursor 即是 tail,所以初始化时 nextCursor 为-1    long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);    // check for safety consideration, it never occurs    // 初始化或者全部 UID 耗尽时 nextCursor == currentCursor    Assert.isTrue(nextCursor >= currentCursor, "Curosr can&#39;t move back");    // 如果到达阈值,则以异步模式触发添补    long currentTail = tail.get();    if (currentTail - nextCursor < paddingThreshold) {        LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,                nextCursor, currentTail - nextCursor);        bufferPaddingExecutor.asyncPadding();    }    // cursor 追上 tail ,意味着没有更多可用的 UID 可以获取    if (nextCursor == currentCursor) {        rejectedTakeHandler.rejectTakeBuffer(this);    }    // 1. check next slot flag is CAN_TAKE_FLAG    int nextCursorIndex = calSlotIndex(nextCursor);    // 这个位置必须要是可以 TAKE    Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");    // 2. get UID from next slot    // 3. set next slot flag as CAN_PUT_FLAG.    long uid = slots[nextCursorIndex];    // 告知 flags 数组这个位置是可以被重用了    flags[nextCursorIndex].set(CAN_PUT_FLAG);    // 注意:步骤 2,3 不能交换。如果我们在获取 slot 的值之前设置 flag,生产者可能会用新的 UID 覆盖 slot,这可能会导致消费者在一个 ring 中  获取 UID 两次    return uid; }
BufferPaddingExecutor

默认情况下,slots 被消费大于 50%的时间举行异步添补,这个添补由 BufferPaddingExecutor 所执行的,下面我们马上看看这个执行者的主要代码。

/**    * Padding buffer fill the slots until to catch the cursor    * 该方法被即时添补和定期添补所调用    */public void paddingBuffer() {    LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);    // is still running    // 这个是代表添补 executor 在执行,不是 RingBuffer 在执行。避免多个线程同时扩容。    if (!running.compareAndSet(false, true)) {        LOGGER.info("Padding buffer is still running. {}", ringBuffer);        return;    }    // fill the rest slots until to catch the cursor    boolean isFullRingBuffer = false;    while (!isFullRingBuffer) {        // 添补完指定 SECOND 内里的所有 UID,直至填满        List uidList = uidProvider.provide(lastSecond.incrementAndGet());        for (Long uid : uidList) {            isFullRingBuffer = !ringBuffer.put(uid);            if (isFullRingBuffer) {                break;            }        }    }    // not running now    running.compareAndSet(true, false);    LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);}
利用 RingBuffer 的 UID 生成器

末了我们看一下利用 CachedUidGenerator 生成 UID 的代码,CachedUidGenerator 继续了 DefaultUidGenerator,实现了 UidGenerator 接口。

该类在应用中作为 Spring Bean 注入到各个组件中,主要作用是初始化 RingBuffer 和 BufferPaddingExecutor。最重要的方法为 BufferedUidProvider 的提供者,即 lambda 表达式中的 nextIdsForOneSecond(long) 方法。

/**     * Get the UIDs in the same specified second under the max sequence     *      * @param currentSecond     * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1     */    protected List nextIdsForOneSecond(long currentSecond) {        // Initialize result list size of (max sequence + 1)        int listSize = (int) bitsAllocator.getMaxSequence() + 1;        List uidList = new ArrayList(listSize);        // Allocate the first sequence of the second, the others can be calculated with the offset        long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);        for (int offset = 0; offset < listSize; offset++) {            uidList.add(firstSeqUid + offset);        }        return uidList;    }
获取 ID 是通过委托 RingBuffer 的 take() 方法达成的

    @Override    public long getUID() {        try {            return ringBuffer.take();        } catch (Exception e) {            LOGGER.error("Generate unique id exception. ", e);            throw new UidGenerateException(e);        }    }
这里通过时序图再串一下 获取 id 的流程。


                               
登录/注册后可看大图

几段位运算代码

判定是不是 2 的幂

利用 bitCount 函数,原理是计算参数通报的 int 值的二进制值有多少个 1,如果只有 1 个,则说明是 2 的幂,否则不是。

Integer.bitCount(bufferSize) == 1根据位数取最大值

// initialize max valuethis.maxDeltaSeconds = ~(-1L >> (workerIdBits + sequenceBits);参考


作者: 七大姑八大姨二大爷    时间: 2021-11-5 18:38
转发了




欢迎光临 创意电子 (https://wxcydz.cc/) Powered by Discuz! X3.4