Snowflake算法时钟回拨重复发号?逻辑时钟偏移检测与Leaf-segment号段缓存

Snowflake算法时钟回拨重复发号?逻辑时钟偏移检测与Leaf-segment号段缓存

大家好,今天我们来深入探讨分布式ID生成器,特别是Snowflake算法在时钟回拨场景下的问题,以及如何通过逻辑时钟偏移检测和Leaf-segment号段缓存来解决这些问题。

Snowflake算法回顾

Snowflake算法是一种经典的分布式ID生成算法,它生成的ID通常是一个64位的长整型,由以下几部分组成:

  • Sign Bit (1 bit): 符号位,始终为0。
  • Timestamp (41 bits): 时间戳,记录自某个起始时间(epoch)以来的毫秒数。
  • Worker ID (10 bits): 工作节点ID,用于区分不同的部署节点。
  • Sequence Number (12 bits): 序列号,用于在同一毫秒内区分不同的ID。

这种结构使得Snowflake算法能够保证在分布式环境下生成全局唯一、趋势递增的ID。 然而,它对时间有着严格的要求,依赖于系统时钟的准确性。

时钟回拨问题

Snowflake算法最常见的问题之一就是时钟回拨。 如果服务器的时钟发生了回拨(例如,由于NTP同步或者人为调整),那么就有可能生成重复的ID。 这是因为时间戳是ID生成的重要组成部分,如果时间戳小于之前生成ID的时间戳,那么就可能出现重复ID。

例如:

  1. 系统在 T1 时刻生成了一个ID,时间戳为 T1。
  2. 系统时钟发生回拨,回退到 T0 时刻,T0 < T1。
  3. 系统在 T0 时刻又生成一个ID,时间戳为 T0。 由于Worker ID和Sequence Number的组合可能在T1时刻已经使用过,因此可能会生成重复的ID。

如何模拟时钟回拨

以下代码模拟了时钟回拨的场景:

import java.util.concurrent.TimeUnit;

public class Snowflake {

    private final long epoch;
    private final long workerId;
    private final long dataCenterId;
    private final long workerIdBits = 5L;
    private final long dataCenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
    private final long sequenceBits = 12L;
    private final long workerIdShift = sequenceBits;
    private final long dataCenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    private long sequence = 0L;
    private long lastTimestamp = -1L;

    public Snowflake(long workerId, long dataCenterId, long epoch) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
            throw new IllegalArgumentException(String.format("dataCenter Id can't be greater than %d or less than 0", maxDataCenterId));
        }
        this.workerId = workerId;
        this.dataCenterId = dataCenterId;
        this.epoch = epoch;
    }

    public synchronized long nextId() {
        long timestamp = timeGen();

        if (timestamp < lastTimestamp) {
            // 时钟回拨
            long offset = lastTimestamp - timestamp;
            if (offset <= 5) {
                try {
                    wait(offset << 1); // 等待两倍的回拨时间
                    timestamp = timeGen();
                    if (timestamp < lastTimestamp) {
                       throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", offset));
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", offset));
            }
        }

        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }

        lastTimestamp = timestamp;

        return ((timestamp - epoch) << timestampLeftShift) |
                (dataCenterId << dataCenterIdShift) |
                (workerId << workerIdShift) |
                sequence;
    }

    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    protected long timeGen() {
        return System.currentTimeMillis();
    }

    public static void main(String[] args) throws InterruptedException {
        Snowflake snowflake = new Snowflake(1, 1, 1609459200000L); // 2021-01-01 00:00:00
        long id1 = snowflake.nextId();
        System.out.println("Generated ID 1: " + id1);

        // 模拟时钟回拨
        TimeUnit.MILLISECONDS.sleep(10); // 模拟一些时间过去
        snowflake.lastTimestamp = System.currentTimeMillis() - 5; // 回拨5毫秒
        long id2 = snowflake.nextId();
        System.out.println("Generated ID 2: " + id2);
    }
}

上述代码展示了Snowflake算法的基本实现,并在 main 函数中模拟了时钟回拨。 可以看到在时钟回拨的情况下,代码会尝试等待一段时间,如果等待后时间仍然小于上一次生成的时间戳,则会抛出异常。

解决时钟回拨的常见策略

  1. 拒绝服务: 如果检测到时钟回拨,直接抛出异常,拒绝生成ID。 这是最简单粗暴的方式,但会影响服务的可用性。
  2. 等待: 当检测到时钟回拨时,等待一段时间,直到时钟追上之前的状态。 这种方式可能会导致ID生成延迟。
  3. 使用备用时钟源: 使用多个时钟源,例如NTP服务器,并在检测到时钟偏差时切换到其他时钟源。
  4. 修改时间戳: 将时间戳设置为一个大于之前生成ID的时间戳的值。 这种方式可能会导致ID不是严格递增,但可以保证唯一性。

逻辑时钟偏移检测

除了简单地比较时间戳,还可以使用逻辑时钟偏移检测来更灵活地处理时钟回拨。 逻辑时钟偏移检测的核心思想是:维护一个逻辑时钟,并根据一定的规则来更新它,而不是直接依赖于系统时钟。

实现逻辑时钟

import java.util.concurrent.atomic.AtomicLong;

public class LogicalClock {

    private final AtomicLong logicalClock;

    public LogicalClock(long initialTime) {
        this.logicalClock = new AtomicLong(initialTime);
    }

    public long now() {
        long currentTime = System.currentTimeMillis();
        long currentLogicalTime = logicalClock.get();
        long newLogicalTime = Math.max(currentTime, currentLogicalTime + 1);

        //如果currentTime比logicalClock小,说明发生了回拨
        if(currentTime < currentLogicalTime){
            System.out.println("Detected clock drift, current time is " + currentTime + ", logical clock is " + currentLogicalTime);
        }

        logicalClock.compareAndSet(currentLogicalTime, newLogicalTime);
        return newLogicalTime;
    }

    public static void main(String[] args) throws InterruptedException {
        LogicalClock logicalClock = new LogicalClock(System.currentTimeMillis());

        System.out.println("First logical time: " + logicalClock.now());
        Thread.sleep(10);
        System.out.println("Second logical time: " + logicalClock.now());

        // 模拟时钟回拨
        long currentTime = System.currentTimeMillis();
        Thread.sleep(5);
        System.currentTimeMillis();
        //手动设置logicalClock的值比当前时间大,模拟时间回拨。
        logicalClock.logicalClock.set(currentTime + 50);

        System.out.println("Third logical time after clock drift: " + logicalClock.now());
    }
}

在上述代码中,LogicalClock 类维护了一个 logicalClock 变量,它是一个原子长整型。 now() 方法首先获取当前的系统时间,然后将其与 logicalClock 的值进行比较。 如果系统时间大于 logicalClock 的值,则将 logicalClock 更新为系统时间加1。 否则,将 logicalClock 的值加1。 这样可以保证逻辑时钟始终是递增的,即使系统时钟发生了回拨。

将逻辑时钟应用于Snowflake

可以将逻辑时钟应用于Snowflake算法,以解决时钟回拨问题。 具体做法是,将Snowflake算法中的时间戳替换为逻辑时钟的值。

public class SnowflakeWithLogicalClock {

    private final long epoch;
    private final long workerId;
    private final long dataCenterId;
    private final long workerIdBits = 5L;
    private final long dataCenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
    private final long sequenceBits = 12L;
    private final long workerIdShift = sequenceBits;
    private final long dataCenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    private long sequence = 0L;
    //private long lastTimestamp = -1L;
    private final LogicalClock logicalClock;

    public SnowflakeWithLogicalClock(long workerId, long dataCenterId, long epoch) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
            throw new IllegalArgumentException(String.format("dataCenter Id can't be greater than %d or less than 0", maxDataCenterId));
        }
        this.workerId = workerId;
        this.dataCenterId = dataCenterId;
        this.epoch = epoch;
        this.logicalClock = new LogicalClock(System.currentTimeMillis());
    }

    public synchronized long nextId() {
        long timestamp = logicalClock.now(); // 使用逻辑时钟
        //long timestamp = timeGen();

        /*if (timestamp < lastTimestamp) {
            // 时钟回拨
            long offset = lastTimestamp - timestamp;
            if (offset <= 5) {
                try {
                    wait(offset << 1); // 等待两倍的回拨时间
                    timestamp = timeGen();
                    if (timestamp < lastTimestamp) {
                       throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", offset));
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", offset));
            }
        }*/

       // if (lastTimestamp == timestamp) {
        long lastTimestamp = timestamp;
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }

        //lastTimestamp = timestamp;

        return ((timestamp - epoch) << timestampLeftShift) |
                (dataCenterId << dataCenterIdShift) |
                (workerId << workerIdShift) |
                sequence;
    }

    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = logicalClock.now();
        while (timestamp <= lastTimestamp) {
            timestamp = logicalClock.now();
        }
        return timestamp;
    }

    protected long timeGen() {
        return System.currentTimeMillis();
    }

    public static void main(String[] args) throws InterruptedException {
        SnowflakeWithLogicalClock snowflake = new SnowflakeWithLogicalClock(1, 1, 1609459200000L); // 2021-01-01 00:00:00
        long id1 = snowflake.nextId();
        System.out.println("Generated ID 1: " + id1);

        // 模拟时钟回拨
        TimeUnit.MILLISECONDS.sleep(10); // 模拟一些时间过去
        //snowflake.lastTimestamp = System.currentTimeMillis() - 5; // 回拨5毫秒
        long id2 = snowflake.nextId();
        System.out.println("Generated ID 2: " + id2);
    }
}

在这个修改后的版本中,nextId() 方法不再直接使用 System.currentTimeMillis(),而是使用 logicalClock.now() 方法来获取时间戳。 这样可以保证即使系统时钟发生了回拨,生成的ID仍然是唯一的。

Leaf-segment号段缓存

Leaf-segment是一种号段模式的ID生成方案,它将ID生成过程分为两步:

  1. 获取号段: 从数据库或缓存中获取一个ID号段,例如 [1000, 2000]。
  2. 生成ID: 在号段内生成ID,例如 1001, 1002, …, 1999。

Leaf-segment可以显著提高ID生成效率,因为它减少了对数据库或缓存的访问次数。 同时,它也可以一定程度上缓解时钟回拨的影响。

实现Leaf-segment

import java.util.concurrent.atomic.AtomicLong;

public class LeafSegment {

    private final String bizTag; // 业务标识
    private final SegmentBuffer buffer; // 双buffer
    private final int step; // 每次获取的号段长度

    public LeafSegment(String bizTag, int step) {
        this.bizTag = bizTag;
        this.buffer = new SegmentBuffer();
        this.step = step;
    }

    public long nextId() {
        Segment segment = buffer.current();
        if (!buffer.isInitOk()) {
            synchronized (buffer) {
                if (!buffer.isInitOk()) {
                    updateNextSegment();
                    buffer.setInitOk(true);
                }
            }
            segment = buffer.current();
        }

        long value = segment.getValue().getAndIncrement();
        if (value < segment.getMax()) {
            return value;
        }

        if (buffer.nextReady()) {
            synchronized (buffer) {
                segment = buffer.current();
                if (value < segment.getMax()) {
                    return value;
                }

                buffer.switchNext();
                updateNextSegment();
                segment = buffer.current();
                return segment.getValue().getAndIncrement();
            }
        } else {
            synchronized (buffer) {
                if (!buffer.nextReady()) {
                    updateNextSegment();
                    buffer.setNextReady(true);
                }
            }
            return nextId();
        }
    }

    private void updateNextSegment() {
        // 从数据库或者缓存中获取下一个号段
        Segment nextSegment = getNextSegmentFromSource(bizTag, step);
        buffer.getSegments()[buffer.nextPos()].setValue(new AtomicLong(nextSegment.getMin()));
        buffer.getSegments()[buffer.nextPos()].setMax(nextSegment.getMax());
        buffer.setUpdateTimestamp(System.currentTimeMillis());
    }

    // 模拟从数据库或缓存中获取下一个号段
    private Segment getNextSegmentFromSource(String bizTag, int step) {
        // 实际场景中,这里应该从数据库或者缓存中获取下一个号段
        long currentMaxId = getCurrentMaxIdFromDatabase(bizTag); // 模拟从数据库获取当前最大ID
        long nextMinId = currentMaxId + 1;
        long nextMaxId = nextMinId + step;

        //这里可以模拟数据库操作,例如更新biz_tag的max_id字段
        updateMaxIdInDatabase(bizTag, nextMaxId);

        return new Segment(nextMinId, nextMaxId);
    }

    // 模拟从数据库获取当前最大ID
    private long getCurrentMaxIdFromDatabase(String bizTag) {
        // 实际场景中,这里应该从数据库中查询当前最大ID
        //这里模拟第一次获取的时候返回0
        return currentMaxIdMap.getOrDefault(bizTag, 0L);
    }

    // 模拟更新数据库中的最大ID
    private void updateMaxIdInDatabase(String bizTag, long nextMaxId) {
        // 实际场景中,这里应该更新数据库中的最大ID
        currentMaxIdMap.put(bizTag, nextMaxId);
    }

    private static class Segment {
        private long min;
        private long max;
        private AtomicLong value;

        public Segment(long min, long max) {
            this.min = min;
            this.max = max;
            this.value = new AtomicLong(min);
        }

        public long getMin() {
            return min;
        }

        public long getMax() {
            return max;
        }

        public AtomicLong getValue() {
            return value;
        }

        public void setValue(AtomicLong value) {
            this.value = value;
        }
    }

    private static class SegmentBuffer {
        private final Segment[] segments;
        private volatile int currentPos;
        private volatile boolean nextReady;
        private volatile boolean initOk;
        private volatile long updateTimestamp;

        public SegmentBuffer() {
            this.segments = new Segment[2];
            segments[0] = new Segment(0, 0);
            segments[1] = new Segment(0, 0);
            this.currentPos = 0;
            this.nextReady = false;
            this.initOk = false;
            this.updateTimestamp = 0;
        }

        public Segment current() {
            return segments[currentPos];
        }

        public void switchNext() {
            currentPos = nextPos();
        }

        public int nextPos() {
            return (currentPos + 1) % 2;
        }

        public boolean nextReady() {
            return nextReady;
        }

        public void setNextReady(boolean nextReady) {
            this.nextReady = nextReady;
        }

        public boolean isInitOk() {
            return initOk;
        }

        public void setInitOk(boolean initOk) {
            this.initOk = initOk;
        }

        public long getUpdateTimestamp() {
            return updateTimestamp;
        }

        public void setUpdateTimestamp(long updateTimestamp) {
            this.updateTimestamp = updateTimestamp;
        }

        public Segment[] getSegments() {
            return segments;
        }
    }

    // 模拟数据库中存储的当前最大ID
    private static final java.util.concurrent.ConcurrentHashMap<String, Long> currentMaxIdMap = new java.util.concurrent.ConcurrentHashMap<>();

    public static void main(String[] args) {
        LeafSegment leafSegment = new LeafSegment("test_biz", 1000);
        for (int i = 0; i < 2000; i++) {
            long id = leafSegment.nextId();
            System.out.println("Generated ID: " + id);
        }
    }
}

在上述代码中,LeafSegment 类维护了一个 SegmentBuffer,它包含两个 Segment 对象。 每个 Segment 对象表示一个ID号段。 nextId() 方法首先从当前的 Segment 对象中获取一个ID,如果当前的 Segment 对象中的ID已经用完,则切换到下一个 Segment 对象,并从数据库或缓存中获取一个新的号段。

Leaf-segment与时钟回拨

Leaf-segment 能够缓解时钟回拨的影响,但不能完全解决它。 如果时钟回拨的时间超过了号段的有效期,仍然可能生成重复的ID。

例如:

  1. 系统获取了一个号段 [1000, 2000],并开始生成ID。
  2. 系统时钟发生回拨,回退到号段生成之前的时间。
  3. 系统重新获取一个号段 [1000, 2000],并开始生成ID。
  4. 由于两个号段的范围相同,因此可能会生成重复的ID。

为了解决这个问题,可以结合逻辑时钟偏移检测和Leaf-segment。 具体做法是,在获取号段时,使用逻辑时钟的值作为号段的起始ID。 这样可以保证即使时钟发生了回拨,新获取的号段的起始ID仍然大于之前生成的ID。

总结

Snowflake算法是一种简单高效的分布式ID生成算法,但它对时钟的准确性有严格的要求。 时钟回拨会导致ID重复。 可以通过逻辑时钟偏移检测和Leaf-segment号段缓存来解决这个问题。

  • 逻辑时钟偏移检测: 维护一个逻辑时钟,并根据一定的规则来更新它,而不是直接依赖于系统时钟。
  • Leaf-segment号段缓存: 预先获取一批ID号段,减少对数据库或缓存的访问次数。

通过结合这两种技术,可以构建一个高可用、高性能、高可靠的分布式ID生成器。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注