实战:利用 Policy-based Design 构建一个可高度定制化的数据库存储引擎

各位同仁,各位技术爱好者,大家好!

在当今数据驱动的世界里,数据库扮演着核心角色。而数据库的心脏,无疑就是其存储引擎。一个高性能、高可靠、易于扩展的存储引擎是构建任何现代数据管理系统的基石。然而,数据库应用场景千变万化,从事务处理到分析查询,从内存数据库到分布式存储,每种场景对存储引擎的性能、并发、恢复、数据布局等方面都有独特的需求。传统的存储引擎往往采用相对固定的架构,虽然提供了某些配置选项,但在面对根本性的设计选择时,如更换页管理策略、调整索引类型、切换并发控制模型等,其定制能力往往捉襟见肘。

今天,我将向大家介绍一种强大的设计范式——Policy-based Design(策略模式设计),并深入探讨如何利用它来构建一个具备高度定制化能力的数据库存储引擎。我们的目标是,通过这种设计,让存储引擎的各个核心组件能够像乐高积木一样自由组合,从而为特定的应用场景提供最优的解决方案,同时又不牺牲性能或可维护性。

一、理解策略模式设计(Policy-based Design)

在深入存储引擎的实践之前,我们首先需要清晰地理解策略模式设计(PBD)的本质。

1.1 什么是策略模式设计?

策略模式设计是一种泛型编程(Generic Programming)范式,它通过将算法或行为的各个方面(策略)抽象为独立的类模板参数,并将这些策略在编译时绑定到宿主类(Host Class)上。简单来说,它将“做什么”(宿主类的功能)与“如何做”(策略类的具体实现)彻底解耦。

想象一下,你正在设计一个容器。这个容器需要管理内存、处理错误、以及支持不同的迭代方式。如果这些功能都被硬编码在容器类内部,那么每次需求变化,你都不得不修改容器类本身。策略模式设计的目标就是避免这种情况。

1.2 PBD 的核心构成

PBD 通常由以下两部分构成:

  • 宿主类 (Host Class):这是一个类模板,它定义了核心功能框架,但将具体的行为实现委托给一个或多个策略类。宿主类通过模板参数接收策略类型。
  • 策略类 (Policy Class):这些是普通的类或结构体,它们实现了宿主类所期望的特定行为。每个策略类都提供了一组接口(方法和/或类型定义),宿主类会通过这些接口来调用策略的功能。策略类通常是无状态的,或者只包含少量状态,并且它们之间的依赖性应该尽量小。

1.3 PBD 的运作机制

PBD 主要依赖于 C++ 的模板元编程(Template Metaprogramming)编译时多态(Compile-time Polymorphism)

当宿主类被实例化时,其模板参数被具体化为特定的策略类。编译器在编译阶段就确定了宿主类将使用哪个策略的实现。这意味着:

  • 零运行时开销:与运行时多态(如虚函数)不同,PBD 不涉及虚函数表查找,所有调用都是直接的。这对于性能敏感的系统(如数据库存储引擎)至关重要。
  • 编译时错误检查:如果策略类未能提供宿主类期望的接口,编译器会在编译时报错,而不是在运行时才发现问题。这有助于提高代码的健壮性。
  • 极高的灵活性和可定制性:通过简单地更换模板参数,就可以在编译时生成一个功能完全不同的系统实例。

1.4 PBD 与运行时多态的对比

特性 策略模式设计 (PBD) 运行时多态 (虚函数)
绑定时机 编译时 运行时
性能开销 零开销(直接调用),可能导致代码膨胀 少量开销(虚函数表查找),较小的代码膨胀
灵活性 编译时定制,高度灵活,但无法在程序运行中动态改变策略 运行时定制,可在程序运行中动态改变行为
错误检查 编译时发现接口不匹配问题 运行时可能出现空指针或类型转换错误
适用场景 核心算法、数据结构等需要极致性能和编译时确定的定制 UI 控件、插件系统等需要运行时动态行为、接口统一的场景

对于数据库存储引擎这种性能敏感、且核心组件一旦确定就不常在运行时切换的系统,PBD 的优势尤为明显。

二、数据库存储引擎的解剖:识别定制点

要应用 PBD,我们首先需要理解数据库存储引擎的核心组件,并识别出哪些功能是“策略”,即可以有多种实现方式的定制点。

2.1 存储引擎的高层架构

一个典型的数据库存储引擎通常包含以下核心组件:

  1. 数据文件管理 (File Manager):负责在磁盘上分配和管理原始文件空间,提供页面(Page)的读写接口。
  2. 页管理 (Page Manager):在数据文件之上,管理逻辑页的分配、回收和存储。页是磁盘 I/O 的最小单位。
  3. 缓冲池管理 (Buffer Manager / Buffer Pool):在内存中缓存磁盘页,减少磁盘 I/O。这是性能优化的关键。
  4. 记录管理 (Record Manager):在页内部组织和管理数据记录,包括记录的插入、删除、更新和查找。
  5. 索引管理 (Index Manager):构建和维护数据记录的索引,加速数据检索。常见的有 B-tree、Hash Index 等。
  6. 并发控制 (Concurrency Control):确保多事务并发执行时的隔离性和一致性,避免数据冲突。
  7. 恢复管理 (Recovery Manager):通过日志(WAL – Write-Ahead Logging)和检查点(Checkpoint)机制,确保数据库在系统崩溃后能够恢复到一致状态。
  8. 事务管理 (Transaction Manager):协调上述组件,提供原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)的事务保证。

2.2 存储引擎中的关键定制点(策略)

基于上述架构,我们可以识别出多个可以被策略化的关键点:

定制点(策略类别) 说明 常见策略实现
数据布局与序列化 记录如何在页内部存储和序列化/反序列化。 固定长度记录、可变长度记录、压缩记录、行存/列存。
页内空间管理 页内部空闲空间的管理方式。 位图(Bitmap)、空闲槽位列表(Slot Array)。
缓冲池页替换 缓冲池满时,选择哪个页进行淘汰。 LRU (Least Recently Used)、LFU (Least Frequently Used)、Clock、MRU 等。
索引结构 用于数据快速查找的索引类型和其内部实现细节。 B-Tree (B+Tree, B*Tree)、LSM-Tree、Hash Index、Trie 等。
并发控制 事务并发执行时,如何保证数据正确性。 2PL (Two-Phase Locking)、MVCC (Multi-Version Concurrency Control)、乐观并发控制。
恢复机制 如何记录操作日志,以及如何进行检查点。 ARIES 算法、简单的 undo/redo 日志、不同粒度的检查点。
内存分配器 存储引擎内部数据结构(如缓冲池帧、索引节点)的内存分配。 std::allocator、自定义池化分配器、竞技场分配器。
文件 I/O 接口 操作系统文件系统的抽象层。 mmappread/pwrite、异步 I/O。

通过将这些定制点抽象为策略,我们就可以构建一个高度灵活的存储引擎框架。

三、利用 PBD 构建定制化存储引擎的核心骨架

现在,让我们通过具体的 C++ 代码示例来展示如何应用 PBD。我们将逐步构建一个通用的 StorageEngine 宿主类,并为其定义一些关键的策略接口及具体实现。

为了保持代码的清晰和专注,我们将省略错误处理、日志记录等非核心 PBD 逻辑,并使用简化的数据结构。

3.1 基础类型定义

首先,定义一些在存储引擎中常用的基础类型。

#include <iostream>
#include <vector>
#include <string>
#include <map>
#include <list>
#include <mutex>
#include <shared_mutex>
#include <chrono>
#include <algorithm>
#include <memory>
#include <stdexcept>

// 基础类型别名
using PageID = uint32_t;
using RecordID = uint64_t; // PageID + SlotID
using SlotID = uint16_t;
using TransactionID = uint64_t;
using byte = unsigned char;

// 简单的 Record 结构体,用于演示
struct UserRecord {
    int id;
    char name[32];
    double balance;

    UserRecord() : id(0), balance(0.0) {
        std::fill(name, name + 32, '');
    }

    UserRecord(int _id, const std::string& _name, double _balance)
        : id(_id), balance(_balance) {
        std::fill(name, name + 32, '');
        std::strncpy(name, _name.c_str(), 31);
        name[31] = ''; // Ensure null termination
    }

    bool operator==(const UserRecord& other) const {
        return id == other.id &&
               std::string(name) == std::string(other.name) &&
               balance == other.balance;
    }

    void print() const {
        std::cout << "ID: " << id << ", Name: " << name << ", Balance: " << balance << std::endl;
    }
};

// 假设的 Page 结构,包含实际数据
// 在实际中,Page 是一个固定大小的字节数组,内部有复杂的布局
struct Page {
    PageID id;
    bool is_dirty;
    int pin_count;
    std::vector<byte> data; // Simplified: in reality, this is fixed-size array

    Page(PageID page_id, size_t page_size = 4096)
        : id(page_id), is_dirty(false), pin_count(0), data(page_size, 0) {}

    // 假设的读写接口
    byte* get_data() { return data.data(); }
    const byte* get_data() const { return data.data(); }
    size_t size() const { return data.size(); }
};

// 抽象文件接口 (简化)
class DiskManager {
public:
    virtual ~DiskManager() = default;
    virtual void read_page(PageID page_id, Page& page) = 0;
    virtual void write_page(PageID page_id, const Page& page) = 0;
    virtual PageID allocate_page() = 0;
    virtual void deallocate_page(PageID page_id) = 0;
};

// 简单的内存 DiskManager
class InMemoryDiskManager : public DiskManager {
    std::map<PageID, Page> pages_;
    PageID next_page_id_ = 0;
    std::mutex mtx_;
public:
    void read_page(PageID page_id, Page& page) override {
        std::lock_guard<std::mutex> lock(mtx_);
        if (pages_.count(page_id)) {
            page = pages_.at(page_id); // Copy content
            page.id = page_id; // Ensure ID is correct
            page.is_dirty = false; // Freshly read from "disk"
            page.pin_count = 0;
        } else {
            // Simulate reading an empty page if not exists
            page = Page(page_id);
            // std::cerr << "Warning: Reading non-existent page " << page_id << std::endl;
        }
    }

    void write_page(PageID page_id, const Page& page) override {
        std::lock_guard<std::mutex> lock(mtx_);
        pages_[page_id] = page;
        pages_[page_id].is_dirty = false; // Written to "disk"
        // std::cout << "Page " << page_id << " written to disk." << std::endl;
    }

    PageID allocate_page() override {
        std::lock_guard<std::mutex> lock(mtx_);
        PageID new_id = next_page_id_++;
        pages_[new_id] = Page(new_id); // Allocate an empty page
        // std::cout << "Allocated new page: " << new_id << std::endl;
        return new_id;
    }

    void deallocate_page(PageID page_id) override {
        std::lock_guard<std::mutex> lock(mtx_);
        pages_.erase(page_id);
        // std::cout << "Deallocated page: " << page_id << std::endl;
    }
};

// 抽象日志管理器 (简化)
class LogManager {
public:
    virtual ~LogManager() = default;
    virtual void write_log(TransactionID tid, const std::string& message) = 0;
    // ... more complex log records ...
};

class SimpleLogManager : public LogManager {
    std::vector<std::string> logs_;
    std::mutex mtx_;
public:
    void write_log(TransactionID tid, const std::string& message) override {
        std::lock_guard<std::mutex> lock(mtx_);
        logs_.push_back("[TID " + std::to_string(tid) + "] " + message);
        // std::cout << logs_.back() << std::endl; // For debugging
    }
};

3.2 宿主类:StorageEngine 的骨架

我们的 StorageEngine 宿主类将作为一个模板,接受各种策略作为其模板参数。它将协调这些策略来完成数据库操作。

template <
    typename RecordType, // 存储的数据类型
    typename RecordLayoutPolicy,
    typename PageManagementPolicy,
    typename BufferManagementPolicy,
    typename ConcurrencyControlPolicy,
    typename IndexingPolicy
>
class StorageEngine {
public:
    // 策略成员
    RecordLayoutPolicy recordLayout;
    PageManagementPolicy pageManager;
    BufferManagementPolicy bufferManager;
    ConcurrencyControlPolicy concurrencyControl;
    IndexingPolicy indexManager;

    // 辅助组件
    std::unique_ptr<DiskManager> diskManager;
    std::unique_ptr<LogManager> logManager;

    // 构造函数:初始化所有策略和辅助组件
    StorageEngine()
        : recordLayout(),
          pageManager(),
          bufferManager(),
          concurrencyControl(),
          indexManager(),
          diskManager(std::make_unique<InMemoryDiskManager>()), // 默认使用内存磁盘
          logManager(std::make_unique<SimpleLogManager>()) {   // 默认使用简单日志
        // 策略可能需要引用其他组件,在实际中,这些会通过构造函数或初始化方法传递
        bufferManager.init(diskManager.get()); // BufferManager 需要 DiskManager
        indexManager.init(bufferManager.get()); // IndexManager 需要 BufferManager
        // PageManager might also need DiskManager if it manages free pages on disk
        // concurrencyControl might need LogManager for recovery
    }

    // 插入记录
    RecordID insertRecord(TransactionID tid, const RecordType& record) {
        logManager->write_log(tid, "INSERT_START");

        // 1. 序列化记录
        std::vector<byte> serialized_data;
        recordLayout.serialize(record, serialized_data);

        // 2. 找到一个有足够空间的页(通过 PageManagementPolicy)
        // 这里简化为:总是从 BufferManager 获取一个新页,或者一个现有页
        // 实际中 PageManagementPolicy 会提供查找/分配页的方法
        PageID target_page_id = pageManager.get_available_page(serialized_data.size());

        // 3. 申请页的读写锁 (通过 ConcurrencyControlPolicy)
        // 简化:这里只为页加锁,实际中可能为记录加锁
        if (!concurrencyControl.acquireLock(tid, target_page_id, LockMode::X)) {
            // 处理锁等待或死锁
            logManager->write_log(tid, "INSERT_ABORT_LOCK_FAIL");
            throw std::runtime_error("Failed to acquire lock for insert.");
        }

        // 4. 从缓冲池获取页(通过 BufferManagementPolicy)
        Page* page = bufferManager.pin_page(target_page_id);
        if (!page) {
            logManager->write_log(tid, "INSERT_ABORT_PAGE_PIN_FAIL");
            concurrencyControl.releaseLock(tid, target_page_id);
            throw std::runtime_error("Failed to pin page for insert.");
        }

        // 5. 将记录写入页(通过 RecordLayoutPolicy 和 PageManagementPolicy 协作)
        SlotID slot_id = pageManager.insert_record_into_page(*page, serialized_data.data(), serialized_data.size());
        page->is_dirty = true; // 标记页为脏

        // 6. 更新索引(通过 IndexingPolicy)
        RecordID rid = (static_cast<RecordID>(target_page_id) << 32) | slot_id;
        indexManager.insert_entry(recordLayout.get_key(record), rid);

        // 7. 解除页的锁定并解除固定 (unpin)
        bufferManager.unpin_page(target_page_id, true); // 标记为脏页
        concurrencyControl.releaseLock(tid, target_page_id);

        logManager->write_log(tid, "INSERT_END RecordID: " + std::to_string(rid));
        return rid;
    }

    // 查询记录
    std::vector<RecordType> lookupRecord(TransactionID tid, const std::string& key) {
        logManager->write_log(tid, "LOOKUP_START Key: " + key);
        std::vector<RecordType> results;

        // 1. 通过索引查找 RecordID (通过 IndexingPolicy)
        std::vector<RecordID> rids = indexManager.lookup_entry(key);
        if (rids.empty()) {
            logManager->write_log(tid, "LOOKUP_END No records found.");
            return results;
        }

        for (RecordID rid : rids) {
            PageID page_id = static_cast<PageID>(rid >> 32);
            SlotID slot_id = static_cast<SlotID>(rid & 0xFFFFFFFF);

            // 2. 申请页的读锁 (通过 ConcurrencyControlPolicy)
            if (!concurrencyControl.acquireLock(tid, page_id, LockMode::S)) {
                logManager->write_log(tid, "LOOKUP_ABORT_LOCK_FAIL");
                // 实际中可能需要处理部分结果或回滚
                continue; // Skip this record
            }

            // 3. 从缓冲池获取页 (通过 BufferManagementPolicy)
            Page* page = bufferManager.pin_page(page_id);
            if (!page) {
                logManager->write_log(tid, "LOOKUP_ABORT_PAGE_PIN_FAIL");
                concurrencyControl.releaseLock(tid, page_id);
                continue; // Skip this record
            }

            // 4. 从页中读取记录 (通过 PageManagementPolicy 和 RecordLayoutPolicy 协作)
            std::vector<byte> serialized_data;
            if (pageManager.get_record_from_page(*page, slot_id, serialized_data)) {
                RecordType record;
                recordLayout.deserialize(serialized_data, record);
                results.push_back(record);
            }

            // 5. 解除页的锁定并解除固定
            bufferManager.unpin_page(page_id, false); // 不是脏页
            concurrencyControl.releaseLock(tid, page_id);
        }

        logManager->write_log(tid, "LOOKUP_END Found " + std::to_string(results.size()) + " records.");
        return results;
    }

    // 事务提交/中止 (简化)
    void commitTransaction(TransactionID tid) {
        logManager->write_log(tid, "TRANSACTION_COMMIT");
        concurrencyControl.releaseAllLocks(tid); // 释放所有锁
        // 实际中还要处理日志写入磁盘,缓冲池脏页刷新等
    }

    void abortTransaction(TransactionID tid) {
        logManager->write_log(tid, "TRANSACTION_ABORT");
        concurrencyControl.releaseAllLocks(tid); // 释放所有锁
        // 实际中还要处理 undo 操作,回滚数据等
    }
};

宿主类 StorageEngine 的说明:

  • 它是一个接受 RecordType 和五种策略类型作为模板参数的类模板。
  • 它包含五个策略类的成员变量,这些策略将在构造时被默认构造。
  • 它还包含 DiskManagerLogManager 的智能指针,这些是所有策略可能需要协作的底层服务。
  • insertRecordlookupRecord 方法展示了如何协调多个策略来完成一个复杂操作。例如,插入操作需要 recordLayout 进行序列化,pageManager 查找空间,concurrencyControl 加锁,bufferManager 固定页,最后 indexManager 更新索引。
  • 这种设计使得每个策略专注于自己的职责,而 StorageEngine 负责流程编排。

3.3 策略实现示例 1:记录布局与序列化策略 (Record Layout Policy)

这个策略负责将结构化的 RecordType 对象转换为字节数组,并从字节数组中反序列化回来。

策略接口要求:

  • serialize(const RecordType& record, std::vector<byte>& buffer) const: 将记录序列化到 buffer 中。
  • deserialize(const std::vector<byte>& buffer, RecordType& record) const: 从 buffer 反序列化记录。
  • get_key(const RecordType& record) const: 从记录中提取用于索引的键。
  • get_record_size(const RecordType& record) const: 获取记录的实际存储大小。
  • get_max_record_size() const: 获取记录的最大可能存储大小(主要用于固定长度记录)。

3.3.1 FixedLengthRecordPolicy:固定长度记录

最简单的策略,直接将结构体内存拷贝。

template <typename RecordType, size_t FixedSize>
struct FixedLengthRecordPolicy {
    // 确保 RecordType 可以被直接 memcpy
    static_assert(std::is_trivially_copyable<RecordType>::value,
                  "RecordType must be trivially copyable for FixedLengthRecordPolicy");
    static_assert(sizeof(RecordType) <= FixedSize,
                  "RecordType size exceeds FixedSize in FixedLengthRecordPolicy");

    void serialize(const RecordType& record, std::vector<byte>& buffer) const {
        buffer.assign(reinterpret_cast<const byte*>(&record),
                      reinterpret_cast<const byte*>(&record) + sizeof(RecordType));
        // Pad to FixedSize if needed, though for FixedLength, usually sizeof(RecordType) == FixedSize
        if (buffer.size() < FixedSize) {
            buffer.resize(FixedSize, 0); // Pad with zeros
        }
    }

    void deserialize(const std::vector<byte>& buffer, RecordType& record) const {
        if (buffer.size() < sizeof(RecordType)) {
            throw std::runtime_error("Buffer too small for deserialization.");
        }
        std::memcpy(&record, buffer.data(), sizeof(RecordType));
    }

    // 假设 UserRecord 的 key 是其 id
    std::string get_key(const RecordType& record) const {
        // This assumes RecordType has an 'id' member or similar to form a key
        if constexpr (std::is_same_v<RecordType, UserRecord>) {
            return std::to_string(record.id);
        }
        return ""; // Fallback or compile-time error for other types
    }

    size_t get_record_size(const RecordType& record) const {
        return FixedSize;
    }

    size_t get_max_record_size() const {
        return FixedSize;
    }
};

3.3.2 VariableLengthRecordPolicy:可变长度记录

在数据前加上长度前缀,以支持可变长度字段或压缩。

template <typename RecordType>
struct VariableLengthRecordPolicy {
    // 假设 RecordType 具有 to_bytes 和 from_bytes 方法来处理序列化
    // 或者我们在这里实现一个简单的通用序列化
    void serialize(const RecordType& record, std::vector<byte>& buffer) const {
        // 这是一个简化的通用序列化,实际中需要更复杂的自定义逻辑
        // 对于 UserRecord, 我们将 id, name_len, name, balance 依次写入
        if constexpr (std::is_same_v<RecordType, UserRecord>) {
            size_t current_pos = 0;
            buffer.resize(sizeof(int) + 1 + 32 + sizeof(double)); // Max possible size for UserRecord

            // Write ID
            std::memcpy(buffer.data() + current_pos, &record.id, sizeof(int));
            current_pos += sizeof(int);

            // Write name length and name
            uint8_t name_len = static_cast<uint8_t>(std::strlen(record.name));
            std::memcpy(buffer.data() + current_pos, &name_len, sizeof(uint8_t));
            current_pos += sizeof(uint8_t);
            std::memcpy(buffer.data() + current_pos, record.name, name_len);
            current_pos += name_len;

            // Write balance
            std::memcpy(buffer.data() + current_pos, &record.balance, sizeof(double));
            current_pos += sizeof(double);

            buffer.resize(current_pos); // Trim buffer to actual size
        } else {
            // Placeholder for other RecordTypes, or use a generic serializer
            throw std::logic_error("Generic VariableLengthRecordPolicy requires RecordType to define custom serialization.");
        }
    }

    void deserialize(const std::vector<byte>& buffer, RecordType& record) const {
        if constexpr (std::is_same_v<RecordType, UserRecord>) {
            size_t current_pos = 0;

            // Read ID
            if (buffer.size() < current_pos + sizeof(int)) throw std::runtime_error("Buffer too small for ID.");
            std::memcpy(&record.id, buffer.data() + current_pos, sizeof(int));
            current_pos += sizeof(int);

            // Read name length and name
            if (buffer.size() < current_pos + sizeof(uint8_t)) throw std::runtime_error("Buffer too small for name length.");
            uint8_t name_len;
            std::memcpy(&name_len, buffer.data() + current_pos, sizeof(uint8_t));
            current_pos += sizeof(uint8_t);

            if (buffer.size() < current_pos + name_len) throw std::runtime_error("Buffer too small for name.");
            std::memcpy(record.name, buffer.data() + current_pos, name_len);
            record.name[name_len] = ''; // Ensure null termination
            current_pos += name_len;

            // Read balance
            if (buffer.size() < current_pos + sizeof(double)) throw std::runtime_error("Buffer too small for balance.");
            std::memcpy(&record.balance, buffer.data() + current_pos, sizeof(double));
            current_pos += sizeof(double);
        } else {
            throw std::logic_error("Generic VariableLengthRecordPolicy requires RecordType to define custom deserialization.");
        }
    }

    std::string get_key(const RecordType& record) const {
        if constexpr (std::is_same_v<RecordType, UserRecord>) {
            return std::to_string(record.id);
        }
        return "";
    }

    size_t get_record_size(const RecordType& record) const {
        if constexpr (std::is_same_v<RecordType, UserRecord>) {
            return sizeof(int) + sizeof(uint8_t) + std::strlen(record.name) + sizeof(double);
        }
        return 0; // Unknown size for generic type
    }

    size_t get_max_record_size() const {
        // 对于 UserRecord,我们可以计算一个最大值
        if constexpr (std::is_same_v<RecordType, UserRecord>) {
            return sizeof(int) + sizeof(uint8_t) + 32 + sizeof(double);
        }
        return 0; // Truly variable or needs a compile-time max
    }
};

3.4 策略实现示例 2:页内空间管理策略 (Page Management Policy)

这个策略负责在一个页内部管理记录的存储和空闲空间。

策略接口要求:

  • get_available_page(size_t record_size) const: 在存储引擎层面,请求一个能容纳 record_size 字节记录的页 ID。
  • insert_record_into_page(Page& page, const byte* data, size_t size): 将记录数据插入到给定页中,并返回槽位 ID。
  • get_record_from_page(const Page& page, SlotID slot_id, std::vector<byte>& out_data) const: 从给定页和槽位中读取记录数据。

3.4.1 SimplePageManagementPolicy:简单页管理

假设页内部有足够的空间,直接追加记录,并使用一个简单的槽位数组来记录偏移量。

struct SimplePageManagementPolicy {
    // 简单地返回一个新的页 ID,实际中会有一个空闲页管理器
    PageID get_available_page(size_t record_size) const {
        // In a real system, this would query a FreePageManager
        // For this demo, let's just assume we always get a new page ID for simplicity,
        // or a pre-allocated page from disk manager.
        // This policy itself doesn't manage disk pages, it delegates to DiskManager.
        // We'll return 0 as a placeholder, assuming the caller (StorageEngine)
        // will get a real page from BufferManager/DiskManager.
        // A more realistic scenario would involve passing DiskManager/BufferManager to policy.
        // For now, let's assume the StorageEngine handles page allocation logic.
        return 0; // Placeholder: StorageEngine needs to allocate/find page
    }

    // 插入记录到页中,返回 SlotID
    SlotID insert_record_into_page(Page& page, const byte* data, size_t size) {
        // 简化:假设页内部有一个简单的槽位数组和数据区
        // 真实情况会更复杂,例如使用 PageHeader 存储元数据
        // 假设 page.data 是实际的页内容
        // 这里只是一个非常简化的实现,不考虑实际的页内部结构(如头信息、空闲空间链表等)

        // 为了演示,我们假定页的开头有一个简单的计数器和偏移量数组
        // [num_records (uint16_t)] [offset0 (uint16_t)] [offset1] ... [record0] [record1] ...
        const size_t PAGE_HEADER_SIZE = sizeof(uint16_t); // for num_records
        const size_t SLOT_SIZE = sizeof(uint16_t); // for each record's offset

        uint16_t num_records = 0;
        if (page.data.size() >= PAGE_HEADER_SIZE) {
            std::memcpy(&num_records, page.data.data(), PAGE_HEADER_SIZE);
        }

        size_t current_data_offset = PAGE_HEADER_SIZE + num_records * SLOT_SIZE;
        // 检查是否有足够的空间来存储新的记录和新的槽位
        if (current_data_offset + SLOT_SIZE + size > page.size()) {
            throw std::runtime_error("Not enough space in page for new record.");
        }

        // 将记录数据追加到页的末尾 (或从后往前增长)
        size_t record_start_offset = page.size() - size; // 简化:从页尾部开始存储
        // 实际中可能需要一个更复杂的空闲空间查找算法
        // 这里为了演示,我们假定记录是从页的某个已知位置开始写入的
        // 更常见的是,记录从页的低地址开始,空闲槽位从高地址开始,中间相遇

        // 演示:我们将记录直接放在一个简化数组中,并返回其索引
        // 实际中,page.data 是字节数组,记录会被写入特定偏移量
        // 为了模拟,我们假定 page.data 足够大,并且我们可以在末尾添加
        if (page.data.capacity() < page.data.size() + size) {
            // Reallocation is not typical for fixed-size Page.data in real DB
            // This indicates a conceptual simplification for demo.
            page.data.resize(page.data.size() + size);
        }
        std::memcpy(page.data.data() + page.data.size() - size, data, size); // Simplistic storage

        // 实际中,这里会更新页头部的元数据,例如空闲空间指针、槽位数组等
        // 返回一个虚拟的 SlotID
        return num_records; // 只是一个索引,不是实际的偏移量
    }

    // 从页中读取记录,返回数据
    bool get_record_from_page(const Page& page, SlotID slot_id, std::vector<byte>& out_data) const {
        // 简化:如果 SimplePageManagementPolicy 只是将记录追加,那么需要知道原始记录的长度
        // 对于这个演示,我们假设我们知道记录的长度 (例如通过 FixedLengthRecordPolicy)
        // 这是一个设计缺陷,实际中 PageManagementPolicy 需要与 RecordLayoutPolicy 协作
        // 或者 RecordLayoutPolicy 能够从字节流中推断长度
        // 为了演示,我们假定这里能获取到数据,但实际情况需要更严谨的页内结构
        if (slot_id < page.data.size() / 100) { // 假设每条记录大概100字节
             // 再次简化,这里无法根据 slot_id 精确获取,因为 insert_record_into_page 是简化实现
             // 真实情况:页头部有槽位数组,每个槽位指向记录的起始偏移量和长度
             // out_data.assign(page.data.begin() + offset, page.data.begin() + offset + length);
             // For demo, just return some placeholder data
             out_data.assign(page.data.begin(), page.data.end()); // Return entire page data for now
             return true;
        }
        return false;
    }
};

// 实际中 PageManagementPolicy 需要与 DiskManager 合作
// 假设有一个 FreePageManager,它知道哪些页是空的
// 为了让 StorageEngine 的 insertRecord 正常工作,我们需要一个方法来获取 PageID
// 这里我们让 PageManagementPolicy 内部包含一个 DiskManager 指针,或者通过构造函数传递
struct BasicPageManagementPolicy {
    DiskManager* disk_manager_ = nullptr; // Needs to be initialized
    std::vector<PageID> free_pages_; // Simplified free page list
    std::mutex mtx_;

    void init(DiskManager* dm) {
        disk_manager_ = dm;
        // In a real system, load free_pages_ from disk
    }

    PageID get_available_page(size_t record_size) {
        std::lock_guard<std::mutex> lock(mtx_);
        if (!free_pages_.empty()) {
            PageID id = free_pages_.back();
            free_pages_.pop_back();
            return id;
        }
        // No free pages, allocate a new one
        if (disk_manager_) {
            return disk_manager_->allocate_page();
        }
        throw std::runtime_error("DiskManager not initialized for BasicPageManagementPolicy.");
    }

    void deallocate_page(PageID page_id) {
        std::lock_guard<std::mutex> lock(mtx_);
        free_pages_.push_back(page_id);
        if (disk_manager_) {
            disk_manager_->deallocate_page(page_id);
        }
    }

    // For simplicity, let's just make it store data sequentially and return slot as index
    SlotID insert_record_into_page(Page& page, const byte* data, size_t size) {
        // A real PageManagementPolicy needs a header in the page to manage slots
        // For this demo, we'll just append data directly to page.data as a conceptual model
        // and return a simple slot index.
        size_t current_records_count = 0; // Placeholder
        if (page.data.size() + size > page.capacity()) { // Assuming page.data.capacity() is page size
            throw std::runtime_error("Page full, cannot insert record.");
        }

        // Simulate appending to the end of existing data
        size_t old_size = page.data.size();
        page.data.resize(old_size + size);
        std::memcpy(page.data.data() + old_size, data, size);

        // This is highly simplified. A real page manager would manage slots and offsets.
        // For now, let's just return a placeholder SlotID.
        // SlotID is usually an index into a slot array in the page header.
        return static_cast<SlotID>(old_size); // Return the byte offset as slot ID for demo
    }

    bool get_record_from_page(const Page& page, SlotID slot_offset, std::vector<byte>& out_data) const {
        // This is also highly simplified. A real page manager uses slot_offset
        // to look up actual record offset and length from a slot directory.
        // For this demo, we assume the record starts at slot_offset and its length
        // can be determined by the RecordLayoutPolicy's deserialize or if it's fixed.
        // Since get_record_from_page doesn't know record length, this is a limitation.
        // In a real system, the slot entry would store length.
        // For demo, we might need a fixed size assumption or
        // pass a hint about max_record_size / actual_record_size.
        // Let's assume for FixedLengthRecordPolicy, we know the size from template.
        // For VariableLengthRecordPolicy, deserialize can read length prefix.

        // Placeholder: assume we can read up to 1024 bytes or until end of page
        size_t assumed_max_record_size = 1024; // Needs to be smarter
        if (slot_offset >= page.data.size()) return false;

        size_t available_bytes = page.data.size() - slot_offset;
        size_t bytes_to_read = std::min(available_bytes, assumed_max_record_size);

        if (bytes_to_read == 0) return false;

        out_data.assign(page.data.data() + slot_offset, page.data.data() + slot_offset + bytes_to_read);
        return true;
    }
};

3.5 策略实现示例 3:缓冲池页替换策略 (Buffer Management Policy)

缓冲池是内存中的页缓存。当缓冲池满时,需要选择一个页淘汰出去,为新页腾出空间。

策略接口要求:

  • init(DiskManager* dm): 初始化缓冲池,需要 DiskManager 来读写磁盘。
  • pin_page(PageID page_id): 将页固定在缓冲池中(增加 pin_count),如果不在则从磁盘加载。
  • unpin_page(PageID page_id, bool is_dirty): 解除页的固定(减少 pin_count),并标记是否为脏页。
  • flush_page(PageID page_id): 将指定页写回磁盘。
  • flush_all_pages(): 将所有脏页写回磁盘。
  • select_victim(): (内部方法) 选择一个可淘汰的页。

3.5.1 LRUBufferPolicy:最近最少使用

维护一个页的访问链表,最近访问的在头部,最久未访问的在尾部。淘汰尾部的页。

struct LRUBufferPolicy {
    static const size_t BUFFER_SIZE = 10; // 缓冲池大小,简化为固定值

    DiskManager* disk_manager_ = nullptr;
    std::map<PageID, Page> buffer_pool_; // 存储实际的页对象
    std::map<PageID, typename std::list<PageID>::iterator> lru_map_; // 维护页在 LRU 列表中的位置
    std::list<PageID> lru_list_; // 存放 PageID,头部是最近使用的,尾部是最久未使用的
    std::mutex mtx_;

    void init(DiskManager* dm) {
        disk_manager_ = dm;
    }

    Page* pin_page(PageID page_id) {
        std::lock_guard<std::mutex> lock(mtx_);
        if (buffer_pool_.count(page_id)) {
            // 页已在缓冲池中
            Page& page = buffer_pool_.at(page_id);
            page.pin_count++;
            // 更新 LRU 链表:移到头部
            lru_list_.erase(lru_map_.at(page_id));
            lru_list_.push_front(page_id);
            lru_map_[page_id] = lru_list_.begin();
            return &page;
        } else {
            // 页不在缓冲池中,需要加载
            if (buffer_pool_.size() >= BUFFER_SIZE) {
                // 缓冲池已满,需要淘汰一个页
                PageID victim_id = select_victim();
                if (victim_id == 0) { // 0 typically means no victim found (all pinned)
                    std::cerr << "Buffer pool full and all pages pinned! Cannot pin new page " << page_id << std::endl;
                    return nullptr;
                }
                flush_page_internal(victim_id); // 写回磁盘(如果脏)
                buffer_pool_.erase(victim_id);
                lru_map_.erase(victim_id);
                lru_list_.remove(victim_id); // Remove from LRU list
            }

            // 从磁盘加载页
            Page new_page(page_id);
            if (disk_manager_) {
                disk_manager_->read_page(page_id, new_page);
            }
            new_page.pin_count = 1;
            buffer_pool_[page_id] = new_page;

            // 加入 LRU 链表头部
            lru_list_.push_front(page_id);
            lru_map_[page_id] = lru_list_.begin();
            return &buffer_pool_[page_id];
        }
    }

    void unpin_page(PageID page_id, bool is_dirty) {
        std::lock_guard<std::mutex> lock(mtx_);
        if (buffer_pool_.count(page_id)) {
            Page& page = buffer_pool_.at(page_id);
            if (page.pin_count > 0) {
                page.pin_count--;
            }
            if (is_dirty) {
                page.is_dirty = true;
            }
        }
    }

    void flush_page(PageID page_id) {
        std::lock_guard<std::mutex> lock(mtx_);
        flush_page_internal(page_id);
    }

    void flush_all_pages() {
        std::lock_guard<std::mutex> lock(mtx_);
        for (auto const& [page_id, page] : buffer_pool_) {
            flush_page_internal(page_id);
        }
    }

private:
    PageID select_victim() {
        // 查找 LRU 链表中第一个 pin_count 为 0 的页
        for (PageID id : lru_list_) {
            if (buffer_pool_.at(id).pin_count == 0) {
                return id;
            }
        }
        return 0; // 没有可淘汰的页
    }

    void flush_page_internal(PageID page_id) {
        if (buffer_pool_.count(page_id)) {
            Page& page = buffer_pool_.at(page_id);
            if (page.is_dirty) {
                if (disk_manager_) {
                    disk_manager_->write_page(page_id, page);
                }
                page.is_dirty = false;
            }
        }
    }
};

3.6 策略实现示例 4:并发控制策略 (Concurrency Control Policy)

该策略负责管理事务对数据资源的并发访问。

策略接口要求:

  • acquireLock(TransactionID tid, LockableID resource_id, LockMode mode): 事务 tid 尝试获取 resource_id 资源的 mode 锁。
  • releaseLock(TransactionID tid, LockableID resource_id): 事务 tid 释放 resource_id 资源的锁。
  • releaseAllLocks(TransactionID tid): 事务 tid 释放所有持有的锁。
enum class LockMode { S, X }; // Shared (共享锁), Exclusive (排他锁)
using LockableID = PageID; // 简化为页级锁

struct LockEntry {
    TransactionID owner_tid;
    LockMode mode;
};

// 检查锁兼容性
bool is_lock_compatible(LockMode existing_mode, LockMode requested_mode) {
    if (requested_mode == LockMode::S && existing_mode == LockMode::S) {
        return true; // S 和 S 兼容
    }
    return false; // 其他情况不兼容 (S 和 X, X 和 S, X 和 X)
}

struct TwoPhaseLockingPolicy {
    std::map<LockableID, std::vector<LockEntry>> lock_table_; // resource_id -> list of locks
    std::map<TransactionID, std::vector<LockableID>> transaction_locks_; // tid -> list of resources locked
    std::mutex mtx_;

    // 简化:不实现等待队列和死锁检测,直接返回 false 表示获取失败
    bool acquireLock(TransactionID tid, LockableID resource_id, LockMode mode) {
        std::lock_guard<std::mutex> lock(mtx_);
        auto& current_locks = lock_table_[resource_id];

        // 检查是否存在当前事务已持有的锁
        for (const auto& entry : current_locks) {
            if (entry.owner_tid == tid) {
                // 如果当前事务已持有相同模式的锁,直接成功
                if (entry.mode == mode) return true;
                // 如果当前事务持有 S 锁,请求 X 锁,尝试升级
                if (entry.mode == LockMode::S && mode == LockMode::X) {
                    if (current_locks.size() == 1) { // 只有自己持有 S 锁才能升级
                        current_locks[0].mode = LockMode::X;
                        // std::cout << "Transaction " << tid << " upgraded lock on " << resource_id << std::endl;
                        return true;
                    }
                }
            }
        }

        // 检查兼容性
        for (const auto& entry : current_locks) {
            if (!is_lock_compatible(entry.mode, mode)) {
                // 不兼容,锁获取失败 (实际中会进入等待队列)
                // std::cout << "Transaction " << tid << " cannot acquire " << (mode == LockMode::S ? "S" : "X")
                //           << " lock on " << resource_id << " due to existing "
                //           << (entry.mode == LockMode::S ? "S" : "X") << " lock by " << entry.owner_tid << std::endl;
                return false;
            }
        }

        // 获取锁成功
        current_locks.push_back({tid, mode});
        transaction_locks_[tid].push_back(resource_id);
        // std::cout << "Transaction " << tid << " acquired " << (mode == LockMode::S ? "S" : "X")
        //           << " lock on " << resource_id << std::endl;
        return true;
    }

    void releaseLock(TransactionID tid, LockableID resource_id) {
        std::lock_guard<std::mutex> lock(mtx_);
        auto it_resource = lock_table_.find(resource_id);
        if (it_resource != lock_table_.end()) {
            auto& locks = it_resource->second;
            locks.erase(std::remove_if(locks.begin(), locks.end(),
                                       [tid](const LockEntry& entry) { return entry.owner_tid == tid; }),
                        locks.end());
            if (locks.empty()) {
                lock_table_.erase(it_resource);
            }
        }

        auto it_tid = transaction_locks_.find(tid);
        if (it_tid != transaction_locks_.end()) {
            auto& resources = it_tid->second;
            resources.erase(std::remove(resources.begin(), resources.end(), resource_id), resources.end());
            if (resources.empty()) {
                transaction_locks_.erase(it_tid);
            }
        }
        // std::cout << "Transaction " << tid << " released lock on " << resource_id << std::endl;
    }

    void releaseAllLocks(TransactionID tid) {
        std::lock_guard<std::mutex> lock(mtx_);
        auto it_tid = transaction_locks_.find(tid);
        if (it_tid != transaction_locks_.end()) {
            for (LockableID resource_id : it_tid->second) {
                auto it_resource = lock_table_.find(resource_id);
                if (it_resource != lock_table_.end()) {
                    auto& locks = it_resource->second;
                    locks.erase(std::remove_if(locks.begin(), locks.end(),
                                               [tid](const LockEntry& entry) { return entry.owner_tid == tid; }),
                                locks.end());
                    if (locks.empty()) {
                        lock_table_.erase(it_resource);
                    }
                }
            }
            transaction_locks_.erase(it_tid);
        }
        // std::cout << "Transaction " << tid << " released all locks." << std::endl;
    }
};

// MVCCPolicy (Multi-Version Concurrency Control) 简化版
// MVCC 通常通过维护多版本数据,并结合时间戳或事务ID来判断可见性,减少读写冲突
// 这里只提供一个骨架,实际实现非常复杂
struct MVCCPolicy {
    // MVCC 通常不需要读锁,只有写操作需要协调
    // 写入会创建新版本,并更新元数据
    // 读取则读取事务开始时可见的旧版本
    std::shared_mutex mtx_; // 使用读写锁来保护版本控制元数据

    bool acquireLock(TransactionID tid, LockableID resource_id, LockMode mode) {
        if (mode == LockMode::S) {
            // 在 MVCC 中,读操作通常不需要显式锁,因为它们读取的是旧版本
            // 只需要确保读取的版本的可见性规则
            mtx_.lock_shared(); // 保护共享元数据
            mtx_.unlock_shared();
            return true;
        } else { // LockMode::X
            // 写操作需要获取排他锁,以确保在创建新版本时的原子性
            // 实际中可能需要检查冲突,例如写写冲突
            mtx_.lock(); // 保护元数据和写入操作
            // 实际中,这里会启动一个新的版本创建流程,而不是传统锁
            // 检查是否有活跃事务正在修改此资源
            // 如果有,则可能等待或中止
            mtx_.unlock();
            return true;
        }
    }

    void releaseLock(TransactionID tid, LockableID resource_id) {
        // MVCC 中的锁通常与事务的提交/中止绑定,而不是单个资源
        // 当事务提交时,其写入的新版本变为可见
        // 当事务中止时,其写入的新版本被废弃
        // 所以这里可能不做任何事,或者只是清理事务相关的临时状态
    }

    void releaseAllLocks(TransactionID tid) {
        // 同 releaseLock,MVCC 更多是关于事务生命周期中的版本管理
    }
};

3.7 策略实现示例 5:索引策略 (Indexing Policy)

索引策略负责维护数据的查找结构。

策略接口要求:

  • init(BufferManagementPolicy* bmp): 初始化索引,需要缓冲池来管理索引页。
  • insert_entry(const std::string& key, RecordID rid): 插入一个键值对到索引。
  • lookup_entry(const std::string& key): 根据键查找所有对应的 RecordID。
  • delete_entry(const std::string& key, RecordID rid): 从索引中删除一个键值对。

3.7.1 BTreeIndexingPolicy:B-树索引

一个经典的 B+ 树实现。

// 简化版 BTreeIndexingPolicy
// 实际的 B+Tree 实现会非常复杂,涉及到页的读写、分裂、合并、并发控制等
struct BTreeIndexingPolicy {
    BufferManagementPolicy* buffer_manager_ = nullptr;
    std::map<std::string, std::vector<RecordID>> index_data_; // 内存中的简化索引

    void init(BufferManagementPolicy* bmp) {
        buffer_manager_ = bmp;
        // In a real B-Tree, this would load the root page and other metadata
    }

    void insert_entry(const std::string& key, RecordID rid) {
        // 真实 B+Tree 插入逻辑:
        // 1. 查找叶子页
        // 2. 将键值对插入叶子页
        // 3. 如果叶子页满,进行分裂,并向上级传播
        // 4. 更新父节点
        // 5. 使用 BufferManagementPolicy pin/unpin 索引页

        // 简化:直接插入到内存 map
        std::lock_guard<std::mutex> lock(mtx_);
        index_data_[key].push_back(rid);
        // std::cout << "Indexed: Key=" << key << ", RID=" << rid << std::endl;
    }

    std::vector<RecordID> lookup_entry(const std::string& key) {
        // 真实 B+Tree 查找逻辑:
        // 1. 从根节点开始,沿着 B+Tree 路径下降到叶子页
        // 2. 在叶子页中查找键
        // 3. 使用 BufferManagementPolicy pin/unpin 索引页
        std::lock_guard<std::mutex> lock(mtx_);
        if (index_data_.count(key)) {
            return index_data_.at(key);
        }
        return {};
    }

    void delete_entry(const std::string& key, RecordID rid) {
        // 真实 B+Tree 删除逻辑:
        // 1. 查找叶子页
        // 2. 从叶子页删除键值对
        // 3. 如果叶子页欠满,可能需要合并或借用
        // 4. 更新父节点
        std::lock_guard<std::mutex> lock(mtx_);
        if (index_data_.count(key)) {
            auto& rids = index_data_.at(key);
            rids.erase(std::remove(rids.begin(), rids.end(), rid), rids.end());
            if (rids.empty()) {
                index_data_.erase(key);
            }
        }
    }

private:
    std::mutex mtx_; // Protects index_data_ for this simplified in-memory version
};

3.8 组装一个定制化的存储引擎

现在我们有了宿主类和一些策略。我们可以轻松地创建不同配置的存储引擎。

// 定义一些假设的策略,它们可能只是空结构体或者更复杂的实现
struct AdvancedPageManagementPolicy : public BasicPageManagementPolicy {}; // 假设的更高级页管理
struct LFUBufferPolicy : public LRUBufferPolicy {}; // 假设的 LFU 策略,这里简化为LRU
struct LSMTreeIndexingPolicy : public BTreeIndexingPolicy {}; // 假设的 LSM-Tree,这里简化为BTree

int main() {
    std::cout << "--- Building SimpleFixedLengthEngine ---" << std::endl;
    // 构建一个简单的、固定长度记录、LRU 缓冲、2PL 并发控制、B-树索引的引擎
    using SimpleFixedLengthEngine = StorageEngine<
        UserRecord,
        FixedLengthRecordPolicy<UserRecord, sizeof(UserRecord)>,
        BasicPageManagementPolicy,
        LRUBufferPolicy,
        TwoPhaseLockingPolicy,
        BTreeIndexingPolicy
    >;

    SimpleFixedLengthEngine engine1;
    TransactionID tid1 = 101;
    TransactionID tid2 = 102;

    std::cout << "nInserting records into SimpleFixedLengthEngine:" << std::endl;
    try {
        UserRecord rec1(1, "Alice", 100.0);
        RecordID rid1 = engine1.insertRecord(tid1, rec1);
        UserRecord rec2(2, "Bob", 200.0);
        RecordID rid2 = engine1.insertRecord(tid1, rec2);
        engine1.commitTransaction(tid1);

        UserRecord rec3(3, "Charlie", 300.0);
        RecordID rid3 = engine1.insertRecord(tid2, rec3);
        engine1.commitTransaction(tid2);

    } catch (const std::exception& e) {
        std::cerr << "Error in SimpleFixedLengthEngine insert: " << e.what() << std::endl;
    }

    std::cout << "nLooking up records in SimpleFixedLengthEngine:" << std::endl;
    std::vector<UserRecord> results1 = engine1.lookupRecord(tid1, "1");
    for (const auto& rec : results1) {
        rec.print();
    }
    std::vector<UserRecord> results2 = engine1.lookupRecord(tid1, "2");
    for (const auto& rec : results2) {
        rec.print();
    }
    std::vector<UserRecord> results3 = engine1.lookupRecord(tid1, "3");
    for (const auto& rec : results3) {
        rec.print();
    }

    std::cout << "n--- Building AdvancedVariableLengthMVCCEngine ---" << std::endl;
    // 构建一个更高级的、可变长度记录、LFU 缓冲、MVCC 并发控制、LSM-Tree 索引的引擎
    using AdvancedVariableLengthMVCCEngine = StorageEngine<
        UserRecord,
        VariableLengthRecordPolicy<UserRecord>,
        AdvancedPageManagementPolicy,
        LFUBufferPolicy,
        MVCCPolicy,
        LSMTreeIndexingPolicy
    >;

    AdvancedVariableLengthMVCCEngine engine2;
    TransactionID tid3 = 201;

    std::cout << "nInserting records into AdvancedVariableLengthMVCCEngine:" << std::endl;
    try {
        UserRecord rec4(4, "David", 400.0);
        RecordID rid4 = engine2.insertRecord(tid3, rec4);
        UserRecord rec5(5, "Eve is a very long name that tests variable length", 500.0);
        RecordID rid5 = engine2.insertRecord(tid3, rec5);
        engine2.commitTransaction(tid3);
    } catch (const std::exception& e) {
        std::cerr << "Error in AdvancedVariableLengthMVCCEngine insert: " << e.what() << std::endl;
    }

    std::cout << "nLooking up records in AdvancedVariableLengthMVCCEngine:" << std::endl;
    std::vector<UserRecord> results4 = engine2.lookupRecord(tid3, "4");
    for (const auto& rec : results4) {
        rec.print();
    }
    std::vector<UserRecord> results5 = engine2.lookupRecord(tid3, "5");
    for (const auto& rec : results5) {
        rec.print();
    }
    std::cout << "nDemonstration complete." << std::endl;

    return 0;
}

通过修改 main 函数中的 using 声明,我们可以轻松地在不同的存储引擎配置之间切换,而无需修改 StorageEngine 宿主类或任何策略类的内部逻辑。这就是 Policy-based Design 带来的强大定制能力!

四、策略模式设计的优势、挑战与最佳实践

4.1 策略模式设计的优势

  1. 极致的定制化与灵活性:这是 PBD 最显著的优点。通过简单的模板参数替换,可以生成针对特定工作负载或环境高度优化的存储引擎变体。无需修改一行核心代码,即可改变其底层行为。
  2. 高性能(零运行时开销):由于策略选择和绑定在编译时完成,PBD 避免了虚函数调用带来的运行时开销。对于数据库存储引擎这种性能敏感的系统,这一点至关重要。
  3. 模块化与可重用性:每个策略都是一个独立的、内聚的模块,负责一个特定的功能域。它们可以独立开发、测试和维护,并且可以在不同的宿主类或不同的引擎配置中重复使用。
  4. 编译时安全性:如果策略类没有提供宿主类期望的接口(例如,缺少某个方法或类型定义),编译器会在编译时报错。这比运行时错误更容易发现和修复。
  5. 减少样板代码:宿主类定义了通用的流程和逻辑,而策略类只关注具体实现细节。这减少了代码重复,提高了代码的内聚性。
  6. 易于扩展:添加新的策略实现(例如,一个新的缓冲池替换算法)只需要创建新的策略类,而不需要修改现有代码。

4.2 策略模式设计的挑战与注意事项

  1. 模板元编程的复杂性:PBD 依赖于模板,复杂的模板代码可能难以理解和调试。特别是当策略之间存在微妙的依赖关系时,编译错误信息可能变得冗长且难以解读。
  2. 隐式接口(Duck Typing):在 C++20 concept 之前,策略类需要遵循的是隐式接口(即“如果它走起来像鸭子,叫起来像鸭子,那它就是鸭子”)。宿主类期望策略提供某些方法和类型,但这些期望通常没有在代码中明确声明。这要求严格的文档和约定。
  3. 策略间的依赖关系:虽然 PBD 鼓励策略独立,但在复杂的系统中,某些策略可能需要访问其他策略的功能或状态(例如,缓冲池策略需要磁盘管理器,索引策略需要缓冲池)。这需要在宿主类中精心设计策略的初始化和协作方式。
  4. 代码膨胀 (Code Bloat):如果为每个策略参数组合都生成大量不同的代码,可能会导致最终二进制文件的大小增加。现代编译器通常能很好地优化,但仍需注意。
  5. 调试难度:编译时错误通常比运行时错误更早发现,但模板实例化深度过高时,错误信息可能变得非常复杂。

4.3 最佳实践

  1. 明确策略接口:即使是隐式接口,也应该通过详细的注释、文档或辅助类来清晰定义策略必须提供的公共方法、类型别名和常量。在 C++20 及更高版本中,使用 std::concepts 可以将这些接口显式化,极大地改善错误信息和可读性。
  2. 策略应小而专注:每个策略应该只负责一个单一、定义明确的功能。避免创建“大而全”的策略,这会降低模块化和可重用性。
  3. 提供默认策略:为常见的用例提供一套合理的默认策略,简化用户入门的复杂性。
  4. 策略组合与聚合:对于非常复杂的功能,可以考虑将多个简单策略组合成一个复合策略。例如,一个高级的页管理策略可能内部使用一个空闲空间管理策略和一个页内记录布局策略。
  5. 彻底的测试:对每个策略进行单元测试,并对不同策略组合的宿主类进行集成测试,确保所有组合都能正确工作。
  6. 利用现代 C++ 特性
    • if constexpr (C++17):在模板函数中进行编译时条件分支,可以根据策略的特性选择不同的实现路径。
    • std::concepts (C++20):这是 PBD 的“杀手级”特性,它允许我们显式地定义策略的接口要求,并在编译时进行检查,从而提供清晰的错误信息。

五、总结与展望

Policy-based Design 为构建高度定制化、高性能的数据库存储引擎提供了一条优雅而强大的途径。它通过在编译时将行为与实现解耦,使得开发者能够像组装乐高积木一样,根据具体需求灵活地配置存储引擎的各个核心组件,从数据布局到并发控制,从缓冲管理到索引结构。这种设计不仅能够显著提升系统的灵活性和可维护性,还能在性能上达到接近手写代码的效率,避免了运行时多态的额外开销。

虽然 PBD 引入了模板元编程的复杂性,但通过清晰的接口定义、模块化的设计以及利用现代 C++ 的特性,这些挑战是完全可以克服的。掌握 PBD,将使我们能够设计出更具适应性、更强大的软件系统,为应对未来多变的数据管理需求做好准备。我鼓励大家在自己的项目中积极探索和实践这一强大的设计范式。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注