Snowflake算法时钟回拨重复发号?逻辑时钟偏移检测与Leaf-segment号段缓存
大家好,今天我们来深入探讨分布式ID生成器,特别是Snowflake算法在时钟回拨场景下的问题,以及如何通过逻辑时钟偏移检测和Leaf-segment号段缓存来解决这些问题。
Snowflake算法回顾
Snowflake算法是一种经典的分布式ID生成算法,它生成的ID通常是一个64位的长整型,由以下几部分组成:
- Sign Bit (1 bit): 符号位,始终为0。
- Timestamp (41 bits): 时间戳,记录自某个起始时间(epoch)以来的毫秒数。
- Worker ID (10 bits): 工作节点ID,用于区分不同的部署节点。
- Sequence Number (12 bits): 序列号,用于在同一毫秒内区分不同的ID。
这种结构使得Snowflake算法能够保证在分布式环境下生成全局唯一、趋势递增的ID。 然而,它对时间有着严格的要求,依赖于系统时钟的准确性。
时钟回拨问题
Snowflake算法最常见的问题之一就是时钟回拨。 如果服务器的时钟发生了回拨(例如,由于NTP同步或者人为调整),那么就有可能生成重复的ID。 这是因为时间戳是ID生成的重要组成部分,如果时间戳小于之前生成ID的时间戳,那么就可能出现重复ID。
例如:
- 系统在 T1 时刻生成了一个ID,时间戳为 T1。
- 系统时钟发生回拨,回退到 T0 时刻,T0 < T1。
- 系统在 T0 时刻又生成一个ID,时间戳为 T0。 由于Worker ID和Sequence Number的组合可能在T1时刻已经使用过,因此可能会生成重复的ID。
如何模拟时钟回拨
以下代码模拟了时钟回拨的场景:
import java.util.concurrent.TimeUnit;
public class Snowflake {
private final long epoch;
private final long workerId;
private final long dataCenterId;
private final long workerIdBits = 5L;
private final long dataCenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
private final long sequenceBits = 12L;
private final long workerIdShift = sequenceBits;
private final long dataCenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
private long sequence = 0L;
private long lastTimestamp = -1L;
public Snowflake(long workerId, long dataCenterId, long epoch) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("dataCenter Id can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = workerId;
this.dataCenterId = dataCenterId;
this.epoch = epoch;
}
public synchronized long nextId() {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
// 时钟回拨
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
wait(offset << 1); // 等待两倍的回拨时间
timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset));
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset));
}
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp - epoch) << timestampLeftShift) |
(dataCenterId << dataCenterIdShift) |
(workerId << workerIdShift) |
sequence;
}
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
protected long timeGen() {
return System.currentTimeMillis();
}
public static void main(String[] args) throws InterruptedException {
Snowflake snowflake = new Snowflake(1, 1, 1609459200000L); // 2021-01-01 00:00:00
long id1 = snowflake.nextId();
System.out.println("Generated ID 1: " + id1);
// 模拟时钟回拨
TimeUnit.MILLISECONDS.sleep(10); // 模拟一些时间过去
snowflake.lastTimestamp = System.currentTimeMillis() - 5; // 回拨5毫秒
long id2 = snowflake.nextId();
System.out.println("Generated ID 2: " + id2);
}
}
上述代码展示了Snowflake算法的基本实现,并在 main 函数中模拟了时钟回拨。 可以看到在时钟回拨的情况下,代码会尝试等待一段时间,如果等待后时间仍然小于上一次生成的时间戳,则会抛出异常。
解决时钟回拨的常见策略
- 拒绝服务: 如果检测到时钟回拨,直接抛出异常,拒绝生成ID。 这是最简单粗暴的方式,但会影响服务的可用性。
- 等待: 当检测到时钟回拨时,等待一段时间,直到时钟追上之前的状态。 这种方式可能会导致ID生成延迟。
- 使用备用时钟源: 使用多个时钟源,例如NTP服务器,并在检测到时钟偏差时切换到其他时钟源。
- 修改时间戳: 将时间戳设置为一个大于之前生成ID的时间戳的值。 这种方式可能会导致ID不是严格递增,但可以保证唯一性。
逻辑时钟偏移检测
除了简单地比较时间戳,还可以使用逻辑时钟偏移检测来更灵活地处理时钟回拨。 逻辑时钟偏移检测的核心思想是:维护一个逻辑时钟,并根据一定的规则来更新它,而不是直接依赖于系统时钟。
实现逻辑时钟
import java.util.concurrent.atomic.AtomicLong;
public class LogicalClock {
private final AtomicLong logicalClock;
public LogicalClock(long initialTime) {
this.logicalClock = new AtomicLong(initialTime);
}
public long now() {
long currentTime = System.currentTimeMillis();
long currentLogicalTime = logicalClock.get();
long newLogicalTime = Math.max(currentTime, currentLogicalTime + 1);
//如果currentTime比logicalClock小,说明发生了回拨
if(currentTime < currentLogicalTime){
System.out.println("Detected clock drift, current time is " + currentTime + ", logical clock is " + currentLogicalTime);
}
logicalClock.compareAndSet(currentLogicalTime, newLogicalTime);
return newLogicalTime;
}
public static void main(String[] args) throws InterruptedException {
LogicalClock logicalClock = new LogicalClock(System.currentTimeMillis());
System.out.println("First logical time: " + logicalClock.now());
Thread.sleep(10);
System.out.println("Second logical time: " + logicalClock.now());
// 模拟时钟回拨
long currentTime = System.currentTimeMillis();
Thread.sleep(5);
System.currentTimeMillis();
//手动设置logicalClock的值比当前时间大,模拟时间回拨。
logicalClock.logicalClock.set(currentTime + 50);
System.out.println("Third logical time after clock drift: " + logicalClock.now());
}
}
在上述代码中,LogicalClock 类维护了一个 logicalClock 变量,它是一个原子长整型。 now() 方法首先获取当前的系统时间,然后将其与 logicalClock 的值进行比较。 如果系统时间大于 logicalClock 的值,则将 logicalClock 更新为系统时间加1。 否则,将 logicalClock 的值加1。 这样可以保证逻辑时钟始终是递增的,即使系统时钟发生了回拨。
将逻辑时钟应用于Snowflake
可以将逻辑时钟应用于Snowflake算法,以解决时钟回拨问题。 具体做法是,将Snowflake算法中的时间戳替换为逻辑时钟的值。
public class SnowflakeWithLogicalClock {
private final long epoch;
private final long workerId;
private final long dataCenterId;
private final long workerIdBits = 5L;
private final long dataCenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
private final long sequenceBits = 12L;
private final long workerIdShift = sequenceBits;
private final long dataCenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
private long sequence = 0L;
//private long lastTimestamp = -1L;
private final LogicalClock logicalClock;
public SnowflakeWithLogicalClock(long workerId, long dataCenterId, long epoch) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("dataCenter Id can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = workerId;
this.dataCenterId = dataCenterId;
this.epoch = epoch;
this.logicalClock = new LogicalClock(System.currentTimeMillis());
}
public synchronized long nextId() {
long timestamp = logicalClock.now(); // 使用逻辑时钟
//long timestamp = timeGen();
/*if (timestamp < lastTimestamp) {
// 时钟回拨
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
wait(offset << 1); // 等待两倍的回拨时间
timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset));
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", offset));
}
}*/
// if (lastTimestamp == timestamp) {
long lastTimestamp = timestamp;
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
//lastTimestamp = timestamp;
return ((timestamp - epoch) << timestampLeftShift) |
(dataCenterId << dataCenterIdShift) |
(workerId << workerIdShift) |
sequence;
}
protected long tilNextMillis(long lastTimestamp) {
long timestamp = logicalClock.now();
while (timestamp <= lastTimestamp) {
timestamp = logicalClock.now();
}
return timestamp;
}
protected long timeGen() {
return System.currentTimeMillis();
}
public static void main(String[] args) throws InterruptedException {
SnowflakeWithLogicalClock snowflake = new SnowflakeWithLogicalClock(1, 1, 1609459200000L); // 2021-01-01 00:00:00
long id1 = snowflake.nextId();
System.out.println("Generated ID 1: " + id1);
// 模拟时钟回拨
TimeUnit.MILLISECONDS.sleep(10); // 模拟一些时间过去
//snowflake.lastTimestamp = System.currentTimeMillis() - 5; // 回拨5毫秒
long id2 = snowflake.nextId();
System.out.println("Generated ID 2: " + id2);
}
}
在这个修改后的版本中,nextId() 方法不再直接使用 System.currentTimeMillis(),而是使用 logicalClock.now() 方法来获取时间戳。 这样可以保证即使系统时钟发生了回拨,生成的ID仍然是唯一的。
Leaf-segment号段缓存
Leaf-segment是一种号段模式的ID生成方案,它将ID生成过程分为两步:
- 获取号段: 从数据库或缓存中获取一个ID号段,例如 [1000, 2000]。
- 生成ID: 在号段内生成ID,例如 1001, 1002, …, 1999。
Leaf-segment可以显著提高ID生成效率,因为它减少了对数据库或缓存的访问次数。 同时,它也可以一定程度上缓解时钟回拨的影响。
实现Leaf-segment
import java.util.concurrent.atomic.AtomicLong;
public class LeafSegment {
private final String bizTag; // 业务标识
private final SegmentBuffer buffer; // 双buffer
private final int step; // 每次获取的号段长度
public LeafSegment(String bizTag, int step) {
this.bizTag = bizTag;
this.buffer = new SegmentBuffer();
this.step = step;
}
public long nextId() {
Segment segment = buffer.current();
if (!buffer.isInitOk()) {
synchronized (buffer) {
if (!buffer.isInitOk()) {
updateNextSegment();
buffer.setInitOk(true);
}
}
segment = buffer.current();
}
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return value;
}
if (buffer.nextReady()) {
synchronized (buffer) {
segment = buffer.current();
if (value < segment.getMax()) {
return value;
}
buffer.switchNext();
updateNextSegment();
segment = buffer.current();
return segment.getValue().getAndIncrement();
}
} else {
synchronized (buffer) {
if (!buffer.nextReady()) {
updateNextSegment();
buffer.setNextReady(true);
}
}
return nextId();
}
}
private void updateNextSegment() {
// 从数据库或者缓存中获取下一个号段
Segment nextSegment = getNextSegmentFromSource(bizTag, step);
buffer.getSegments()[buffer.nextPos()].setValue(new AtomicLong(nextSegment.getMin()));
buffer.getSegments()[buffer.nextPos()].setMax(nextSegment.getMax());
buffer.setUpdateTimestamp(System.currentTimeMillis());
}
// 模拟从数据库或缓存中获取下一个号段
private Segment getNextSegmentFromSource(String bizTag, int step) {
// 实际场景中,这里应该从数据库或者缓存中获取下一个号段
long currentMaxId = getCurrentMaxIdFromDatabase(bizTag); // 模拟从数据库获取当前最大ID
long nextMinId = currentMaxId + 1;
long nextMaxId = nextMinId + step;
//这里可以模拟数据库操作,例如更新biz_tag的max_id字段
updateMaxIdInDatabase(bizTag, nextMaxId);
return new Segment(nextMinId, nextMaxId);
}
// 模拟从数据库获取当前最大ID
private long getCurrentMaxIdFromDatabase(String bizTag) {
// 实际场景中,这里应该从数据库中查询当前最大ID
//这里模拟第一次获取的时候返回0
return currentMaxIdMap.getOrDefault(bizTag, 0L);
}
// 模拟更新数据库中的最大ID
private void updateMaxIdInDatabase(String bizTag, long nextMaxId) {
// 实际场景中,这里应该更新数据库中的最大ID
currentMaxIdMap.put(bizTag, nextMaxId);
}
private static class Segment {
private long min;
private long max;
private AtomicLong value;
public Segment(long min, long max) {
this.min = min;
this.max = max;
this.value = new AtomicLong(min);
}
public long getMin() {
return min;
}
public long getMax() {
return max;
}
public AtomicLong getValue() {
return value;
}
public void setValue(AtomicLong value) {
this.value = value;
}
}
private static class SegmentBuffer {
private final Segment[] segments;
private volatile int currentPos;
private volatile boolean nextReady;
private volatile boolean initOk;
private volatile long updateTimestamp;
public SegmentBuffer() {
this.segments = new Segment[2];
segments[0] = new Segment(0, 0);
segments[1] = new Segment(0, 0);
this.currentPos = 0;
this.nextReady = false;
this.initOk = false;
this.updateTimestamp = 0;
}
public Segment current() {
return segments[currentPos];
}
public void switchNext() {
currentPos = nextPos();
}
public int nextPos() {
return (currentPos + 1) % 2;
}
public boolean nextReady() {
return nextReady;
}
public void setNextReady(boolean nextReady) {
this.nextReady = nextReady;
}
public boolean isInitOk() {
return initOk;
}
public void setInitOk(boolean initOk) {
this.initOk = initOk;
}
public long getUpdateTimestamp() {
return updateTimestamp;
}
public void setUpdateTimestamp(long updateTimestamp) {
this.updateTimestamp = updateTimestamp;
}
public Segment[] getSegments() {
return segments;
}
}
// 模拟数据库中存储的当前最大ID
private static final java.util.concurrent.ConcurrentHashMap<String, Long> currentMaxIdMap = new java.util.concurrent.ConcurrentHashMap<>();
public static void main(String[] args) {
LeafSegment leafSegment = new LeafSegment("test_biz", 1000);
for (int i = 0; i < 2000; i++) {
long id = leafSegment.nextId();
System.out.println("Generated ID: " + id);
}
}
}
在上述代码中,LeafSegment 类维护了一个 SegmentBuffer,它包含两个 Segment 对象。 每个 Segment 对象表示一个ID号段。 nextId() 方法首先从当前的 Segment 对象中获取一个ID,如果当前的 Segment 对象中的ID已经用完,则切换到下一个 Segment 对象,并从数据库或缓存中获取一个新的号段。
Leaf-segment与时钟回拨
Leaf-segment 能够缓解时钟回拨的影响,但不能完全解决它。 如果时钟回拨的时间超过了号段的有效期,仍然可能生成重复的ID。
例如:
- 系统获取了一个号段 [1000, 2000],并开始生成ID。
- 系统时钟发生回拨,回退到号段生成之前的时间。
- 系统重新获取一个号段 [1000, 2000],并开始生成ID。
- 由于两个号段的范围相同,因此可能会生成重复的ID。
为了解决这个问题,可以结合逻辑时钟偏移检测和Leaf-segment。 具体做法是,在获取号段时,使用逻辑时钟的值作为号段的起始ID。 这样可以保证即使时钟发生了回拨,新获取的号段的起始ID仍然大于之前生成的ID。
总结
Snowflake算法是一种简单高效的分布式ID生成算法,但它对时钟的准确性有严格的要求。 时钟回拨会导致ID重复。 可以通过逻辑时钟偏移检测和Leaf-segment号段缓存来解决这个问题。
- 逻辑时钟偏移检测: 维护一个逻辑时钟,并根据一定的规则来更新它,而不是直接依赖于系统时钟。
- Leaf-segment号段缓存: 预先获取一批ID号段,减少对数据库或缓存的访问次数。
通过结合这两种技术,可以构建一个高可用、高性能、高可靠的分布式ID生成器。