深入 CRDTs:如何在不需要加锁的情况下实现多端同时编辑的实时同步?
在当今高度互联的世界中,协同编辑已成为许多应用的核心功能,从文档处理器到代码编辑器,再到设计工具。用户期望能够与同事或朋友实时地在同一份内容上工作,而无需担心数据冲突或丢失。然而,实现这种无缝的实时协作并非易事。传统的并发控制方法,如加锁,在分布式环境中会引入严重的性能瓶颈和可用性问题。如何才能在多端同时编辑的复杂场景下,实现数据的实时同步,并且保证最终一致性,同时又摆脱锁的束缚呢?
这就是无冲突复制数据类型(Conflict-free Replicated Data Types, CRDTs)的用武之地。CRDTs 提供了一种优雅的数学框架,允许在多个副本上独立地执行操作,然后将它们合并,而无需任何中心协调或冲突解决逻辑,最终保证所有副本收敛到相同的、一致的状态。
实时协作的痛点与挑战
想象一下两个用户同时编辑一个在线文档。用户 A 在文档开头插入了一个词,用户 B 则在文档中间删除了一个句子。如果服务器简单地按照接收顺序应用这些操作,很可能会导致以下问题:
- 顺序依赖:操作的最终结果可能取决于它们被应用的顺序。如果用户 A 的插入先应用,再应用用户 B 的删除,结果与用户 B 的删除先应用再应用用户 A 的插入可能完全不同。
- 数据丢失或损坏:不恰当的操作应用可能导致部分编辑丢失,或者文档结构被破坏。
- 并发冲突:当多个用户尝试修改同一部分数据时,如何决定哪个修改“胜出”?
- 性能瓶颈:如果采用传统数据库的锁机制,每次编辑都需要获取锁,这将大大降低并发性能,尤其是在高延迟网络环境下。一个用户在编辑时,其他用户可能需要等待,这完全违背了“实时协作”的初衷。
- 离线编辑与同步:用户可能在离线状态下进行编辑,当重新连接时,如何将离线操作与在线操作合并,并确保一致性?
为了解决这些问题,业界曾提出过多种方案,其中最著名的是操作转换 (Operational Transformation, OT)。
传统协作模型:操作转换 (Operational Transformation, OT)
操作转换 (OT) 是 Google Docs 等早期协作编辑工具所采用的核心技术。其基本思想是:当一个客户端生成一个操作并发送给服务器时,服务器在广播给其他客户端之前,会根据服务器的当前状态和此前已经应用的其他并发操作,对该操作进行“转换”。同样,客户端收到服务器广播的操作时,也会根据客户端的本地状态和其本地尚未发送给服务器的操作,对接收到的操作进行转换,以确保操作在正确的上下文上执行。
OT 的核心概念:
- 操作 (Operation):对文档的修改,例如在位置 5 插入字符 ‘a’,或在位置 10 删除 3 个字符。
- 状态 (State):文档的当前内容。
- 转换函数 (Transformation Function):OT 的核心,它接受两个操作
Op1和Op2,以及Op1作用后的状态,然后返回一个转换后的Op2'。Op2'是在Op1已经发生的情况下,Op2应该如何调整才能得到与Op1和Op2都发生过的最终一致状态。
OT 的复杂性与挑战:
OT 理论上可以解决并发冲突,但它的实现极为复杂。
- 转换函数的编写:针对每种操作类型(插入、删除等)及其组合,都需要编写和维护复杂的转换逻辑。例如,一个插入操作遇到另一个插入操作,一个插入操作遇到一个删除操作,一个删除操作遇到另一个删除操作,都需要不同的转换规则。这些规则必须满足数学上的“一致性保证”,否则会导致状态发散。
- 算法复杂性:随着操作类型和并发场景的增加,转换逻辑会变得极其庞大和难以调试。
- 状态依赖:OT 操作是依赖于特定状态的,这意味着服务器需要维护所有客户端的状态,或者客户端需要维护复杂的版本历史,以便正确地转换操作。
- 中心化协调:通常需要一个中心服务器来协调操作的顺序和转换,这引入了单点故障和性能瓶颈。
- 难以扩展:向 OT 系统添加新的操作类型或数据结构非常困难,因为它需要重新审视所有现有的转换函数。
鉴于 OT 的复杂性,研究人员一直在寻找更简单、更健壮的实时协作方案,CRDTs 应运而生。
CRDTs:无锁同步的曙光
CRDTs(Conflict-free Replicated Data Types),即无冲突复制数据类型,是一种设计用于分布式系统的数据结构,它们可以在多个副本上独立地进行修改,而无需任何协调或加锁机制。当这些副本的状态(或操作)被合并时,CRDTs 保证最终所有副本都会收敛到相同的、一致的状态,无论操作的顺序如何。这种特性被称为“强最终一致性”(Strong Eventual Consistency, SEC)。
CRDTs 的核心三特性:
CRDTs 的魔法来源于其背后严谨的数学理论,具体来说,其合并操作需要满足以下三个属性,以确保收敛性:
- 交换律 (Commutativity):操作的顺序不影响最终结果。对于任意两个操作
A和B,A应用后再应用B,与B应用后再应用A,结果是相同的。
A(B(state)) == B(A(state)) - 结合律 (Associativity):多个操作的组合顺序不影响最终结果。对于任意三个操作
A,B,C,(A(B(state)))C与A((B(state))C)结果相同。
(A * B) * C == A * (B * C)(这里*代表操作的组合) - 幂等律 (Idempotence):同一个操作重复应用多次与只应用一次的效果相同。
A(A(state)) == A(state)
如果一个数据结构的合并操作满足这三个属性,那么它就是一个半格(Semilattice),并且可以保证在任意顺序下合并操作,所有副本最终都会收敛到同一个唯一最大上界(Least Upper Bound)。
CRDTs 的优势:
- 无需中心协调:副本之间可以直接通信并合并状态或操作,无需中心服务器仲裁。
- 操作顺序不敏感:由于交换律,操作的传输顺序不再重要,简化了网络层面的设计。
- 易于实现:与 OT 相比,CRDTs 的实现通常更直接,因为无需复杂的转换函数。
- 容错性:系统中的某个副本故障不会影响其他副本的正常工作。
- 支持离线编辑:用户可以在离线状态下进行编辑,当网络恢复时,直接合并本地状态即可。
CRDTs 的两大类型
CRDTs 主要分为两大类:状态合并型 CRDTs 和操作复制型 CRDTs。
1. 状态合并型 CRDTs (State-based CRDTs / Pn-CRDTs)
状态合并型 CRDTs (Mergeable Replicated Data Types) 的工作原理是:每个副本维护一个完整的数据结构状态。当需要同步时,副本之间直接交换它们的完整状态。收到状态的副本会使用一个预定义的、满足交换律、结合律和幂等律的 merge 函数,将其本地状态与接收到的远程状态合并。
- 优点:实现简单、鲁棒性高。因为传输的是完整的状态,所以即使网络丢包或乱序,只要最终能收到完整的状态,就能正确合并。
- 缺点:传输整个状态可能导致较高的网络带宽消耗,尤其是在状态较大或更新频繁时。
状态合并型 CRDTs 示例:
我们将通过一些经典的 CRDT 类型来理解状态合并型 CRDT 的工作方式。
a. G-Counter (Grow-only Counter / 只增计数器)
G-Counter 只能进行增量操作。每个副本维护一个映射,记录每个 replica_id 的增量。
- 状态:
map<replica_id, count> - 操作:
increment(replica_id): 增加当前replica_id对应的计数。
- 合并:
merge(local_state, remote_state): 对两个map中的所有replica_id,取其对应计数的最大值。
class GCounter:
def __init__(self, replica_id):
self.replica_id = replica_id
self.counts = {replica_id: 0} # {replica_id: count}
def increment(self):
self.counts[self.replica_id] = self.counts.get(self.replica_id, 0) + 1
def value(self):
return sum(self.counts.values())
def merge(self, other_counter):
# 合并逻辑:对每个replica_id,取其计数的最大值
for rid, count in other_counter.counts.items():
self.counts[rid] = max(self.counts.get(rid, 0), count)
def __repr__(self):
return f"GCounter(id={self.replica_id}, counts={self.counts}, value={self.value()})"
# 示例
c1 = GCounter("replica1")
c2 = GCounter("replica2")
c1.increment() # replica1: {replica1: 1}
c1.increment() # replica1: {replica1: 2}
c2.increment() # replica2: {replica2: 1}
print(f"Initial c1: {c1}") # GCounter(id=replica1, counts={'replica1': 2}, value=2)
print(f"Initial c2: {c2}") # GCounter(id=replica2, counts={'replica2': 1}, value=1)
# c1发送状态给c2,c2合并
c2.merge(c1)
print(f"c2 after merge c1: {c2}") # GCounter(id=replica2, counts={'replica1': 2, 'replica2': 1}, value=3)
# c2发送状态给c1,c1合并
c1.merge(c2)
print(f"c1 after merge c2: {c1}") # GCounter(id=replica1, counts={'replica1': 2, 'replica2': 1}, value=3)
# 此时c1和c2状态一致
c1.increment() # replica1: {replica1: 3, replica2: 1}
print(f"c1 after increment: {c1}")
# c2再次合并c1
c2.merge(c1)
print(f"c2 after merge c1 again: {c2}") # GCounter(id=replica2, counts={'replica1': 3, 'replica2': 1}, value=4)
b. PN-Counter (Positive-Negative Counter / 可增可减计数器)
PN-Counter 允许增量和减量操作。它实际上是两个 G-Counter 的组合:一个用于正数增量 (P),一个用于负数减量 (N)。
- 状态:
{'P': map<replica_id, count>, 'N': map<replica_id, count>} - 操作:
increment(replica_id): 增加P计数器中replica_id对应的计数。decrement(replica_id): 增加N计数器中replica_id对应的计数。
- 合并:分别对
P和N两个 G-Counter 进行合并。 - 值:
sum(P_counts.values()) - sum(N_counts.values())
class PNCounter:
def __init__(self, replica_id):
self.replica_id = replica_id
self.inc_counts = {replica_id: 0} # G-Counter for increments
self.dec_counts = {replica_id: 0} # G-Counter for decrements
def increment(self):
self.inc_counts[self.replica_id] = self.inc_counts.get(self.replica_id, 0) + 1
def decrement(self):
self.dec_counts[self.replica_id] = self.dec_counts.get(self.replica_id, 0) + 1
def value(self):
return sum(self.inc_counts.values()) - sum(self.dec_counts.values())
def merge(self, other_counter):
# 合并inc_counts
for rid, count in other_counter.inc_counts.items():
self.inc_counts[rid] = max(self.inc_counts.get(rid, 0), count)
# 合并dec_counts
for rid, count in other_counter.dec_counts.items():
self.dec_counts[rid] = max(self.dec_counts.get(rid, 0), count)
def __repr__(self):
return (f"PNCounter(id={self.replica_id}, inc_counts={self.inc_counts}, "
f"dec_counts={self.dec_counts}, value={self.value()})")
# 示例
pn1 = PNCounter("replicaA")
pn2 = PNCounter("replicaB")
pn1.increment() # A: {A:1} {A:0}
pn1.increment() # A: {A:2} {A:0}
pn2.increment() # B: {B:1} {B:0}
pn2.decrement() # B: {B:1} {B:1}
print(f"Initial pn1: {pn1}") # PNCounter(id=replicaA, inc_counts={'replicaA': 2}, dec_counts={'replicaA': 0}, value=2)
print(f"Initial pn2: {pn2}") # PNCounter(id=replicaB, inc_counts={'replicaB': 1}, dec_counts={'replicaB': 1}, value=0)
pn1.merge(pn2)
print(f"pn1 after merge pn2: {pn1}") # PNCounter(id=replicaA, inc_counts={'replicaA': 2, 'replicaB': 1}, dec_counts={'replicaB': 1, 'replicaA': 0}, value=2)
pn2.merge(pn1) # pn2收到pn1的最新状态
print(f"pn2 after merge pn1: {pn2}") # PNCounter(id=replicaB, inc_counts={'replicaA': 2, 'replicaB': 1}, dec_counts={'replicaB': 1, 'replicaA': 0}, value=2)
pn1.decrement()
print(f"pn1 after decrement: {pn1}") # PNCounter(id=replicaA, inc_counts={'replicaA': 2, 'replicaB': 1}, dec_counts={'replicaB': 1, 'replicaA': 1}, value=1)
c. G-Set (Grow-only Set / 只增集合)
G-Set 只能添加元素,不能删除。
- 状态:
set - 操作:
add(element): 将元素添加到集合中。
- 合并:
union(local_set, remote_set): 取两个集合的并集。
class GSet:
def __init__(self):
self.elements = set()
def add(self, element):
self.elements.add(element)
def value(self):
return self.elements
def merge(self, other_set):
self.elements.update(other_set.elements)
def __repr__(self):
return f"GSet(elements={sorted(list(self.elements))})"
# 示例
s1 = GSet()
s2 = GSet()
s1.add("apple")
s1.add("banana")
s2.add("orange")
s2.add("banana")
print(f"Initial s1: {s1}") # GSet(elements=['apple', 'banana'])
print(f"Initial s2: {s2}") # GSet(elements=['banana', 'orange'])
s1.merge(s2)
print(f"s1 after merge s2: {s1}") # GSet(elements=['apple', 'banana', 'orange'])
s2.merge(s1)
print(f"s2 after merge s1: {s2}") # GSet(elements=['apple', 'banana', 'orange'])
d. 2P-Set (Two-Phase Set / 两阶段集合)
2P-Set 允许添加和删除元素,但删除是永久性的。它使用两个 G-Set:一个用于添加的元素 (adds),一个用于删除的元素 (removes)。
- 状态:
{'adds': GSet, 'removes': GSet} - 操作:
add(element): 将元素添加到adds集合中。remove(element): 将元素添加到removes集合中。一个元素一旦被添加到removes集合,就不能再被重新添加到集合中。
- 合并:分别对
adds和removes两个 G-Set 进行合并。 - 值:
adds.value() - removes.value()(集合差集)。
class TwoPSet:
def __init__(self):
self.adds = GSet()
self.removes = GSet()
def add(self, element):
self.adds.add(element)
def remove(self, element):
# 只有当元素存在于adds集合中时,才能将其添加到removes集合
# 这是为了防止删除一个从未添加过的元素,从而导致merge行为不一致
if element in self.adds.value():
self.removes.add(element)
def value(self):
return self.adds.value() - self.removes.value()
def merge(self, other_set):
self.adds.merge(other_set.adds)
self.removes.merge(other_set.removes)
def __repr__(self):
return f"TwoPSet(adds={sorted(list(self.adds.value()))}, removes={sorted(list(self.removes.value()))}, value={sorted(list(self.value()))})"
# 示例
tps1 = TwoPSet()
tps2 = TwoPSet()
tps1.add("A")
tps1.add("B")
tps2.add("B")
tps2.add("C")
print(f"Initial tps1: {tps1}") # TwoPSet(adds=['A', 'B'], removes=[], value=['A', 'B'])
print(f"Initial tps2: {tps2}") # TwoPSet(adds=['B', 'C'], removes=[], value=['B', 'C'])
# 模拟并发删除
tps1.remove("B") # tps1: adds={'A','B'}, removes={'B'}
tps2.add("D") # tps2: adds={'B','C','D'}, removes={}
print(f"tps1 after remove B: {tps1}") # TwoPSet(adds=['A', 'B'], removes=['B'], value=['A'])
print(f"tps2 after add D: {tps2}") # TwoPSet(adds=['B', 'C', 'D'], removes=[], value=['B', 'C', 'D'])
tps1.merge(tps2)
print(f"tps1 after merge tps2: {tps1}") # adds={'A', 'B', 'C', 'D'}, removes={'B'} -> value={'A', 'C', 'D'}
tps2.merge(tps1)
print(f"tps2 after merge tps1: {tps2}") # adds={'A', 'B', 'C', 'D'}, removes={'B'} -> value={'A', 'C', 'D'}
e. LWW-Register (Last-Write-Wins Register / 最后写入者胜出寄存器)
LWW-Register 用于存储一个单一值,并解决并发写入冲突。它通常通过时间戳(或 Lamport 时钟)来决定哪个写入“胜出”。
- 状态:
(value, timestamp, replica_id) - 操作:
assign(new_value): 更新值,并生成一个新的时间戳。
- 合并:比较两个状态的时间戳。如果时间戳不同,取时间戳最大的那个。如果时间戳相同(并发写入),则通常通过
replica_id进行确定性排序(例如,取replica_id字典序较大的那个)。
import time
class LWWRegister:
def __init__(self, replica_id, initial_value=None):
self.replica_id = replica_id
self.value = initial_value
self.timestamp = time.time_ns() # 使用纳秒时间戳作为版本
self.replica_id_for_tie_break = replica_id # 用于时间戳相同时的决胜
def assign(self, new_value):
self.value = new_value
self.timestamp = time.time_ns() # 更新时间戳
# Note: 在真实分布式系统中,时间戳需要与Lamport Clock或Vector Clock结合使用
# 避免不同机器之间时钟不同步的问题。这里仅作简化演示。
def merge(self, other_register):
if other_register.timestamp > self.timestamp:
self.value = other_register.value
self.timestamp = other_register.timestamp
self.replica_id_for_tie_break = other_register.replica_id_for_tie_break
elif other_register.timestamp == self.timestamp:
# 时间戳相同,使用replica_id作为决胜规则 (例如,字符串比较)
if other_register.replica_id_for_tie_break > self.replica_id_for_tie_break:
self.value = other_register.value
self.timestamp = other_register.timestamp
self.replica_id_for_tie_break = other_register.replica_id_for_tie_break
# else: local timestamp is greater, do nothing
def __repr__(self):
return f"LWWRegister(id={self.replica_id}, value='{self.value}', ts={self.timestamp}, tie_break_id='{self.replica_id_for_tie_break}')"
# 示例
r1 = LWWRegister("replica_A", "Hello")
r2 = LWWRegister("replica_B", "World")
print(f"Initial r1: {r1}")
print(f"Initial r2: {r2}")
# 模拟并发写入
# r1写入
r1.assign("CRDTs are cool")
print(f"r1 after assign: {r1}")
# r2写入 (可能在r1写入之后,也可能几乎同时)
time.sleep(0.001) # 稍微延迟,确保时间戳不同
r2.assign("CRDTs are powerful")
print(f"r2 after assign: {r2}")
# r1合并r2
r1.merge(r2)
print(f"r1 after merge r2: {r1}") # r2的写入时间戳更大,r1会更新为r2的值
# r2合并r1
r2.merge(r1)
print(f"r2 after merge r1: {r2}") # 此时r1和r2的状态已经一致
2. 操作复制型 CRDTs (Operation-based CRDTs / Op-CRDTs)
操作复制型 CRDTs (Commutative Replicated Data Types) 的工作原理是:每个副本仅广播它在本地执行的操作。其他副本收到这些操作后,直接在本地应用它们。为了保证最终一致性,这些操作必须满足一些条件:
-
因果顺序:操作必须以因果顺序(causal order)传递和应用。这意味着如果操作
B是基于操作A的结果产生的(即A是B的前因),那么A必须在B之前被所有副本应用。这通常通过版本向量(Vector Clocks)或 Lamport 时钟来实现。 -
幂等性:操作必须是幂等的,即使重复应用也只会产生一次效果。
-
交换性:非因果相关的操作必须是可交换的。
-
优点:带宽效率高,因为只传输小的操作消息而不是整个状态。
-
缺点:需要额外的机制来保证操作的因果顺序,例如使用消息队列、版本向量或 Lamport 时钟。这增加了系统的复杂性。
操作复制型 CRDTs 示例:
一个简单的 Op-CRDT 计数器可以定义为 increment() 和 decrement() 操作。每个操作携带 replica_id。当一个副本收到一个操作时,它直接将其应用到本地计数器上。由于 increment 和 decrement 是可交换的(它们只是对总和的加减),所以即使乱序到达,最终结果也是一致的。
状态合并型与操作复制型 CRDTs 对比
| 特性 | 状态合并型 CRDTs (Pn-CRDTs) | 操作复制型 CRDTs (Op-CRDTs) |
|---|---|---|
| 同步方式 | 传输完整状态 | 传输操作 (deltas) |
| 合并机制 | merge(state1, state2) 函数 |
apply(operation) 函数 |
| 网络要求 | 最终传输到所有副本即可,无需保证顺序 | 必须保证操作的因果顺序 |
| 带宽 | 可能较高 (传输大状态) | 较低 (只传输操作) |
| 实现复杂性 | 相对简单,只需实现合并函数 | 较复杂,需要因果顺序机制 (版本向量等) |
| 鲁棒性 | 高,对网络丢包/乱序不敏感 | 需因果顺序机制保证,否则可能不一致 |
核心挑战:如何实现文本序列的 CRDT
对于计数器、集合和寄存器这类简单数据结构,CRDT 的设计相对直观。然而,对于文本编辑器这种需要处理字符序列(列表)的场景,CRDT 的实现变得更加复杂。核心挑战在于:
- 字符位置的稳定性:当在序列中插入或删除字符时,后续字符的位置会发生变化。如何保证不同副本插入的字符能够稳定地出现在预期的相对位置?
- 并发插入/删除冲突:两个用户同时在同一个位置插入字符,或者一个删除一个插入,如何解决冲突并保持最终一致的顺序?
为了解决这些问题,文本 CRDTs (如 RGA, LSEQ, WOOT) 引入了复杂的字符标识符和墓碑机制。
字符标识符 (Char Identifier) 的设计
核心思想是为序列中的每个字符分配一个全局唯一的、能够编码其相对顺序的标识符。这样,即使字符被删除,它的标识符也依然存在,可以被引用。
一个典型的字符标识符包含以下部分:
PositionIdentifier:这是一个关键部分,它不是一个简单的整数索引,而是一个可以表示任意插入点的“分数索引”或“多位数字”。例如,一个PositionIdentifier可以是一个整数列表,如[1, 5, 3]。这种结构允许在任何两个现有标识符之间插入新的标识符,而无需改变现有标识符。ReplicaId:创建该字符的副本的唯一标识符,用于在PositionIdentifier相同或无法区分时进行决胜。Timestamp(或 Lamport Clock):创建该字符的时间戳,也用于冲突决胜(例如,后写入者胜出)。
Char 结构体定义
import uuid
import time
class PositionIdentifier:
"""
用于文本CRDT的字符位置标识符。
它是一个整数列表,允许在任意两个现有位置之间插入新位置。
例如:
A: [1]
B: [2]
插入X在A和B之间: [1, 5]
插入Y在A和X之间: [1, 2]
则顺序为: A ([1]), Y ([1, 2]), X ([1, 5]), B ([2])
"""
def __init__(self, digits, replica_id):
self.digits = tuple(digits) # 使用元组保证不可变性
self.replica_id = replica_id # 用于在digits完全相同时的决胜
def __lt__(self, other):
# 比较两个PositionIdentifier,实现排序
for i in range(max(len(self.digits), len(other.digits))):
d1 = self.digits[i] if i < len(self.digits) else -1 # 较短的序列视为有更小的“下一位”
d2 = other.digits[i] if i < len(other.digits) else -1
if d1 < d2:
return True
if d1 > d2:
return False
# 如果所有数字都相同,则根据replica_id决胜
return self.replica_id < other.replica_id
def __eq__(self, other):
return self.digits == other.digits and self.replica_id == other.replica_id
def __hash__(self):
return hash((self.digits, self.replica_id))
def __repr__(self):
return f"PosID({list(self.digits)}, {self.replica_id})"
class Char:
"""
表示文本中的一个字符。
包含其值、唯一的PositionIdentifier和删除状态。
"""
def __init__(self, value, position_id, timestamp, replica_id):
self.value = value
self.position_id = position_id # PositionIdentifier对象
self.timestamp = timestamp # 用于LWW冲突解决
self.replica_id = replica_id # 用于LWW冲突解决
self.is_deleted = False
def __lt__(self, other):
# 字符的排序主要依据PositionIdentifier
return self.position_id < other.position_id
def __eq__(self, other):
# 两个Char对象相等,当它们的PositionIdentifier相等时 (因为PositionIdentifier已经是全局唯一的)
return self.position_id == other.position_id
def __hash__(self):
return hash(self.position_id)
def __repr__(self):
return (f"Char(value='{self.value}', pos={self.position_id}, "
f"ts={self.timestamp}, deleted={self.is_deleted})")
TextCRDT 核心数据结构
文本 CRDT 通常将文本表示为一个有序的 Char 对象列表。这个列表是“逻辑”上的,它包含了所有已插入和已删除的字符,通过 is_deleted 标志来区分。
class TextCRDT:
def __init__(self, replica_id):
self.replica_id = replica_id
self.chars = [] # 存储所有Char对象,保持有序
self.lamport_clock = 0 # 用于生成时间戳,保证因果顺序
# 初始的“边界”字符,用于简化PositionIdentifier的生成
# left_boundary 和 right_boundary 不会在实际文本中显示
self.left_boundary = Char(
'', PositionIdentifier([-1], 'system_start'), 0, 'system_start'
)
self.right_boundary = Char(
'', PositionIdentifier([float('inf')], 'system_end'), 0, 'system_end'
)
self.chars.append(self.left_boundary)
self.chars.append(self.right_boundary)
def _generate_lamport_timestamp(self):
self.lamport_clock += 1
return self.lamport_clock
def _get_char_at_index(self, index):
# 找到第N个非删除状态的字符
current_idx = -1
for i, char in enumerate(self.chars):
if char == self.left_boundary or char == self.right_boundary:
continue
if not char.is_deleted:
current_idx += 1
if current_idx == index:
return char
return None
def _get_insertion_context(self, index):
"""
根据插入位置,找到前一个和后一个字符,以便生成新的PositionIdentifier
"""
if index < 0:
index = 0
visible_chars = [char for char in self.chars if char != self.left_boundary and char != self.right_boundary and not char.is_deleted]
if not visible_chars or index >= len(visible_chars):
# 插入到末尾
prev_char = visible_chars[-1] if visible_chars else self.left_boundary
next_char = self.right_boundary
elif index == 0:
# 插入到开头
prev_char = self.left_boundary
next_char = visible_chars[0]
else:
prev_char = visible_chars[index - 1]
next_char = visible_chars[index]
return prev_char, next_char
def _generate_position_id_between(self, prev_pos_id, next_pos_id):
"""
生成一个介于 prev_pos_id 和 next_pos_id 之间的新 PositionIdentifier
这是文本CRDT中最核心和最复杂的逻辑之一。
这里使用一种简化的LSEQ风格的生成方式。
"""
digits1 = list(prev_pos_id.digits)
digits2 = list(next_pos_id.digits)
new_digits = []
i = 0
while True:
d1 = digits1[i] if i < len(digits1) else -1
d2 = digits2[i] if i < len(digits2) else float('inf') # next_pos_id的末尾视为无穷大
if d2 - d1 > 1:
# 存在空隙,可以在中间插入一个数字
new_digits.append(d1 + 1)
break
elif d2 - d1 == 1:
# 只有一位的空隙,必须深入下一层
new_digits.append(d1)
i += 1
else: # d1 == d2, 此时需要继续深入
new_digits.append(d1)
i += 1
if i >= len(digits1) and i >= len(digits2):
# 如果到达了两个PositionIdentifier的末尾,且它们完全相同,
# 理论上不应该发生,除非是初始边界情况。
# 在实际LSEQ中,会确保通过增加一个新数字来创建唯一的ID
new_digits.append(0) # 插入一个0作为新的层级
break
return PositionIdentifier(new_digits, self.replica_id)
def insert(self, index, char_value):
"""
在指定索引处插入一个字符。
index是基于当前可见文本的索引。
"""
prev_char, next_char = self._get_insertion_context(index)
new_pos_id = self._generate_position_id_between(
prev_char.position_id, next_char.position_id
)
self.lamport_clock = max(self.lamport_clock, prev_char.timestamp, next_char.timestamp) + 1
new_char = Char(char_value, new_pos_id, self.lamport_clock, self.replica_id)
# 将新字符插入到chars列表中并保持有序
# 这是一个O(N)操作,在大型文档中需要优化,例如使用平衡二叉搜索树
# 这里使用简单的列表插入,以便理解逻辑
inserted = False
for i in range(len(self.chars)):
if new_char < self.chars[i]:
self.chars.insert(i, new_char)
inserted = True
break
if not inserted:
self.chars.append(new_char) # 理论上不应该发生,除非是边界情况
return new_char # 返回新字符,以便远程传输
def delete(self, index):
"""
删除指定索引处的字符。
不是真正从列表中移除,而是标记为已删除 (tombstone)。
"""
char_to_delete = self._get_char_at_index(index)
if char_to_delete:
self.lamport_clock = max(self.lamport_clock, char_to_delete.timestamp) + 1
char_to_delete.is_deleted = True
char_to_delete.timestamp = self.lamport_clock # 更新删除操作的时间戳
return char_to_delete # 返回被删除字符,以便远程传输
return None
def receive_char_operation(self, remote_char):
"""
接收并应用来自远程副本的字符操作 (插入或删除)。
"""
self.lamport_clock = max(self.lamport_clock, remote_char.timestamp) + 1
# 检查是否已存在该 PositionIdentifier 的字符
existing_char_index = -1
for i, char in enumerate(self.chars):
if char.position_id == remote_char.position_id:
existing_char_index = i
break
if existing_char_index != -1:
# 字符已存在,处理冲突或更新状态
existing_char = self.chars[existing_char_index]
# 冲突解决:如果远程操作的时间戳更新,则应用远程操作
# 对于删除操作,删除总是胜出( tombstone wins over live char with older timestamp)
# 如果时间戳相同,则根据replica_id决胜 (LWW)
if remote_char.timestamp > existing_char.timestamp:
existing_char.value = remote_char.value
existing_char.is_deleted = remote_char.is_deleted
existing_char.timestamp = remote_char.timestamp
existing_char.replica_id = remote_char.replica_id
elif remote_char.timestamp == existing_char.timestamp:
if remote_char.replica_id > existing_char.replica_id: # 决定性决胜
existing_char.value = remote_char.value
existing_char.is_deleted = remote_char.is_deleted
existing_char.timestamp = remote_char.timestamp
existing_char.replica_id = remote_char.replica_id
elif remote_char.is_deleted and not existing_char.is_deleted:
# 删除操作的优先级高于同一时间戳的插入/修改
existing_char.is_deleted = True
existing_char.timestamp = remote_char.timestamp
existing_char.replica_id = remote_char.replica_id
else:
# 字符不存在,直接插入 (保持有序)
inserted = False
for i in range(len(self.chars)):
if remote_char < self.chars[i]:
self.chars.insert(i, remote_char)
inserted = True
break
if not inserted:
self.chars.append(remote_char)
def get_current_text(self):
"""
获取当前可见的文本内容 (不包含已删除字符和边界字符)。
"""
return "".join([char.value for char in self.chars if not char.is_deleted and char.value != ''])
def __repr__(self):
visible_chars = [char for char in self.chars if char != self.left_boundary and char != self.right_boundary]
return f"TextCRDT(id={self.replica_id}, text='{self.get_current_text()}', chars={visible_chars})"
# 辅助函数,用于在测试中模拟网络传输
def simulate_network_transfer(crdt_instance, char_op):
# 模拟深拷贝,防止直接修改
import copy
return copy.deepcopy(char_op)
# 示例
editor1 = TextCRDT("editor1")
editor2 = TextCRDT("editor2")
print("--- 初始状态 ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")
# Editor1 插入 "H"
op1 = editor1.insert(0, "H")
editor2.receive_char_operation(simulate_network_transfer(editor2, op1))
print("n--- Editor1 插入 'H' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")
# Editor2 插入 "i"
op2 = editor2.insert(1, "i") # 插入到 H 后面
editor1.receive_char_operation(simulate_network_transfer(editor1, op2))
print("n--- Editor2 插入 'i' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")
# Editor1 在开头插入 "W" (并发)
op3_e1 = editor1.insert(0, "W")
# Editor2 在开头插入 "e" (并发)
op4_e2 = editor2.insert(0, "e")
# 模拟网络乱序,先处理 editor2 的操作,再处理 editor1 的操作
editor1.receive_char_operation(simulate_network_transfer(editor1, op4_e2))
editor2.receive_char_operation(simulate_network_transfer(editor2, op3_e1))
print("n--- 并发插入 'W' 和 'e' ---")
print(f"Editor1: {editor1.get_current_text()}") # 最终可能是 "WeHi" 或 "eWHi",取决于PositionIdentifier和replica_id的决胜规则
print(f"Editor2: {editor2.get_current_text()}")
# 检查内部状态
print(f"Editor1 internal chars: {editor1.chars}")
print(f"Editor2 internal chars: {editor2.chars}")
# 确保最终文本一致
assert editor1.get_current_text() == editor2.get_current_text()
# Editor1 删除 "W"
op5_e1 = editor1.delete(0) # 删除 "W" (假设 "W" 在 "e" 之前)
editor2.receive_char_operation(simulate_network_transfer(editor2, op5_e1))
print("n--- Editor1 删除 'W' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")
# Editor2 删除 "H"
op6_e2 = editor2.delete(1) # 删除 "H" (此时文本为 "eHi" 或 "eWHI", 假设是"eHi"的H)
editor1.receive_char_operation(simulate_network_transfer(editor1, op6_e2))
print("n--- Editor2 删除 'H' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")
# 再次检查最终文本一致
assert editor1.get_current_text() == editor2.get_current_text()
print(f"n最终文本: {editor1.get_current_text()}")
代码解释:
PositionIdentifier:这是关键。它是一个digits列表,例如[1],[1, 5],[2]。这种结构允许我们通过在digits列表中添加新的数字来在任意两个现有标识符之间“创建”一个新标识符,而无需修改其他标识符。__lt__方法定义了这些标识符的排序规则。replica_id在digits完全相同的情况下作为最终的决胜者。Char:每个字符都包含其值、一个唯一的PositionIdentifier、一个timestamp(Lamport Clock) 和replica_id。is_deleted标志用于标记字符是否已被逻辑删除(墓碑)。TextCRDT:chars列表:存储所有Char对象,并始终保持排序状态。left_boundary和right_boundary:两个特殊的字符,它们的PositionIdentifier分别是最小和最大的,用于简化在文档开头和末尾插入字符时的PositionIdentifier生成。_generate_lamport_timestamp():生成一个递增的时间戳,用于确保因果顺序和冲突解决。_get_insertion_context():根据用户输入的逻辑索引(例如,第 3 个字符),找到其对应的实际Char对象(考虑已删除的字符)以及它前后的Char对象,以便为新插入的字符生成PositionIdentifier。_generate_position_id_between():这是最复杂的逻辑,它根据前一个和后一个字符的PositionIdentifier,生成一个新的PositionIdentifier。它通过比较digits列表,寻找可以插入新数字的位置。如果d2 - d1 > 1,则直接插入d1 + 1。如果d2 - d1 == 1或d1 == d2,则需要深入到digits列表的下一层。insert(index, char_value):创建一个新的Char对象,并将其插入到self.chars列表中,同时保持其有序性。delete(index):找到对应的Char对象,并将其is_deleted标志设为True。这是一个“软删除”,被删除的字符仍然存在于chars列表中,以确保其PositionIdentifier仍然有效,并且可以被其他副本正确引用。receive_char_operation(remote_char):这是合并逻辑的核心。当接收到远程的字符操作时,它会检查本地是否已存在相同PositionIdentifier的字符。- 如果不存在,则直接按序插入。
- 如果存在,则根据
timestamp和replica_id(LWW 规则)以及删除优先规则来解决冲突。时间戳更新的操作会覆盖旧操作。如果时间戳相同,则replica_id较大的获胜。删除操作通常具有更高的优先级,即使时间戳相同,删除也往往会“胜出”。
get_current_text():遍历chars列表,过滤掉已删除的字符和边界字符,然后拼接成最终的字符串。
CRDTs 的工程实践与考量
虽然 CRDTs 提供了强大的理论保证,但在实际工程中仍需考虑以下因素:
- 性能与内存:
- 墓碑 (Tombstones):删除操作不会真正移除数据,而是标记为已删除。这会导致数据结构随着时间的推移不断增长,占用更多内存。需要定期进行垃圾回收 (Garbage Collection, GC) 或快照机制,将已删除的字符从历史中清除,同时确保不破坏其他副本的因果关系。
- 排序列表:文本 CRDT 的
chars列表需要始终保持有序。使用 Python 列表的insert和del操作是 O(N) 的,对于大型文档会非常慢。在生产环境中,通常会使用更高效的数据结构,如平衡二叉搜索树(例如红黑树)或跳表,将插入和删除操作的复杂度降至 O(log N)。
- 网络带宽:
- 状态合并型:传输整个状态会消耗大量带宽。对于大型数据结构,这可能不是一个可行的方案。
- 操作复制型:虽然只传输操作,但为了保证因果顺序,通常需要传输版本向量或 Lamport 时钟信息,这会增加消息的开销。
- Delta CRDTs:一种优化方案,只传输状态的“增量”(delta),而不是整个状态,兼顾了状态合并型的鲁棒性和操作复制型的带宽效率。
- 调试与复杂性:尽管 CRDTs 理论上比 OT 简单,但针对复杂数据类型(如富文本、嵌套结构)设计和实现 CRDTs 仍然需要深入理解其数学原理和冲突解决机制。调试分布式系统中 CRDTs 的最终一致性问题也可能具有挑战性。
- 与现有系统的集成:将 CRDTs 集成到现有的数据库或应用架构中可能需要进行结构性调整。
无锁协作的未来
CRDTs 代表了分布式系统中无锁、实时协作的未来方向。它们通过数学上的严谨性,为复杂的并发问题提供了一种优雅而健壮的解决方案。从简单的计数器到复杂的文本编辑器,CRDTs 的应用范围正在不断扩大。随着分布式系统和边缘计算的普及,CRDTs 的价值将愈发凸显,它们将使开发者能够构建出更具弹性、更易于扩展、用户体验更佳的协作应用。虽然在工程实现上仍面临一些挑战,但 CRDTs 的核心思想——通过设计无冲突的数据结构来规避冲突,而非解决冲突——无疑是分布式系统设计中的一个重要范式转变。