C++ 与 事务多版本并发控制(MVCC):在 C++ 存储内核中利用时间戳排序实现无锁读写冲突控制

各位开发者、架构师,以及对高性能并发系统充满热情的同仁们,大家好!

今天,我们将深入探讨一个在现代数据库和存储系统中至关重要的主题:多版本并发控制(MVCC),并聚焦于如何在 C++ 存储内核中,利用时间戳排序机制,实现无锁读写冲突控制。这不仅仅是一个理论概念的讲解,更将伴随着详细的 C++ 代码示例,力求将抽象的并发控制原理转化为可感知的实际实现。

在高性能、高并发的数据密集型应用中,如何有效地管理并发事务,同时保证数据的一致性和隔离性,是核心挑战。传统的基于锁的并发控制机制,如两阶段锁(2PL),虽然能够保证事务的 ACID 属性,但在高并发场景下,往往面临死锁、活锁、锁粒度过粗导致的性能瓶颈等问题。MVCC 正是为了解决这些痛点而生,它通过维护数据的多个历史版本,允许多个事务同时读取数据,而无需等待写事务释放锁,从而显著提升系统的并发度。

我们的目标是构建一个 C++ 存储内核,它能够:

  1. 允许多个读事务同时进行,且不被写事务阻塞。
  2. 允许多个写事务并发执行,但通过时间戳排序机制解决冲突。
  3. 读操作在很大程度上是“无锁”的,即不获取可能阻塞写操作的锁。
  4. 写操作通过原子操作和时间戳验证来避免对读操作的直接阻塞,并在冲突时优雅地回滚。

我们将通过一个简化的键值存储模型来演示这些概念。


1. MVCC 核心理念与挑战

1.1 什么是 MVCC?

MVCC(Multi-Version Concurrency Control,多版本并发控制)是一种并发控制技术,它允许数据库管理系统维护数据项的多个版本。当一个事务修改数据时,它不会直接覆盖旧数据,而是创建一个新的数据版本。这样,读事务可以读取旧版本的数据,而写事务则创建新版本,两者互不干扰。

核心思想:

  • 读不阻塞写,写不阻塞读。 这是 MVCC 最大的优势。读事务总是能找到一个合适的、已提交的数据版本进行读取,而无需等待写事务完成。写事务则创建新版本,也无需等待读事务。
  • 快照隔离(Snapshot Isolation)。 大多数 MVCC 实现提供快照隔离,这意味着每个事务都看到数据库的一个一致性快照,这个快照是在事务开始时确定的。
  • 版本管理。 每个数据项可能存在多个版本,每个版本都有其生命周期(创建时间、删除时间或创建事务ID、删除事务ID)。

1.2 MVCC 解决了什么问题?

传统的基于锁的并发控制(例如两阶段锁 2PL)在高并发环境下会遇到:

  • 读写冲突: 读事务可能会阻塞写事务,写事务也可能会阻塞读事务。
  • 死锁: 多个事务相互等待对方释放锁而陷入僵局。
  • 锁粒度问题: 粗粒度锁降低并发度,细粒度锁增加管理开销。

MVCC 通过允许多个版本共存,有效地缓解了这些问题。读事务可以访问它们开始时可见的数据版本,而写事务则在不影响读事务的情况下创建新版本。

1.3 MVCC 的挑战

尽管 MVCC 优势显著,但也伴随着一些挑战:

  • 存储开销: 维护数据的多个版本需要更多的存储空间。
  • 垃圾回收(Garbage Collection, GC): 需要机制来回收不再可见的旧版本,否则版本链会无限增长。GC 机制的设计和实现非常复杂。
  • 索引维护: 针对多版本数据的索引维护比单版本复杂。
  • 事务回滚: 虽然回滚通常更容易(只需丢弃未提交的新版本),但对于复杂的事务模型仍需仔细设计。

2. 时间戳排序(Timestamp Ordering, TO)

在 MVCC 框架下,我们需要一种机制来决定事务的执行顺序和冲突处理方式。时间戳排序(Timestamp Ordering)是一种常见的并发控制协议,它为每个事务分配一个全局唯一的、单调递增的时间戳(Timestamp)。这个时间戳通常在事务开始时获取,并用于确定事务的逻辑顺序。

2.1 核心原理

时间戳排序的基本思想是,如果事务 $T_i$ 的时间戳 $TS(T_i)$ 小于事务 $T_j$ 的时间戳 $TS(T_j)$,那么在并发执行时,系统必须保证 $T_i$ 看起来是在 $T_j$ 之前执行的。

为了实现这一点,每个数据项 $X$ 需要维护两个重要的时间戳:

  • 读时间戳 (RTS(X)): 所有成功读取 $X$ 的事务中,最大的事务时间戳。
  • 写时间戳 (WTS(X)): 所有成功写入 $X$ 的事务中,最大的事务时间戳。

这些时间戳在事务提交时更新,以反映已提交事务对数据项的最新访问情况。

2.2 读写规则

当事务 $T_i$ (时间戳为 $TS(T_i)$) 尝试访问数据项 $X$ 时,需要遵循以下规则:

读操作规则 (Read Rule):
如果 $T_i$ 想要读取 $X$:

  1. 冲突检测: 如果 $TS(T_i) < WTS(X)$,这意味着一个比 $T_i$ 更年轻的事务已经写入并提交了 $X$。根据时间戳排序原则,$T_i$ 应该在 $WTS(X)$ 之前执行,所以它不应该看到 $WTS(X)$ 写入的值。这意味着 $T_i$ 尝试读取的是一个过时的数据,或者 $T_i$ 违反了时间戳顺序,应该回滚
  2. 成功读取: 如果 $TS(T_i) ge WTS(X)$,则 $T_i$ 可以读取 $X$。读取成功后,需要更新 $RTS(X) = max(RTS(X), TS(T_i))$。

写操作规则 (Write Rule):
如果 $T_i$ 想要写入 $X$:

  1. 冲突检测 (读冲突): 如果 $TS(T_i) < RTS(X)$,这意味着一个比 $T_i$ 更年轻的事务已经读取了 $X$。如果 $T_i$ 写入 $X$,那么那个年轻事务的读操作将变得不合法(它应该看到 $T_i$ 写入的值,但它读的是旧值)。因此,$T_i$ 必须回滚
  2. 冲突检测 (写冲突): 如果 $TS(T_i) < WTS(X)$,这意味着一个比 $T_i$ 更年轻的事务已经写入并提交了 $X$。$T_i$ 的写入是过时的,不应该被接受。根据严格的时间戳排序, $T_i$ 应该回滚
    • Thomas Write Rule (TWR): 在某些宽松的实现中,如果 $TS(T_i) < WTS(X)$,但 $T_i$ 的写入并没有被任何已提交的读事务所依赖,那么可以忽略 $T_i$ 的写入(即不执行写入,也不回滚)。这提高了并发性,但可能导致更复杂的正确性推理。在我们的实现中,为了保证更强的隔离性(例如快照隔离或可串行化),我们将选择回滚
  3. 成功写入: 如果 $TS(T_i) ge RTS(X)$ 并且 $TS(T_i) ge WTS(X)$,则 $T_i$ 可以写入 $X$。写入成功后,需要更新 $WTS(X) = TS(T_i)$。

总结表格:

操作 条件 结果
读 $X$ $TS(T_i) < WTS(X)$ $T_i$ 回滚
读 $X$ $TS(T_i) ge WTS(X)$ 成功读取,更新 $RTS(X) = max(RTS(X), TS(T_i))$
写 $X$ $TS(T_i) < RTS(X)$ $T_i$ 回滚
写 $X$ $TS(T_i) < WTS(X)$ $T_i$ 回滚
写 $X$ $TS(T_i) ge RTS(X)$ 且 $TS(T_i) ge WTS(X)$ 成功写入,更新 $WTS(X) = TS(T_i)$

这些规则通常在事务的提交阶段进行最终验证,因为读写时间戳 (RTS/WTS) 只有在事务提交后才真正确定。在事务执行期间,通常采用乐观的方式,先记录读写操作,然后在提交时进行验证和应用。


3. C++ 存储内核中的 MVCC 与时间戳排序实现

我们将构建一个简化的内存键值存储,来演示 MVCC 和时间戳排序的集成。

3.1 核心数据结构

3.1.1 TransactionContext:事务上下文

每个事务都需要一个上下文来存储其自身的信息,例如事务 ID、读集合和写集合。

#include <atomic>
#include <cstdint>
#include <map>
#include <set>
#include <memory>
#include <mutex>
#include <iostream>
#include <vector>

// 简化键值类型
using DataKey = std::string;
using DataValue = std::string;

// TransactionContext 定义
struct TransactionContext {
    uint64_t transaction_id; // 事务的开始时间戳 (作为唯一标识符)
    std::set<DataKey> read_set; // 事务读取的键集合,用于提交时验证
    std::map<DataKey, DataValue> write_set; // 事务写入的键值对集合,用于延迟写入

    TransactionContext(uint64_t id) : transaction_id(id) {}

    // 禁止拷贝和移动,确保事务上下文的唯一性
    TransactionContext(const TransactionContext&) = delete;
    TransactionContext& operator=(const TransactionContext&) = delete;
    TransactionContext(TransactionContext&&) = delete;
    TransactionContext& operator=(TransactionContext&&) = delete;
};

3.1.2 DataVersion:数据版本

每个数据项可能有多个版本。每个版本需要记录其创建事务 ID、删除事务 ID(如果被删除)和实际数据。

// DataVersion 定义
struct DataVersion {
    uint64_t creator_tid;  // 创建此版本的事务ID (作为提交时间戳)
    uint64_t deleter_tid;  // 删除此版本的事务ID (0表示未被删除,或 MAX_UINT64)
    DataValue value;       // 实际存储的数据
    std::atomic<DataVersion*> next_ptr; // 指向更旧版本的指针

    DataVersion(uint64_t c_tid, const DataValue& val)
        : creator_tid(c_tid), deleter_tid(0), value(val), next_ptr(nullptr) {}

    // 假设0表示未删除,MAX_UINT64 是一个特殊的标记表示永久删除
    static constexpr uint64_t NOT_DELETED = 0;
};

这里 next_ptr 使用 std::atomic 是为了实现版本链的无锁遍历和原子更新头部。

3.1.3 DataItem:数据项

每个逻辑数据项(由 DataKey 标识)维护其所有版本的链表,以及其读写时间戳。

// DataItem 定义
struct DataItem {
    std::atomic<DataVersion*> head_version; // 指向最新版本的指针
    std::atomic<uint64_t> latest_read_ts;   // 成功读取此项的最大事务ID
    std::atomic<uint64_t> latest_write_ts;  // 成功写入此项的最大事务ID

    DataItem() : head_version(nullptr), latest_read_ts(0), latest_write_ts(0) {}

    // 清理所有版本链,用于析构或GC
    ~DataItem() {
        DataVersion* current = head_version.load(std::memory_order_relaxed);
        while (current) {
            DataVersion* next = current->next_ptr.load(std::memory_order_relaxed);
            delete current;
            current = next;
        }
    }

    // 禁止拷贝和移动
    DataItem(const DataItem&) = delete;
    DataItem& operator=(const DataItem&) = delete;
    DataItem(DataItem&&) = delete;
    DataItem& operator=(DataItem&&) = delete;
};

DataItem 中的 head_version, latest_read_ts, latest_write_ts 都使用 std::atomic,这是实现“无锁读写冲突控制”的关键。读者可以直接通过 load 操作获取状态,无需加锁。写者使用 compare_exchange 来原子地更新这些状态。

3.1.4 StorageKernel:存储内核

这是我们存储系统的核心,负责事务管理、数据访问和冲突处理。

// StorageKernel 定义
class StorageKernel {
public:
    // 全局事务ID生成器
    std::atomic<uint64_t> next_transaction_id;

    // 存储所有数据项的哈希表
    // 注意:这里的mutex保护的是`data_store`这个map自身的结构,
    // 而不是`DataItem`内部的版本链或RTS/WTS。
    // 访问或修改map结构(如插入新键)时需要加锁,但对已存在`DataItem`的读写操作则尽量无锁。
    std::unordered_map<DataKey, std::unique_ptr<DataItem>> data_store;
    std::mutex store_map_mutex; // 保护data_store的结构修改

    StorageKernel() : next_transaction_id(1) {} // 事务ID从1开始

    // 启动一个新事务
    TransactionContext* begin_transaction();

    // 事务的读操作
    bool read(TransactionContext* tx, DataKey key, DataValue& value);

    // 事务的写操作
    bool write(TransactionContext* tx, DataKey key, const DataValue& value);

    // 事务的删除操作 (在MVCC中,删除也是一种特殊形式的写入)
    bool remove(TransactionContext* tx, DataKey key);

    // 提交事务
    bool commit(TransactionContext* tx);

    // 回滚事务
    void abort(TransactionContext* tx);

    // 垃圾回收 (简化实现,实际复杂得多)
    void garbage_collect(uint64_t min_active_ts);
};

3.2 事务操作实现

3.2.1 begin_transaction():开始事务

获取一个唯一的事务 ID,作为事务的时间戳。

TransactionContext* StorageKernel::begin_transaction() {
    uint64_t new_tid = next_transaction_id.fetch_add(1, std::memory_order_relaxed);
    std::cout << "Transaction " << new_tid << " started." << std::endl;
    return new TransactionContext(new_tid);
}

3.2.2 read():读操作

这是实现“无锁读”的核心。读事务会遍历数据项的版本链,找到对其可见的最新版本。这个遍历过程不加锁,因为 next_ptrstd::atomic

bool StorageKernel::read(TransactionContext* tx, DataKey key, DataValue& value) {
    // 1. 获取DataItem:需要先找到对应的DataItem。这里使用map锁保护map结构。
    std::unique_ptr<DataItem>* item_ptr = nullptr;
    {
        std::lock_guard<std::mutex> lock(store_map_mutex);
        auto it = data_store.find(key);
        if (it == data_store.end()) {
            // std::cout << "Tx " << tx->transaction_id << ": Read key '" << key << "' not found." << std::endl;
            return false; // 键不存在
        }
        item_ptr = &it->second;
    }
    DataItem& item = **item_ptr;

    // 2. 无锁遍历版本链寻找可见版本
    DataVersion* current_version = item.head_version.load(std::memory_order_acquire);
    DataVersion* chosen_version = nullptr;

    while (current_version != nullptr) {
        // MVCC可见性规则:
        // 一个版本V对事务Tx可见,如果:
        // (1) V的创建者事务ID <= Tx的事务ID (即V在Tx开始前已经存在)
        // (2) V的删除者事务ID == NOT_DELETED (即V未被删除)
        //     或者 V的删除者事务ID > Tx的事务ID (即Tx在V被删除后才开始,或者删除Tx尚未提交,Tx不应该看到删除)
        // 这里我们简化处理,认为 creator_tid 和 deleter_tid 是已提交的事务ID。
        // 实际上,更严谨的MVCC需要Transaction Manager来查询事务的提交状态。

        // 检查创建者ID
        if (tx->transaction_id >= current_version->creator_tid) {
            // 检查删除者ID
            if (current_version->deleter_tid == DataVersion::NOT_DELETED ||
                tx->transaction_id < current_version->deleter_tid)
            {
                chosen_version = current_version;
                break; // 找到可见的最新版本
            }
        }
        current_version = current_version->next_ptr.load(std::memory_order_acquire);
    }

    if (chosen_version) {
        value = chosen_version->value;
        tx->read_set.insert(key); // 记录到读集合,用于提交时验证

        // 3. 更新 latest_read_ts (原子操作)
        // 我们只在事务成功读取后,且其 transaction_id 大于当前 latest_read_ts 时才更新。
        // 这是一个CAS (Compare-And-Swap) 循环,确保原子性。
        uint64_t current_rts = item.latest_read_ts.load(std::memory_order_acquire);
        while (current_rts < tx->transaction_id) {
            if (item.latest_read_ts.compare_exchange_weak(current_rts, tx->transaction_id,
                                                            std::memory_order_release, std::memory_order_acquire)) {
                break; // 更新成功
            }
            // CAS 失败,说明 other_rts 已经被其他线程更新,重试
            // current_rts 已经被 compare_exchange_weak 更新为最新的值
        }
        // std::cout << "Tx " << tx->transaction_id << ": Read key '" << key << "' = '" << value << "'" << std::endl;
        return true;
    }

    // std::cout << "Tx " << tx->transaction_id << ": Read key '" << key << "' no visible version." << std::endl;
    return false; // 没有找到对当前事务可见的版本
}

无锁读的体现: 读者在遍历版本链时,不获取任何锁。head_version.load() 获取一个快照指针,current_version->next_ptr.load() 也是如此。即使写者正在修改 head_version 或添加新版本,读者也能基于其快照指针继续遍历,不会被阻塞。latest_read_ts 的更新使用了 compare_exchange_weak 进行原子操作,同样避免了锁。

3.2.3 write():写操作

写操作在事务执行期间通常是乐观的。它不会立即修改共享数据,而是将修改记录在事务的私有写集合 (tx->write_set) 中。真正的冲突检测和数据版本创建发生在事务提交时。

bool StorageKernel::write(TransactionContext* tx, DataKey key, const DataValue& value) {
    // 写入操作通常是延迟的,先将修改记录在事务的 write_set 中。
    // 真正的冲突检测和版本创建发生在 commit 阶段。
    // std::cout << "Tx " << tx->transaction_id << ": Staging write for key '" << key << "' = '" << value << "'" << std::endl;
    tx->write_set[key] = value;
    return true;
}

bool StorageKernel::remove(TransactionContext* tx, DataKey key) {
    // 删除操作在MVCC中可以看作是写入一个特殊的“删除标记”。
    // 同样,延迟到 commit 阶段处理。
    // 这里我们使用一个特殊的值来表示删除,或者在commit时查找并标记deleter_tid。
    // 为了简化,我们让write_set记录要删除的键,值为一个特殊标记。
    // 实际系统中,可能需要一个单独的delete_set或一个更复杂的机制。
    // 这里我们将删除也视为一种写入,只是在commit时找到并标记旧版本为删除。
    // std::cout << "Tx " << tx->transaction_id << ": Staging remove for key '" << key << "'" << std::endl;
    tx->write_set[key] = ""; // 空字符串作为删除标记,需要特殊处理
    return true;
}

3.2.4 commit():提交事务

提交阶段是事务并发控制的核心。在这里,我们执行时间戳排序的冲突检测,并原子地应用所有修改。

bool StorageKernel::commit(TransactionContext* tx) {
    std::cout << "Tx " << tx->transaction_id << ": Attempting to commit..." << std::endl;

    // --- Phase 1: 提交验证 ---
    // 检查读集合 (Read Set) 中的数据项是否被比当前事务更年轻的事务写入。
    // 如果是,说明当前事务读取的数据已经过时,需要回滚。
    for (const auto& key : tx->read_set) {
        // 如果此键也在写集合中,说明是事务自己修改的,无需检查外部写入冲突
        if (tx->write_set.count(key)) {
            continue;
        }

        std::unique_ptr<DataItem>* item_ptr = nullptr;
        {
            std::lock_guard<std::mutex> lock(store_map_mutex);
            auto it = data_store.find(key);
            if (it == data_store.end()) {
                // 如果读取后被其他事务删除了,也视为冲突
                std::cout << "Tx " << tx->transaction_id << ": Commit failed - read key '" << key << "' was deleted by another transaction." << std::endl;
                abort(tx);
                return false;
            }
            item_ptr = &it->second;
        }
        DataItem& item = **item_ptr;

        // 获取 item 的最新写入时间戳
        uint64_t current_wts = item.latest_write_ts.load(std::memory_order_acquire);

        // 如果 `latest_write_ts` 大于当前事务的 `transaction_id`,
        // 则表示在当前事务开始后,有其他更年轻的事务已经写入并提交了此键。
        // 这违反了快照隔离或时间戳排序,当前事务必须回滚。
        if (current_wts > tx->transaction_id) {
            std::cout << "Tx " << tx->transaction_id << ": Commit failed - read key '" << key << "' was written by a younger transaction (WTS " << current_wts << " > TX_ID " << tx->transaction_id << ")." << std::endl;
            abort(tx);
            return false;
        }
    }

    // 检查写集合 (Write Set) 中的数据项是否与已提交的事务冲突。
    // 这对应时间戳排序的写规则:TS(Ti) < RTS(X) 或 TS(Ti) < WTS(X) 时回滚。
    for (const auto& entry : tx->write_set) {
        const DataKey& key = entry.first;
        const DataValue& new_value = entry.second; // 如果是空字符串,表示删除

        std::unique_ptr<DataItem>* item_ptr = nullptr;
        {
            std::lock_guard<std::mutex> lock(store_map_mutex);
            auto it = data_store.find(key);
            if (it == data_store.end()) {
                // 如果是写入一个新键,则创建DataItem。
                // 否则,如果尝试删除一个不存在的键,这里不会创建。
                if (new_value != "") { // 写入新键
                    data_store[key] = std::make_unique<DataItem>();
                    item_ptr = &data_store[key];
                } else { // 尝试删除不存在的键,无需回滚,但也没有实际操作
                    continue;
                }
            } else {
                item_ptr = &it->second;
            }
        }
        DataItem& item = **item_ptr;

        // 检查写冲突 (与 RTS 冲突)
        // 如果 `latest_read_ts` 大于当前事务的 `transaction_id`,
        // 则表示在当前事务开始后,有其他更年轻的事务已经读取了此键。
        // 如果当前事务现在写入,将导致那个读事务看到不一致的数据。回滚。
        uint64_t current_rts = item.latest_read_ts.load(std::memory_order_acquire);
        if (current_rts > tx->transaction_id) {
            std::cout << "Tx " << tx->transaction_id << ": Commit failed - write key '" << key << "' was read by a younger transaction (RTS " << current_rts << " > TX_ID " << tx->transaction_id << ")." << std::endl;
            abort(tx);
            return false;
        }

        // 检查写冲突 (与 WTS 冲突)
        // 如果 `latest_write_ts` 大于当前事务的 `transaction_id`,
        // 则表示在当前事务开始后,有其他更年轻的事务已经写入并提交了此键。
        // 当前事务的写入是过时的,回滚 (严格时间戳排序)。
        uint64_t current_wts = item.latest_write_ts.load(std::memory_order_acquire);
        if (current_wts > tx->transaction_id) {
            std::cout << "Tx " << tx->transaction_id << ": Commit failed - write key '" << key << "' was written by a younger transaction (WTS " << current_wts << " > TX_ID " << tx->transaction_id << ")." << std::endl;
            abort(tx);
            return false;
        }
    }

    // --- Phase 2: 应用写入 ---
    // 如果所有验证都通过,现在可以原子地应用所有写入。
    // 这涉及创建新版本,并更新 DataItem 的 head_version 和 latest_write_ts。
    // 这一阶段也需要通过原子操作来保证多线程安全。

    for (const auto& entry : tx->write_set) {
        const DataKey& key = entry.first;
        const DataValue& new_value = entry.second;

        std::unique_ptr<DataItem>* item_ptr = nullptr;
        {
            std::lock_guard<std::mutex> lock(store_map_mutex);
            // 确保DataItem存在,对于新键,它已经在验证阶段创建
            auto it = data_store.find(key);
            if (it == data_store.end()) {
                // 如果是删除一个不存在的键,在验证阶段已经跳过,这里不应该发生
                // 但如果发生了,可能意味着验证逻辑有问题或并发删除
                continue;
            }
            item_ptr = &it->second;
        }
        DataItem& item = **item_ptr;

        if (new_value == "") { // 表示删除操作
            // 找到当前对本事务可见的最新版本,并标记其 deleter_tid
            DataVersion* current_version = item.head_version.load(std::memory_order_acquire);
            DataVersion* visible_version_to_delete = nullptr;

            while (current_version != nullptr) {
                if (tx->transaction_id >= current_version->creator_tid &&
                    (current_version->deleter_tid == DataVersion::NOT_DELETED || tx->transaction_id < current_version->deleter_tid))
                {
                    visible_version_to_delete = current_version;
                    break;
                }
                current_version = current_version->next_ptr.load(std::memory_order_acquire);
            }

            if (visible_version_to_delete) {
                // 原子地更新 deleter_tid,确保只有一个事务能标记删除
                uint64_t expected_deleter_tid = DataVersion::NOT_DELETED;
                if (!visible_version_to_delete->deleter_tid.compare_exchange_strong(
                        expected_deleter_tid, tx->transaction_id,
                        std::memory_order_release, std::memory_order_acquire)) {
                    // 如果 CAS 失败,说明此版本已被其他事务删除或标记。
                    // 逻辑上,这应该在验证阶段被捕获为写冲突。
                    // 为了简单,我们在这里假设验证阶段已确保没有冲突。
                    // 实际系统需要更严格的错误处理。
                }
            } else {
                // 没有找到可见版本可删除,可能是并发删除或键不存在。
                // 同样,理论上应在验证阶段被捕获。
            }
        } else { // 正常写入操作
            // 创建新的版本
            DataVersion* new_version = new DataVersion(tx->transaction_id, new_value);

            // 原子地将新版本添加到版本链的头部
            // 这是一个CAS循环,处理并发写入
            DataVersion* old_head = item.head_version.load(std::memory_order_acquire);
            new_version->next_ptr.store(old_head, std::memory_order_relaxed); // 暂时链接到当前头部

            while (!item.head_version.compare_exchange_weak(old_head, new_version,
                                                            std::memory_order_release, std::memory_order_acquire)) {
                // CAS 失败,说明 head_version 已经被其他线程更新,重试
                new_version->next_ptr.store(old_head, std::memory_order_relaxed); // 重新链接到新的头部
            }
        }

        // 更新 item 的 latest_write_ts
        // 同样是CAS循环,确保原子性
        uint64_t current_wts = item.latest_write_ts.load(std::memory_order_acquire);
        while (current_wts < tx->transaction_id) {
            if (item.latest_write_ts.compare_exchange_weak(current_wts, tx->transaction_id,
                                                            std::memory_order_release, std::memory_order_acquire)) {
                break; // 更新成功
            }
            // CAS 失败,current_wts 已被更新,重试
        }
    }

    // 更新读集合中未被写入的项的 latest_read_ts
    // 这一步对于保证时间戳排序的读规则至关重要
    for (const auto& key : tx->read_set) {
        if (tx->write_set.find(key) == tx->write_set.end()) { // 仅处理只读的项
            std::unique_ptr<DataItem>* item_ptr = nullptr;
            {
                std::lock_guard<std::mutex> lock(store_map_mutex);
                auto it = data_store.find(key);
                if (it == data_store.end()) {
                    // 如果在读之后被删除了,说明验证阶段已经失败,这里不应再处理
                    continue;
                }
                item_ptr = &it->second;
            }
            DataItem& item = **item_ptr;

            uint64_t current_rts = item.latest_read_ts.load(std::memory_order_acquire);
            while (current_rts < tx->transaction_id) {
                if (item.latest_read_ts.compare_exchange_weak(current_rts, tx->transaction_id,
                                                                std::memory_order_release, std::memory_order_acquire)) {
                    break; // 更新成功
                }
            }
        }
    }

    std::cout << "Tx " << tx->transaction_id << ": Committed successfully." << std::endl;
    delete tx; // 清理事务上下文
    return true;
}

无锁写冲突控制的体现: 写事务在提交时,通过检查 latest_read_tslatest_write_ts 来进行冲突验证。这些检查都是通过 load 原子操作完成的,不会阻塞其他读写事务。如果验证通过,实际的数据版本创建和 head_version 的更新也通过 compare_exchange_weak 原子操作完成,同样避免了全局锁。

3.2.5 abort():回滚事务

在我们的 MVCC 延迟写入模型中,回滚非常简单,因为所有修改都在事务的私有 write_set 中,未曾暴露给共享存储。

void StorageKernel::abort(TransactionContext* tx) {
    std::cout << "Tx " << tx->transaction_id << ": Aborted." << std::endl;
    // 所有的写操作都只是暂存在 tx->write_set 中,没有对实际数据进行修改。
    // 所以回滚只需要销毁事务上下文即可。
    delete tx;
}

3.3 垃圾回收 (Garbage Collection, GC)

MVCC 系统需要一种机制来清理不再需要的旧数据版本。一个版本可以被回收的条件是:没有任何活跃事务能够再访问到它。

最常见的 GC 策略之一是基于最小活跃事务时间戳 (Min Active Transaction Timestamp, MATT)。系统需要追踪所有当前活跃事务中最小的 transaction_id。任何 creator_tid 小于 MATT 的版本,如果其 deleter_tid 也小于 MATT (或它自身已经被更新的版本所取代),则可以被安全地回收。

简化的 GC 实现思路:

// 假设有一个机制来获取所有活跃事务的最小ID
// 实际中可能需要一个全局的活跃事务列表来维护
uint64_t get_min_active_transaction_id(/* active_transactions_list */) {
    // 简化:这里假设只有一个活跃事务,或者返回一个足够小的ID
    // 真实场景中,需要遍历所有正在运行的TransactionContext来找到最小的transaction_id
    return 1; // 示例,实际需要动态获取
}

void StorageKernel::garbage_collect(uint64_t min_active_ts) {
    std::cout << "Starting garbage collection with min_active_ts: " << min_active_ts << std::endl;

    std::lock_guard<std::mutex> lock(store_map_mutex); // GC 期间需要锁定 map 结构,防止 DataItem 被删除或添加
    for (auto const& [key, item_ptr] : data_store) {
        DataItem& item = *item_ptr;

        // GC过程可能需要更复杂的锁机制或无锁算法来处理版本链的修改,
        // 特别是当其他事务同时在读写这个DataItem时。
        // 这里为了简化,我们假设在GC时对版本链的修改是受保护的。
        // 一个更健壮的方法是使用 RCU (Read-Copy-Update) 或 epoch-based reclamation。

        DataVersion* current = item.head_version.load(std::memory_order_acquire);
        DataVersion* prev = nullptr;
        DataVersion* new_head = current; // 最终的新头部

        // 找到第一个仍然可能被活跃事务访问的版本作为新的头部
        while (new_head != nullptr) {
            // 一个版本被认为“不活跃”且可回收,如果:
            // 1. 它的 creator_tid < min_active_ts
            // 2. 它的 deleter_tid < min_active_ts (如果已删除)
            // 3. 或者它已被一个 creator_tid < min_active_ts 的版本取代
            // 这里的判断是:如果版本V的 `creator_tid` 小于 `min_active_ts`,
            // 并且其 `deleter_tid` 也小于 `min_active_ts` (如果被删除),
            // 那么这个版本及其后续版本都可以被清理。
            // 实际上,我们应该从尾部开始清理,保留头部可见的。
            // 这里我们采取另一种策略:找到第一个必须保留的版本作为新的 head。
            // 任何比 `min_active_ts` 早的版本,如果已经被更新的版本覆盖且该更新版本也早于 `min_active_ts`,
            // 就可以被清理。

            // 简化的判断:一个版本V可以被回收,如果它已经被一个 `creator_tid < min_active_ts` 的事务创建的版本所取代,
            // 并且V本身的 `creator_tid < min_active_ts` 且 `deleter_tid` 已设置(即已过期)。
            // 或者,如果 `item.head_version` 是 V,且 V 的 `creator_tid < min_active_ts`,并且 V 已经被删除。
            // 这是一个非常简化的GC,实际情况中需要更精确的可见性判断。

            // 为了实现链表的中间删除,我们需要跟踪前一个节点。
            // 更常见的做法是:遍历链表,将需要保留的版本复制到新链表,然后替换head。
            // 或者,使用一个“下一个可见版本”指针。

            // 让我们尝试一种更直观的方式:
            // 遍历所有版本,找出那些 `creator_tid < min_active_ts` 且 `deleter_tid != NOT_DELETED`
            // 并且 `deleter_tid < min_active_ts` 的版本,将它们从链表中移除并删除。
            // 这仍然很复杂,因为需要原子地修改 `next_ptr`。

            // 最简单但效率不高的 GC 方式:
            // 1. 找到所有可以被删除的版本。
            // 2. 将它们从链表中解除链接(需要对链表头部的CAS操作或锁)。
            // 3. 释放内存。

            // 考虑一个更安全的,基于`min_active_ts`的保留策略:
            // 任何 `DataVersion` 如果 `creator_tid >= min_active_ts`,则必须保留。
            // 如果 `creator_tid < min_active_ts` 且 `deleter_tid != NOT_DELETED` 且 `deleter_tid < min_active_ts`,则可以删除。

            DataVersion* current_node = item.head_version.load(std::memory_order_acquire);
            DataVersion* last_kept_node = nullptr; // 指向需要保留的链表末尾
            DataVersion* to_delete_list = nullptr; // 指向待删除的链表头部

            // 遍历并重新构建链表
            while (current_node != nullptr) {
                // 判断此版本是否应该被保留
                bool should_keep = false;
                if (current_node->creator_tid >= min_active_ts) {
                    should_keep = true; // 正在进行的事务可能看到此版本
                } else if (current_node->deleter_tid == DataVersion::NOT_DELETED || current_node->deleter_tid >= min_active_ts) {
                    should_keep = true; // 未被删除,或被一个比min_active_ts更新的事务删除,可能仍对某些事务可见
                }

                if (should_keep) {
                    // 保持此节点
                    if (last_kept_node == nullptr) {
                        // 这是第一个要保留的节点,它将是新的头部
                        // 新的头部应该还是item.head_version,但是需要确保它的 next_ptr 指向正确
                        // 这一步非常复杂,因为需要保证原子性
                    }
                    last_kept_node = current_node;
                    current_node = current_node->next_ptr.load(std::memory_order_acquire);
                } else {
                    // 此节点可以被删除
                    DataVersion* next_to_delete = current_node->next_ptr.load(std::memory_order_acquire);
                    current_node->next_ptr.store(to_delete_list, std::memory_order_relaxed); // 将其添加到待删除链表头部
                    to_delete_list = current_node;
                    current_node = next_to_delete;
                }
            }

            // 这个链表重构逻辑在并发环境下非常困难,需要锁或RCU。
            // 最简单粗暴的GC方式(需要对DataItem加锁):
            // std::lock_guard<std::mutex> item_lock(item_mutex); // 假设DataItem有一个内部锁
            // current = item.head_version;
            // prev = nullptr;
            // while (current != nullptr) {
            //    // 简化:如果版本已删除且创建者ID小于min_active_ts,就删除它
            //    if (current->deleter_tid != DataVersion::NOT_DELETED && current->creator_tid < min_active_ts) {
            //        DataVersion* to_delete = current;
            //        if (prev) {
            //            prev->next_ptr = current->next_ptr;
            //        } else {
            //            item.head_version = current->next_ptr; // 更新头部
            //        }
            //        current = current->next_ptr;
            //        delete to_delete;
            //    } else {
            //        prev = current;
            //        current = current->next_ptr;
            //    }
            // }

            // 鉴于无锁GC的复杂性,在讲座中我们仅限于概念说明。
            // 实际实现通常使用一种混合方法:GC线程在对特定DataItem进行操作时,可能会短暂地对其加锁,
            // 或者使用更高级的RCU/Hazard Pointers等技术。
            // 对于此讲座,我们将其简化为一个概念性函数,不提供完整的无锁实现。
        }
    }
    std::cout << "Garbage collection finished." << std::endl;
}

由于无锁垃圾回收的复杂性远超本次讲座的范围(涉及 Hazard Pointers, RCU 等高级技术),上述 garbage_collect 函数仅作为概念性框架,并未提供一个完整的无锁实现。实际应用中,GC 线程在进行清理时,可能需要短暂地获取数据项的锁,或者采用更复杂的无锁内存回收机制。


4. 优势与局限

4.1 优势

  • 高并发性: 读事务几乎完全无锁,与写事务并行执行,显著提升读密集型工作负载的性能。写事务之间通过时间戳排序进行冲突检测,避免了传统锁机制的死锁和活锁问题。
  • 无死锁: 由于冲突通过事务回滚解决,而不是通过等待锁解决,因此天然避免了死锁。
  • 快照隔离: 每个事务都能看到一个一致性的数据库快照,简化了应用层的并发编程模型。
  • 高吞吐量: 减少了锁竞争,使得系统在高并发下能保持较高的吞吐量。

4.2 局限性

  • 事务回滚: 冲突检测可能导致事务频繁回滚,尤其是在写竞争激烈或事务执行时间较长的情况下,这会浪费计算资源。
  • 内存开销: 存储多个数据版本需要更多的内存和存储空间。
  • 垃圾回收复杂性: 需要一个高效且正确的垃圾回收机制来回收旧版本,这本身就是一项复杂的任务,可能引入额外的性能开销。
  • 写倾斜(Write Skew): 我们的时间戳排序 MVCC 实现提供了快照隔离,但快照隔离可能允许写倾斜异常。如果需要更强的隔离级别(如可串行化),则需要更复杂的验证(例如,在提交时对读写集进行更全面的检查,或者结合两阶段锁)。
  • 事务粒度: 事务 ID 通常是全局递增的,这在分布式系统中实现起来更复杂(需要分布式时间戳服务)。

5. 总结与展望

我们深入探讨了如何在 C++ 存储内核中,利用 MVCC 和时间戳排序实现无锁读写冲突控制。通过原子操作 (std::atomiccompare_exchange),我们构建了一个能够实现高并发读、乐观写并进行提交时验证的系统。读操作能够无锁地遍历数据版本链,而写操作则通过原子地更新版本链头部和读写时间戳来避免阻塞读操作。

这种设计是现代高性能数据库系统(如 PostgreSQL, Oracle, CockroachDB 等)核心并发控制机制的简化版本。它展示了 C++ 在构建高性能、细粒度并发控制系统方面的强大能力。当然,实际的生产系统会在此基础上进一步优化,例如引入更复杂的垃圾回收算法、索引的多版本支持、分布式事务处理以及针对特定工作负载的混合并发控制策略。

理解并掌握 MVCC 和时间戳排序的原理与实现,对于构建高性能、可扩展的并发系统至关重要。希望本次讲座能为您打开一扇通向 C++ 高级并发编程的大门。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注