深入 CRDTs (无冲突复制数据类型):如何在不需要加锁的情况下实现多端同时编辑的实时同步?

深入 CRDTs:如何在不需要加锁的情况下实现多端同时编辑的实时同步?

在当今高度互联的世界中,协同编辑已成为许多应用的核心功能,从文档处理器到代码编辑器,再到设计工具。用户期望能够与同事或朋友实时地在同一份内容上工作,而无需担心数据冲突或丢失。然而,实现这种无缝的实时协作并非易事。传统的并发控制方法,如加锁,在分布式环境中会引入严重的性能瓶颈和可用性问题。如何才能在多端同时编辑的复杂场景下,实现数据的实时同步,并且保证最终一致性,同时又摆脱锁的束缚呢?

这就是无冲突复制数据类型(Conflict-free Replicated Data Types, CRDTs)的用武之地。CRDTs 提供了一种优雅的数学框架,允许在多个副本上独立地执行操作,然后将它们合并,而无需任何中心协调或冲突解决逻辑,最终保证所有副本收敛到相同的、一致的状态。

实时协作的痛点与挑战

想象一下两个用户同时编辑一个在线文档。用户 A 在文档开头插入了一个词,用户 B 则在文档中间删除了一个句子。如果服务器简单地按照接收顺序应用这些操作,很可能会导致以下问题:

  1. 顺序依赖:操作的最终结果可能取决于它们被应用的顺序。如果用户 A 的插入先应用,再应用用户 B 的删除,结果与用户 B 的删除先应用再应用用户 A 的插入可能完全不同。
  2. 数据丢失或损坏:不恰当的操作应用可能导致部分编辑丢失,或者文档结构被破坏。
  3. 并发冲突:当多个用户尝试修改同一部分数据时,如何决定哪个修改“胜出”?
  4. 性能瓶颈:如果采用传统数据库的锁机制,每次编辑都需要获取锁,这将大大降低并发性能,尤其是在高延迟网络环境下。一个用户在编辑时,其他用户可能需要等待,这完全违背了“实时协作”的初衷。
  5. 离线编辑与同步:用户可能在离线状态下进行编辑,当重新连接时,如何将离线操作与在线操作合并,并确保一致性?

为了解决这些问题,业界曾提出过多种方案,其中最著名的是操作转换 (Operational Transformation, OT)。

传统协作模型:操作转换 (Operational Transformation, OT)

操作转换 (OT) 是 Google Docs 等早期协作编辑工具所采用的核心技术。其基本思想是:当一个客户端生成一个操作并发送给服务器时,服务器在广播给其他客户端之前,会根据服务器的当前状态和此前已经应用的其他并发操作,对该操作进行“转换”。同样,客户端收到服务器广播的操作时,也会根据客户端的本地状态和其本地尚未发送给服务器的操作,对接收到的操作进行转换,以确保操作在正确的上下文上执行。

OT 的核心概念:

  • 操作 (Operation):对文档的修改,例如在位置 5 插入字符 ‘a’,或在位置 10 删除 3 个字符。
  • 状态 (State):文档的当前内容。
  • 转换函数 (Transformation Function):OT 的核心,它接受两个操作 Op1Op2,以及 Op1 作用后的状态,然后返回一个转换后的 Op2'Op2' 是在 Op1 已经发生的情况下,Op2 应该如何调整才能得到与 Op1Op2 都发生过的最终一致状态。

OT 的复杂性与挑战:

OT 理论上可以解决并发冲突,但它的实现极为复杂。

  1. 转换函数的编写:针对每种操作类型(插入、删除等)及其组合,都需要编写和维护复杂的转换逻辑。例如,一个插入操作遇到另一个插入操作,一个插入操作遇到一个删除操作,一个删除操作遇到另一个删除操作,都需要不同的转换规则。这些规则必须满足数学上的“一致性保证”,否则会导致状态发散。
  2. 算法复杂性:随着操作类型和并发场景的增加,转换逻辑会变得极其庞大和难以调试。
  3. 状态依赖:OT 操作是依赖于特定状态的,这意味着服务器需要维护所有客户端的状态,或者客户端需要维护复杂的版本历史,以便正确地转换操作。
  4. 中心化协调:通常需要一个中心服务器来协调操作的顺序和转换,这引入了单点故障和性能瓶颈。
  5. 难以扩展:向 OT 系统添加新的操作类型或数据结构非常困难,因为它需要重新审视所有现有的转换函数。

鉴于 OT 的复杂性,研究人员一直在寻找更简单、更健壮的实时协作方案,CRDTs 应运而生。

CRDTs:无锁同步的曙光

CRDTs(Conflict-free Replicated Data Types),即无冲突复制数据类型,是一种设计用于分布式系统的数据结构,它们可以在多个副本上独立地进行修改,而无需任何协调或加锁机制。当这些副本的状态(或操作)被合并时,CRDTs 保证最终所有副本都会收敛到相同的、一致的状态,无论操作的顺序如何。这种特性被称为“强最终一致性”(Strong Eventual Consistency, SEC)。

CRDTs 的核心三特性:

CRDTs 的魔法来源于其背后严谨的数学理论,具体来说,其合并操作需要满足以下三个属性,以确保收敛性:

  1. 交换律 (Commutativity):操作的顺序不影响最终结果。对于任意两个操作 ABA 应用后再应用 B,与 B 应用后再应用 A,结果是相同的。
    A(B(state)) == B(A(state))
  2. 结合律 (Associativity):多个操作的组合顺序不影响最终结果。对于任意三个操作 A, B, C(A(B(state)))CA((B(state))C) 结果相同。
    (A * B) * C == A * (B * C)(这里 * 代表操作的组合)
  3. 幂等律 (Idempotence):同一个操作重复应用多次与只应用一次的效果相同。
    A(A(state)) == A(state)

如果一个数据结构的合并操作满足这三个属性,那么它就是一个半格(Semilattice),并且可以保证在任意顺序下合并操作,所有副本最终都会收敛到同一个唯一最大上界(Least Upper Bound)。

CRDTs 的优势:

  • 无需中心协调:副本之间可以直接通信并合并状态或操作,无需中心服务器仲裁。
  • 操作顺序不敏感:由于交换律,操作的传输顺序不再重要,简化了网络层面的设计。
  • 易于实现:与 OT 相比,CRDTs 的实现通常更直接,因为无需复杂的转换函数。
  • 容错性:系统中的某个副本故障不会影响其他副本的正常工作。
  • 支持离线编辑:用户可以在离线状态下进行编辑,当网络恢复时,直接合并本地状态即可。

CRDTs 的两大类型

CRDTs 主要分为两大类:状态合并型 CRDTs 和操作复制型 CRDTs。

1. 状态合并型 CRDTs (State-based CRDTs / Pn-CRDTs)

状态合并型 CRDTs (Mergeable Replicated Data Types) 的工作原理是:每个副本维护一个完整的数据结构状态。当需要同步时,副本之间直接交换它们的完整状态。收到状态的副本会使用一个预定义的、满足交换律、结合律和幂等律的 merge 函数,将其本地状态与接收到的远程状态合并。

  • 优点:实现简单、鲁棒性高。因为传输的是完整的状态,所以即使网络丢包或乱序,只要最终能收到完整的状态,就能正确合并。
  • 缺点:传输整个状态可能导致较高的网络带宽消耗,尤其是在状态较大或更新频繁时。

状态合并型 CRDTs 示例:

我们将通过一些经典的 CRDT 类型来理解状态合并型 CRDT 的工作方式。

a. G-Counter (Grow-only Counter / 只增计数器)

G-Counter 只能进行增量操作。每个副本维护一个映射,记录每个 replica_id 的增量。

  • 状态map<replica_id, count>
  • 操作
    • increment(replica_id): 增加当前 replica_id 对应的计数。
  • 合并merge(local_state, remote_state): 对两个 map 中的所有 replica_id,取其对应计数的最大值。
class GCounter:
    def __init__(self, replica_id):
        self.replica_id = replica_id
        self.counts = {replica_id: 0} # {replica_id: count}

    def increment(self):
        self.counts[self.replica_id] = self.counts.get(self.replica_id, 0) + 1

    def value(self):
        return sum(self.counts.values())

    def merge(self, other_counter):
        # 合并逻辑:对每个replica_id,取其计数的最大值
        for rid, count in other_counter.counts.items():
            self.counts[rid] = max(self.counts.get(rid, 0), count)

    def __repr__(self):
        return f"GCounter(id={self.replica_id}, counts={self.counts}, value={self.value()})"

# 示例
c1 = GCounter("replica1")
c2 = GCounter("replica2")

c1.increment() # replica1: {replica1: 1}
c1.increment() # replica1: {replica1: 2}

c2.increment() # replica2: {replica2: 1}

print(f"Initial c1: {c1}") # GCounter(id=replica1, counts={'replica1': 2}, value=2)
print(f"Initial c2: {c2}") # GCounter(id=replica2, counts={'replica2': 1}, value=1)

# c1发送状态给c2,c2合并
c2.merge(c1)
print(f"c2 after merge c1: {c2}") # GCounter(id=replica2, counts={'replica1': 2, 'replica2': 1}, value=3)

# c2发送状态给c1,c1合并
c1.merge(c2)
print(f"c1 after merge c2: {c1}") # GCounter(id=replica1, counts={'replica1': 2, 'replica2': 1}, value=3)

# 此时c1和c2状态一致
c1.increment() # replica1: {replica1: 3, replica2: 1}
print(f"c1 after increment: {c1}")

# c2再次合并c1
c2.merge(c1)
print(f"c2 after merge c1 again: {c2}") # GCounter(id=replica2, counts={'replica1': 3, 'replica2': 1}, value=4)

b. PN-Counter (Positive-Negative Counter / 可增可减计数器)

PN-Counter 允许增量和减量操作。它实际上是两个 G-Counter 的组合:一个用于正数增量 (P),一个用于负数减量 (N)。

  • 状态{'P': map<replica_id, count>, 'N': map<replica_id, count>}
  • 操作
    • increment(replica_id): 增加 P 计数器中 replica_id 对应的计数。
    • decrement(replica_id): 增加 N 计数器中 replica_id 对应的计数。
  • 合并:分别对 PN 两个 G-Counter 进行合并。
  • sum(P_counts.values()) - sum(N_counts.values())
class PNCounter:
    def __init__(self, replica_id):
        self.replica_id = replica_id
        self.inc_counts = {replica_id: 0} # G-Counter for increments
        self.dec_counts = {replica_id: 0} # G-Counter for decrements

    def increment(self):
        self.inc_counts[self.replica_id] = self.inc_counts.get(self.replica_id, 0) + 1

    def decrement(self):
        self.dec_counts[self.replica_id] = self.dec_counts.get(self.replica_id, 0) + 1

    def value(self):
        return sum(self.inc_counts.values()) - sum(self.dec_counts.values())

    def merge(self, other_counter):
        # 合并inc_counts
        for rid, count in other_counter.inc_counts.items():
            self.inc_counts[rid] = max(self.inc_counts.get(rid, 0), count)
        # 合并dec_counts
        for rid, count in other_counter.dec_counts.items():
            self.dec_counts[rid] = max(self.dec_counts.get(rid, 0), count)

    def __repr__(self):
        return (f"PNCounter(id={self.replica_id}, inc_counts={self.inc_counts}, "
                f"dec_counts={self.dec_counts}, value={self.value()})")

# 示例
pn1 = PNCounter("replicaA")
pn2 = PNCounter("replicaB")

pn1.increment() # A: {A:1} {A:0}
pn1.increment() # A: {A:2} {A:0}
pn2.increment() # B: {B:1} {B:0}
pn2.decrement() # B: {B:1} {B:1}

print(f"Initial pn1: {pn1}") # PNCounter(id=replicaA, inc_counts={'replicaA': 2}, dec_counts={'replicaA': 0}, value=2)
print(f"Initial pn2: {pn2}") # PNCounter(id=replicaB, inc_counts={'replicaB': 1}, dec_counts={'replicaB': 1}, value=0)

pn1.merge(pn2)
print(f"pn1 after merge pn2: {pn1}") # PNCounter(id=replicaA, inc_counts={'replicaA': 2, 'replicaB': 1}, dec_counts={'replicaB': 1, 'replicaA': 0}, value=2)
pn2.merge(pn1) # pn2收到pn1的最新状态
print(f"pn2 after merge pn1: {pn2}") # PNCounter(id=replicaB, inc_counts={'replicaA': 2, 'replicaB': 1}, dec_counts={'replicaB': 1, 'replicaA': 0}, value=2)

pn1.decrement()
print(f"pn1 after decrement: {pn1}") # PNCounter(id=replicaA, inc_counts={'replicaA': 2, 'replicaB': 1}, dec_counts={'replicaB': 1, 'replicaA': 1}, value=1)

c. G-Set (Grow-only Set / 只增集合)

G-Set 只能添加元素,不能删除。

  • 状态set
  • 操作
    • add(element): 将元素添加到集合中。
  • 合并union(local_set, remote_set): 取两个集合的并集。
class GSet:
    def __init__(self):
        self.elements = set()

    def add(self, element):
        self.elements.add(element)

    def value(self):
        return self.elements

    def merge(self, other_set):
        self.elements.update(other_set.elements)

    def __repr__(self):
        return f"GSet(elements={sorted(list(self.elements))})"

# 示例
s1 = GSet()
s2 = GSet()

s1.add("apple")
s1.add("banana")
s2.add("orange")
s2.add("banana")

print(f"Initial s1: {s1}") # GSet(elements=['apple', 'banana'])
print(f"Initial s2: {s2}") # GSet(elements=['banana', 'orange'])

s1.merge(s2)
print(f"s1 after merge s2: {s1}") # GSet(elements=['apple', 'banana', 'orange'])

s2.merge(s1)
print(f"s2 after merge s1: {s2}") # GSet(elements=['apple', 'banana', 'orange'])

d. 2P-Set (Two-Phase Set / 两阶段集合)

2P-Set 允许添加和删除元素,但删除是永久性的。它使用两个 G-Set:一个用于添加的元素 (adds),一个用于删除的元素 (removes)。

  • 状态{'adds': GSet, 'removes': GSet}
  • 操作
    • add(element): 将元素添加到 adds 集合中。
    • remove(element): 将元素添加到 removes 集合中。一个元素一旦被添加到 removes 集合,就不能再被重新添加到集合中。
  • 合并:分别对 addsremoves 两个 G-Set 进行合并。
  • adds.value() - removes.value() (集合差集)。
class TwoPSet:
    def __init__(self):
        self.adds = GSet()
        self.removes = GSet()

    def add(self, element):
        self.adds.add(element)

    def remove(self, element):
        # 只有当元素存在于adds集合中时,才能将其添加到removes集合
        # 这是为了防止删除一个从未添加过的元素,从而导致merge行为不一致
        if element in self.adds.value():
            self.removes.add(element)

    def value(self):
        return self.adds.value() - self.removes.value()

    def merge(self, other_set):
        self.adds.merge(other_set.adds)
        self.removes.merge(other_set.removes)

    def __repr__(self):
        return f"TwoPSet(adds={sorted(list(self.adds.value()))}, removes={sorted(list(self.removes.value()))}, value={sorted(list(self.value()))})"

# 示例
tps1 = TwoPSet()
tps2 = TwoPSet()

tps1.add("A")
tps1.add("B")
tps2.add("B")
tps2.add("C")

print(f"Initial tps1: {tps1}") # TwoPSet(adds=['A', 'B'], removes=[], value=['A', 'B'])
print(f"Initial tps2: {tps2}") # TwoPSet(adds=['B', 'C'], removes=[], value=['B', 'C'])

# 模拟并发删除
tps1.remove("B") # tps1: adds={'A','B'}, removes={'B'}
tps2.add("D")    # tps2: adds={'B','C','D'}, removes={}

print(f"tps1 after remove B: {tps1}") # TwoPSet(adds=['A', 'B'], removes=['B'], value=['A'])
print(f"tps2 after add D: {tps2}")   # TwoPSet(adds=['B', 'C', 'D'], removes=[], value=['B', 'C', 'D'])

tps1.merge(tps2)
print(f"tps1 after merge tps2: {tps1}") # adds={'A', 'B', 'C', 'D'}, removes={'B'} -> value={'A', 'C', 'D'}

tps2.merge(tps1)
print(f"tps2 after merge tps1: {tps2}") # adds={'A', 'B', 'C', 'D'}, removes={'B'} -> value={'A', 'C', 'D'}

e. LWW-Register (Last-Write-Wins Register / 最后写入者胜出寄存器)

LWW-Register 用于存储一个单一值,并解决并发写入冲突。它通常通过时间戳(或 Lamport 时钟)来决定哪个写入“胜出”。

  • 状态(value, timestamp, replica_id)
  • 操作
    • assign(new_value): 更新值,并生成一个新的时间戳。
  • 合并:比较两个状态的时间戳。如果时间戳不同,取时间戳最大的那个。如果时间戳相同(并发写入),则通常通过 replica_id 进行确定性排序(例如,取 replica_id 字典序较大的那个)。
import time

class LWWRegister:
    def __init__(self, replica_id, initial_value=None):
        self.replica_id = replica_id
        self.value = initial_value
        self.timestamp = time.time_ns() # 使用纳秒时间戳作为版本
        self.replica_id_for_tie_break = replica_id # 用于时间戳相同时的决胜

    def assign(self, new_value):
        self.value = new_value
        self.timestamp = time.time_ns() # 更新时间戳
        # Note: 在真实分布式系统中,时间戳需要与Lamport Clock或Vector Clock结合使用
        # 避免不同机器之间时钟不同步的问题。这里仅作简化演示。

    def merge(self, other_register):
        if other_register.timestamp > self.timestamp:
            self.value = other_register.value
            self.timestamp = other_register.timestamp
            self.replica_id_for_tie_break = other_register.replica_id_for_tie_break
        elif other_register.timestamp == self.timestamp:
            # 时间戳相同,使用replica_id作为决胜规则 (例如,字符串比较)
            if other_register.replica_id_for_tie_break > self.replica_id_for_tie_break:
                self.value = other_register.value
                self.timestamp = other_register.timestamp
                self.replica_id_for_tie_break = other_register.replica_id_for_tie_break
        # else: local timestamp is greater, do nothing

    def __repr__(self):
        return f"LWWRegister(id={self.replica_id}, value='{self.value}', ts={self.timestamp}, tie_break_id='{self.replica_id_for_tie_break}')"

# 示例
r1 = LWWRegister("replica_A", "Hello")
r2 = LWWRegister("replica_B", "World")

print(f"Initial r1: {r1}")
print(f"Initial r2: {r2}")

# 模拟并发写入
# r1写入
r1.assign("CRDTs are cool")
print(f"r1 after assign: {r1}")

# r2写入 (可能在r1写入之后,也可能几乎同时)
time.sleep(0.001) # 稍微延迟,确保时间戳不同
r2.assign("CRDTs are powerful")
print(f"r2 after assign: {r2}")

# r1合并r2
r1.merge(r2)
print(f"r1 after merge r2: {r1}") # r2的写入时间戳更大,r1会更新为r2的值

# r2合并r1
r2.merge(r1)
print(f"r2 after merge r1: {r2}") # 此时r1和r2的状态已经一致

2. 操作复制型 CRDTs (Operation-based CRDTs / Op-CRDTs)

操作复制型 CRDTs (Commutative Replicated Data Types) 的工作原理是:每个副本仅广播它在本地执行的操作。其他副本收到这些操作后,直接在本地应用它们。为了保证最终一致性,这些操作必须满足一些条件:

  • 因果顺序:操作必须以因果顺序(causal order)传递和应用。这意味着如果操作 B 是基于操作 A 的结果产生的(即 AB 的前因),那么 A 必须在 B 之前被所有副本应用。这通常通过版本向量(Vector Clocks)或 Lamport 时钟来实现。

  • 幂等性:操作必须是幂等的,即使重复应用也只会产生一次效果。

  • 交换性:非因果相关的操作必须是可交换的。

  • 优点:带宽效率高,因为只传输小的操作消息而不是整个状态。

  • 缺点:需要额外的机制来保证操作的因果顺序,例如使用消息队列、版本向量或 Lamport 时钟。这增加了系统的复杂性。

操作复制型 CRDTs 示例:

一个简单的 Op-CRDT 计数器可以定义为 increment()decrement() 操作。每个操作携带 replica_id。当一个副本收到一个操作时,它直接将其应用到本地计数器上。由于 incrementdecrement 是可交换的(它们只是对总和的加减),所以即使乱序到达,最终结果也是一致的。

状态合并型与操作复制型 CRDTs 对比

特性 状态合并型 CRDTs (Pn-CRDTs) 操作复制型 CRDTs (Op-CRDTs)
同步方式 传输完整状态 传输操作 (deltas)
合并机制 merge(state1, state2) 函数 apply(operation) 函数
网络要求 最终传输到所有副本即可,无需保证顺序 必须保证操作的因果顺序
带宽 可能较高 (传输大状态) 较低 (只传输操作)
实现复杂性 相对简单,只需实现合并函数 较复杂,需要因果顺序机制 (版本向量等)
鲁棒性 高,对网络丢包/乱序不敏感 需因果顺序机制保证,否则可能不一致

核心挑战:如何实现文本序列的 CRDT

对于计数器、集合和寄存器这类简单数据结构,CRDT 的设计相对直观。然而,对于文本编辑器这种需要处理字符序列(列表)的场景,CRDT 的实现变得更加复杂。核心挑战在于:

  1. 字符位置的稳定性:当在序列中插入或删除字符时,后续字符的位置会发生变化。如何保证不同副本插入的字符能够稳定地出现在预期的相对位置?
  2. 并发插入/删除冲突:两个用户同时在同一个位置插入字符,或者一个删除一个插入,如何解决冲突并保持最终一致的顺序?

为了解决这些问题,文本 CRDTs (如 RGA, LSEQ, WOOT) 引入了复杂的字符标识符和墓碑机制。

字符标识符 (Char Identifier) 的设计

核心思想是为序列中的每个字符分配一个全局唯一的、能够编码其相对顺序的标识符。这样,即使字符被删除,它的标识符也依然存在,可以被引用。

一个典型的字符标识符包含以下部分:

  • PositionIdentifier:这是一个关键部分,它不是一个简单的整数索引,而是一个可以表示任意插入点的“分数索引”或“多位数字”。例如,一个 PositionIdentifier 可以是一个整数列表,如 [1, 5, 3]。这种结构允许在任何两个现有标识符之间插入新的标识符,而无需改变现有标识符。
  • ReplicaId:创建该字符的副本的唯一标识符,用于在 PositionIdentifier 相同或无法区分时进行决胜。
  • Timestamp (或 Lamport Clock):创建该字符的时间戳,也用于冲突决胜(例如,后写入者胜出)。

Char 结构体定义

import uuid
import time

class PositionIdentifier:
    """
    用于文本CRDT的字符位置标识符。
    它是一个整数列表,允许在任意两个现有位置之间插入新位置。
    例如:
    A: [1]
    B: [2]
    插入X在A和B之间: [1, 5]
    插入Y在A和X之间: [1, 2]
    则顺序为: A ([1]), Y ([1, 2]), X ([1, 5]), B ([2])
    """
    def __init__(self, digits, replica_id):
        self.digits = tuple(digits) # 使用元组保证不可变性
        self.replica_id = replica_id # 用于在digits完全相同时的决胜

    def __lt__(self, other):
        # 比较两个PositionIdentifier,实现排序
        for i in range(max(len(self.digits), len(other.digits))):
            d1 = self.digits[i] if i < len(self.digits) else -1 # 较短的序列视为有更小的“下一位”
            d2 = other.digits[i] if i < len(other.digits) else -1

            if d1 < d2:
                return True
            if d1 > d2:
                return False
        # 如果所有数字都相同,则根据replica_id决胜
        return self.replica_id < other.replica_id

    def __eq__(self, other):
        return self.digits == other.digits and self.replica_id == other.replica_id

    def __hash__(self):
        return hash((self.digits, self.replica_id))

    def __repr__(self):
        return f"PosID({list(self.digits)}, {self.replica_id})"

class Char:
    """
    表示文本中的一个字符。
    包含其值、唯一的PositionIdentifier和删除状态。
    """
    def __init__(self, value, position_id, timestamp, replica_id):
        self.value = value
        self.position_id = position_id # PositionIdentifier对象
        self.timestamp = timestamp    # 用于LWW冲突解决
        self.replica_id = replica_id  # 用于LWW冲突解决
        self.is_deleted = False

    def __lt__(self, other):
        # 字符的排序主要依据PositionIdentifier
        return self.position_id < other.position_id

    def __eq__(self, other):
        # 两个Char对象相等,当它们的PositionIdentifier相等时 (因为PositionIdentifier已经是全局唯一的)
        return self.position_id == other.position_id

    def __hash__(self):
        return hash(self.position_id)

    def __repr__(self):
        return (f"Char(value='{self.value}', pos={self.position_id}, "
                f"ts={self.timestamp}, deleted={self.is_deleted})")

TextCRDT 核心数据结构

文本 CRDT 通常将文本表示为一个有序的 Char 对象列表。这个列表是“逻辑”上的,它包含了所有已插入和已删除的字符,通过 is_deleted 标志来区分。

class TextCRDT:
    def __init__(self, replica_id):
        self.replica_id = replica_id
        self.chars = [] # 存储所有Char对象,保持有序
        self.lamport_clock = 0 # 用于生成时间戳,保证因果顺序

        # 初始的“边界”字符,用于简化PositionIdentifier的生成
        # left_boundary 和 right_boundary 不会在实际文本中显示
        self.left_boundary = Char(
            '', PositionIdentifier([-1], 'system_start'), 0, 'system_start'
        )
        self.right_boundary = Char(
            '', PositionIdentifier([float('inf')], 'system_end'), 0, 'system_end'
        )
        self.chars.append(self.left_boundary)
        self.chars.append(self.right_boundary)

    def _generate_lamport_timestamp(self):
        self.lamport_clock += 1
        return self.lamport_clock

    def _get_char_at_index(self, index):
        # 找到第N个非删除状态的字符
        current_idx = -1
        for i, char in enumerate(self.chars):
            if char == self.left_boundary or char == self.right_boundary:
                continue
            if not char.is_deleted:
                current_idx += 1
            if current_idx == index:
                return char
        return None

    def _get_insertion_context(self, index):
        """
        根据插入位置,找到前一个和后一个字符,以便生成新的PositionIdentifier
        """
        if index < 0:
            index = 0

        visible_chars = [char for char in self.chars if char != self.left_boundary and char != self.right_boundary and not char.is_deleted]

        if not visible_chars or index >= len(visible_chars):
            # 插入到末尾
            prev_char = visible_chars[-1] if visible_chars else self.left_boundary
            next_char = self.right_boundary
        elif index == 0:
            # 插入到开头
            prev_char = self.left_boundary
            next_char = visible_chars[0]
        else:
            prev_char = visible_chars[index - 1]
            next_char = visible_chars[index]

        return prev_char, next_char

    def _generate_position_id_between(self, prev_pos_id, next_pos_id):
        """
        生成一个介于 prev_pos_id 和 next_pos_id 之间的新 PositionIdentifier
        这是文本CRDT中最核心和最复杂的逻辑之一。
        这里使用一种简化的LSEQ风格的生成方式。
        """
        digits1 = list(prev_pos_id.digits)
        digits2 = list(next_pos_id.digits)

        new_digits = []
        i = 0
        while True:
            d1 = digits1[i] if i < len(digits1) else -1
            d2 = digits2[i] if i < len(digits2) else float('inf') # next_pos_id的末尾视为无穷大

            if d2 - d1 > 1:
                # 存在空隙,可以在中间插入一个数字
                new_digits.append(d1 + 1)
                break
            elif d2 - d1 == 1:
                # 只有一位的空隙,必须深入下一层
                new_digits.append(d1)
                i += 1
            else: # d1 == d2, 此时需要继续深入
                new_digits.append(d1)
                i += 1
                if i >= len(digits1) and i >= len(digits2):
                    # 如果到达了两个PositionIdentifier的末尾,且它们完全相同,
                    # 理论上不应该发生,除非是初始边界情况。
                    # 在实际LSEQ中,会确保通过增加一个新数字来创建唯一的ID
                    new_digits.append(0) # 插入一个0作为新的层级
                    break
        return PositionIdentifier(new_digits, self.replica_id)

    def insert(self, index, char_value):
        """
        在指定索引处插入一个字符。
        index是基于当前可见文本的索引。
        """
        prev_char, next_char = self._get_insertion_context(index)
        new_pos_id = self._generate_position_id_between(
            prev_char.position_id, next_char.position_id
        )
        self.lamport_clock = max(self.lamport_clock, prev_char.timestamp, next_char.timestamp) + 1
        new_char = Char(char_value, new_pos_id, self.lamport_clock, self.replica_id)

        # 将新字符插入到chars列表中并保持有序
        # 这是一个O(N)操作,在大型文档中需要优化,例如使用平衡二叉搜索树
        # 这里使用简单的列表插入,以便理解逻辑
        inserted = False
        for i in range(len(self.chars)):
            if new_char < self.chars[i]:
                self.chars.insert(i, new_char)
                inserted = True
                break
        if not inserted:
            self.chars.append(new_char) # 理论上不应该发生,除非是边界情况

        return new_char # 返回新字符,以便远程传输

    def delete(self, index):
        """
        删除指定索引处的字符。
        不是真正从列表中移除,而是标记为已删除 (tombstone)。
        """
        char_to_delete = self._get_char_at_index(index)
        if char_to_delete:
            self.lamport_clock = max(self.lamport_clock, char_to_delete.timestamp) + 1
            char_to_delete.is_deleted = True
            char_to_delete.timestamp = self.lamport_clock # 更新删除操作的时间戳
            return char_to_delete # 返回被删除字符,以便远程传输
        return None

    def receive_char_operation(self, remote_char):
        """
        接收并应用来自远程副本的字符操作 (插入或删除)。
        """
        self.lamport_clock = max(self.lamport_clock, remote_char.timestamp) + 1

        # 检查是否已存在该 PositionIdentifier 的字符
        existing_char_index = -1
        for i, char in enumerate(self.chars):
            if char.position_id == remote_char.position_id:
                existing_char_index = i
                break

        if existing_char_index != -1:
            # 字符已存在,处理冲突或更新状态
            existing_char = self.chars[existing_char_index]

            # 冲突解决:如果远程操作的时间戳更新,则应用远程操作
            # 对于删除操作,删除总是胜出( tombstone wins over live char with older timestamp)
            # 如果时间戳相同,则根据replica_id决胜 (LWW)
            if remote_char.timestamp > existing_char.timestamp:
                existing_char.value = remote_char.value
                existing_char.is_deleted = remote_char.is_deleted
                existing_char.timestamp = remote_char.timestamp
                existing_char.replica_id = remote_char.replica_id
            elif remote_char.timestamp == existing_char.timestamp:
                if remote_char.replica_id > existing_char.replica_id: # 决定性决胜
                    existing_char.value = remote_char.value
                    existing_char.is_deleted = remote_char.is_deleted
                    existing_char.timestamp = remote_char.timestamp
                    existing_char.replica_id = remote_char.replica_id
                elif remote_char.is_deleted and not existing_char.is_deleted:
                    # 删除操作的优先级高于同一时间戳的插入/修改
                    existing_char.is_deleted = True
                    existing_char.timestamp = remote_char.timestamp
                    existing_char.replica_id = remote_char.replica_id
        else:
            # 字符不存在,直接插入 (保持有序)
            inserted = False
            for i in range(len(self.chars)):
                if remote_char < self.chars[i]:
                    self.chars.insert(i, remote_char)
                    inserted = True
                    break
            if not inserted:
                self.chars.append(remote_char)

    def get_current_text(self):
        """
        获取当前可见的文本内容 (不包含已删除字符和边界字符)。
        """
        return "".join([char.value for char in self.chars if not char.is_deleted and char.value != ''])

    def __repr__(self):
        visible_chars = [char for char in self.chars if char != self.left_boundary and char != self.right_boundary]
        return f"TextCRDT(id={self.replica_id}, text='{self.get_current_text()}', chars={visible_chars})"

# 辅助函数,用于在测试中模拟网络传输
def simulate_network_transfer(crdt_instance, char_op):
    # 模拟深拷贝,防止直接修改
    import copy
    return copy.deepcopy(char_op)

# 示例
editor1 = TextCRDT("editor1")
editor2 = TextCRDT("editor2")

print("--- 初始状态 ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")

# Editor1 插入 "H"
op1 = editor1.insert(0, "H")
editor2.receive_char_operation(simulate_network_transfer(editor2, op1))
print("n--- Editor1 插入 'H' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")

# Editor2 插入 "i"
op2 = editor2.insert(1, "i") # 插入到 H 后面
editor1.receive_char_operation(simulate_network_transfer(editor1, op2))
print("n--- Editor2 插入 'i' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")

# Editor1 在开头插入 "W" (并发)
op3_e1 = editor1.insert(0, "W")
# Editor2 在开头插入 "e" (并发)
op4_e2 = editor2.insert(0, "e")

# 模拟网络乱序,先处理 editor2 的操作,再处理 editor1 的操作
editor1.receive_char_operation(simulate_network_transfer(editor1, op4_e2))
editor2.receive_char_operation(simulate_network_transfer(editor2, op3_e1))

print("n--- 并发插入 'W' 和 'e' ---")
print(f"Editor1: {editor1.get_current_text()}") # 最终可能是 "WeHi" 或 "eWHi",取决于PositionIdentifier和replica_id的决胜规则
print(f"Editor2: {editor2.get_current_text()}")
# 检查内部状态
print(f"Editor1 internal chars: {editor1.chars}")
print(f"Editor2 internal chars: {editor2.chars}")

# 确保最终文本一致
assert editor1.get_current_text() == editor2.get_current_text()

# Editor1 删除 "W"
op5_e1 = editor1.delete(0) # 删除 "W" (假设 "W" 在 "e" 之前)
editor2.receive_char_operation(simulate_network_transfer(editor2, op5_e1))
print("n--- Editor1 删除 'W' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")

# Editor2 删除 "H"
op6_e2 = editor2.delete(1) # 删除 "H" (此时文本为 "eHi" 或 "eWHI", 假设是"eHi"的H)
editor1.receive_char_operation(simulate_network_transfer(editor1, op6_e2))
print("n--- Editor2 删除 'H' ---")
print(f"Editor1: {editor1.get_current_text()}")
print(f"Editor2: {editor2.get_current_text()}")

# 再次检查最终文本一致
assert editor1.get_current_text() == editor2.get_current_text()

print(f"n最终文本: {editor1.get_current_text()}")

代码解释:

  1. PositionIdentifier:这是关键。它是一个 digits 列表,例如 [1], [1, 5], [2]。这种结构允许我们通过在 digits 列表中添加新的数字来在任意两个现有标识符之间“创建”一个新标识符,而无需修改其他标识符。__lt__ 方法定义了这些标识符的排序规则。replica_iddigits 完全相同的情况下作为最终的决胜者。
  2. Char:每个字符都包含其值、一个唯一的 PositionIdentifier、一个 timestamp (Lamport Clock) 和 replica_idis_deleted 标志用于标记字符是否已被逻辑删除(墓碑)。
  3. TextCRDT
    • chars 列表:存储所有 Char 对象,并始终保持排序状态。
    • left_boundaryright_boundary:两个特殊的字符,它们的 PositionIdentifier 分别是最小和最大的,用于简化在文档开头和末尾插入字符时的 PositionIdentifier 生成。
    • _generate_lamport_timestamp():生成一个递增的时间戳,用于确保因果顺序和冲突解决。
    • _get_insertion_context():根据用户输入的逻辑索引(例如,第 3 个字符),找到其对应的实际 Char 对象(考虑已删除的字符)以及它前后的 Char 对象,以便为新插入的字符生成 PositionIdentifier
    • _generate_position_id_between():这是最复杂的逻辑,它根据前一个和后一个字符的 PositionIdentifier,生成一个新的 PositionIdentifier。它通过比较 digits 列表,寻找可以插入新数字的位置。如果 d2 - d1 > 1,则直接插入 d1 + 1。如果 d2 - d1 == 1d1 == d2,则需要深入到 digits 列表的下一层。
    • insert(index, char_value):创建一个新的 Char 对象,并将其插入到 self.chars 列表中,同时保持其有序性。
    • delete(index):找到对应的 Char 对象,并将其 is_deleted 标志设为 True。这是一个“软删除”,被删除的字符仍然存在于 chars 列表中,以确保其 PositionIdentifier 仍然有效,并且可以被其他副本正确引用。
    • receive_char_operation(remote_char):这是合并逻辑的核心。当接收到远程的字符操作时,它会检查本地是否已存在相同 PositionIdentifier 的字符。
      • 如果不存在,则直接按序插入。
      • 如果存在,则根据 timestampreplica_id(LWW 规则)以及删除优先规则来解决冲突。时间戳更新的操作会覆盖旧操作。如果时间戳相同,则 replica_id 较大的获胜。删除操作通常具有更高的优先级,即使时间戳相同,删除也往往会“胜出”。
    • get_current_text():遍历 chars 列表,过滤掉已删除的字符和边界字符,然后拼接成最终的字符串。

CRDTs 的工程实践与考量

虽然 CRDTs 提供了强大的理论保证,但在实际工程中仍需考虑以下因素:

  1. 性能与内存
    • 墓碑 (Tombstones):删除操作不会真正移除数据,而是标记为已删除。这会导致数据结构随着时间的推移不断增长,占用更多内存。需要定期进行垃圾回收 (Garbage Collection, GC) 或快照机制,将已删除的字符从历史中清除,同时确保不破坏其他副本的因果关系。
    • 排序列表:文本 CRDT 的 chars 列表需要始终保持有序。使用 Python 列表的 insertdel 操作是 O(N) 的,对于大型文档会非常慢。在生产环境中,通常会使用更高效的数据结构,如平衡二叉搜索树(例如红黑树)或跳表,将插入和删除操作的复杂度降至 O(log N)。
  2. 网络带宽
    • 状态合并型:传输整个状态会消耗大量带宽。对于大型数据结构,这可能不是一个可行的方案。
    • 操作复制型:虽然只传输操作,但为了保证因果顺序,通常需要传输版本向量或 Lamport 时钟信息,这会增加消息的开销。
    • Delta CRDTs:一种优化方案,只传输状态的“增量”(delta),而不是整个状态,兼顾了状态合并型的鲁棒性和操作复制型的带宽效率。
  3. 调试与复杂性:尽管 CRDTs 理论上比 OT 简单,但针对复杂数据类型(如富文本、嵌套结构)设计和实现 CRDTs 仍然需要深入理解其数学原理和冲突解决机制。调试分布式系统中 CRDTs 的最终一致性问题也可能具有挑战性。
  4. 与现有系统的集成:将 CRDTs 集成到现有的数据库或应用架构中可能需要进行结构性调整。

无锁协作的未来

CRDTs 代表了分布式系统中无锁、实时协作的未来方向。它们通过数学上的严谨性,为复杂的并发问题提供了一种优雅而健壮的解决方案。从简单的计数器到复杂的文本编辑器,CRDTs 的应用范围正在不断扩大。随着分布式系统和边缘计算的普及,CRDTs 的价值将愈发凸显,它们将使开发者能够构建出更具弹性、更易于扩展、用户体验更佳的协作应用。虽然在工程实现上仍面临一些挑战,但 CRDTs 的核心思想——通过设计无冲突的数据结构来规避冲突,而非解决冲突——无疑是分布式系统设计中的一个重要范式转变。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注