解析 ‘Transactional Graph Execution’:如何确保跨越 10 个节点的复杂操作在失败时能‘一键回滚’?

各位同仁,下午好!

今天,我们将深入探讨一个在现代数据管理,尤其是在图数据库领域中,既核心又极具挑战性的主题:事务性图执行(Transactional Graph Execution)。具体来说,我们将聚焦于一个关键问题:如何确保一个横跨十个乃至更多节点的复杂操作,在任何环节遭遇失败时,都能够实现彻底的“一键回滚”,仿佛从未发生过一样?这不仅仅是对系统健壮性的考验,更是对数据完整性与业务逻辑准确性的终极保障。

想象一下,在一个庞大的社交网络中,用户A删除其账户,这可能意味着需要:

  1. 删除用户A的节点。
  2. 删除所有与用户A相关的“朋友”关系。
  3. 删除所有用户A发布的“帖子”节点。
  4. 更新所有被用户A点赞过的“帖子”的点赞计数。
  5. 从所有用户A参与过的“群组”中移除其成员关系。
  6. 甚至可能触发通知给其朋友,或存档其数据。

这是一个典型的多节点、多关系操作。如果在处理第4步时系统崩溃,或者网络中断,我们绝不希望出现用户账户被部分删除、部分数据残留的混乱局面。我们期望的是,要么所有操作都成功,要么所有操作都回滚到操作之前的状态。这正是我们今天讲座的核心——如何构建这样的原子性操作。


第一章:理解事务的本质与图的挑战

在深入技术细节之前,我们必须对“事务”这个概念有一个清晰而深刻的理解。它不仅仅是一个数据库操作的封装,更是数据一致性的基石。

1.1 ACID 特性:事务的四大支柱

事务性系统之所以能够提供可靠的数据管理,是因为它们通常遵循 ACID 原则:

  • 原子性 (Atomicity):这是我们今天的主题。事务是不可分割的工作单元,要么全部完成,要么全部不完成(回滚)。“一键回滚”正是原子性的直观体现。
  • 一致性 (Consistency):事务将数据库从一个合法状态转换到另一个合法状态。它确保数据在事务提交后满足所有预定义的规则和约束(如类型约束、关系完整性、唯一性等)。
  • 隔离性 (Isolation):并发执行的事务之间互不影响。一个事务在完成之前,其所做的修改对其他事务是不可见的。常见的隔离级别有读未提交、读已提交、可重复读、串行化。
  • 持久性 (Durability):一旦事务提交,其所做的修改就是永久性的,即使系统发生故障也不会丢失。

在图数据库中,原子性和一致性尤为重要,因为图的互联性使得一个看似简单的操作可能牵一发而动全身。

1.2 图数据模型的特殊性与事务的挑战

图数据库以节点(Nodes)和关系(Relationships)为基本构建块,这种模型天生就适合表示复杂、互联的数据。然而,这种互联性也给事务带来了独特的挑战:

  • 高扇出效应 (High Fan-out Effect):一个节点可能连接着成千上万个其他节点。删除一个节点可能需要遍历并删除其所有关系,这可能是一个巨大的操作集。
  • 路径依赖 (Path Dependencies):某些业务逻辑可能依赖于特定的图路径。事务需要确保这些路径在操作过程中保持有效或在回滚后恢复。
  • 分布式挑战 (Distributed Challenges):当图数据量庞大到需要分布到多个服务器上时,跨服务器的事务管理变得异常复杂。我们的“10个节点”很可能就意味着数据分布在不同的物理存储单元上。

1.3 核心问题:多节点操作的原子性保障

当一个操作涉及修改多个节点(以及它们之间的关系)时,我们面临的核心问题是:如何追踪这些分散的修改,并在失败时将它们全部撤销?

举个例子:在一个电商平台的图数据库中,一个“订单支付”操作可能涉及:

  1. Order 节点的 status 属性从 PENDING 更新为 PAID
  2. 创建 Payment 节点,并将其与 Order 节点关联。
  3. 更新 User 节点的 total_spent 属性。
  4. 更新 Product 节点的 stock 属性。
  5. 创建 Notification 节点,通知用户订单已支付。

这五个步骤可能分散在不同的存储区域,甚至不同的物理机器上。如果在第4步更新 Product 库存时发生错误(例如,库存不足导致事务逻辑回滚),那么前三步已经完成的修改必须全部撤销。这就是我们所说的“一键回滚”——一个单一的失败点,触发所有相关操作的统一撤销。


第二章:单节点事务的基础:WAL 与 Undo/Redo Logs

在探讨分布式场景之前,我们首先要理解单节点数据库是如何实现原子性与持久性的。这通常依赖于 写前日志 (Write-Ahead Logging, WAL)Undo/Redo 日志 机制。

2.1 写前日志 (WAL) 原理

WAL 的核心思想是:在实际修改数据页之前,所有的修改操作都必须先记录到持久化的日志中。这意味着,即使数据库在数据页被修改后、但在日志记录完成前崩溃,我们也可以通过日志来恢复数据。

  • 原子性保障:如果事务在提交前失败,日志中未标记为提交的记录会被忽略,数据库回滚到事务开始前的状态。
  • 持久性保障:如果事务提交,日志中会记录提交信息。即使数据页尚未写入磁盘,系统崩溃后也可以通过日志进行重放 (Redo),确保已提交的数据不会丢失。

2.2 Undo Log 与 Redo Log

WAL 通常结合 Undo Log 和 Redo Log 来实现原子性和持久性。

  • Undo Log (回滚日志):记录了数据修改前的状态(即“旧值”)。当事务需要回滚时,系统会使用 Undo Log 中的信息将数据恢复到修改前的状态。它是实现原子性的关键。
  • Redo Log (重做日志):记录了数据修改后的状态(即“新值”),或者更准确地说,记录了如何重新应用一个操作以达到修改后的状态。当系统崩溃后,Redo Log 用于恢复已提交但尚未写入磁盘的数据。它是实现持久性的关键。

让我们通过一个简化的Python代码示例来模拟一个带有WAL和Undo/Redo日志的单节点图操作。

import time
import json
import os

# 模拟图数据库的存储
class Node:
    def __init__(self, node_id, properties=None):
        self.node_id = node_id
        self.properties = properties if properties is not None else {}
        self.relationships = {} # {rel_type: {target_node_id: rel_properties}}

    def __repr__(self):
        return f"Node({self.node_id}, {self.properties})"

class GraphStorage:
    def __init__(self, data_file="graph_data.json"):
        self.nodes = {}  # node_id -> Node object
        self.data_file = data_file
        self._load_from_disk()

    def _load_from_disk(self):
        if os.path.exists(self.data_file):
            try:
                with open(self.data_file, 'r') as f:
                    data = json.load(f)
                    for node_id, node_data in data.items():
                        node = Node(node_id, node_data['properties'])
                        node.relationships = node_data['relationships']
                        self.nodes[node_id] = node
            except json.JSONDecodeError:
                print(f"Warning: Could not decode {self.data_file}. Starting fresh.")
                self.nodes = {}
        else:
            self.nodes = {}

    def _save_to_disk(self):
        data = {}
        for node_id, node in self.nodes.items():
            data[node_id] = {
                'properties': node.properties,
                'relationships': node.relationships
            }
        with open(self.data_file, 'w') as f:
            json.dump(data, f, indent=2)

    def get_node(self, node_id):
        return self.nodes.get(node_id)

    def add_node(self, node_id, properties=None):
        if node_id in self.nodes:
            raise ValueError(f"Node {node_id} already exists.")
        node = Node(node_id, properties)
        self.nodes[node_id] = node
        return node

    def update_node_properties(self, node_id, new_properties):
        node = self.nodes.get(node_id)
        if node:
            node.properties.update(new_properties)
            return True
        return False

    def delete_node(self, node_id):
        if node_id in self.nodes:
            # First, remove all relationships pointing TO this node
            for other_node_id, other_node in self.nodes.items():
                if other_node_id == node_id:
                    continue
                for rel_type in list(other_node.relationships.keys()):
                    if node_id in other_node.relationships[rel_type]:
                        del other_node.relationships[rel_type][node_id]
                        if not other_node.relationships[rel_type]:
                            del other_node.relationships[rel_type]

            # Then, delete the node itself
            del self.nodes[node_id]
            return True
        return False

    def add_relationship(self, from_node_id, to_node_id, rel_type, properties=None):
        from_node = self.nodes.get(from_node_id)
        to_node = self.nodes.get(to_node_id)
        if from_node and to_node:
            if rel_type not in from_node.relationships:
                from_node.relationships[rel_type] = {}
            from_node.relationships[rel_type][to_node_id] = properties if properties is not None else {}
            return True
        return False

    def delete_relationship(self, from_node_id, to_node_id, rel_type):
        from_node = self.nodes.get(from_node_id)
        if from_node and rel_type in from_node.relationships and to_node_id in from_node.relationships[rel_type]:
            del from_node.relationships[rel_type][to_node_id]
            if not from_node.relationships[rel_type]:
                del from_node.relationships[rel_type]
            return True
        return False

# 事务日志记录
class TransactionLog:
    def __init__(self, log_file="transaction.log"):
        self.log_file = log_file
        self.current_tx_id = 0
        self._load_last_tx_id()

    def _load_last_tx_id(self):
        if os.path.exists(self.log_file):
            with open(self.log_file, 'r') as f:
                for line in f:
                    try:
                        entry = json.loads(line)
                        if entry['type'] == 'COMMIT':
                            self.current_tx_id = max(self.current_tx_id, entry['tx_id'])
                    except json.JSONDecodeError:
                        continue # Ignore malformed lines
        self.current_tx_id += 1 # Next transaction ID

    def _write_log(self, entry):
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(entry) + 'n')

    def begin_transaction(self):
        tx_id = self.current_tx_id
        self.current_tx_id += 1
        self._write_log({'type': 'BEGIN', 'tx_id': tx_id, 'timestamp': time.time()})
        return tx_id

    def log_undo_redo_entry(self, tx_id, operation, old_value, new_value=None):
        """
        Log an operation with its undo/redo information.
        operation: 'ADD_NODE', 'UPDATE_NODE_PROP', 'DELETE_NODE', 'ADD_REL', 'DELETE_REL'
        old_value: data to restore for undo
        new_value: data to apply for redo (optional, for some ops old_value is enough)
        """
        entry = {
            'type': 'DATA_CHANGE',
            'tx_id': tx_id,
            'operation': operation,
            'old_value': old_value,
            'new_value': new_value,
            'timestamp': time.time()
        }
        self._write_log(entry)

    def commit_transaction(self, tx_id):
        self._write_log({'type': 'COMMIT', 'tx_id': tx_id, 'timestamp': time.time()})

    def rollback_transaction(self, tx_id):
        self._write_log({'type': 'ROLLBACK', 'tx_id': tx_id, 'timestamp': time.time()})

    def get_transaction_log(self):
        logs = []
        if os.path.exists(self.log_file):
            with open(self.log_file, 'r') as f:
                for line in f:
                    try:
                        logs.append(json.loads(line))
                    except json.JSONDecodeError:
                        continue
        return logs

# 事务管理器
class TransactionManager:
    def __init__(self, graph_storage, transaction_log):
        self.graph_storage = graph_storage
        self.transaction_log = transaction_log
        self.active_transactions = {} # tx_id -> list of pending changes for undo

    def begin(self):
        tx_id = self.transaction_log.begin_transaction()
        self.active_transactions[tx_id] = []
        print(f"Transaction {tx_id} started.")
        return tx_id

    def _record_change(self, tx_id, operation, old_value, new_value=None):
        """Records change for potential undo."""
        self.active_transactions[tx_id].append({
            'operation': operation,
            'old_value': old_value,
            'new_value': new_value # For redo, or just context
        })
        self.transaction_log.log_undo_redo_entry(tx_id, operation, old_value, new_value)

    def add_node(self, tx_id, node_id, properties=None):
        if self.graph_storage.get_node(node_id):
            raise ValueError(f"Node {node_id} already exists.")
        self._record_change(tx_id, 'ADD_NODE', {'node_id': node_id}) # To undo, we delete it
        return self.graph_storage.add_node(node_id, properties)

    def update_node_properties(self, tx_id, node_id, new_properties):
        node = self.graph_storage.get_node(node_id)
        if not node:
            raise ValueError(f"Node {node_id} does not exist.")
        old_properties = {k: node.properties.get(k) for k in new_properties} # Only record changed keys
        self._record_change(tx_id, 'UPDATE_NODE_PROP', {'node_id': node_id, 'old_props': old_properties}, new_properties)
        return self.graph_storage.update_node_properties(node_id, new_properties)

    def delete_node(self, tx_id, node_id):
        node = self.graph_storage.get_node(node_id)
        if not node:
            raise ValueError(f"Node {node_id} does not exist.")
        # For simplicity, store full node state to recreate
        old_node_state = {
            'node_id': node.node_id,
            'properties': dict(node.properties),
            'relationships': {rt: dict(targets) for rt, targets in node.relationships.items()}
        }
        # Also, relationships pointing TO this node must be recorded for undo
        incoming_rels = []
        for other_node_id, other_node in self.graph_storage.nodes.items():
            if other_node_id == node_id: continue
            for rel_type, targets in other_node.relationships.items():
                if node_id in targets:
                    incoming_rels.append({
                        'from_node_id': other_node_id,
                        'to_node_id': node_id,
                        'rel_type': rel_type,
                        'properties': dict(targets[node_id])
                    })

        self._record_change(tx_id, 'DELETE_NODE', {'node_id': node_id, 'old_state': old_node_state, 'incoming_rels': incoming_rels})
        return self.graph_storage.delete_node(node_id)

    def add_relationship(self, tx_id, from_node_id, to_node_id, rel_type, properties=None):
        from_node = self.graph_storage.get_node(from_node_id)
        to_node = self.graph_storage.get_node(to_node_id)
        if not (from_node and to_node):
            raise ValueError("One or both nodes for relationship do not exist.")

        # To undo, we delete this relationship
        self._record_change(tx_id, 'ADD_REL', {'from': from_node_id, 'to': to_node_id, 'type': rel_type})
        return self.graph_storage.add_relationship(from_node_id, to_node_id, rel_type, properties)

    def delete_relationship(self, tx_id, from_node_id, to_node_id, rel_type):
        from_node = self.graph_storage.get_node(from_node_id)
        if not (from_node and rel_type in from_node.relationships and to_node_id in from_node.relationships[rel_type]):
             raise ValueError("Relationship does not exist.")

        # To undo, we add this relationship back
        old_rel_props = from_node.relationships[rel_type][to_node_id]
        self._record_change(tx_id, 'DELETE_REL', {'from': from_node_id, 'to': to_node_id, 'type': rel_type, 'properties': old_rel_props})
        return self.graph_storage.delete_relationship(from_node_id, to_node_id, rel_type)

    def commit(self, tx_id):
        if tx_id not in self.active_transactions:
            raise ValueError(f"Transaction {tx_id} is not active.")

        self.transaction_log.commit_transaction(tx_id)
        self.graph_storage._save_to_disk() # Durable storage after commit
        del self.active_transactions[tx_id]
        print(f"Transaction {tx_id} committed.")

    def rollback(self, tx_id):
        if tx_id not in self.active_transactions:
            print(f"Transaction {tx_id} not active, nothing to rollback.")
            return

        print(f"Rolling back transaction {tx_id}...")
        # Undo changes in reverse order
        for change in reversed(self.active_transactions[tx_id]):
            operation = change['operation']
            old_value = change['old_value']

            try:
                if operation == 'ADD_NODE':
                    # To undo ADD_NODE, we delete the node
                    self.graph_storage.delete_node(old_value['node_id'])
                    print(f"  Undo: Deleted node {old_value['node_id']}")
                elif operation == 'UPDATE_NODE_PROP':
                    # To undo UPDATE_NODE_PROP, we revert properties
                    self.graph_storage.update_node_properties(old_value['node_id'], old_value['old_props'])
                    print(f"  Undo: Reverted properties for node {old_value['node_id']}")
                elif operation == 'DELETE_NODE':
                    # To undo DELETE_NODE, we add the node back and its incoming relationships
                    node_state = old_value['old_state']
                    self.graph_storage.add_node(node_state['node_id'], node_state['properties'])
                    for rel_type, targets in node_state['relationships'].items():
                        for target_id, props in targets.items():
                            self.graph_storage.add_relationship(node_state['node_id'], target_id, rel_type, props)
                    for incoming_rel in old_value['incoming_rels']:
                        self.graph_storage.add_relationship(
                            incoming_rel['from_node_id'], 
                            incoming_rel['to_node_id'], 
                            incoming_rel['rel_type'], 
                            incoming_rel['properties']
                        )
                    print(f"  Undo: Re-added node {node_state['node_id']} and its relationships")
                elif operation == 'ADD_REL':
                    # To undo ADD_REL, we delete the relationship
                    self.graph_storage.delete_relationship(old_value['from'], old_value['to'], old_value['type'])
                    print(f"  Undo: Deleted relationship {old_value['from']}-{old_value['type']}->{old_value['to']}")
                elif operation == 'DELETE_REL':
                    # To undo DELETE_REL, we add the relationship back
                    self.graph_storage.add_relationship(old_value['from'], old_value['to'], old_value['type'], old_value['properties'])
                    print(f"  Undo: Re-added relationship {old_value['from']}-{old_value['type']}->{old_value['to']}")
            except Exception as e:
                print(f"  ERROR during undo for change {change}: {e}")
                # In a real system, this would be critical and require manual intervention or more robust recovery

        self.transaction_log.rollback_transaction(tx_id)
        self.graph_storage._save_to_disk() # Persist the rolled-back state
        del self.active_transactions[tx_id]
        print(f"Transaction {tx_id} rolled back successfully.")

    def recover(self):
        """
        Recovers the graph state by replaying committed transactions and rolling back uncommitted ones.
        This is a simplified recovery mechanism.
        """
        print("Starting recovery process...")
        committed_tx_ids = set()
        uncommitted_tx_data = {} # tx_id -> list of DATA_CHANGE entries

        # First pass: identify committed transactions and group data changes
        for entry in self.transaction_log.get_transaction_log():
            tx_id = entry['tx_id']
            if entry['type'] == 'COMMIT':
                committed_tx_ids.add(tx_id)
            elif entry['type'] == 'ROLLBACK':
                if tx_id in uncommitted_tx_data:
                    del uncommitted_tx_data[tx_id] # Mark as rolled back
            elif entry['type'] == 'DATA_CHANGE':
                if tx_id not in uncommitted_tx_data:
                    uncommitted_tx_data[tx_id] = []
                uncommitted_tx_data[tx_id].append(entry)

        # Second pass: apply redo for committed, undo for uncommitted
        # For simplicity, we assume current graph_storage is "empty" or represents a prior valid state
        # A more robust recovery would involve diffing current state with log or checkpointing.
        # Here, we simulate by re-applying *only* committed changes if the current in-memory state is incorrect.

        # This recovery is more about ensuring the log reflects the correct final state on disk.
        # The `_save_to_disk` on commit/rollback handles the actual data persistence.

        print(f"Recovery complete. Total committed: {len(committed_tx_ids)}")
        # In a real system, we'd iterate through the log and apply redo for committed and undo for uncommitted
        # based on the last checkpoint. For this example, `_save_to_disk` during commit/rollback is the primary persistence.

# --- Usage Example ---
if __name__ == "__main__":
    # Clean up previous runs
    if os.path.exists("graph_data.json"):
        os.remove("graph_data.json")
    if os.path.exists("transaction.log"):
        os.remove("transaction.log")

    graph_storage = GraphStorage()
    transaction_log = TransactionLog()
    tx_manager = TransactionManager(graph_storage, transaction_log)

    print("--- Scenario 1: Successful Transaction ---")
    tx1 = tx_manager.begin()
    try:
        tx_manager.add_node(tx1, "UserA", {"name": "Alice"})
        tx_manager.add_node(tx1, "Post1", {"title": "Hello Graph"})
        tx_manager.add_relationship(tx1, "UserA", "Post1", "AUTHORED")
        tx_manager.update_node_properties(tx1, "UserA", {"age": 30})
        tx_manager.add_node(tx1, "UserB", {"name": "Bob"})
        tx_manager.add_relationship(tx1, "UserA", "UserB", "FRIENDS_WITH")
        tx_manager.commit(tx1)
    except Exception as e:
        print(f"Transaction 1 failed: {e}. Rolling back.")
        tx_manager.rollback(tx1)

    print("n--- Current Graph State after TX1 ---")
    print(graph_storage.nodes)
    print(graph_storage.get_node("UserA").relationships if graph_storage.get_node("UserA") else "UserA not found")

    print("n--- Scenario 2: Transaction with Rollback ---")
    tx2 = tx_manager.begin()
    try:
        tx_manager.add_node(tx2, "UserC", {"name": "Charlie"})
        tx_manager.add_node(tx2, "Post2", {"title": "Another Post"})
        tx_manager.add_relationship(tx2, "UserC", "Post2", "AUTHORED")

        print("Simulating a failure during transaction 2...")
        # Simulate an error that causes rollback
        if True: # Force an error
            raise ValueError("Artificial error: Stock is low!")

        tx_manager.update_node_properties(tx2, "UserC", {"status": "active"}) # This line will not be reached
        tx_manager.commit(tx2)
    except ValueError as e:
        print(f"Transaction 2 failed: {e}. Initiating rollback.")
        tx_manager.rollback(tx2)
    except Exception as e:
        print(f"An unexpected error occurred: {e}. Initiating rollback.")
        tx_manager.rollback(tx2)

    print("n--- Current Graph State after TX2 (should not contain UserC) ---")
    print(graph_storage.nodes)
    print(graph_storage.get_node("UserC") if graph_storage.get_node("UserC") else "UserC not found (correctly rolled back)")

    print("n--- Scenario 3: Complex Rollback (Deleting a node with relationships) ---")
    # First, setup some data
    tx3_setup = tx_manager.begin()
    tx_manager.add_node(tx3_setup, "GroupX", {"name": "Developers"})
    tx_manager.add_relationship(tx3_setup, "UserA", "GroupX", "MEMBER_OF")
    tx_manager.add_node(tx3_setup, "UserD", {"name": "David"})
    tx_manager.add_relationship(tx3_setup, "UserD", "GroupX", "MEMBER_OF")
    tx_manager.commit(tx3_setup)

    print("n--- Graph State Before Complex TX ---")
    print(graph_storage.get_node("UserA"))
    print(graph_storage.get_node("GroupX"))
    print(graph_storage.get_node("UserD"))

    tx3_complex = tx_manager.begin()
    try:
        print(f"Attempting to delete UserA in TX {tx3_complex}")
        tx_manager.delete_node(tx3_complex, "UserA") # This should also remove UserA's relationships

        # Simulate another operation that fails
        tx_manager.add_node(tx3_complex, "InvalidNode", {"bad": True}) # This will be rolled back
        raise ValueError("Simulated failure after deleting UserA.")

        tx_manager.commit(tx3_complex)
    except ValueError as e:
        print(f"Complex transaction {tx3_complex} failed: {e}. Rolling back.")
        tx_manager.rollback(tx3_complex)

    print("n--- Current Graph State after Complex TX Rollback (UserA should be back) ---")
    print(graph_storage.get_node("UserA") if graph_storage.get_node("UserA") else "UserA not found (ERROR: should be back)")
    print(graph_storage.get_node("UserA").relationships if graph_storage.get_node("UserA") else "UserA not found")
    print(graph_storage.get_node("GroupX") if graph_storage.get_node("GroupX") else "GroupX not found")
    # Verify relationships are restored
    if graph_storage.get_node("UserA"):
        print(f"UserA relationships: {graph_storage.get_node('UserA').relationships}")
    if graph_storage.get_node("GroupX"):
        print(f"GroupX relationships: {graph_storage.get_node('GroupX').relationships}")

    print("n--- Transaction Log Content ---")
    # transaction_log.recover() # This is a conceptual call, `_save_to_disk` is the actual persistence
    for entry in transaction_log.get_transaction_log():
        print(entry)

代码解析:

  1. GraphStorage: 模拟了图数据的内存存储和磁盘持久化。它包含添加、更新、删除节点和关系的方法。
  2. TransactionLog: 负责记录所有事务事件(BEGIN, DATA_CHANGE, COMMIT, ROLLBACK)。DATA_CHANGE 记录了操作类型、旧值(用于 Undo)和新值(用于 Redo)。
  3. TransactionManager: 这是核心组件。
    • begin(): 开启新事务,生成 tx_id,并在 active_transactions 中为该事务初始化一个空的变更列表。
    • _record_change(): 每次对 GraphStorage 进行操作前,都会调用此方法,将操作的“逆操作”信息(即 Undo 信息)记录到 active_transactions[tx_id] 列表中,同时也写入 TransactionLog
    • commit(): 将事务标记为提交,并触发 GraphStorage 将当前状态持久化到磁盘。
    • rollback(): 这是原子性的关键。它会逆序遍历 active_transactions[tx_id] 中记录的所有变更,并执行对应的逆操作,将图数据恢复到事务开始前的状态。例如,如果事务中 ADD_NODE 了一个节点,回滚时就会 DELETE_NODE 该节点。
    • recover(): 这是一个简化的恢复机制,旨在演示在系统崩溃后如何通过日志识别已提交或未提交的事务。在实际生产系统中,这会涉及读取 WAL,根据检查点重放 Redo Log,并对未提交事务执行 Undo Log。

这个单节点模型为我们理解更复杂的分布式事务奠定了基础。但请注意,这里的回滚是在单个内存空间内完成的,且持久化机制相对简单。当“10个节点”分布在不同机器时,情况会变得复杂得多。


第三章:分布式事务:跨越多个节点的原子性挑战

当我们的“10个节点”不仅仅是逻辑上的独立实体,而是物理上分布在多台服务器上时,实现原子性就成了一个典型的分布式事务问题。此时,单机 WAL 的机制不再适用,我们需要更强大的协调机制。

3.1 分布式事务的复杂性

分布式事务面临的挑战包括:

  • 网络延迟与分区容忍性:不同节点间的通信不可靠,可能出现消息丢失、延迟,甚至网络分区。
  • 独立故障模式:一个节点可能失败,而其他节点正常运行。
  • 两阶段提交 (Two-Phase Commit, 2PC) 的局限性:虽然广泛使用,但存在性能瓶颈和单点故障问题。
  • 数据一致性:在并发和分布环境下,维护 ACID 属性变得极其困难。

3.2 两阶段提交 (Two-Phase Commit, 2PC)

2PC 是实现分布式事务原子性的经典协议。它确保所有参与者要么全部提交事务,要么全部回滚。

角色:

  • 协调者 (Coordinator):发起事务,并协调所有参与者。
  • 参与者 (Participants):执行事务的各个部分,并响应协调者的指令。

阶段:

  1. 准备阶段 (Prepare Phase)

    • 协调者向所有参与者发送 prepare 请求。
    • 每个参与者执行事务的所有操作,并将操作记录在自己的日志中(例如,写入 Undo/Redo Log),但不提交。
    • 参与者将操作结果持久化,并锁定资源。
    • 如果参与者能够成功完成事务,它会向协调者发送 vote_commit 消息;否则,发送 vote_abort
  2. 提交阶段 (Commit Phase)

    • 如果所有参与者都投 vote_commit:协调者向所有参与者发送 commit 请求。参与者提交事务,释放资源,并向协调者发送 ack (acknowledgement)。协调者收到所有 ack 后,事务完成。
    • 如果有任何一个参与者投 vote_abort,或协调者超时未收到所有回复:协调者向所有参与者发送 abort 请求。参与者回滚事务,释放资源,并向协调者发送 ack。协调者收到所有 ack 后,事务完成。

2PC 状态机:

协调者状态 参与者状态 描述
INIT INIT 事务开始
PREPARING PREPARED 协调者发送 prepare,参与者执行操作并记录,持久化日志,锁定资源,等待投票
COMMITTING COMMITTED 协调者收到所有 vote_commit,发送 commit,参与者提交,释放资源
ABORTING ABORTED 协调者收到任何 vote_abort 或超时,发送 abort,参与者回滚,释放资源

2PC 的优缺点:

  • 优点
    • 保证分布式事务的原子性。
    • 相对简单,易于理解和实现(相较于更复杂的分布式一致性算法)。
  • 缺点
    • 同步阻塞:参与者在准备阶段会锁定资源,直到收到协调者的提交或回滚指令,这可能导致长时间的阻塞,影响系统吞吐量。
    • 单点故障:协调者是单点。如果协调者在提交阶段前崩溃,参与者将永远处于 PREPARED 状态,资源无法释放(“悬挂事务”)。
    • 性能开销:多轮网络通信增加了延迟。

下面是一个简化的 2PC 模拟,演示如何协调多个“节点”(这里是 GraphParticipant 实例)执行一个原子性操作。

import time
import random
import threading
from collections import defaultdict

# Re-using GraphStorage, TransactionLog, TransactionManager from previous example
# For 2PC, each participant will have its own GraphStorage and TransactionManager
# We'll simplify the actual graph operations within participants for focus on 2PC logic.

# A simple mock for a participant's local graph storage and transaction manager
class MockGraphParticipant:
    def __init__(self, participant_id):
        self.participant_id = participant_id
        # In a real system, each participant would have its own GraphStorage and TransactionManager
        # For this simulation, we'll just track a "state" for simplicity.
        self.state = {} # Simulating nodes/properties managed by this participant
        self.pending_changes = {} # tx_id -> list of changes
        self.locked_resources = {} # tx_id -> list of locked resources
        self.last_tx_decision = {} # tx_id -> 'COMMIT' or 'ABORT'
        print(f"Participant {self.participant_id} initialized.")

    def prepare(self, tx_id, operations):
        """
        Simulates the prepare phase for a transaction.
        operations: A list of dicts, e.g., [{'type': 'ADD_NODE', 'node_id': 'User1'}]
        """
        print(f"Participant {self.participant_id}: Received prepare for TX {tx_id}")
        self.pending_changes[tx_id] = operations

        # Simulate local validation and resource locking
        can_commit = random.choice([True, True, True, False]) # Simulate occasional failure
        if can_commit:
            # Simulate locking resources (e.g., specific nodes/relationships)
            self.locked_resources[tx_id] = [op.get('node_id') for op in operations if 'node_id' in op]
            print(f"Participant {self.participant_id}: TX {tx_id} PREPARED. Locked: {self.locked_resources[tx_id]}")
            return True # Vote commit
        else:
            print(f"Participant {self.participant_id}: TX {tx_id} ABORTING (simulated failure during prepare).")
            # Clear pending changes if unable to prepare
            if tx_id in self.pending_changes:
                del self.pending_changes[tx_id]
            return False # Vote abort

    def commit(self, tx_id):
        """Simulates the commit phase."""
        print(f"Participant {self.participant_id}: Received commit for TX {tx_id}")
        if tx_id not in self.pending_changes:
            # This can happen if prepare failed, or already committed/aborted
            print(f"Participant {self.participant_id}: TX {tx_id} not in pending state. Already handled or prepare failed.")
            return True

        # Apply changes to local state
        for op in self.pending_changes[tx_id]:
            if op['type'] == 'ADD_NODE':
                self.state[op['node_id']] = op.get('properties', {})
                print(f"Participant {self.participant_id}: Added node {op['node_id']}")
            elif op['type'] == 'UPDATE_NODE':
                node_id = op['node_id']
                if node_id not in self.state:
                    self.state[node_id] = {} # Initialize if not exists (should be added by another op or pre-exist)
                self.state[node_id].update(op['properties'])
                print(f"Participant {self.participant_id}: Updated node {op['node_id']}")
            # Add other operations (delete, add_rel, etc.) here

        # Release resources and clear pending changes
        if tx_id in self.locked_resources:
            del self.locked_resources[tx_id]
        del self.pending_changes[tx_id]
        self.last_tx_decision[tx_id] = 'COMMIT'
        print(f"Participant {self.participant_id}: TX {tx_id} COMMITTED. State: {self.state}")
        return True

    def abort(self, tx_id):
        """Simulates the abort/rollback phase."""
        print(f"Participant {self.participant_id}: Received abort for TX {tx_id}")
        # Discard pending changes (no need to undo if not committed)
        if tx_id in self.pending_changes:
            del self.pending_changes[tx_id]
        # Release resources
        if tx_id in self.locked_resources:
            del self.locked_resources[tx_id]
        self.last_tx_decision[tx_id] = 'ABORT'
        print(f"Participant {self.participant_id}: TX {tx_id} ABORTED. State: {self.state}")
        return True

    def get_state(self):
        return self.state

# 2PC 协调者
class TwoPhaseCommitCoordinator:
    def __init__(self, participants):
        self.participants = participants # List of MockGraphParticipant instances
        self.transactions = {} # tx_id -> {'status': 'INIT', 'participants': [], 'operations': {}}
        self.next_tx_id = 1

    def begin_transaction(self, tx_operations_map):
        """
        Starts a new 2PC transaction.
        tx_operations_map: dict where key is participant_id, value is list of operations for that participant.
        e.g., {'P1': [{'type': 'ADD_NODE', 'node_id': 'U1'}], 'P2': [{'type': 'UPDATE_NODE', 'node_id': 'P1', 'properties': {'views': 10}}]}
        """
        tx_id = self.next_tx_id
        self.next_tx_id += 1

        participant_ids = list(tx_operations_map.keys())
        self.transactions[tx_id] = {
            'status': 'INIT',
            'participants': participant_ids,
            'operations': tx_operations_map
        }
        print(f"nCoordinator: Starting TX {tx_id} with participants {participant_ids}")
        return tx_id

    def execute_transaction(self, tx_id):
        if tx_id not in self.transactions:
            print(f"Coordinator: TX {tx_id} not found.")
            return False

        tx_info = self.transactions[tx_id]
        participants_to_involve = [p for p in self.participants if p.participant_id in tx_info['participants']]

        # Phase 1: Prepare
        tx_info['status'] = 'PREPARING'
        print(f"Coordinator: TX {tx_id} - Sending prepare requests.")

        votes = []
        for p in participants_to_involve:
            ops_for_participant = tx_info['operations'].get(p.participant_id, [])
            try:
                # Simulate network delay and potential participant crash
                time.sleep(random.uniform(0.01, 0.1))
                if random.random() < 0.05: # 5% chance a participant crashes during prepare
                    print(f"Coordinator: Participant {p.participant_id} CRASHED during prepare for TX {tx_id}!")
                    votes.append(False)
                    continue

                vote = p.prepare(tx_id, ops_for_participant)
                votes.append(vote)
            except Exception as e:
                print(f"Coordinator: Error during prepare for participant {p.participant_id}: {e}")
                votes.append(False) # Treat any error as a vote_abort

        all_voted_commit = all(votes)

        # Phase 2: Commit or Abort
        if all_voted_commit:
            tx_info['status'] = 'COMMITTING'
            print(f"Coordinator: TX {tx_id} - All participants voted COMMIT. Sending commit requests.")
            for p in participants_to_involve:
                try:
                    time.sleep(random.uniform(0.01, 0.05))
                    if random.random() < 0.02: # 2% chance a participant crashes during commit
                        print(f"Coordinator: Participant {p.participant_id} CRASHED during commit for TX {tx_id}!")
                        # In a real system, this would require recovery logic (e.g., retries, durable coordinator state)
                        continue 
                    p.commit(tx_id)
                except Exception as e:
                    print(f"Coordinator: Error during commit for participant {p.participant_id}: {e}")
            tx_info['status'] = 'COMMITTED'
            print(f"Coordinator: TX {tx_id} COMMITTED.")
            return True
        else:
            tx_info['status'] = 'ABORTING'
            print(f"Coordinator: TX {tx_id} - Some participants voted ABORT. Sending abort requests.")
            for p in participants_to_involve:
                try:
                    time.sleep(random.uniform(0.01, 0.05))
                    p.abort(tx_id)
                except Exception as e:
                    print(f"Coordinator: Error during abort for participant {p.participant_id}: {e}")
            tx_info['status'] = 'ABORTED'
            print(f"Coordinator: TX {tx_id} ABORTED.")
            return False

# --- 2PC Usage Example ---
if __name__ == "__main__":
    # Setup participants (each representing a logical 'node' or partition of the graph)
    participant_p1 = MockGraphParticipant("P1")
    participant_p2 = MockGraphParticipant("P2")
    participant_p3 = MockGraphParticipant("P3")

    coordinator = TwoPhaseCommitCoordinator([participant_p1, participant_p2, participant_p3])

    print("n--- 2PC Scenario 1: Successful Transaction Across 3 Nodes ---")
    tx1_ops = {
        "P1": [{'type': 'ADD_NODE', 'node_id': 'UserA', 'properties': {'name': 'Alice', 'location': 'Node1'}},
               {'type': 'UPDATE_NODE', 'node_id': 'ServiceX', 'properties': {'status': 'active'}}],
        "P2": [{'type': 'ADD_NODE', 'node_id': 'Post1', 'properties': {'title': 'Graph Query', 'author': 'UserA'}},
               {'type': 'UPDATE_NODE', 'node_id': 'SystemMetrics', 'properties': {'cpu_load': 0.7}}],
        "P3": [{'type': 'ADD_NODE', 'node_id': 'LogEntry1', 'properties': {'message': 'UserA created', 'timestamp': time.time()}}]
    }
    tx_id1 = coordinator.begin_transaction(tx1_ops)
    coordinator.execute_transaction(tx_id1)

    print("n--- States after TX1 ---")
    print(f"P1 State: {participant_p1.get_state()}")
    print(f"P2 State: {participant_p2.get_state()}")
    print(f"P3 State: {participant_p3.get_state()}")

    print("n--- 2PC Scenario 2: Transaction with Rollback (simulated failure in P2) ---")
    tx2_ops = {
        "P1": [{'type': 'ADD_NODE', 'node_id': 'UserB', 'properties': {'name': 'Bob', 'location': 'Node1'}}],
        "P2": [{'type': 'ADD_NODE', 'node_id': 'Post2', 'properties': {'title': 'New Feature', 'author': 'UserB'}}],
        "P3": [{'type': 'UPDATE_NODE', 'node_id': 'LogEntry1', 'properties': {'status': 'processed'}}]
    }
    tx_id2 = coordinator.begin_transaction(tx2_ops)
    coordinator.execute_transaction(tx_id2) # P2 will randomly fail during prepare

    print("n--- States after TX2 (UserB and Post2 should NOT be committed) ---")
    print(f"P1 State: {participant_p1.get_state()}")
    print(f"P2 State: {participant_p2.get_state()}")
    print(f"P3 State: {participant_p3.get_state()}")

    # Verify rollback
    assert 'UserB' not in participant_p1.get_state()
    assert 'Post2' not in participant_p2.get_state()
    assert participant_p3.get_state().get('LogEntry1', {}).get('status') != 'processed'

    print("n--- 2PC Scenario 3: Another Successful Transaction (ensure previous rollback didn't interfere) ---")
    tx3_ops = {
        "P1": [{'type': 'ADD_NODE', 'node_id': 'UserC', 'properties': {'name': 'Charlie', 'location': 'Node1'}}],
        "P3": [{'type': 'ADD_NODE', 'node_id': 'Notification1', 'properties': {'type': 'welcome', 'to': 'UserC'}}]
    }
    tx_id3 = coordinator.begin_transaction(tx3_ops)
    coordinator.execute_transaction(tx_id3)

    print("n--- States after TX3 ---")
    print(f"P1 State: {participant_p1.get_state()}")
    print(f"P2 State: {participant_p2.get_state()}")
    print(f"P3 State: {participant_p3.get_state()}")

代码解析:

  1. MockGraphParticipant: 模拟了图数据库的一个分区或实例。每个参与者有自己的状态 (self.state) 和记录待处理变更 (self.pending_changes) 的能力。
    • prepare(): 模拟了验证操作、记录待处理变更、锁定资源,并随机决定是否投票提交。
    • commit(): 将 pending_changes 应用到 self.state 并释放资源。
    • abort(): 简单地丢弃 pending_changes 并释放资源。
  2. TwoPhaseCommitCoordinator: 负责协调事务。
    • begin_transaction(): 初始化事务信息,包括所有参与者及其各自的操作。
    • execute_transaction():
      • 准备阶段:向所有参与者发送 prepare 请求。如果任何参与者返回 False(表示无法提交),或者模拟的崩溃发生,协调者就知道需要回滚。
      • 提交/回滚阶段:根据所有参与者的投票,向他们发送 commitabort 指令。

这个模拟清楚地展示了 2PC 如何通过两阶段的协调,确保一个分布式操作要么在所有参与者上都成功,要么都在所有参与者上回滚,从而实现原子性。然而,其同步阻塞和协调者单点故障的缺点,促使人们寻求其他解决方案。


第四章:应对 2PC 局限性的模式:Saga 与 OCC

为了克服 2PC 的一些缺点,特别是在长事务、高可用性和高性能要求场景下,出现了其他的事务模式。

4.1 Saga 模式

Saga 模式是一种处理长事务的模式,它将一个分布式事务分解为一系列本地事务。每个本地事务都有一个对应的补偿事务 (Compensating Transaction),用于撤销前一个本地事务的影响。如果任何本地事务失败,则会按逆序执行已经成功的所有本地事务的补偿事务。

Saga 的类型:

  • 编排 (Orchestration):有一个中央协调器(Saga Orchestrator)负责管理 Saga 的流程,决定执行哪个本地事务,以及在失败时触发哪个补偿事务。
  • 协同 (Choreography):没有中央协调器。每个本地事务完成时会发布一个事件,触发下一个本地事务的执行。如果一个本地事务失败,它会发布一个事件来触发补偿事务链。

Saga 的优缺点:

  • 优点
    • 非阻塞:本地事务可以独立提交,不会长时间锁定资源。
    • 高可用:没有单点协调者,部分节点失败不会导致整个系统停滞。
    • 性能好:减少了同步通信和等待时间。
  • 缺点
    • 最终一致性:在 Saga 运行过程中,系统可能处于中间不一致状态,直到所有本地事务完成或补偿事务全部执行完毕。这与 2PC 提供的强一致性不同。
    • 复杂性高:设计补偿事务、管理 Saga 状态和错误恢复逻辑比 2PC 复杂得多。
    • 隔离性弱:由于本地事务会提交,其他事务可能会看到中间状态。

Saga 与图操作:
在图数据库中,Saga 模式可以用于处理非常复杂的、跨多个微服务或子图的长时间运行操作。例如,用户注册并创建个人资料、发布第一篇文章、加入多个群组等一系列相互关联的图操作。

下面是一个使用编排式 Saga 模式的简化示例,模拟一个用户在分布式图系统中的“账户创建与初始化”流程。

import time
import random
from enum import Enum

# 定义事务步骤及其补偿步骤
class SagaStep:
    def __init__(self, name, execute_func, compensate_func):
        self.name = name
        self.execute = execute_func
        self.compensate = compensate_func

    def __repr__(self):
        return f"SagaStep({self.name})"

class SagaStatus(Enum):
    PENDING = 1
    COMPLETED = 2
    FAILED = 3
    COMPENSATING = 4
    COMPENSATED = 5

class SagaOrchestrator:
    def __init__(self, saga_name, steps):
        self.saga_name = saga_name
        self.steps = steps # List of SagaStep objects
        self.current_saga_id = 1
        self.sagas = {} # saga_id -> {'status': SagaStatus, 'executed_steps': [], 'failure_reason': None}

    def start_saga(self, context):
        saga_id = self.current_saga_id
        self.current_saga_id += 1
        self.sagas[saga_id] = {
            'status': SagaStatus.PENDING,
            'executed_steps': [],
            'failure_reason': None,
            'context': context # Data passed between steps
        }
        print(f"nOrchestrator: Starting Saga '{self.saga_name}' with ID {saga_id} for context: {context}")

        for i, step in enumerate(self.steps):
            try:
                print(f"  Saga {saga_id}: Executing step {i+1}/{len(self.steps)} - {step.name}")
                # Simulate a local transaction within the step
                success = step.execute(saga_id, self.sagas[saga_id]['context'])
                if not success:
                    raise Exception(f"Step '{step.name}' failed during execution.")

                self.sagas[saga_id]['executed_steps'].append(step)
                print(f"  Saga {saga_id}: Step '{step.name}' completed.")
                time.sleep(0.05) # Simulate work

            except Exception as e:
                self.sagas[saga_id]['status'] = SagaStatus.FAILED
                self.sagas[saga_id]['failure_reason'] = str(e)
                print(f"  Saga {saga_id}: Step '{step.name}' FAILED: {e}. Initiating compensation.")
                self._compensate(saga_id)
                return False # Saga failed

        self.sagas[saga_id]['status'] = SagaStatus.COMPLETED
        print(f"Orchestrator: Saga {saga_id} '{self.saga_name}' COMPLETED successfully.")
        return True

    def _compensate(self, saga_id):
        self.sagas[saga_id]['status'] = SagaStatus.COMPENSATING
        print(f"Orchestrator: Saga {saga_id} - Starting compensation for failed saga.")

        # Compensate in reverse order of execution
        for step in reversed(self.sagas[saga_id]['executed_steps']):
            try:
                print(f"  Saga {saga_id}: Compensating step '{step.name}'")
                step.compensate(saga_id, self.sagas[saga_id]['context'])
                print(f"  Saga {saga_id}: Compensation for '{step.name}' completed.")
                time.sleep(0.05)
            except Exception as e:
                print(f"  Saga {saga_id}: CRITICAL ERROR during compensation for '{step.name}': {e}")
                # In a real system, this would require human intervention or a more robust retry mechanism

        self.sagas[saga_id]['status'] = SagaStatus.COMPENSATED
        print(f"Orchestrator: Saga {saga_id} COMPENSATED.")

# --- Mock Services for Graph Operations ---
class UserService:
    def __init__(self):
        self.users = {} # user_id -> {'name': str, 'email': str}

    def create_user_node(self, saga_id, context):
        user_id = context['user_id']
        if random.random() < 0.1: # Simulate 10% chance of failure
            print(f"  UserService: FAILED to create user {user_id}")
            return False
        self.users[user_id] = {'name': context['username'], 'email': f"{context['username']}@example.com"}
        print(f"  UserService: User {user_id} created: {self.users[user_id]}")
        return True

    def delete_user_node(self, saga_id, context):
        user_id = context['user_id']
        if user_id in self.users:
            del self.users[user_id]
            print(f"  UserService: User {user_id} deleted (compensation).")
        return True

class ProfileService:
    def __init__(self):
        self.profiles = {} # user_id -> {'bio': str, 'avatar_url': str}

    def create_profile_node(self, saga_id, context):
        user_id = context['user_id']
        if random.random() < 0.1: # Simulate 10% chance of failure
            print(f"  ProfileService: FAILED to create profile for {user_id}")
            return False
        self.profiles[user_id] = {'bio': 'New user bio', 'avatar_url': 'default.png'}
        print(f"  ProfileService: Profile for {user_id} created.")
        return True

    def delete_profile_node(self, saga_id, context):
        user_id = context['user_id']
        if user_id in self.profiles:
            del self.profiles[user_id]
            print(f"  ProfileService: Profile for {user_id} deleted (compensation).")
        return True

class RelationshipService:
    def __init__(self):
        self.relationships = defaultdict(list) # user_id -> list of rels

    def add_default_relationships(self, saga_id, context):
        user_id = context['user_id']
        if random.random() < 0.1: # Simulate 10% chance of failure
            print(f"  RelationshipService: FAILED to add default relationships for {user_id}")
            return False
        self.relationships[user_id].append('FOLLOWS:AdminUser')
        self.relationships[user_id].append('MEMBER_OF:WelcomeGroup')
        print(f"  RelationshipService: Default relationships added for {user_id}.")
        return True

    def remove_default_relationships(self, saga_id, context):
        user_id = context['user_id']
        if user_id in self.relationships:
            del self.relationships[user_id]
            print(f"  RelationshipService: Default relationships removed for {user_id} (compensation).")
        return True

# --- Saga Usage Example ---
if __name__ == "__main__":
    user_service = UserService()
    profile_service = ProfileService()
    relationship_service = RelationshipService()

    # Define Saga steps for "User Onboarding"
    # Each step involves a local transaction on a specific service (representing a "node" or sub-graph partition)
    user_onboarding_steps = [
        SagaStep(
            name="CreateUserNode",
            execute_func=user_service.create_user_node,
            compensate_func=user_service.delete_user_node
        ),
        SagaStep(
            name="CreateProfileNode",
            execute_func=profile_service.create_profile_node,
            compensate_func=profile_service.delete_profile_node
        ),
        SagaStep(
            name="AddDefaultRelationships",
            execute_func=relationship_service.add_default_relationships,
            compensate_func=relationship_service.remove_default_relationships
        )
    ]

    onboarding_orchestrator = SagaOrchestrator("UserOnboardingSaga", user_onboarding_steps)

    print("--- Saga Scenario 1: Successful User Onboarding ---")
    user_context1 = {'user_id': 'U1', 'username': 'Alice'}
    onboarding_orchestrator.start_saga(user_context1)

    print("n--- Service States after Saga 1 ---")
    print(f"UserService: {user_service.users}")
    print(f"ProfileService: {profile_service.profiles}")
    print(f"RelationshipService: {relationship_service.relationships}")

    print("n--- Saga Scenario 2: User Onboarding Fails at CreateProfileNode, then Compensates ---")
    # To force failure, we can temporarily modify the profile_service to always fail
    original_create_profile = profile_service.create_profile_node
    profile_service.create_profile_node = lambda s_id, ctx: False # Force failure

    user_context2 = {'user_id': 'U2', 'username': 'Bob'}
    onboarding_orchestrator.start_saga(user_context2)

    # Restore original function
    profile_service.create_profile_node = original_create_profile

    print("n--- Service States after Saga 2 (U2 should be fully compensated) ---")
    print(f"UserService: {user_service.users}")
    print(f"ProfileService: {profile_service.profiles}")
    print(f"RelationshipService: {relationship_service.relationships}")

    # Verify U2 was compensated
    assert 'U2' not in user_service.users
    assert 'U2' not in profile_service.profiles
    assert 'U2' not in relationship_service.relationships

    print("n--- Saga Scenario 3: User Onboarding Fails at AddDefaultRelationships, then Compensates ---")
    original_add_relationships = relationship_service.add_default_relationships
    relationship_service.add_default_relationships = lambda s_id, ctx: False # Force failure

    user_context3 = {'user_id': 'U3', 'username': 'Charlie'}
    onboarding_orchestrator.start_saga(user_context3)

    relationship_service.add_default_relationships = original_add_relationships

    print("n--- Service States after Saga 3 (U3 should be fully compensated) ---")
    print(f"UserService: {user_service.users}")
    print(f"ProfileService: {profile_service.profiles}")
    print(f"RelationshipService: {relationship_service.relationships}")

    # Verify U3 was compensated
    assert 'U3' not in user_service.users
    assert 'U3' not in profile_service.profiles
    assert 'U3' not in relationship_service.relationships

代码解析:

  1. SagaStep: 定义了 Saga 中的一个原子操作,包括其执行逻辑 (execute_func) 和补偿逻辑 (compensate_func)。
  2. SagaOrchestrator: 这是 Saga 模式的核心,负责协调整个 Saga 的执行。
    • start_saga(): 顺序执行每个 SagaStepexecute 方法。
    • _compensate(): 如果任何一步 execute 失败,它会逆序遍历已成功执行的步骤,并调用它们的 compensate 方法,以撤销之前的操作。
  3. UserService, ProfileService, RelationshipService: 模拟了不同的微服务,每个服务负责管理图中的一部分数据(例如,用户节点、用户档案节点、用户关系)。它们各自实现了本地的创建和删除(补偿)操作。

Saga 模式的“一键回滚”是通过一系列精心设计的补偿操作来实现的。它不是瞬间的,而是一个过程,并且在回滚过程中,系统会短暂地处于中间状态,直到所有补偿操作完成。

4.2 乐观并发控制 (Optimistic Concurrency Control, OCC) 与版本控制

在某些图数据库中,尤其是在高并发读、低冲突的场景下,乐观并发控制 (OCC) 是一种有效的事务管理策略。它不使用传统的锁机制,而是假设事务之间不会发生冲突。

OCC 的基本原理:

  1. 读阶段 (Read Phase):事务读取数据并进行操作,不加任何锁。它会记录所读取数据的版本号或时间戳。
  2. 验证阶段 (Validation Phase):在事务提交前,系统检查事务读取和修改的数据是否已被其他并发事务修改。这通常通过比较版本号来实现。
  3. 写阶段 (Write Phase):如果验证成功,事务提交,其修改生效。如果验证失败(发现冲突),事务回滚并通常会重试。

OCC 的回滚:
在 OCC 中,回滚相对简单。由于事务在验证阶段之前并没有真正修改持久化数据(通常是在内存或临时存储中操作),所以回滚仅仅意味着丢弃这些未提交的修改。

版本控制 (Versioning) 在图数据库中与 OCC 紧密结合。每个节点和关系都可以有一个版本号或时间戳。

  • 当读取一个节点时,事务会记录其版本号。
  • 当尝试修改一个节点时,事务会检查该节点的当前版本号是否与读取时记录的版本号一致。
  • 如果不一致,说明在当前事务操作期间,该节点已被其他事务修改并提交,此时当前事务会失败并回滚。
  • 如果一致,事务将新版本号写入节点,并提交。

OCC 与图操作:
在一个复杂的图操作中,OCC 可以确保在修改多个节点时,这些节点在事务的整个生命周期内没有被其他事务干扰。

例如,在一个事务中,我们读取了 UserAPost1 的信息,并打算创建一个 LIKES 关系,并更新 Post1likes_count。在提交时,系统会检查 UserAPost1 的版本号是否与读取时一致。如果 Post1likes_count 在此期间被其他事务更新了,则当前事务会回滚。


import time
import random

class VersionedNode:
    def __init__(self, node_id, properties=None, version=0):
        self.node_id = node_id
        self.properties = properties if properties is not None else {}
        self.relationships = {} # {rel_type: {target_node_id: rel_properties}}
        self.version = version # Version counter

    def __repr__(self):
        return f"VersionedNode({self.node_id}, v={self.version}, props={self.properties})"

class VersionedGraphStorage:
    def __init__(self):
        self.nodes = {} # node_id -> VersionedNode object
        self.next_tx_id = 1

    def get_node(self, node_id):
        return self.nodes.get(node_id)

    def add_node(self, node_id, properties=None):
        if node_id in self.nodes:
            raise ValueError(f"Node {node_id} already exists.")
        node = VersionedNode(node_id, properties, version=0)
        self.nodes[node_id] = node
        return node

    def update_node_properties(self, node_id, new_properties):
        node = self.nodes.get(node_id)
        if node:
            node.properties.update(new_properties)
            node.version += 1 # Increment version on change
            return True
        return False

    def add_relationship(self, from_node_id, to_node_id, rel_type, properties=None):
        from_node = self.nodes.get(from_node_id)
        to_node = self.nodes.get(to_node_id)
        if from_node and to_node:
            if rel_type not in from_node.relationships:
                from_node.relationships[rel_type] = {}
            from_node.relationships[rel_type][to_node_id] = properties if properties is not None else {}
            from_node.version += 1 # Relationship changes also update source node's version
            return True
        return False

    # Simplified delete for this example, focusing on update/add versioning

# Optimistic Concurrency Control Transaction Manager
class OCCTransactionManager:
    def __init__(self, graph_storage):
        self.graph_storage = graph_storage
        self.active_transactions = {} # tx_id -> {'read_versions': {node_id: version}, 'writes': {node_id: new_node_state}}
        self.next_tx_id = 1

    def begin(self):
        tx_id = self.next_tx_id
        self.next_tx_id += 1
        self.active_transactions[tx_id] = {
            'read_versions': {}, # Node ID -> Version when read
            'writes': {},        # Node ID -> New properties/relationships to apply
            'new_nodes': {},     # Node ID -> New node object for nodes added in this TX
            'new_rels': []       # (from_id, to_id, rel_type, props) for new relationships
        }
        print(f"OCC Transaction {tx_id} started.")
        return tx_id

    def read_node(self, tx_id, node_id):
        if tx_id not in self.active_transactions:
            raise ValueError("Invalid transaction ID.")

        node = self.graph_storage.get_node(node_id)
        if node:
            # Record the version of the node when read
            self.active_transactions[tx_id]['read_versions'][node_id] = node.version
            # Return a copy to avoid direct modification of graph_storage during read phase
            return VersionedNode(node.node_id, dict(node.properties), node.version)
        return None

    def add_node(self, tx_id, node_id, properties=None):
        if tx_id not in self.active_transactions:
            raise ValueError("Invalid transaction ID.")

        if self.graph_storage.get_node(node_id) or node_id in self.active_transactions[tx_id]['new_nodes']:
            raise ValueError(f"Node {node_id} already exists or pending creation.")

        # Store new node temporarily
        new_node = VersionedNode(node_id, properties, version=0) # New nodes start at version 0 locally
        self.active_transactions[tx_id]['new_nodes'][node_id] = new_node
        print(f"  TX {tx_id}: Staged ADD_NODE {node_id}")
        return new_node

    def update_node_properties(self, tx_id, node_id, new_properties):
        if tx_id not in self.active_transactions:
            raise ValueError("Invalid transaction ID.")

        # Priority: check newly added nodes first
        target_node = self.active_transactions[tx_id]['new_nodes'].get(node_id)
        if target_node:
            target_node.properties.update(new_properties)
            print(f"  TX {tx_id}: Staged UPDATE_NODE_PROP for new node {node_id}")
            return True

        # If existing node, record current state to apply later
        node_in_storage = self.graph_storage.get_node(node_id)
        if not node_in_storage:
            raise ValueError(f"Node {node_id} does not exist.")

        # Ensure node was read, or record its current version for validation
        if node_id not in self.active_transactions[tx_id]['read_versions']:
             self.active_transactions[tx_id]['read_versions'][node_id] = node_in_storage.version

        # Store the update for commit phase
        if node_id not in self.active_transactions[tx_id]['writes']:
            self.active_transactions[tx_id]['writes'][node_id] = dict(node_in_storage.properties)
        self.active_transactions[tx_id]['writes'][node_id].update(new_properties)
        print(f"  TX {tx_id}: Staged UPDATE_NODE_PROP for existing node {node_id}")
        return True

    def add_relationship(self, tx_id, from_node_id, to_node_id, rel_type, properties=None):
        if tx_id not in self.active_transactions:
            raise ValueError("Invalid transaction ID.")

        # Ensure involved nodes exist either in storage or are newly added in this TX
        from_node_exists = self.graph_storage.get_node(from_node_id) or from_node_id in self.active_transactions[tx_id]['new_nodes']
        to_node_exists = self.graph_storage.get_node(to_node_id) or to_node_id in self.active_transactions[tx_id]['new_nodes']

        if not (from_node_exists and to_node_exists):
            raise ValueError("One or both nodes for relationship do not exist or are not part of this transaction.")

        # Record versions of nodes involved in relationship for validation
        if self.graph_storage.get_node(from_node_id) and from_node_id not in self.active_transactions[tx_id]['read_versions']:
            self.active_transactions[tx_id]['read_versions'][from_node_id] = self.graph_storage.get_node(from_node_id).version
        if self.graph_storage.get_node(to_node_id) and to_node_id not in self.active_transactions[tx_id]['read_versions']:
            self.active_transactions[tx_id]['read_versions'][to_node_id] = self.graph_storage.get_node(to_node_id).version

        self.active_transactions[tx_id]['new_rels'].append((from_node_id, to_node_id, rel_type, properties))
        print(f"  TX {tx_id}: Staged ADD_REL {from_node_id}-{rel_type}->{to_node_id}")
        return True

    def commit(self, tx_id):
        if tx_id not in self.active_transactions:
            raise ValueError(f"Transaction {tx_id} is not active.")

        tx_data = self.active_transactions[tx_id]

        print(f"OCC Transaction {tx_id}: Starting validation and commit.")

        # Validation Phase
        for node_id, read_version in tx_data['read_versions'].items():
            current_node = self.graph_storage.get_node(node_id)
            if not current_node or current_node.version != read_version:
                print(f"  TX {tx_id} CONFLICT: Node {node_id} version mismatch (read {read_version}, current {current_node.version if current_node else 'deleted'}).")
                self.rollback(tx_id)
                raise RuntimeError(f"Conflict detected for node {node_id}. Transaction {tx_id} aborted.")

        # Write Phase (apply changes to actual storage)
        # 1. Add new nodes
        for node_id, node_obj in tx_data['new_nodes'].items():
            self.graph_storage.add_node(node_id, node_obj.properties)
            # Newly added nodes get their initial version (e.g., 0 or 1)
            self.graph_storage.get_node(node_id).version = 0 
            print(f"  TX {tx_id}: Committed ADD_NODE {node_id}")

        # 2. Update existing nodes
        for node_id, new_props in tx_data['writes'].items():
            self.graph_storage.update_node_properties(node_id, new_props) # This increments version
            print(f"  TX {tx_id}: Committed UPDATE_NODE_PROP for {

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注