什么是 ‘Write-Ahead Logging’ (WAL)?利用 C++ 实现具备奔溃恢复能力的高性能持久化存储

各位专家、工程师们,欢迎来到今天的讲座。我们将深入探讨一个在构建高可靠、高性能持久化存储系统时至关重要的技术——Write-Ahead Logging (WAL)。我将以一名编程专家的视角,不仅阐述WAL的理论基础,更将带领大家逐步解构并利用C++实现一个具备奔溃恢复能力的持久化存储系统,探讨其内部机制与性能优化策略。


1. 持久化存储的挑战与WAL的诞生

在现代信息系统中,数据是核心资产。我们构建的各种应用,从简单的配置存储到复杂的事务型数据库,无一例外都需要将数据安全地保存下来,即使系统遭遇电源故障、软件崩溃等意外情况,数据也必须能够恢复到一致状态。这就是持久化存储的核心需求。

然而,实现一个高性能、高可靠的持久化存储系统面临着诸多挑战:

  • 奔溃恢复 (Crash Recovery):这是最关键的需求。系统可能在任何时候崩溃,例如断电。崩溃后,系统必须能够将数据恢复到一个有效且一致的状态,所有已提交的事务必须持久化,所有未提交的事务必须被回滚。
  • 原子性 (Atomicity):事务的原子性要求一个事务中的所有操作要么全部成功,要么全部失败。没有中间状态。例如,银行转账操作,包括扣款和入账,必须作为一个不可分割的整体。
  • 持久性 (Durability):一旦事务被提交,其所做的更改必须永久保存,即使系统崩溃也依然存在。
  • 并发性 (Concurrency):多个事务可能同时读写数据。系统需要确保并发操作不会导致数据损坏或不一致。
  • 性能 (Performance):数据读写操作需要尽可能快,尤其是在高并发负载下。

传统的"in-place update"(就地更新)策略,即直接在数据文件中修改数据,难以同时满足原子性、持久性和高性能的要求。例如,如果一个事务修改了多个数据页,在所有修改都写入磁盘之前发生崩溃,那么磁盘上的数据将处于不一致状态,且无法判断哪些修改是已提交事务的一部分,哪些是未提交事务的一部分。

为了解决这些问题,数据库领域引入了Write-Ahead Logging (WAL)机制。WAL的核心思想是:任何数据修改都必须首先记录在日志中,并且这些日志记录必须在数据页被写入磁盘之前,先被持久化到稳定的存储介质上。

这个简单而强大的原则,为我们构建高可靠的持久化存储系统奠定了基石。


2. Write-Ahead Logging (WAL) 的核心原理

WAL机制的核心在于“先写日志,后写数据”。它将数据修改操作分解为两个阶段:

  1. 记录日志 (Logging):将数据修改的意图和内容(包括旧值和新值)以日志记录的形式写入一个专门的日志文件(通常是顺序写入)。
  2. 应用修改 (Applying Changes):将日志中记录的修改实际应用到内存中的数据页(缓冲区),这些数据页随后会异步或批量地写入磁盘。

WAL遵循两个关键规则:

  • WAL Rule (Redo Rule):一个事务的任何数据页修改,其对应的日志记录必须在数据页本身被写入磁盘之前,先被写入日志文件并持久化(fsync)。这意味着,如果系统崩溃,只要日志记录已持久化,我们就能通过重放日志来恢复数据。
  • Undo Rule (Force-Log-at-Commit):在事务提交时,所有与该事务相关的日志记录(包括事务开始、数据修改和事务提交记录)都必须被强制写入日志文件并持久化,才能向客户端确认事务已成功提交。这确保了已提交事务的持久性。

2.1 WAL如何保证ACID属性?

WAL是实现ACID(Atomicity, Consistency, Isolation, Durability)属性的关键机制之一。

  • 原子性 (Atomicity)
    • 如果一个事务在提交前崩溃,WAL日志中会留下该事务的修改记录,但没有提交记录。在恢复过程中,系统可以通过回溯日志(Undo)来撤销这些未提交的修改,将数据恢复到事务开始前的状态。
  • 持久性 (Durability)
    • 一旦事务提交,其提交记录以及所有相关修改日志都已写入并持久化到日志文件。即使系统在数据页写入磁盘之前崩溃,恢复时,系统可以重放(Redo)这些日志记录,将数据恢复到事务提交后的状态。
  • 一致性 (Consistency)
    • WAL本身不直接保证业务逻辑上的一致性,但它为原子性和持久性提供了基础,确保了在系统层面数据状态的逻辑转换是完整的。事务管理器结合WAL,通过隔离级别和约束条件来维护一致性。
  • 隔离性 (Isolation)
    • WAL通常与并发控制机制(如两阶段锁2PL、多版本并发控制MVCC)协同工作。日志记录可以包含锁信息或版本信息,以支持并发事务的正确隔离。WAL日志本身是串行写入的,这简化了并发控制在日志层面的实现。

2.2 WAL带来的性能优势

WAL不仅提升了可靠性,也带来了显著的性能优势:

  • 顺序写入 (Sequential Writes):日志文件通常是顺序追加写入的。顺序写入比随机写入快得多,因为它们更好地利用了磁盘的机械特性(对于HDD)或闪存的内部机制(对于SSD)。而数据文件上的修改可能是随机的。
  • 减少磁盘I/O (Reduced Disk I/O)
    • 数据页的修改首先发生在内存中的缓冲区。只有当缓冲区中的脏页需要被驱逐(evict)或系统执行检查点(checkpoint)时,才需要将它们写入磁盘。这减少了对数据文件的随机写入次数。
    • 多个逻辑修改可以合并为一次物理写入。
  • 组提交 (Group Commit):多个并发事务的提交请求可以被批量处理。系统可以将它们的提交日志记录一次性写入日志文件并进行一次fsync操作,而不是每个事务都独立执行一次fsync,从而显著降低fsync的开销。
  • 后台写入 (Background Writes):脏数据页的写入可以异步在后台进行,不会阻塞前端的事务处理。

3. C++ 实现具备奔溃恢复能力的持久化存储系统架构

我们将设计一个简化但功能完备的持久化存储系统,它将包含以下核心组件:

  1. LogRecord (日志记录):定义不同类型的日志记录结构。
  2. LogManager (日志管理器):负责日志的写入、读取和持久化。
  3. Page (数据页):存储实际数据的基本单元。
  4. DiskManager (磁盘管理器):负责数据文件和日志文件的底层物理读写。
  5. BufferPoolManager (缓冲区池管理器):管理内存中的数据页,作为磁盘与CPU之间的缓存。
  6. Transaction (事务):定义事务的生命周期和状态。
  7. TransactionManager (事务管理器):管理事务的开始、提交和中止。
  8. RecoveryManager (恢复管理器):负责系统启动时的奔溃恢复。
  9. StorageEngine (存储引擎):整合所有组件,提供高层接口。

我们将使用C++标准库,并利用其提供的文件I/O、多线程和同步原语。

3.1 核心数据结构设计

3.1.1 LogRecord:日志记录

日志记录是WAL机制的基石。每当数据发生变化或事务状态发生改变,都会生成一个对应的日志记录。

// common.h
#pragma once
#include <cstdint>
#include <vector>
#include <string>
#include <memory>
#include <atomic>
#include <mutex>
#include <fstream>
#include <unordered_map>
#include <list>
#include <iostream>
#include <chrono>

// 定义类型别名
using LSN = uint64_t;      // Log Sequence Number
using TxID = uint64_t;     // Transaction ID
using PageID = uint64_t;   // Page ID
using FrameID = uint32_t;  // Buffer Pool Frame ID
const size_t PAGE_SIZE = 4096; // 数据页大小

// LogRecordType: 日志记录类型
enum class LogRecordType : uint8_t {
    INVALID = 0,
    BEGIN,
    COMMIT,
    ABORT,
    UPDATE,
    CLR // Compensation Log Record for UNDO operations
};

// LogRecordHeader: 所有日志记录的通用头部
struct LogRecordHeader {
    LSN lsn;            // 当前日志记录的LSN
    LSN prev_lsn;       // 同一事务前一个日志记录的LSN,用于构建事务日志链
    TxID tx_id;         // 事务ID
    LogRecordType type; // 日志记录类型
    uint32_t payload_size; // 实际数据(payload)的大小,不包括头部

    // 默认构造函数
    LogRecordHeader() : lsn(0), prev_lsn(0), tx_id(0), type(LogRecordType::INVALID), payload_size(0) {}
};

// LogRecord:基类,所有具体日志记录的父类
class LogRecord {
public:
    LogRecordHeader header;

    LogRecord(LSN lsn = 0, LSN prev_lsn = 0, TxID tx_id = 0, LogRecordType type = LogRecordType::INVALID, uint32_t payload_size = 0)
        : header({lsn, prev_lsn, tx_id, type, payload_size}) {}

    virtual ~LogRecord() = default;

    // 序列化日志记录到字节流
    virtual std::vector<char> serialize() const {
        std::vector<char> data(sizeof(LogRecordHeader));
        memcpy(data.data(), &header, sizeof(LogRecordHeader));
        return data;
    }

    // 从字节流反序列化日志记录(基类只处理头部)
    virtual void deserialize(const char* data) {
        memcpy(&header, data, sizeof(LogRecordHeader));
    }

    virtual std::string toString() const {
        return "Type: " + std::to_string(static_cast<int>(header.type)) +
               ", TxID: " + std::to_string(header.tx_id) +
               ", LSN: " + std::to_string(header.lsn) +
               ", PrevLSN: " + std::to_string(header.prev_lsn);
    }
};

// UpdateLogRecord:数据更新日志记录
class UpdateLogRecord : public LogRecord {
public:
    PageID page_id;
    uint32_t offset;
    std::vector<char> old_value;
    std::vector<char> new_value;

    UpdateLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id, PageID page_id, uint32_t offset,
                    const std::vector<char>& old_val, const std::vector<char>& new_val)
        : LogRecord(lsn, prev_lsn, tx_id, LogRecordType::UPDATE,
                    sizeof(PageID) + sizeof(uint32_t) + old_val.size() + new_val.size()),
          page_id(page_id), offset(offset), old_value(old_val), new_value(new_val) {}

    // 默认构造函数
    UpdateLogRecord() : LogRecord(0, 0, 0, LogRecordType::UPDATE, 0), page_id(0), offset(0) {}

    std::vector<char> serialize() const override {
        size_t total_size = sizeof(LogRecordHeader) + sizeof(PageID) + sizeof(uint32_t) + old_value.size() + new_value.size();
        std::vector<char> data(total_size);
        char* ptr = data.data();

        memcpy(ptr, &header, sizeof(LogRecordHeader));
        ptr += sizeof(LogRecordHeader);

        memcpy(ptr, &page_id, sizeof(PageID));
        ptr += sizeof(PageID);

        memcpy(ptr, &offset, sizeof(uint32_t));
        ptr += sizeof(uint32_t);

        memcpy(ptr, old_value.data(), old_value.size());
        ptr += old_value.size();

        memcpy(ptr, new_value.data(), new_value.size());

        return data;
    }

    void deserialize(const char* data) override {
        const char* ptr = data;
        memcpy(&header, ptr, sizeof(LogRecordHeader));
        ptr += sizeof(LogRecordHeader);

        memcpy(&page_id, ptr, sizeof(PageID));
        ptr += sizeof(PageID);

        memcpy(&offset, ptr, sizeof(uint32_t));
        ptr += sizeof(uint32_t);

        old_value.resize(header.payload_size - sizeof(PageID) - sizeof(uint32_t) - new_value.size()); // 预估 old_value 大小
        memcpy(old_value.data(), ptr, old_value.size());
        ptr += old_value.size();

        new_value.resize(header.payload_size - sizeof(PageID) - sizeof(uint32_t) - old_value.size()); // 实际 new_value 大小
        memcpy(new_value.data(), ptr, new_value.size());
    }

    std::string toString() const override {
        return LogRecord::toString() +
               ", PageID: " + std::to_string(page_id) +
               ", Offset: " + std::to_string(offset) +
               ", OldValueSize: " + std::to_string(old_value.size()) +
               ", NewValueSize: " + std::to_string(new_value.size());
    }
};

// 其他日志记录类型(Begin, Commit, Abort, CLR)相对简单,只需要头部信息或少量额外信息
// 例如:
class BeginLogRecord : public LogRecord {
public:
    BeginLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id)
        : LogRecord(lsn, prev_lsn, tx_id, LogRecordType::BEGIN, 0) {} // 无额外payload
    BeginLogRecord() : LogRecord(0, 0, 0, LogRecordType::BEGIN, 0) {}
};

class CommitLogRecord : public LogRecord {
public:
    CommitLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id)
        : LogRecord(lsn, prev_lsn, tx_id, LogRecordType::COMMIT, 0) {}
    CommitLogRecord() : LogRecord(0, 0, 0, LogRecordType::COMMIT, 0) {}
};

class AbortLogRecord : public LogRecord {
public:
    AbortLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id)
        : LogRecord(lsn, prev_lsn, tx_id, LogRecordType::ABORT, 0) {}
    AbortLogRecord() : LogRecord(0, 0, 0, LogRecordType::ABORT, 0) {}
};

// Compensation Log Record (CLR): 用于UNDO操作,记录UNDO操作本身
class CLRLogRecord : public LogRecord {
public:
    PageID page_id;
    uint32_t offset;
    std::vector<char> old_value; // CLR的old_value是它所undo的update的new_value
    LSN undo_next_lsn; // 指向下一个需要undo的日志记录的LSN

    CLRLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id, PageID page_id, uint32_t offset,
                 const std::vector<char>& old_val, LSN undo_next_lsn)
        : LogRecord(lsn, prev_lsn, tx_id, LogRecordType::CLR,
                    sizeof(PageID) + sizeof(uint32_t) + old_val.size() + sizeof(LSN)),
          page_id(page_id), offset(offset), old_value(old_val), undo_next_lsn(undo_next_lsn) {}

    CLRLogRecord() : LogRecord(0, 0, 0, LogRecordType::CLR, 0), page_id(0), offset(0), undo_next_lsn(0) {}

    std::vector<char> serialize() const override {
        size_t total_size = sizeof(LogRecordHeader) + sizeof(PageID) + sizeof(uint32_t) + old_value.size() + sizeof(LSN);
        std::vector<char> data(total_size);
        char* ptr = data.data();

        memcpy(ptr, &header, sizeof(LogRecordHeader));
        ptr += sizeof(LogRecordHeader);

        memcpy(ptr, &page_id, sizeof(PageID));
        ptr += sizeof(PageID);

        memcpy(ptr, &offset, sizeof(uint32_t));
        ptr += sizeof(uint32_t);

        memcpy(ptr, old_value.data(), old_value.size());
        ptr += old_value.size();

        memcpy(ptr, &undo_next_lsn, sizeof(LSN));

        return data;
    }

    void deserialize(const char* data) override {
        const char* ptr = data;
        memcpy(&header, ptr, sizeof(LogRecordHeader));
        ptr += sizeof(LogRecordHeader);

        memcpy(&page_id, ptr, sizeof(PageID));
        ptr += sizeof(PageID);

        memcpy(&offset, ptr, sizeof(uint32_t));
        ptr += sizeof(uint32_t);

        old_value.resize(header.payload_size - sizeof(PageID) - sizeof(uint32_t) - sizeof(LSN));
        memcpy(old_value.data(), ptr, old_value.size());
        ptr += old_value.size();

        memcpy(&undo_next_lsn, ptr, sizeof(LSN));
    }

    std::string toString() const override {
        return LogRecord::toString() +
               ", PageID: " + std::to_string(page_id) +
               ", Offset: " + std::to_string(offset) +
               ", OldValueSize: " + std::to_string(old_value.size()) +
               ", UndoNextLSN: " + std::to_string(undo_next_lsn);
    }
};

// 辅助函数:根据类型反序列化完整的LogRecord
std::unique_ptr<LogRecord> deserialize_log_record(const char* data) {
    LogRecordHeader temp_header;
    memcpy(&temp_header, data, sizeof(LogRecordHeader));

    std::unique_ptr<LogRecord> record;
    switch (temp_header.type) {
        case LogRecordType::BEGIN:
            record = std::make_unique<BeginLogRecord>();
            break;
        case LogRecordType::COMMIT:
            record = std::make_unique<CommitLogRecord>();
            break;
        case LogRecordType::ABORT:
            record = std::make_unique<AbortLogRecord>();
            break;
        case LogRecordType::UPDATE:
            record = std::make_unique<UpdateLogRecord>();
            break;
        case LogRecordType::CLR:
            record = std::make_unique<CLRLogRecord>();
            break;
        default:
            return nullptr; // 或者抛出异常
    }
    record->deserialize(data);
    return record;
}

3.1.2 Page:数据页

数据页是存储引擎中数据存储和管理的最小单位。

// page.h
class Page {
public:
    PageID page_id_;
    LSN page_lsn_; // LSN of the last log record that modified this page
    char data_[PAGE_SIZE];
    int pin_count_; // 引用计数,表示有多少线程正在使用此页
    bool is_dirty_; // 标记此页是否被修改过,需要写回磁盘
    std::mutex page_latch_; // 保护页内容的并发访问(短时间持有)

    Page() : page_id_(0), page_lsn_(0), pin_count_(0), is_dirty_(false) {
        std::fill(data_, data_ + PAGE_SIZE, 0); // 初始化数据为0
    }

    // 重置页状态,用于从BufferPool中回收
    void reset() {
        page_id_ = 0;
        page_lsn_ = 0;
        pin_count_ = 0;
        is_dirty_ = false;
        std::fill(data_, data_ + PAGE_SIZE, 0);
    }

    // 读写数据接口
    void read_data(uint32_t offset, uint32_t length, std::vector<char>& result) {
        if (offset + length > PAGE_SIZE) {
            throw std::runtime_error("Read out of page bounds");
        }
        result.resize(length);
        std::memcpy(result.data(), data_ + offset, length);
    }

    void write_data(uint32_t offset, const std::vector<char>& value) {
        if (offset + value.size() > PAGE_SIZE) {
            throw std::runtime_error("Write out of page bounds");
        }
        std::memcpy(data_ + offset, value.data(), value.size());
        is_dirty_ = true;
    }
};

3.1.3 Transaction:事务

// transaction.h
enum class TransactionStatus {
    RUNNING,
    COMMITTED,
    ABORTED
};

class Transaction {
public:
    TxID tx_id_;
    TransactionStatus status_;
    LSN prev_lsn_; // LSN of the last log record written by this transaction

    // 假设这里可以存储事务持有的锁,但在本次WAL主题中简化,不深入锁实现
    // std::vector<std::pair<PageID, LockMode>> held_locks_; 

    Transaction(TxID id) : tx_id_(id), status_(TransactionStatus::RUNNING), prev_lsn_(0) {}

    // 获取事务状态
    TransactionStatus get_status() const { return status_; }
    // 设置事务状态
    void set_status(TransactionStatus status) { status_ = status; }
    // 获取事务ID
    TxID get_tx_id() const { return tx_id_; }
    // 获取前一个LSN
    LSN get_prev_lsn() const { return prev_lsn_; }
    // 设置前一个LSN
    void set_prev_lsn(LSN lsn) { prev_lsn_ = lsn; }
};

3.2 核心组件实现

3.2.1 DiskManager:磁盘管理器

负责与物理磁盘进行交互,读写数据文件和日志文件。这里使用std::fstream进行文件操作,实际高性能系统中可能采用mmap或异步I/O。

// disk_manager.h
class DiskManager {
private:
    std::fstream data_file_;
    std::fstream log_file_;
    std::mutex data_file_mutex_; // 保护数据文件访问
    std::mutex log_file_mutex_;  // 保护日志文件访问
    PageID next_page_id_;        // 用于分配新的PageID

public:
    DiskManager(const std::string& data_filepath, const std::string& log_filepath) {
        // 打开数据文件,如果不存在则创建
        data_file_.open(data_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::app);
        if (!data_file_.is_open()) {
            data_file_.clear(); // 清除错误标志
            data_file_.open(data_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::trunc); // 创建新文件
            if (!data_file_.is_open()) {
                throw std::runtime_error("Failed to open or create data file: " + data_filepath);
            }
        }
        // 计算下一个可用的PageID
        data_file_.seekg(0, std::ios::end);
        size_t file_size = data_file_.tellg();
        next_page_id_ = file_size / PAGE_SIZE;

        // 打开日志文件,如果不存在则创建
        log_file_.open(log_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::app);
        if (!log_file_.is_open()) {
            log_file_.clear();
            log_file_.open(log_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::trunc);
            if (!log_file_.is_open()) {
                throw std::runtime_error("Failed to open or create log file: " + log_filepath);
            }
        }
        // 确保文件指针在末尾,以便追加写入
        log_file_.seekp(0, std::ios::end);
    }

    ~DiskManager() {
        if (data_file_.is_open()) {
            data_file_.close();
        }
        if (log_file_.is_open()) {
            log_file_.close();
        }
    }

    // 读取数据页
    void read_page(PageID page_id, char* data) {
        std::lock_guard<std::mutex> lock(data_file_mutex_);
        std::streampos offset = static_cast<std::streampos>(page_id) * PAGE_SIZE;
        data_file_.seekg(offset);
        if (!data_file_.read(data, PAGE_SIZE)) {
            // 如果读取失败,可能文件末尾或页不存在,填充0
            std::fill(data, data + PAGE_SIZE, 0);
            data_file_.clear(); // 清除eof或fail标志
        }
    }

    // 写入数据页
    void write_page(PageID page_id, const char* data) {
        std::lock_guard<std::mutex> lock(data_file_mutex_);
        std::streampos offset = static_cast<std::streampos>(page_id) * PAGE_SIZE;
        data_file_.seekp(offset);
        if (!data_file_.write(data, PAGE_SIZE)) {
            throw std::runtime_error("Failed to write data page " + std::to_string(page_id));
        }
        data_file_.flush(); // 确保写入磁盘
    }

    // 分配一个新的数据页ID
    PageID allocate_page() {
        std::lock_guard<std::mutex> lock(data_file_mutex_);
        PageID allocated_page_id = next_page_id_++;
        // 预写入一个空页,确保文件大小增长
        char zero_page[PAGE_SIZE] = {0};
        write_page(allocated_page_id, zero_page);
        return allocated_page_id;
    }

    // 追加日志记录
    void append_log_record_data(const std::vector<char>& log_data) {
        std::lock_guard<std::mutex> lock(log_file_mutex_);
        log_file_.write(log_data.data(), log_data.size());
        if (!log_file_) {
            throw std::runtime_error("Failed to append log record to log file.");
        }
    }

    // 强制日志数据写入磁盘 (fsync)
    void flush_log() {
        std::lock_guard<std::mutex> lock(log_file_mutex_);
        log_file_.flush();
        // 实际的fsync操作需要平台特定的调用,例如`_commit` (Windows) 或 `fsync` (Unix)
        // std::fsync(log_file_.fd()); // 伪代码,需要实际实现
    }

    // 读取日志文件内容
    std::vector<char> read_log_file() {
        std::lock_guard<std::mutex> lock(log_file_mutex_);
        log_file_.seekg(0, std::ios::end);
        size_t size = log_file_.tellg();
        log_file_.seekg(0, std::ios::beg);

        std::vector<char> buffer(size);
        if (size > 0) {
            if (!log_file_.read(buffer.data(), size)) {
                throw std::runtime_error("Failed to read log file.");
            }
        }
        log_file_.clear(); // 清除EOF标志,以便后续写入
        log_file_.seekp(0, std::ios::end); // 恢复写指针到文件末尾
        return buffer;
    }

    // 获取日志文件当前大小
    size_t get_log_file_size() {
        std::lock_guard<std::mutex> lock(log_file_mutex_);
        log_file_.seekp(0, std::ios::end);
        return log_file_.tellp();
    }
};

关于fsync的说明std::fstream::flush()只保证数据从C++缓冲区写入操作系统缓冲区。要真正保证数据写入磁盘,需要调用操作系统的fsync(Unix-like)或_commit(Windows)函数。这通常需要获取底层文件句柄。在实际系统中,这是一个关键点。为了简化示例,我们暂时依赖fstream::flush,但在高性能和高可靠性场景下,必须使用真正的fsync

3.2.2 LogManager:日志管理器

负责生成、追加和持久化日志记录,并维护当前的LSN。

// log_manager.h
class LogManager {
private:
    DiskManager& disk_manager_;
    std::atomic<LSN> next_lsn_;     // 下一个可用的LSN
    std::atomic<LSN> flushed_lsn_;  // 已经持久化到磁盘的LSN
    std::mutex log_buffer_mutex_;   // 保护日志缓冲区
    std::vector<char> log_buffer_;  // 日志缓冲区,用于批量写入
    size_t buffer_offset_;          // 缓冲区当前写入位置
    const size_t buffer_capacity_ = 8 * PAGE_SIZE; // 日志缓冲区大小

    // 条件变量用于等待日志刷新
    std::condition_variable cv_flush_;

public:
    LogManager(DiskManager& dm) : disk_manager_(dm), next_lsn_(1), flushed_lsn_(0), buffer_offset_(0) {
        log_buffer_.resize(buffer_capacity_);
    }

    // 追加日志记录
    LSN append_log_record(const std::unique_ptr<LogRecord>& record) {
        std::lock_guard<std::mutex> lock(log_buffer_mutex_);
        LSN current_lsn = next_lsn_++;
        record->header.lsn = current_lsn;

        // 获取该事务的最新LSN,并更新prev_lsn (这里需要TransactionManager的协作,简化为0)
        // 实际中,TransactionManager会在事务对象中维护last_lsn
        // record->header.prev_lsn = transaction->get_prev_lsn();
        // transaction->set_prev_lsn(current_lsn);

        std::vector<char> record_data = record->serialize();
        size_t record_size = record_data.size();

        if (buffer_offset_ + record_size > buffer_capacity_) {
            // 缓冲区不足,先刷新缓冲区
            flush_buffer_and_wait();
        }

        // 写入到缓冲区
        memcpy(log_buffer_.data() + buffer_offset_, record_data.data(), record_size);
        buffer_offset_ += record_size;

        return current_lsn;
    }

    // 强制刷新日志到磁盘,直到指定的LSN
    void flush_log(LSN target_lsn) {
        std::unique_lock<std::mutex> lock(log_buffer_mutex_);
        // 如果目标LSN已经持久化,则直接返回
        if (flushed_lsn_ >= target_lsn) {
            return;
        }

        // 写入缓冲区内容到磁盘
        if (buffer_offset_ > 0) {
            std::vector<char> data_to_write(log_buffer_.begin(), log_buffer_.begin() + buffer_offset_);
            disk_manager_.append_log_record_data(data_to_write);
            disk_manager_.flush_log(); // 真正的fsync

            flushed_lsn_.store(next_lsn_ - 1); // 更新已刷新LSN为当前已写入的最新LSN
            buffer_offset_ = 0; // 清空缓冲区
        } else {
            // 缓冲区为空,但可能目标LSN比flushed_lsn大,说明有其他线程写入了但还没刷新
            // 此时应该等待其他线程刷新或检查是否需要空刷新
            // For simplicity, we assume an empty buffer means nothing needs to be flushed.
            // In a more complex system, this might involve a dedicated flusher thread.
        }

        cv_flush_.notify_all(); // 通知所有等待的线程
    }

    // 刷新缓冲区并等待其写入磁盘
    void flush_buffer_and_wait() {
        std::unique_lock<std::mutex> lock(log_buffer_mutex_);
        if (buffer_offset_ == 0) return; // 缓冲区已空

        std::vector<char> data_to_write(log_buffer_.begin(), log_buffer_.begin() + buffer_offset_);
        disk_manager_.append_log_record_data(data_to_write);
        disk_manager_.flush_log(); // 真正的fsync

        flushed_lsn_.store(next_lsn_ - 1); // 更新已刷新LSN
        buffer_offset_ = 0; // 清空缓冲区
        cv_flush_.notify_all(); // 通知所有等待的线程
    }

    // 获取日志文件的所有记录,用于恢复
    std::vector<std::unique_ptr<LogRecord>> read_all_log_records() {
        std::vector<std::unique_ptr<LogRecord>> records;
        std::vector<char> raw_log_data = disk_manager_.read_log_file();

        size_t current_offset = 0;
        while (current_offset < raw_log_data.size()) {
            if (raw_log_data.size() - current_offset < sizeof(LogRecordHeader)) {
                // 不完整的头部,可能是崩溃导致的
                break;
            }
            LogRecordHeader temp_header;
            memcpy(&temp_header, raw_log_data.data() + current_offset, sizeof(LogRecordHeader));

            if (temp_header.lsn == 0 || temp_header.type == LogRecordType::INVALID) {
                // 无效日志记录,可能是文件末尾的垃圾数据
                break;
            }

            size_t record_total_size = sizeof(LogRecordHeader) + temp_header.payload_size;
            if (current_offset + record_total_size > raw_log_data.size()) {
                // 不完整的日志记录
                break;
            }

            std::unique_ptr<LogRecord> record = deserialize_log_record(raw_log_data.data() + current_offset);
            if (record) {
                records.push_back(std::move(record));
            } else {
                // 无法反序列化,跳过或报错
                std::cerr << "Warning: Could not deserialize log record at offset " << current_offset << std::endl;
            }
            current_offset += record_total_size;
        }
        return records;
    }

    // 获取下一个LSN
    LSN get_next_lsn() const { return next_lsn_.load(); }
    // 获取已刷新LSN
    LSN get_flushed_lsn() const { return flushed_lsn_.load(); }
};

3.2.3 BufferPoolManager:缓冲区池管理器

管理内存中的数据页,实现了LRU(最近最少使用)淘汰策略。

// buffer_pool_manager.h
class BufferPoolManager {
private:
    DiskManager& disk_manager_;
    std::vector<Page> pages_; // 内存中的页面帧
    size_t pool_size_;        // 缓冲区池大小(页帧数量)

    // PageID -> FrameID 映射表,快速查找内存中的页
    std::unordered_map<PageID, FrameID> page_table_;
    // LRU 链表,用于页面淘汰策略
    std::list<FrameID> lru_list_;
    // LRU 链表迭代器映射,快速删除和移动
    std::unordered_map<FrameID, std::list<FrameID>::iterator> lru_map_;
    // 空闲页帧列表
    std::list<FrameID> free_list_;

    std::mutex buffer_mutex_; // 保护缓冲区池的元数据结构

public:
    BufferPoolManager(size_t pool_size, DiskManager& dm)
        : disk_manager_(dm), pool_size_(pool_size) {
        pages_.resize(pool_size_);
        for (FrameID i = 0; i < pool_size_; ++i) {
            free_list_.push_back(i); // 初始化所有帧为空闲
        }
    }

    // 获取一个页面,如果不在内存则从磁盘读取
    Page* fetch_page(PageID page_id) {
        std::lock_guard<std::mutex> lock(buffer_mutex_);

        // 1. 检查页面是否已在缓冲区中
        if (page_table_.count(page_id)) {
            FrameID frame_id = page_table_[page_id];
            // 移动到LRU链表头部(最近使用)
            lru_list_.erase(lru_map_[frame_id]);
            lru_list_.push_front(frame_id);
            lru_map_[frame_id] = lru_list_.begin();
            pages_[frame_id].pin_count_++;
            return &pages_[frame_id];
        }

        // 2. 页面不在缓冲区,需要从磁盘读取
        FrameID frame_id = -1;
        if (!free_list_.empty()) {
            // 2.1 从空闲列表获取一个帧
            frame_id = free_list_.front();
            free_list_.pop_front();
        } else {
            // 2.2 否则,执行LRU淘汰
            for (auto it = lru_list_.rbegin(); it != lru_list_.rend(); ++it) {
                if (pages_[*it].pin_count_ == 0) { // 找到一个未被固定的页
                    frame_id = *it;
                    // 如果被淘汰的页是脏页,写回磁盘
                    if (pages_[frame_id].is_dirty_) {
                        disk_manager_.write_page(pages_[frame_id].page_id_, pages_[frame_id].data_);
                    }
                    // 从page_table和LRU中移除
                    page_table_.erase(pages_[frame_id].page_id_);
                    lru_list_.erase(lru_map_[frame_id]);
                    lru_map_.erase(frame_id);
                    break;
                }
            }
            if (frame_id == -1) {
                throw std::runtime_error("No unpinned page available for eviction.");
            }
        }

        // 3. 将新页载入帧
        Page* page = &pages_[frame_id];
        page->reset(); // 重置页状态
        page->page_id_ = page_id;
        disk_manager_.read_page(page_id, page->data_); // 从磁盘读取数据
        page->pin_count_ = 1; // 页面被固定
        page_table_[page_id] = frame_id;
        lru_list_.push_front(frame_id);
        lru_map_[frame_id] = lru_list_.begin();

        return page;
    }

    // 解除页面固定
    void unpin_page(PageID page_id, bool is_dirty) {
        std::lock_guard<std::mutex> lock(buffer_mutex_);
        if (!page_table_.count(page_id)) {
            // 页面不在缓冲区
            return;
        }
        FrameID frame_id = page_table_[page_id];
        Page* page = &pages_[frame_id];
        page->is_dirty_ = page->is_dirty_ || is_dirty; // 更新脏页状态
        if (page->pin_count_ > 0) {
            page->pin_count_--;
        }
        // 如果pin_count归零,且它是LRU链表中未被固定的页,它现在有资格被淘汰
        // 不需要立即将其从LRU链表移除,LRU淘汰逻辑会处理
    }

    // 强制将指定页面刷新到磁盘
    void flush_page(PageID page_id) {
        std::lock_guard<std::mutex> lock(buffer_mutex_);
        if (!page_table_.count(page_id)) {
            // 页面不在缓冲区,可能已经被淘汰或从未加载
            return;
        }
        FrameID frame_id = page_table_[page_id];
        Page* page = &pages_[frame_id];
        if (page->is_dirty_) {
            disk_manager_.write_page(page->page_id_, page->data_);
            page->is_dirty_ = false;
        }
    }

    // 强制将所有脏页刷新到磁盘
    void flush_all_pages() {
        std::lock_guard<std::mutex> lock(buffer_mutex_);
        for (FrameID i = 0; i < pool_size_; ++i) {
            Page* page = &pages_[i];
            if (page->page_id_ != 0 && page->is_dirty_) { // 检查是否是有效页且是脏页
                disk_manager_.write_page(page->page_id_, page->data_);
                page->is_dirty_ = false;
            }
        }
    }

    // 创建一个新页面
    Page* new_page(PageID& new_page_id) {
        std::lock_guard<std::mutex> lock(buffer_mutex_);
        new_page_id = disk_manager_.allocate_page(); // 从磁盘管理器获取新页ID

        // 逻辑与fetch_page类似,只是不需要从磁盘读取
        FrameID frame_id = -1;
        if (!free_list_.empty()) {
            frame_id = free_list_.front();
            free_list_.pop_front();
        } else {
            for (auto it = lru_list_.rbegin(); it != lru_list_.rend(); ++it) {
                if (pages_[*it].pin_count_ == 0) {
                    frame_id = *it;
                    if (pages_[frame_id].is_dirty_) {
                        disk_manager_.write_page(pages_[frame_id].page_id_, pages_[frame_id].data_);
                    }
                    page_table_.erase(pages_[frame_id].page_id_);
                    lru_list_.erase(lru_map_[frame_id]);
                    lru_map_.erase(frame_id);
                    break;
                }
            }
            if (frame_id == -1) {
                throw std::runtime_error("No unpinned page available for new page allocation.");
            }
        }

        Page* page = &pages_[frame_id];
        page->reset();
        page->page_id_ = new_page_id;
        page->pin_count_ = 1;
        page->is_dirty_ = true; // 新页默认是脏的,需要写回
        page_table_[new_page_id] = frame_id;
        lru_list_.push_front(frame_id);
        lru_map_[frame_id] = lru_list_.begin();

        return page;
    }
};

3.2.4 TransactionManager:事务管理器

管理事务的生命周期,包括事务的开始、提交和中止。它与LogManager协同工作,确保事务的ACID属性。

// transaction_manager.h
class TransactionManager {
private:
    std::atomic<TxID> next_tx_id_;
    std::unordered_map<TxID, Transaction> active_transactions_; // 活跃事务表
    std::mutex tx_table_mutex_; // 保护active_transactions_ map

    LogManager& log_manager_;
    BufferPoolManager& buffer_pool_manager_;

public:
    TransactionManager(LogManager& lm, BufferPoolManager& bpm)
        : next_tx_id_(1), log_manager_(lm), buffer_pool_manager_(bpm) {}

    // 开始一个新事务
    Transaction* begin_transaction() {
        TxID tx_id = next_tx_id_++;
        std::lock_guard<std::mutex> lock(tx_table_mutex_);
        active_transactions_.emplace(tx_id, Transaction(tx_id));
        Transaction* txn = &active_transactions_.at(tx_id);

        // 写入BEGIN日志记录
        std::unique_ptr<LogRecord> begin_log = std::make_unique<BeginLogRecord>(0, txn->get_prev_lsn(), tx_id);
        LSN current_lsn = log_manager_.append_log_record(begin_log);
        txn->set_prev_lsn(current_lsn); // 更新事务的last_lsn

        return txn;
    }

    // 提交事务
    void commit_transaction(Transaction* txn) {
        // 写入COMMIT日志记录
        std::unique_ptr<LogRecord> commit_log = std::make_unique<CommitLogRecord>(0, txn->get_prev_lsn(), txn->get_tx_id());
        LSN commit_lsn = log_manager_.append_log_record(commit_log);
        txn->set_prev_lsn(commit_lsn);

        // 强制刷新日志到磁盘 (Force-Log-at-Commit)
        log_manager_.flush_log(commit_lsn);

        // 更新事务状态
        std::lock_guard<std::mutex> lock(tx_table_mutex_);
        txn->set_status(TransactionStatus::COMMITTED);
        // 通常在这里解除所有锁,并从活跃事务表中移除,但为了简化,我们仅设置状态
        active_transactions_.erase(txn->get_tx_id());
    }

    // 中止事务
    void abort_transaction(Transaction* txn) {
        // 写入ABORT日志记录
        std::unique_ptr<LogRecord> abort_log = std::make_unique<AbortLogRecord>(0, txn->get_prev_lsn(), txn->get_tx_id());
        LSN abort_lsn = log_manager_.append_log_record(abort_log);
        txn->set_prev_lsn(abort_lsn);

        // 执行UNDO操作 (这里只是一个占位符,实际UNDO逻辑在RecoveryManager中)
        // 在线事务中止需要遍历该事务的所有日志记录并生成CLR
        // For simplicity, we just mark as aborted and remove from active transactions.

        // 更新事务状态
        std::lock_guard<std::mutex> lock(tx_table_mutex_);
        txn->set_status(TransactionStatus::ABORTED);
        active_transactions_.erase(txn->get_tx_id());
    }

    // 获取事务对象
    Transaction* get_transaction(TxID tx_id) {
        std::lock_guard<std::mutex> lock(tx_table_mutex_);
        if (active_transactions_.count(tx_id)) {
            return &active_transactions_.at(tx_id);
        }
        return nullptr;
    }

    // 获取所有活跃事务的快照 (用于恢复)
    std::unordered_map<TxID, Transaction> get_active_transactions_snapshot() {
        std::lock_guard<std::mutex> lock(tx_table_mutex_);
        return active_transactions_;
    }
};

3.2.5 RecoveryManager:恢复管理器

这是WAL机制中实现奔溃恢复的核心组件。它遵循ARIES(Algorithm for Recovery and Isolation Exploiting Semantics)恢复算法的三阶段:分析 (Analysis)重做 (Redo)回滚 (Undo)

// recovery_manager.h
struct ActiveTransactionEntry {
    LSN last_lsn;          // 事务写入的最后一个日志记录的LSN
    TransactionStatus status; // 事务状态 (RUNNING, COMMITTED, ABORTED)
    // 其他如锁信息等...
};

struct DirtyPageEntry {
    LSN rec_lsn; // LSN of the first log record that dirtied this page
};

class RecoveryManager {
private:
    LogManager& log_manager_;
    BufferPoolManager& buffer_pool_manager_;
    DiskManager& disk_manager_;
    TransactionManager& transaction_manager_;

    // Recovery Data Structures
    std::unordered_map<TxID, ActiveTransactionEntry> active_tx_table_; // 活跃事务表
    std::unordered_map<PageID, DirtyPageEntry> dirty_page_table_;     // 脏页表
    LSN last_checkpoint_lsn_; // 最后一个检查点的LSN (简化,本次不实现检查点,从头扫描)

public:
    RecoveryManager(LogManager& lm, BufferPoolManager& bpm, DiskManager& dm, TransactionManager& tm)
        : log_manager_(lm), buffer_pool_manager_(bpm), disk_manager_(dm), transaction_manager_(tm),
          last_checkpoint_lsn_(0) {}

    // 执行奔溃恢复
    void recover() {
        std::cout << "Starting recovery process..." << std::endl;
        std::vector<std::unique_ptr<LogRecord>> all_logs = log_manager_.read_all_log_records();
        if (all_logs.empty()) {
            std::cout << "No log records found. System clean." << std::endl;
            return;
        }

        // 1. Analysis Phase (分析阶段)
        // 扫描所有日志记录(或从最近的检查点开始),构建活跃事务表和脏页表。
        std::cout << "Recovery: Analysis Phase..." << std::endl;
        for (const auto& log : all_logs) {
            LSN current_lsn = log->header.lsn;
            TxID tx_id = log->header.tx_id;

            // 更新活跃事务表
            if (active_tx_table_.find(tx_id) == active_tx_table_.end()) {
                active_tx_table_[tx_id] = {current_lsn, TransactionStatus::RUNNING};
            } else {
                active_tx_table_[tx_id].last_lsn = current_lsn;
            }

            switch (log->header.type) {
                case LogRecordType::BEGIN:
                    active_tx_table_[tx_id].status = TransactionStatus::RUNNING;
                    break;
                case LogRecordType::COMMIT:
                    active_tx_table_[tx_id].status = TransactionStatus::COMMITTED;
                    break;
                case LogRecordType::ABORT:
                    active_tx_table_[tx_id].status = TransactionStatus::ABORTED;
                    break;
                case LogRecordType::UPDATE: {
                    UpdateLogRecord* update_log = static_cast<UpdateLogRecord*>(log.get());
                    // 将被修改的页面加入脏页表,记录其RecLSN
                    if (dirty_page_table_.find(update_log->page_id) == dirty_page_table_.end()) {
                        dirty_page_table_[update_log->page_id] = {current_lsn};
                    }
                    break;
                }
                case LogRecordType::CLR: {
                    CLRLogRecord* clr_log = static_cast<CLRLogRecord*>(log.get());
                     if (dirty_page_table_.find(clr_log->page_id) == dirty_page_table_.end()) {
                        dirty_page_table_[clr_log->page_id] = {current_lsn};
                    }
                    break;
                }
                default:
                    break;
            }
        }

        // 从活跃事务表中移除已提交或已中止的事务
        auto it = active_tx_table_.begin();
        while (it != active_tx_table_.end()) {
            if (it->second.status == TransactionStatus::COMMITTED || it->second.status == TransactionStatus::ABORTED) {
                it = active_tx_table_.erase(it);
            } else {
                ++it;
            }
        }
        std::cout << "Analysis Phase completed. Active transactions: " << active_tx_table_.size() << std::endl;
        std::cout << "Dirty pages: " << dirty_page_table_.size() << std::endl;

        // 2. Redo Phase (重做阶段)
        // 从最小的RecLSN(或检查点)开始,正向扫描日志,重做所有已记录的操作。
        // 目标是让所有数据页恢复到崩溃前的最新状态。
        std::cout << "Recovery: Redo Phase..." << std::endl;
        LSN redo_start_lsn = 0; // 简化,从头开始
        // 实际应为 min(所有dirty_page_table_[page_id].rec_lsn, last_checkpoint_lsn_)

        for (const auto& log : all_logs) {
            if (log->header.lsn < redo_start_lsn) continue; // 跳过早于重做起始点的日志

            // 只有当页面不在DPT中,或者页面的RecLSN小于等于当前日志记录的LSN时,才重做
            // 并且,只有当页面上记录的LSN小于日志记录的LSN时才重做
            if (log->header.type == LogRecordType::UPDATE) {
                UpdateLogRecord* update_log = static_cast<UpdateLogRecord*>(log.get());
                Page* page = buffer_pool_manager_.fetch_page(update_log->page_id);
                // 确保页面被固定,并且加锁
                std::lock_guard<std::mutex> page_lock(page->page_latch_);

                if (page->page_lsn_ < update_log->header.lsn) {
                    // 只有当页面上的LSN小于日志记录的LSN时,才重做该操作
                    // 这避免了重复应用已经存在的修改
                    page->write_data(update_log->offset, update_log->new_value);
                    page->page_lsn_ = update_log->header.lsn; // 更新页面LSN
                }
                buffer_pool_manager_.unpin_page(update_log->page_id, true); // 标记为脏页
            } else if (log->header.type == LogRecordType::CLR) {
                 CLRLogRecord* clr_log = static_cast<CLRLogRecord*>(log.get());
                Page* page = buffer_pool_manager_.fetch_page(clr_log->page_id);
                std::lock_guard<std::mutex> page_lock(page->page_latch_);

                if (page->page_lsn_ < clr_log->header.lsn) {
                    // CLR的"old_value"实际上是它所撤销的UPDATE的"new_value"
                    // 这里我们重做CLR,即把数据恢复到CLR的old_value
                    page->write_data(clr_log->offset, clr_log->old_value);
                    page->page_lsn_ = clr_log->header.lsn;
                }
                buffer_pool_manager_.unpin_page(clr_log->page_id, true);
            }
        }
        std::cout << "Redo Phase completed." << std::endl;

        // 3. Undo Phase (回滚阶段)
        // 逆序扫描活跃事务的日志链,回滚所有未提交事务的操作。
        // 对于每个回滚操作,生成一个CLR(Compensation Log Record)。
        std::cout << "Recovery: Undo Phase..." << std::endl;
        std::list<LSN> undo_list; // 存储需要undo的LSN,按降序排列
        for (const auto& entry : active_tx_table_) {
            undo_list.push_back(entry.second.last_lsn);
        }
        undo_list.sort(std::greater<LSN>()); // 降序排列

        while (!undo_list.empty()) {
            LSN current_undo_lsn = undo_list.front();
            undo_list.pop_front();

            // 查找对应的日志记录
            std::unique_ptr<LogRecord> log_to_undo;
            for (const auto& log : all_logs) {
                if (log->header.lsn == current_undo_lsn) {
                    log_to_undo = deserialize_log_record(log->serialize().data()); // 拷贝一份
                    break;
                }
            }
            if (!log_to_undo) {
                // 找不到日志记录,可能日志文件损坏或逻辑错误
                std::cerr << "Error: Log record " << current_undo_lsn << " not found for undo." << std::endl;
                continue;
            }

            // 对于UPDATE记录,执行UNDO操作
            if (log_to_undo->header.type == LogRecordType::UPDATE) {
                UpdateLogRecord* update_log = static_cast<UpdateLogRecord*>(log_to_undo.get());
                Page* page = buffer_pool_manager_.fetch_page(update_log->page_id);
                std::lock_guard<std::mutex> page_lock(page->page_latch_); // 加锁

                // 只有当页面上的LSN等于日志记录的LSN时,才执行UNDO
                // 避免UNDO已经被覆盖的修改
                if (page->page_lsn_ == update_log->header.lsn) {
                    page->write_data(update_log->offset, update_log->old_value); // 恢复旧值
                    page->page_lsn_ = log_manager_.get_next_lsn(); // 更新页面LSN为CLR的LSN
                }
                buffer_pool_manager_.unpin_page(update_log->page_id, true); // 标记为脏页

                // 生成CLR (Compensation Log Record)
                // CLR的prev_lsn指向被undo的日志记录的prev_lsn
                // CLR的undo_next_lsn指向被undo的日志记录的prev_lsn
                std::unique_ptr<CLRLogRecord> clr_log = std::make_unique<CLRLogRecord>(
                    0, update_log->header.prev_lsn, update_log->header.tx_id,
                    update_log->page_id, update_log->offset, update_log->old_value,
                    update_log->header.prev_lsn // 指向下一个需要undo的LSN
                );
                LSN clr_lsn = log_manager_.append_log_record(clr_log);
                log_manager_.flush_log(clr_lsn); // CLR也需要持久化

                // 将CLR的undo_next_lsn加入到undo_list中,以便继续回滚该事务的下一个操作
                if (update_log->header.prev_lsn != 0) {
                    undo_list.push_back(update_log->header.prev_lsn);
                    undo_list.sort(std::greater<LSN>()); // 重新排序
                }
            } else if (log_to_undo->header.type == LogRecordType::CLR) {
                // 如果遇到CLR,则按照CLR的undo_next_lsn继续回滚
                CLRLogRecord* clr_log = static_cast<CLRLogRecord*>(log_to_undo.get());
                if (clr_log->undo_next_lsn != 0) {
                    undo_list.push_back(clr_log->undo_next_lsn);
                    undo_list.sort(std::greater<LSN>());
                }
            } else if (log_to_undo->header.type == LogRecordType::BEGIN) {
                // 遇到BEGIN日志记录,说明该事务的所有操作都已回滚
                // 写入ABORT日志记录
                std::unique_ptr<LogRecord> abort_log = std::make_unique<AbortLogRecord>(
                    0, log_to_undo->header.lsn, log_to_undo->header.tx_id);
                LSN abort_lsn = log_manager_.append_log_record(abort_log);
                log_manager_.flush_log(abort_lsn);
                // 从活跃事务表中移除
                active_tx_table_.erase(log_to_undo->header.tx_id);
            }
        }
        std::cout << "Undo Phase completed." << std::endl;
        std::cout << "Recovery process finished. System ready." << std::endl;
    }

    // 创建检查点 (简化实现,仅刷新所有脏页和日志)
    void create_checkpoint() {
        std::cout << "Creating checkpoint..." << std::endl;
        // 1. 停止所有新事务的开始 (简化,实际更复杂)
        // 2. 刷新所有脏页到磁盘
        buffer_pool_manager_.flush_all_pages();
        // 3. 强制刷新所有日志到磁盘
        log_manager_.flush_log(log_manager_.get_next_lsn() - 1);
        // 4. 写入一个Checkpoint Begin Record
        // 5. 写入一个Checkpoint End Record,包含DPT和ATT信息
        // 更新 last_checkpoint_lsn_
        last_checkpoint_lsn_ = log_manager_.get_flushed_lsn();
        std::cout << "Checkpoint created. Last checkpoint LSN: " << last_checkpoint_lsn_ << std::endl;
    }
};

3.2.6 StorageEngine:存储引擎

作为对外接口,协调各个组件完成数据操作。

// storage_engine.h
class StorageEngine {
private:
    DiskManager disk_manager_;
    BufferPoolManager buffer_pool_manager_;
    LogManager log_manager_;
    TransactionManager transaction_manager_;
    RecoveryManager recovery_manager_;

public:
    StorageEngine(const std::string& data_filepath, const std::string& log_filepath, size_t buffer_pool_size)
        : disk_manager_(data_filepath, log_filepath),
          buffer_pool_manager_(buffer_pool_size, disk_manager_),
          log_manager_(disk_manager_),
          transaction_manager_(log_manager_, buffer_pool_manager_),
          recovery_manager_(log_manager_, buffer_pool_manager_, disk_manager_, transaction_manager_) {}

    void start() {
        // 在系统启动时执行恢复
        recovery_manager_.recover();
    }

    void shutdown() {
        // 在关闭时创建检查点并刷新所有脏页和日志
        recovery_manager_.create_checkpoint();
        buffer_pool_manager_.flush_all_pages();
        // 确保所有日志都已持久化
        log_manager_.flush_log(log_manager_.get_next_lsn() - 1);
        std::cout << "Storage Engine shutdown complete." << std::endl;
    }

    // 示例:高层PUT操作
    void put(Transaction* txn, PageID page_id, uint32_t offset, const std::vector<char>& value) {
        Page* page = buffer_pool_manager_.fetch_page(page_id);
        std::lock_guard<std::mutex> page_lock(page->page_latch_); // 锁住页面

        // 获取旧值
        std::vector<char> old_value;
        page->read_data(offset, value.size(), old_value);

        // 写入UPDATE日志记录
        std::unique_ptr<LogRecord> update_log = std::make_unique<UpdateLogRecord>(
            0, txn->get_prev_lsn(), txn->get_tx_id(), page_id, offset, old_value, value);
        LSN current_lsn = log_manager_.append_log_record(update_log);
        txn->set_prev_lsn(current_lsn); // 更新事务的last_lsn

        // 应用修改到内存页
        page->write_data(offset, value);
        page->page_lsn_ = current_lsn; // 更新页面LSN

        // 解除页面固定,标记为脏页
        buffer_pool_manager_.unpin_page(page_id, true);
    }

    // 示例:高层GET操作
    std::vector<char> get(Transaction* txn, PageID page_id, uint32_t offset, uint32_t length) {
        Page* page = buffer_pool_manager_.fetch_page(page_id);
        std::lock_guard<std::mutex> page_lock(page->page_latch_); // 锁住页面

        std::vector<char> result;
        page->read_data(offset, length, result);

        // 解除页面固定
        buffer_pool_manager_.unpin_page(page_id, false);
        return result;
    }

    // 示例:分配新页
    PageID new_page(Transaction* txn) {
        PageID new_id;
        Page* page = buffer_pool_manager_.new_page(new_id);
        // 新页的创建也应该被记录,但此处简化
        // LogRecord new_page_log(...)
        // log_manager_.append_log_record(new_page_log);
        // txn->set_prev_lsn(new_page_log.lsn);
        buffer_pool_manager_.unpin_page(new_id, true); // 新页是脏的
        return new_id;
    }

    // 事务接口
    Transaction* begin_transaction() {
        return transaction_manager_.begin_transaction();
    }

    void commit_transaction(Transaction* txn) {
        transaction_manager_.commit_transaction(txn);
    }

    void abort_transaction(Transaction* txn) {
        transaction_manager_.abort_transaction(txn);
    }
};

3.3 示例使用


#include "storage_engine.h"
#include <thread>
#include <filesystem>

void simulate_crash_and_recovery(const std::string& data_file, const std::string& log_file) {
    std::cout << "n--- SIMULATION START: First Run ---" << std::endl;
    {
        StorageEngine engine(data_file, log_file, 10); // 10页缓冲区
        engine.start(); // 恢复 (第一次运行无日志,直接启动)

        Transaction* txn1 = engine.begin_transaction();
        PageID page0 = engine.new_page(txn1); // 分配页ID 0
        PageID page1 = engine.new_page(txn1); // 分配页ID 1

        std::cout << "Txn " << txn1->get_tx_id() << ": Writing 'Hello WAL' to Page " << page0 << " Offset 0" << std::endl;
        std::vector<char> value1 = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'A', 'L'};
        engine.put(txn1, page0, 0, value1);

        std::cout << "Txn " << txn1->get_tx_id() << ": Writing 'Database' to Page " << page1 << " Offset 0" << std::endl;
        std::vector<char> value2 = {'D', 'a', 't', 'a', 'b', 'a', 's', 'e'};
        engine.put(txn1, page1, 0, value2);

        engine.commit_transaction(txn1);
        std::cout << "Txn " << txn1->get_tx_id() << " committed." << std::endl;

        // 模拟一个未提交事务
        Transaction* txn2 = engine.begin_transaction();
        std::cout << "Txn " << txn2->get_tx_id() << ": Writing 'Uncommitted' to Page " << page0 << " Offset 10" << std::endl;
        std::vector<char> value_uncommitted = {'U', 'n', 'c', 'o', 'm', 'm', 'i', 't', 't', 'e', 'd'};
        engine.put(txn2, page0,

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注