各位专家、工程师们,欢迎来到今天的讲座。我们将深入探讨一个在构建高可靠、高性能持久化存储系统时至关重要的技术——Write-Ahead Logging (WAL)。我将以一名编程专家的视角,不仅阐述WAL的理论基础,更将带领大家逐步解构并利用C++实现一个具备奔溃恢复能力的持久化存储系统,探讨其内部机制与性能优化策略。
1. 持久化存储的挑战与WAL的诞生
在现代信息系统中,数据是核心资产。我们构建的各种应用,从简单的配置存储到复杂的事务型数据库,无一例外都需要将数据安全地保存下来,即使系统遭遇电源故障、软件崩溃等意外情况,数据也必须能够恢复到一致状态。这就是持久化存储的核心需求。
然而,实现一个高性能、高可靠的持久化存储系统面临着诸多挑战:
- 奔溃恢复 (Crash Recovery):这是最关键的需求。系统可能在任何时候崩溃,例如断电。崩溃后,系统必须能够将数据恢复到一个有效且一致的状态,所有已提交的事务必须持久化,所有未提交的事务必须被回滚。
- 原子性 (Atomicity):事务的原子性要求一个事务中的所有操作要么全部成功,要么全部失败。没有中间状态。例如,银行转账操作,包括扣款和入账,必须作为一个不可分割的整体。
- 持久性 (Durability):一旦事务被提交,其所做的更改必须永久保存,即使系统崩溃也依然存在。
- 并发性 (Concurrency):多个事务可能同时读写数据。系统需要确保并发操作不会导致数据损坏或不一致。
- 性能 (Performance):数据读写操作需要尽可能快,尤其是在高并发负载下。
传统的"in-place update"(就地更新)策略,即直接在数据文件中修改数据,难以同时满足原子性、持久性和高性能的要求。例如,如果一个事务修改了多个数据页,在所有修改都写入磁盘之前发生崩溃,那么磁盘上的数据将处于不一致状态,且无法判断哪些修改是已提交事务的一部分,哪些是未提交事务的一部分。
为了解决这些问题,数据库领域引入了Write-Ahead Logging (WAL)机制。WAL的核心思想是:任何数据修改都必须首先记录在日志中,并且这些日志记录必须在数据页被写入磁盘之前,先被持久化到稳定的存储介质上。
这个简单而强大的原则,为我们构建高可靠的持久化存储系统奠定了基石。
2. Write-Ahead Logging (WAL) 的核心原理
WAL机制的核心在于“先写日志,后写数据”。它将数据修改操作分解为两个阶段:
- 记录日志 (Logging):将数据修改的意图和内容(包括旧值和新值)以日志记录的形式写入一个专门的日志文件(通常是顺序写入)。
- 应用修改 (Applying Changes):将日志中记录的修改实际应用到内存中的数据页(缓冲区),这些数据页随后会异步或批量地写入磁盘。
WAL遵循两个关键规则:
- WAL Rule (Redo Rule):一个事务的任何数据页修改,其对应的日志记录必须在数据页本身被写入磁盘之前,先被写入日志文件并持久化(
fsync)。这意味着,如果系统崩溃,只要日志记录已持久化,我们就能通过重放日志来恢复数据。 - Undo Rule (Force-Log-at-Commit):在事务提交时,所有与该事务相关的日志记录(包括事务开始、数据修改和事务提交记录)都必须被强制写入日志文件并持久化,才能向客户端确认事务已成功提交。这确保了已提交事务的持久性。
2.1 WAL如何保证ACID属性?
WAL是实现ACID(Atomicity, Consistency, Isolation, Durability)属性的关键机制之一。
- 原子性 (Atomicity):
- 如果一个事务在提交前崩溃,WAL日志中会留下该事务的修改记录,但没有提交记录。在恢复过程中,系统可以通过回溯日志(Undo)来撤销这些未提交的修改,将数据恢复到事务开始前的状态。
- 持久性 (Durability):
- 一旦事务提交,其提交记录以及所有相关修改日志都已写入并持久化到日志文件。即使系统在数据页写入磁盘之前崩溃,恢复时,系统可以重放(Redo)这些日志记录,将数据恢复到事务提交后的状态。
- 一致性 (Consistency):
- WAL本身不直接保证业务逻辑上的一致性,但它为原子性和持久性提供了基础,确保了在系统层面数据状态的逻辑转换是完整的。事务管理器结合WAL,通过隔离级别和约束条件来维护一致性。
- 隔离性 (Isolation):
- WAL通常与并发控制机制(如两阶段锁2PL、多版本并发控制MVCC)协同工作。日志记录可以包含锁信息或版本信息,以支持并发事务的正确隔离。WAL日志本身是串行写入的,这简化了并发控制在日志层面的实现。
2.2 WAL带来的性能优势
WAL不仅提升了可靠性,也带来了显著的性能优势:
- 顺序写入 (Sequential Writes):日志文件通常是顺序追加写入的。顺序写入比随机写入快得多,因为它们更好地利用了磁盘的机械特性(对于HDD)或闪存的内部机制(对于SSD)。而数据文件上的修改可能是随机的。
- 减少磁盘I/O (Reduced Disk I/O):
- 数据页的修改首先发生在内存中的缓冲区。只有当缓冲区中的脏页需要被驱逐(evict)或系统执行检查点(checkpoint)时,才需要将它们写入磁盘。这减少了对数据文件的随机写入次数。
- 多个逻辑修改可以合并为一次物理写入。
- 组提交 (Group Commit):多个并发事务的提交请求可以被批量处理。系统可以将它们的提交日志记录一次性写入日志文件并进行一次
fsync操作,而不是每个事务都独立执行一次fsync,从而显著降低fsync的开销。 - 后台写入 (Background Writes):脏数据页的写入可以异步在后台进行,不会阻塞前端的事务处理。
3. C++ 实现具备奔溃恢复能力的持久化存储系统架构
我们将设计一个简化但功能完备的持久化存储系统,它将包含以下核心组件:
- LogRecord (日志记录):定义不同类型的日志记录结构。
- LogManager (日志管理器):负责日志的写入、读取和持久化。
- Page (数据页):存储实际数据的基本单元。
- DiskManager (磁盘管理器):负责数据文件和日志文件的底层物理读写。
- BufferPoolManager (缓冲区池管理器):管理内存中的数据页,作为磁盘与CPU之间的缓存。
- Transaction (事务):定义事务的生命周期和状态。
- TransactionManager (事务管理器):管理事务的开始、提交和中止。
- RecoveryManager (恢复管理器):负责系统启动时的奔溃恢复。
- StorageEngine (存储引擎):整合所有组件,提供高层接口。
我们将使用C++标准库,并利用其提供的文件I/O、多线程和同步原语。
3.1 核心数据结构设计
3.1.1 LogRecord:日志记录
日志记录是WAL机制的基石。每当数据发生变化或事务状态发生改变,都会生成一个对应的日志记录。
// common.h
#pragma once
#include <cstdint>
#include <vector>
#include <string>
#include <memory>
#include <atomic>
#include <mutex>
#include <fstream>
#include <unordered_map>
#include <list>
#include <iostream>
#include <chrono>
// 定义类型别名
using LSN = uint64_t; // Log Sequence Number
using TxID = uint64_t; // Transaction ID
using PageID = uint64_t; // Page ID
using FrameID = uint32_t; // Buffer Pool Frame ID
const size_t PAGE_SIZE = 4096; // 数据页大小
// LogRecordType: 日志记录类型
enum class LogRecordType : uint8_t {
INVALID = 0,
BEGIN,
COMMIT,
ABORT,
UPDATE,
CLR // Compensation Log Record for UNDO operations
};
// LogRecordHeader: 所有日志记录的通用头部
struct LogRecordHeader {
LSN lsn; // 当前日志记录的LSN
LSN prev_lsn; // 同一事务前一个日志记录的LSN,用于构建事务日志链
TxID tx_id; // 事务ID
LogRecordType type; // 日志记录类型
uint32_t payload_size; // 实际数据(payload)的大小,不包括头部
// 默认构造函数
LogRecordHeader() : lsn(0), prev_lsn(0), tx_id(0), type(LogRecordType::INVALID), payload_size(0) {}
};
// LogRecord:基类,所有具体日志记录的父类
class LogRecord {
public:
LogRecordHeader header;
LogRecord(LSN lsn = 0, LSN prev_lsn = 0, TxID tx_id = 0, LogRecordType type = LogRecordType::INVALID, uint32_t payload_size = 0)
: header({lsn, prev_lsn, tx_id, type, payload_size}) {}
virtual ~LogRecord() = default;
// 序列化日志记录到字节流
virtual std::vector<char> serialize() const {
std::vector<char> data(sizeof(LogRecordHeader));
memcpy(data.data(), &header, sizeof(LogRecordHeader));
return data;
}
// 从字节流反序列化日志记录(基类只处理头部)
virtual void deserialize(const char* data) {
memcpy(&header, data, sizeof(LogRecordHeader));
}
virtual std::string toString() const {
return "Type: " + std::to_string(static_cast<int>(header.type)) +
", TxID: " + std::to_string(header.tx_id) +
", LSN: " + std::to_string(header.lsn) +
", PrevLSN: " + std::to_string(header.prev_lsn);
}
};
// UpdateLogRecord:数据更新日志记录
class UpdateLogRecord : public LogRecord {
public:
PageID page_id;
uint32_t offset;
std::vector<char> old_value;
std::vector<char> new_value;
UpdateLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id, PageID page_id, uint32_t offset,
const std::vector<char>& old_val, const std::vector<char>& new_val)
: LogRecord(lsn, prev_lsn, tx_id, LogRecordType::UPDATE,
sizeof(PageID) + sizeof(uint32_t) + old_val.size() + new_val.size()),
page_id(page_id), offset(offset), old_value(old_val), new_value(new_val) {}
// 默认构造函数
UpdateLogRecord() : LogRecord(0, 0, 0, LogRecordType::UPDATE, 0), page_id(0), offset(0) {}
std::vector<char> serialize() const override {
size_t total_size = sizeof(LogRecordHeader) + sizeof(PageID) + sizeof(uint32_t) + old_value.size() + new_value.size();
std::vector<char> data(total_size);
char* ptr = data.data();
memcpy(ptr, &header, sizeof(LogRecordHeader));
ptr += sizeof(LogRecordHeader);
memcpy(ptr, &page_id, sizeof(PageID));
ptr += sizeof(PageID);
memcpy(ptr, &offset, sizeof(uint32_t));
ptr += sizeof(uint32_t);
memcpy(ptr, old_value.data(), old_value.size());
ptr += old_value.size();
memcpy(ptr, new_value.data(), new_value.size());
return data;
}
void deserialize(const char* data) override {
const char* ptr = data;
memcpy(&header, ptr, sizeof(LogRecordHeader));
ptr += sizeof(LogRecordHeader);
memcpy(&page_id, ptr, sizeof(PageID));
ptr += sizeof(PageID);
memcpy(&offset, ptr, sizeof(uint32_t));
ptr += sizeof(uint32_t);
old_value.resize(header.payload_size - sizeof(PageID) - sizeof(uint32_t) - new_value.size()); // 预估 old_value 大小
memcpy(old_value.data(), ptr, old_value.size());
ptr += old_value.size();
new_value.resize(header.payload_size - sizeof(PageID) - sizeof(uint32_t) - old_value.size()); // 实际 new_value 大小
memcpy(new_value.data(), ptr, new_value.size());
}
std::string toString() const override {
return LogRecord::toString() +
", PageID: " + std::to_string(page_id) +
", Offset: " + std::to_string(offset) +
", OldValueSize: " + std::to_string(old_value.size()) +
", NewValueSize: " + std::to_string(new_value.size());
}
};
// 其他日志记录类型(Begin, Commit, Abort, CLR)相对简单,只需要头部信息或少量额外信息
// 例如:
class BeginLogRecord : public LogRecord {
public:
BeginLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id)
: LogRecord(lsn, prev_lsn, tx_id, LogRecordType::BEGIN, 0) {} // 无额外payload
BeginLogRecord() : LogRecord(0, 0, 0, LogRecordType::BEGIN, 0) {}
};
class CommitLogRecord : public LogRecord {
public:
CommitLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id)
: LogRecord(lsn, prev_lsn, tx_id, LogRecordType::COMMIT, 0) {}
CommitLogRecord() : LogRecord(0, 0, 0, LogRecordType::COMMIT, 0) {}
};
class AbortLogRecord : public LogRecord {
public:
AbortLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id)
: LogRecord(lsn, prev_lsn, tx_id, LogRecordType::ABORT, 0) {}
AbortLogRecord() : LogRecord(0, 0, 0, LogRecordType::ABORT, 0) {}
};
// Compensation Log Record (CLR): 用于UNDO操作,记录UNDO操作本身
class CLRLogRecord : public LogRecord {
public:
PageID page_id;
uint32_t offset;
std::vector<char> old_value; // CLR的old_value是它所undo的update的new_value
LSN undo_next_lsn; // 指向下一个需要undo的日志记录的LSN
CLRLogRecord(LSN lsn, LSN prev_lsn, TxID tx_id, PageID page_id, uint32_t offset,
const std::vector<char>& old_val, LSN undo_next_lsn)
: LogRecord(lsn, prev_lsn, tx_id, LogRecordType::CLR,
sizeof(PageID) + sizeof(uint32_t) + old_val.size() + sizeof(LSN)),
page_id(page_id), offset(offset), old_value(old_val), undo_next_lsn(undo_next_lsn) {}
CLRLogRecord() : LogRecord(0, 0, 0, LogRecordType::CLR, 0), page_id(0), offset(0), undo_next_lsn(0) {}
std::vector<char> serialize() const override {
size_t total_size = sizeof(LogRecordHeader) + sizeof(PageID) + sizeof(uint32_t) + old_value.size() + sizeof(LSN);
std::vector<char> data(total_size);
char* ptr = data.data();
memcpy(ptr, &header, sizeof(LogRecordHeader));
ptr += sizeof(LogRecordHeader);
memcpy(ptr, &page_id, sizeof(PageID));
ptr += sizeof(PageID);
memcpy(ptr, &offset, sizeof(uint32_t));
ptr += sizeof(uint32_t);
memcpy(ptr, old_value.data(), old_value.size());
ptr += old_value.size();
memcpy(ptr, &undo_next_lsn, sizeof(LSN));
return data;
}
void deserialize(const char* data) override {
const char* ptr = data;
memcpy(&header, ptr, sizeof(LogRecordHeader));
ptr += sizeof(LogRecordHeader);
memcpy(&page_id, ptr, sizeof(PageID));
ptr += sizeof(PageID);
memcpy(&offset, ptr, sizeof(uint32_t));
ptr += sizeof(uint32_t);
old_value.resize(header.payload_size - sizeof(PageID) - sizeof(uint32_t) - sizeof(LSN));
memcpy(old_value.data(), ptr, old_value.size());
ptr += old_value.size();
memcpy(&undo_next_lsn, ptr, sizeof(LSN));
}
std::string toString() const override {
return LogRecord::toString() +
", PageID: " + std::to_string(page_id) +
", Offset: " + std::to_string(offset) +
", OldValueSize: " + std::to_string(old_value.size()) +
", UndoNextLSN: " + std::to_string(undo_next_lsn);
}
};
// 辅助函数:根据类型反序列化完整的LogRecord
std::unique_ptr<LogRecord> deserialize_log_record(const char* data) {
LogRecordHeader temp_header;
memcpy(&temp_header, data, sizeof(LogRecordHeader));
std::unique_ptr<LogRecord> record;
switch (temp_header.type) {
case LogRecordType::BEGIN:
record = std::make_unique<BeginLogRecord>();
break;
case LogRecordType::COMMIT:
record = std::make_unique<CommitLogRecord>();
break;
case LogRecordType::ABORT:
record = std::make_unique<AbortLogRecord>();
break;
case LogRecordType::UPDATE:
record = std::make_unique<UpdateLogRecord>();
break;
case LogRecordType::CLR:
record = std::make_unique<CLRLogRecord>();
break;
default:
return nullptr; // 或者抛出异常
}
record->deserialize(data);
return record;
}
3.1.2 Page:数据页
数据页是存储引擎中数据存储和管理的最小单位。
// page.h
class Page {
public:
PageID page_id_;
LSN page_lsn_; // LSN of the last log record that modified this page
char data_[PAGE_SIZE];
int pin_count_; // 引用计数,表示有多少线程正在使用此页
bool is_dirty_; // 标记此页是否被修改过,需要写回磁盘
std::mutex page_latch_; // 保护页内容的并发访问(短时间持有)
Page() : page_id_(0), page_lsn_(0), pin_count_(0), is_dirty_(false) {
std::fill(data_, data_ + PAGE_SIZE, 0); // 初始化数据为0
}
// 重置页状态,用于从BufferPool中回收
void reset() {
page_id_ = 0;
page_lsn_ = 0;
pin_count_ = 0;
is_dirty_ = false;
std::fill(data_, data_ + PAGE_SIZE, 0);
}
// 读写数据接口
void read_data(uint32_t offset, uint32_t length, std::vector<char>& result) {
if (offset + length > PAGE_SIZE) {
throw std::runtime_error("Read out of page bounds");
}
result.resize(length);
std::memcpy(result.data(), data_ + offset, length);
}
void write_data(uint32_t offset, const std::vector<char>& value) {
if (offset + value.size() > PAGE_SIZE) {
throw std::runtime_error("Write out of page bounds");
}
std::memcpy(data_ + offset, value.data(), value.size());
is_dirty_ = true;
}
};
3.1.3 Transaction:事务
// transaction.h
enum class TransactionStatus {
RUNNING,
COMMITTED,
ABORTED
};
class Transaction {
public:
TxID tx_id_;
TransactionStatus status_;
LSN prev_lsn_; // LSN of the last log record written by this transaction
// 假设这里可以存储事务持有的锁,但在本次WAL主题中简化,不深入锁实现
// std::vector<std::pair<PageID, LockMode>> held_locks_;
Transaction(TxID id) : tx_id_(id), status_(TransactionStatus::RUNNING), prev_lsn_(0) {}
// 获取事务状态
TransactionStatus get_status() const { return status_; }
// 设置事务状态
void set_status(TransactionStatus status) { status_ = status; }
// 获取事务ID
TxID get_tx_id() const { return tx_id_; }
// 获取前一个LSN
LSN get_prev_lsn() const { return prev_lsn_; }
// 设置前一个LSN
void set_prev_lsn(LSN lsn) { prev_lsn_ = lsn; }
};
3.2 核心组件实现
3.2.1 DiskManager:磁盘管理器
负责与物理磁盘进行交互,读写数据文件和日志文件。这里使用std::fstream进行文件操作,实际高性能系统中可能采用mmap或异步I/O。
// disk_manager.h
class DiskManager {
private:
std::fstream data_file_;
std::fstream log_file_;
std::mutex data_file_mutex_; // 保护数据文件访问
std::mutex log_file_mutex_; // 保护日志文件访问
PageID next_page_id_; // 用于分配新的PageID
public:
DiskManager(const std::string& data_filepath, const std::string& log_filepath) {
// 打开数据文件,如果不存在则创建
data_file_.open(data_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::app);
if (!data_file_.is_open()) {
data_file_.clear(); // 清除错误标志
data_file_.open(data_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::trunc); // 创建新文件
if (!data_file_.is_open()) {
throw std::runtime_error("Failed to open or create data file: " + data_filepath);
}
}
// 计算下一个可用的PageID
data_file_.seekg(0, std::ios::end);
size_t file_size = data_file_.tellg();
next_page_id_ = file_size / PAGE_SIZE;
// 打开日志文件,如果不存在则创建
log_file_.open(log_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::app);
if (!log_file_.is_open()) {
log_file_.clear();
log_file_.open(log_filepath, std::ios::binary | std::ios::in | std::ios::out | std::ios::trunc);
if (!log_file_.is_open()) {
throw std::runtime_error("Failed to open or create log file: " + log_filepath);
}
}
// 确保文件指针在末尾,以便追加写入
log_file_.seekp(0, std::ios::end);
}
~DiskManager() {
if (data_file_.is_open()) {
data_file_.close();
}
if (log_file_.is_open()) {
log_file_.close();
}
}
// 读取数据页
void read_page(PageID page_id, char* data) {
std::lock_guard<std::mutex> lock(data_file_mutex_);
std::streampos offset = static_cast<std::streampos>(page_id) * PAGE_SIZE;
data_file_.seekg(offset);
if (!data_file_.read(data, PAGE_SIZE)) {
// 如果读取失败,可能文件末尾或页不存在,填充0
std::fill(data, data + PAGE_SIZE, 0);
data_file_.clear(); // 清除eof或fail标志
}
}
// 写入数据页
void write_page(PageID page_id, const char* data) {
std::lock_guard<std::mutex> lock(data_file_mutex_);
std::streampos offset = static_cast<std::streampos>(page_id) * PAGE_SIZE;
data_file_.seekp(offset);
if (!data_file_.write(data, PAGE_SIZE)) {
throw std::runtime_error("Failed to write data page " + std::to_string(page_id));
}
data_file_.flush(); // 确保写入磁盘
}
// 分配一个新的数据页ID
PageID allocate_page() {
std::lock_guard<std::mutex> lock(data_file_mutex_);
PageID allocated_page_id = next_page_id_++;
// 预写入一个空页,确保文件大小增长
char zero_page[PAGE_SIZE] = {0};
write_page(allocated_page_id, zero_page);
return allocated_page_id;
}
// 追加日志记录
void append_log_record_data(const std::vector<char>& log_data) {
std::lock_guard<std::mutex> lock(log_file_mutex_);
log_file_.write(log_data.data(), log_data.size());
if (!log_file_) {
throw std::runtime_error("Failed to append log record to log file.");
}
}
// 强制日志数据写入磁盘 (fsync)
void flush_log() {
std::lock_guard<std::mutex> lock(log_file_mutex_);
log_file_.flush();
// 实际的fsync操作需要平台特定的调用,例如`_commit` (Windows) 或 `fsync` (Unix)
// std::fsync(log_file_.fd()); // 伪代码,需要实际实现
}
// 读取日志文件内容
std::vector<char> read_log_file() {
std::lock_guard<std::mutex> lock(log_file_mutex_);
log_file_.seekg(0, std::ios::end);
size_t size = log_file_.tellg();
log_file_.seekg(0, std::ios::beg);
std::vector<char> buffer(size);
if (size > 0) {
if (!log_file_.read(buffer.data(), size)) {
throw std::runtime_error("Failed to read log file.");
}
}
log_file_.clear(); // 清除EOF标志,以便后续写入
log_file_.seekp(0, std::ios::end); // 恢复写指针到文件末尾
return buffer;
}
// 获取日志文件当前大小
size_t get_log_file_size() {
std::lock_guard<std::mutex> lock(log_file_mutex_);
log_file_.seekp(0, std::ios::end);
return log_file_.tellp();
}
};
关于fsync的说明:std::fstream::flush()只保证数据从C++缓冲区写入操作系统缓冲区。要真正保证数据写入磁盘,需要调用操作系统的fsync(Unix-like)或_commit(Windows)函数。这通常需要获取底层文件句柄。在实际系统中,这是一个关键点。为了简化示例,我们暂时依赖fstream::flush,但在高性能和高可靠性场景下,必须使用真正的fsync。
3.2.2 LogManager:日志管理器
负责生成、追加和持久化日志记录,并维护当前的LSN。
// log_manager.h
class LogManager {
private:
DiskManager& disk_manager_;
std::atomic<LSN> next_lsn_; // 下一个可用的LSN
std::atomic<LSN> flushed_lsn_; // 已经持久化到磁盘的LSN
std::mutex log_buffer_mutex_; // 保护日志缓冲区
std::vector<char> log_buffer_; // 日志缓冲区,用于批量写入
size_t buffer_offset_; // 缓冲区当前写入位置
const size_t buffer_capacity_ = 8 * PAGE_SIZE; // 日志缓冲区大小
// 条件变量用于等待日志刷新
std::condition_variable cv_flush_;
public:
LogManager(DiskManager& dm) : disk_manager_(dm), next_lsn_(1), flushed_lsn_(0), buffer_offset_(0) {
log_buffer_.resize(buffer_capacity_);
}
// 追加日志记录
LSN append_log_record(const std::unique_ptr<LogRecord>& record) {
std::lock_guard<std::mutex> lock(log_buffer_mutex_);
LSN current_lsn = next_lsn_++;
record->header.lsn = current_lsn;
// 获取该事务的最新LSN,并更新prev_lsn (这里需要TransactionManager的协作,简化为0)
// 实际中,TransactionManager会在事务对象中维护last_lsn
// record->header.prev_lsn = transaction->get_prev_lsn();
// transaction->set_prev_lsn(current_lsn);
std::vector<char> record_data = record->serialize();
size_t record_size = record_data.size();
if (buffer_offset_ + record_size > buffer_capacity_) {
// 缓冲区不足,先刷新缓冲区
flush_buffer_and_wait();
}
// 写入到缓冲区
memcpy(log_buffer_.data() + buffer_offset_, record_data.data(), record_size);
buffer_offset_ += record_size;
return current_lsn;
}
// 强制刷新日志到磁盘,直到指定的LSN
void flush_log(LSN target_lsn) {
std::unique_lock<std::mutex> lock(log_buffer_mutex_);
// 如果目标LSN已经持久化,则直接返回
if (flushed_lsn_ >= target_lsn) {
return;
}
// 写入缓冲区内容到磁盘
if (buffer_offset_ > 0) {
std::vector<char> data_to_write(log_buffer_.begin(), log_buffer_.begin() + buffer_offset_);
disk_manager_.append_log_record_data(data_to_write);
disk_manager_.flush_log(); // 真正的fsync
flushed_lsn_.store(next_lsn_ - 1); // 更新已刷新LSN为当前已写入的最新LSN
buffer_offset_ = 0; // 清空缓冲区
} else {
// 缓冲区为空,但可能目标LSN比flushed_lsn大,说明有其他线程写入了但还没刷新
// 此时应该等待其他线程刷新或检查是否需要空刷新
// For simplicity, we assume an empty buffer means nothing needs to be flushed.
// In a more complex system, this might involve a dedicated flusher thread.
}
cv_flush_.notify_all(); // 通知所有等待的线程
}
// 刷新缓冲区并等待其写入磁盘
void flush_buffer_and_wait() {
std::unique_lock<std::mutex> lock(log_buffer_mutex_);
if (buffer_offset_ == 0) return; // 缓冲区已空
std::vector<char> data_to_write(log_buffer_.begin(), log_buffer_.begin() + buffer_offset_);
disk_manager_.append_log_record_data(data_to_write);
disk_manager_.flush_log(); // 真正的fsync
flushed_lsn_.store(next_lsn_ - 1); // 更新已刷新LSN
buffer_offset_ = 0; // 清空缓冲区
cv_flush_.notify_all(); // 通知所有等待的线程
}
// 获取日志文件的所有记录,用于恢复
std::vector<std::unique_ptr<LogRecord>> read_all_log_records() {
std::vector<std::unique_ptr<LogRecord>> records;
std::vector<char> raw_log_data = disk_manager_.read_log_file();
size_t current_offset = 0;
while (current_offset < raw_log_data.size()) {
if (raw_log_data.size() - current_offset < sizeof(LogRecordHeader)) {
// 不完整的头部,可能是崩溃导致的
break;
}
LogRecordHeader temp_header;
memcpy(&temp_header, raw_log_data.data() + current_offset, sizeof(LogRecordHeader));
if (temp_header.lsn == 0 || temp_header.type == LogRecordType::INVALID) {
// 无效日志记录,可能是文件末尾的垃圾数据
break;
}
size_t record_total_size = sizeof(LogRecordHeader) + temp_header.payload_size;
if (current_offset + record_total_size > raw_log_data.size()) {
// 不完整的日志记录
break;
}
std::unique_ptr<LogRecord> record = deserialize_log_record(raw_log_data.data() + current_offset);
if (record) {
records.push_back(std::move(record));
} else {
// 无法反序列化,跳过或报错
std::cerr << "Warning: Could not deserialize log record at offset " << current_offset << std::endl;
}
current_offset += record_total_size;
}
return records;
}
// 获取下一个LSN
LSN get_next_lsn() const { return next_lsn_.load(); }
// 获取已刷新LSN
LSN get_flushed_lsn() const { return flushed_lsn_.load(); }
};
3.2.3 BufferPoolManager:缓冲区池管理器
管理内存中的数据页,实现了LRU(最近最少使用)淘汰策略。
// buffer_pool_manager.h
class BufferPoolManager {
private:
DiskManager& disk_manager_;
std::vector<Page> pages_; // 内存中的页面帧
size_t pool_size_; // 缓冲区池大小(页帧数量)
// PageID -> FrameID 映射表,快速查找内存中的页
std::unordered_map<PageID, FrameID> page_table_;
// LRU 链表,用于页面淘汰策略
std::list<FrameID> lru_list_;
// LRU 链表迭代器映射,快速删除和移动
std::unordered_map<FrameID, std::list<FrameID>::iterator> lru_map_;
// 空闲页帧列表
std::list<FrameID> free_list_;
std::mutex buffer_mutex_; // 保护缓冲区池的元数据结构
public:
BufferPoolManager(size_t pool_size, DiskManager& dm)
: disk_manager_(dm), pool_size_(pool_size) {
pages_.resize(pool_size_);
for (FrameID i = 0; i < pool_size_; ++i) {
free_list_.push_back(i); // 初始化所有帧为空闲
}
}
// 获取一个页面,如果不在内存则从磁盘读取
Page* fetch_page(PageID page_id) {
std::lock_guard<std::mutex> lock(buffer_mutex_);
// 1. 检查页面是否已在缓冲区中
if (page_table_.count(page_id)) {
FrameID frame_id = page_table_[page_id];
// 移动到LRU链表头部(最近使用)
lru_list_.erase(lru_map_[frame_id]);
lru_list_.push_front(frame_id);
lru_map_[frame_id] = lru_list_.begin();
pages_[frame_id].pin_count_++;
return &pages_[frame_id];
}
// 2. 页面不在缓冲区,需要从磁盘读取
FrameID frame_id = -1;
if (!free_list_.empty()) {
// 2.1 从空闲列表获取一个帧
frame_id = free_list_.front();
free_list_.pop_front();
} else {
// 2.2 否则,执行LRU淘汰
for (auto it = lru_list_.rbegin(); it != lru_list_.rend(); ++it) {
if (pages_[*it].pin_count_ == 0) { // 找到一个未被固定的页
frame_id = *it;
// 如果被淘汰的页是脏页,写回磁盘
if (pages_[frame_id].is_dirty_) {
disk_manager_.write_page(pages_[frame_id].page_id_, pages_[frame_id].data_);
}
// 从page_table和LRU中移除
page_table_.erase(pages_[frame_id].page_id_);
lru_list_.erase(lru_map_[frame_id]);
lru_map_.erase(frame_id);
break;
}
}
if (frame_id == -1) {
throw std::runtime_error("No unpinned page available for eviction.");
}
}
// 3. 将新页载入帧
Page* page = &pages_[frame_id];
page->reset(); // 重置页状态
page->page_id_ = page_id;
disk_manager_.read_page(page_id, page->data_); // 从磁盘读取数据
page->pin_count_ = 1; // 页面被固定
page_table_[page_id] = frame_id;
lru_list_.push_front(frame_id);
lru_map_[frame_id] = lru_list_.begin();
return page;
}
// 解除页面固定
void unpin_page(PageID page_id, bool is_dirty) {
std::lock_guard<std::mutex> lock(buffer_mutex_);
if (!page_table_.count(page_id)) {
// 页面不在缓冲区
return;
}
FrameID frame_id = page_table_[page_id];
Page* page = &pages_[frame_id];
page->is_dirty_ = page->is_dirty_ || is_dirty; // 更新脏页状态
if (page->pin_count_ > 0) {
page->pin_count_--;
}
// 如果pin_count归零,且它是LRU链表中未被固定的页,它现在有资格被淘汰
// 不需要立即将其从LRU链表移除,LRU淘汰逻辑会处理
}
// 强制将指定页面刷新到磁盘
void flush_page(PageID page_id) {
std::lock_guard<std::mutex> lock(buffer_mutex_);
if (!page_table_.count(page_id)) {
// 页面不在缓冲区,可能已经被淘汰或从未加载
return;
}
FrameID frame_id = page_table_[page_id];
Page* page = &pages_[frame_id];
if (page->is_dirty_) {
disk_manager_.write_page(page->page_id_, page->data_);
page->is_dirty_ = false;
}
}
// 强制将所有脏页刷新到磁盘
void flush_all_pages() {
std::lock_guard<std::mutex> lock(buffer_mutex_);
for (FrameID i = 0; i < pool_size_; ++i) {
Page* page = &pages_[i];
if (page->page_id_ != 0 && page->is_dirty_) { // 检查是否是有效页且是脏页
disk_manager_.write_page(page->page_id_, page->data_);
page->is_dirty_ = false;
}
}
}
// 创建一个新页面
Page* new_page(PageID& new_page_id) {
std::lock_guard<std::mutex> lock(buffer_mutex_);
new_page_id = disk_manager_.allocate_page(); // 从磁盘管理器获取新页ID
// 逻辑与fetch_page类似,只是不需要从磁盘读取
FrameID frame_id = -1;
if (!free_list_.empty()) {
frame_id = free_list_.front();
free_list_.pop_front();
} else {
for (auto it = lru_list_.rbegin(); it != lru_list_.rend(); ++it) {
if (pages_[*it].pin_count_ == 0) {
frame_id = *it;
if (pages_[frame_id].is_dirty_) {
disk_manager_.write_page(pages_[frame_id].page_id_, pages_[frame_id].data_);
}
page_table_.erase(pages_[frame_id].page_id_);
lru_list_.erase(lru_map_[frame_id]);
lru_map_.erase(frame_id);
break;
}
}
if (frame_id == -1) {
throw std::runtime_error("No unpinned page available for new page allocation.");
}
}
Page* page = &pages_[frame_id];
page->reset();
page->page_id_ = new_page_id;
page->pin_count_ = 1;
page->is_dirty_ = true; // 新页默认是脏的,需要写回
page_table_[new_page_id] = frame_id;
lru_list_.push_front(frame_id);
lru_map_[frame_id] = lru_list_.begin();
return page;
}
};
3.2.4 TransactionManager:事务管理器
管理事务的生命周期,包括事务的开始、提交和中止。它与LogManager协同工作,确保事务的ACID属性。
// transaction_manager.h
class TransactionManager {
private:
std::atomic<TxID> next_tx_id_;
std::unordered_map<TxID, Transaction> active_transactions_; // 活跃事务表
std::mutex tx_table_mutex_; // 保护active_transactions_ map
LogManager& log_manager_;
BufferPoolManager& buffer_pool_manager_;
public:
TransactionManager(LogManager& lm, BufferPoolManager& bpm)
: next_tx_id_(1), log_manager_(lm), buffer_pool_manager_(bpm) {}
// 开始一个新事务
Transaction* begin_transaction() {
TxID tx_id = next_tx_id_++;
std::lock_guard<std::mutex> lock(tx_table_mutex_);
active_transactions_.emplace(tx_id, Transaction(tx_id));
Transaction* txn = &active_transactions_.at(tx_id);
// 写入BEGIN日志记录
std::unique_ptr<LogRecord> begin_log = std::make_unique<BeginLogRecord>(0, txn->get_prev_lsn(), tx_id);
LSN current_lsn = log_manager_.append_log_record(begin_log);
txn->set_prev_lsn(current_lsn); // 更新事务的last_lsn
return txn;
}
// 提交事务
void commit_transaction(Transaction* txn) {
// 写入COMMIT日志记录
std::unique_ptr<LogRecord> commit_log = std::make_unique<CommitLogRecord>(0, txn->get_prev_lsn(), txn->get_tx_id());
LSN commit_lsn = log_manager_.append_log_record(commit_log);
txn->set_prev_lsn(commit_lsn);
// 强制刷新日志到磁盘 (Force-Log-at-Commit)
log_manager_.flush_log(commit_lsn);
// 更新事务状态
std::lock_guard<std::mutex> lock(tx_table_mutex_);
txn->set_status(TransactionStatus::COMMITTED);
// 通常在这里解除所有锁,并从活跃事务表中移除,但为了简化,我们仅设置状态
active_transactions_.erase(txn->get_tx_id());
}
// 中止事务
void abort_transaction(Transaction* txn) {
// 写入ABORT日志记录
std::unique_ptr<LogRecord> abort_log = std::make_unique<AbortLogRecord>(0, txn->get_prev_lsn(), txn->get_tx_id());
LSN abort_lsn = log_manager_.append_log_record(abort_log);
txn->set_prev_lsn(abort_lsn);
// 执行UNDO操作 (这里只是一个占位符,实际UNDO逻辑在RecoveryManager中)
// 在线事务中止需要遍历该事务的所有日志记录并生成CLR
// For simplicity, we just mark as aborted and remove from active transactions.
// 更新事务状态
std::lock_guard<std::mutex> lock(tx_table_mutex_);
txn->set_status(TransactionStatus::ABORTED);
active_transactions_.erase(txn->get_tx_id());
}
// 获取事务对象
Transaction* get_transaction(TxID tx_id) {
std::lock_guard<std::mutex> lock(tx_table_mutex_);
if (active_transactions_.count(tx_id)) {
return &active_transactions_.at(tx_id);
}
return nullptr;
}
// 获取所有活跃事务的快照 (用于恢复)
std::unordered_map<TxID, Transaction> get_active_transactions_snapshot() {
std::lock_guard<std::mutex> lock(tx_table_mutex_);
return active_transactions_;
}
};
3.2.5 RecoveryManager:恢复管理器
这是WAL机制中实现奔溃恢复的核心组件。它遵循ARIES(Algorithm for Recovery and Isolation Exploiting Semantics)恢复算法的三阶段:分析 (Analysis)、重做 (Redo) 和 回滚 (Undo)。
// recovery_manager.h
struct ActiveTransactionEntry {
LSN last_lsn; // 事务写入的最后一个日志记录的LSN
TransactionStatus status; // 事务状态 (RUNNING, COMMITTED, ABORTED)
// 其他如锁信息等...
};
struct DirtyPageEntry {
LSN rec_lsn; // LSN of the first log record that dirtied this page
};
class RecoveryManager {
private:
LogManager& log_manager_;
BufferPoolManager& buffer_pool_manager_;
DiskManager& disk_manager_;
TransactionManager& transaction_manager_;
// Recovery Data Structures
std::unordered_map<TxID, ActiveTransactionEntry> active_tx_table_; // 活跃事务表
std::unordered_map<PageID, DirtyPageEntry> dirty_page_table_; // 脏页表
LSN last_checkpoint_lsn_; // 最后一个检查点的LSN (简化,本次不实现检查点,从头扫描)
public:
RecoveryManager(LogManager& lm, BufferPoolManager& bpm, DiskManager& dm, TransactionManager& tm)
: log_manager_(lm), buffer_pool_manager_(bpm), disk_manager_(dm), transaction_manager_(tm),
last_checkpoint_lsn_(0) {}
// 执行奔溃恢复
void recover() {
std::cout << "Starting recovery process..." << std::endl;
std::vector<std::unique_ptr<LogRecord>> all_logs = log_manager_.read_all_log_records();
if (all_logs.empty()) {
std::cout << "No log records found. System clean." << std::endl;
return;
}
// 1. Analysis Phase (分析阶段)
// 扫描所有日志记录(或从最近的检查点开始),构建活跃事务表和脏页表。
std::cout << "Recovery: Analysis Phase..." << std::endl;
for (const auto& log : all_logs) {
LSN current_lsn = log->header.lsn;
TxID tx_id = log->header.tx_id;
// 更新活跃事务表
if (active_tx_table_.find(tx_id) == active_tx_table_.end()) {
active_tx_table_[tx_id] = {current_lsn, TransactionStatus::RUNNING};
} else {
active_tx_table_[tx_id].last_lsn = current_lsn;
}
switch (log->header.type) {
case LogRecordType::BEGIN:
active_tx_table_[tx_id].status = TransactionStatus::RUNNING;
break;
case LogRecordType::COMMIT:
active_tx_table_[tx_id].status = TransactionStatus::COMMITTED;
break;
case LogRecordType::ABORT:
active_tx_table_[tx_id].status = TransactionStatus::ABORTED;
break;
case LogRecordType::UPDATE: {
UpdateLogRecord* update_log = static_cast<UpdateLogRecord*>(log.get());
// 将被修改的页面加入脏页表,记录其RecLSN
if (dirty_page_table_.find(update_log->page_id) == dirty_page_table_.end()) {
dirty_page_table_[update_log->page_id] = {current_lsn};
}
break;
}
case LogRecordType::CLR: {
CLRLogRecord* clr_log = static_cast<CLRLogRecord*>(log.get());
if (dirty_page_table_.find(clr_log->page_id) == dirty_page_table_.end()) {
dirty_page_table_[clr_log->page_id] = {current_lsn};
}
break;
}
default:
break;
}
}
// 从活跃事务表中移除已提交或已中止的事务
auto it = active_tx_table_.begin();
while (it != active_tx_table_.end()) {
if (it->second.status == TransactionStatus::COMMITTED || it->second.status == TransactionStatus::ABORTED) {
it = active_tx_table_.erase(it);
} else {
++it;
}
}
std::cout << "Analysis Phase completed. Active transactions: " << active_tx_table_.size() << std::endl;
std::cout << "Dirty pages: " << dirty_page_table_.size() << std::endl;
// 2. Redo Phase (重做阶段)
// 从最小的RecLSN(或检查点)开始,正向扫描日志,重做所有已记录的操作。
// 目标是让所有数据页恢复到崩溃前的最新状态。
std::cout << "Recovery: Redo Phase..." << std::endl;
LSN redo_start_lsn = 0; // 简化,从头开始
// 实际应为 min(所有dirty_page_table_[page_id].rec_lsn, last_checkpoint_lsn_)
for (const auto& log : all_logs) {
if (log->header.lsn < redo_start_lsn) continue; // 跳过早于重做起始点的日志
// 只有当页面不在DPT中,或者页面的RecLSN小于等于当前日志记录的LSN时,才重做
// 并且,只有当页面上记录的LSN小于日志记录的LSN时才重做
if (log->header.type == LogRecordType::UPDATE) {
UpdateLogRecord* update_log = static_cast<UpdateLogRecord*>(log.get());
Page* page = buffer_pool_manager_.fetch_page(update_log->page_id);
// 确保页面被固定,并且加锁
std::lock_guard<std::mutex> page_lock(page->page_latch_);
if (page->page_lsn_ < update_log->header.lsn) {
// 只有当页面上的LSN小于日志记录的LSN时,才重做该操作
// 这避免了重复应用已经存在的修改
page->write_data(update_log->offset, update_log->new_value);
page->page_lsn_ = update_log->header.lsn; // 更新页面LSN
}
buffer_pool_manager_.unpin_page(update_log->page_id, true); // 标记为脏页
} else if (log->header.type == LogRecordType::CLR) {
CLRLogRecord* clr_log = static_cast<CLRLogRecord*>(log.get());
Page* page = buffer_pool_manager_.fetch_page(clr_log->page_id);
std::lock_guard<std::mutex> page_lock(page->page_latch_);
if (page->page_lsn_ < clr_log->header.lsn) {
// CLR的"old_value"实际上是它所撤销的UPDATE的"new_value"
// 这里我们重做CLR,即把数据恢复到CLR的old_value
page->write_data(clr_log->offset, clr_log->old_value);
page->page_lsn_ = clr_log->header.lsn;
}
buffer_pool_manager_.unpin_page(clr_log->page_id, true);
}
}
std::cout << "Redo Phase completed." << std::endl;
// 3. Undo Phase (回滚阶段)
// 逆序扫描活跃事务的日志链,回滚所有未提交事务的操作。
// 对于每个回滚操作,生成一个CLR(Compensation Log Record)。
std::cout << "Recovery: Undo Phase..." << std::endl;
std::list<LSN> undo_list; // 存储需要undo的LSN,按降序排列
for (const auto& entry : active_tx_table_) {
undo_list.push_back(entry.second.last_lsn);
}
undo_list.sort(std::greater<LSN>()); // 降序排列
while (!undo_list.empty()) {
LSN current_undo_lsn = undo_list.front();
undo_list.pop_front();
// 查找对应的日志记录
std::unique_ptr<LogRecord> log_to_undo;
for (const auto& log : all_logs) {
if (log->header.lsn == current_undo_lsn) {
log_to_undo = deserialize_log_record(log->serialize().data()); // 拷贝一份
break;
}
}
if (!log_to_undo) {
// 找不到日志记录,可能日志文件损坏或逻辑错误
std::cerr << "Error: Log record " << current_undo_lsn << " not found for undo." << std::endl;
continue;
}
// 对于UPDATE记录,执行UNDO操作
if (log_to_undo->header.type == LogRecordType::UPDATE) {
UpdateLogRecord* update_log = static_cast<UpdateLogRecord*>(log_to_undo.get());
Page* page = buffer_pool_manager_.fetch_page(update_log->page_id);
std::lock_guard<std::mutex> page_lock(page->page_latch_); // 加锁
// 只有当页面上的LSN等于日志记录的LSN时,才执行UNDO
// 避免UNDO已经被覆盖的修改
if (page->page_lsn_ == update_log->header.lsn) {
page->write_data(update_log->offset, update_log->old_value); // 恢复旧值
page->page_lsn_ = log_manager_.get_next_lsn(); // 更新页面LSN为CLR的LSN
}
buffer_pool_manager_.unpin_page(update_log->page_id, true); // 标记为脏页
// 生成CLR (Compensation Log Record)
// CLR的prev_lsn指向被undo的日志记录的prev_lsn
// CLR的undo_next_lsn指向被undo的日志记录的prev_lsn
std::unique_ptr<CLRLogRecord> clr_log = std::make_unique<CLRLogRecord>(
0, update_log->header.prev_lsn, update_log->header.tx_id,
update_log->page_id, update_log->offset, update_log->old_value,
update_log->header.prev_lsn // 指向下一个需要undo的LSN
);
LSN clr_lsn = log_manager_.append_log_record(clr_log);
log_manager_.flush_log(clr_lsn); // CLR也需要持久化
// 将CLR的undo_next_lsn加入到undo_list中,以便继续回滚该事务的下一个操作
if (update_log->header.prev_lsn != 0) {
undo_list.push_back(update_log->header.prev_lsn);
undo_list.sort(std::greater<LSN>()); // 重新排序
}
} else if (log_to_undo->header.type == LogRecordType::CLR) {
// 如果遇到CLR,则按照CLR的undo_next_lsn继续回滚
CLRLogRecord* clr_log = static_cast<CLRLogRecord*>(log_to_undo.get());
if (clr_log->undo_next_lsn != 0) {
undo_list.push_back(clr_log->undo_next_lsn);
undo_list.sort(std::greater<LSN>());
}
} else if (log_to_undo->header.type == LogRecordType::BEGIN) {
// 遇到BEGIN日志记录,说明该事务的所有操作都已回滚
// 写入ABORT日志记录
std::unique_ptr<LogRecord> abort_log = std::make_unique<AbortLogRecord>(
0, log_to_undo->header.lsn, log_to_undo->header.tx_id);
LSN abort_lsn = log_manager_.append_log_record(abort_log);
log_manager_.flush_log(abort_lsn);
// 从活跃事务表中移除
active_tx_table_.erase(log_to_undo->header.tx_id);
}
}
std::cout << "Undo Phase completed." << std::endl;
std::cout << "Recovery process finished. System ready." << std::endl;
}
// 创建检查点 (简化实现,仅刷新所有脏页和日志)
void create_checkpoint() {
std::cout << "Creating checkpoint..." << std::endl;
// 1. 停止所有新事务的开始 (简化,实际更复杂)
// 2. 刷新所有脏页到磁盘
buffer_pool_manager_.flush_all_pages();
// 3. 强制刷新所有日志到磁盘
log_manager_.flush_log(log_manager_.get_next_lsn() - 1);
// 4. 写入一个Checkpoint Begin Record
// 5. 写入一个Checkpoint End Record,包含DPT和ATT信息
// 更新 last_checkpoint_lsn_
last_checkpoint_lsn_ = log_manager_.get_flushed_lsn();
std::cout << "Checkpoint created. Last checkpoint LSN: " << last_checkpoint_lsn_ << std::endl;
}
};
3.2.6 StorageEngine:存储引擎
作为对外接口,协调各个组件完成数据操作。
// storage_engine.h
class StorageEngine {
private:
DiskManager disk_manager_;
BufferPoolManager buffer_pool_manager_;
LogManager log_manager_;
TransactionManager transaction_manager_;
RecoveryManager recovery_manager_;
public:
StorageEngine(const std::string& data_filepath, const std::string& log_filepath, size_t buffer_pool_size)
: disk_manager_(data_filepath, log_filepath),
buffer_pool_manager_(buffer_pool_size, disk_manager_),
log_manager_(disk_manager_),
transaction_manager_(log_manager_, buffer_pool_manager_),
recovery_manager_(log_manager_, buffer_pool_manager_, disk_manager_, transaction_manager_) {}
void start() {
// 在系统启动时执行恢复
recovery_manager_.recover();
}
void shutdown() {
// 在关闭时创建检查点并刷新所有脏页和日志
recovery_manager_.create_checkpoint();
buffer_pool_manager_.flush_all_pages();
// 确保所有日志都已持久化
log_manager_.flush_log(log_manager_.get_next_lsn() - 1);
std::cout << "Storage Engine shutdown complete." << std::endl;
}
// 示例:高层PUT操作
void put(Transaction* txn, PageID page_id, uint32_t offset, const std::vector<char>& value) {
Page* page = buffer_pool_manager_.fetch_page(page_id);
std::lock_guard<std::mutex> page_lock(page->page_latch_); // 锁住页面
// 获取旧值
std::vector<char> old_value;
page->read_data(offset, value.size(), old_value);
// 写入UPDATE日志记录
std::unique_ptr<LogRecord> update_log = std::make_unique<UpdateLogRecord>(
0, txn->get_prev_lsn(), txn->get_tx_id(), page_id, offset, old_value, value);
LSN current_lsn = log_manager_.append_log_record(update_log);
txn->set_prev_lsn(current_lsn); // 更新事务的last_lsn
// 应用修改到内存页
page->write_data(offset, value);
page->page_lsn_ = current_lsn; // 更新页面LSN
// 解除页面固定,标记为脏页
buffer_pool_manager_.unpin_page(page_id, true);
}
// 示例:高层GET操作
std::vector<char> get(Transaction* txn, PageID page_id, uint32_t offset, uint32_t length) {
Page* page = buffer_pool_manager_.fetch_page(page_id);
std::lock_guard<std::mutex> page_lock(page->page_latch_); // 锁住页面
std::vector<char> result;
page->read_data(offset, length, result);
// 解除页面固定
buffer_pool_manager_.unpin_page(page_id, false);
return result;
}
// 示例:分配新页
PageID new_page(Transaction* txn) {
PageID new_id;
Page* page = buffer_pool_manager_.new_page(new_id);
// 新页的创建也应该被记录,但此处简化
// LogRecord new_page_log(...)
// log_manager_.append_log_record(new_page_log);
// txn->set_prev_lsn(new_page_log.lsn);
buffer_pool_manager_.unpin_page(new_id, true); // 新页是脏的
return new_id;
}
// 事务接口
Transaction* begin_transaction() {
return transaction_manager_.begin_transaction();
}
void commit_transaction(Transaction* txn) {
transaction_manager_.commit_transaction(txn);
}
void abort_transaction(Transaction* txn) {
transaction_manager_.abort_transaction(txn);
}
};
3.3 示例使用
#include "storage_engine.h"
#include <thread>
#include <filesystem>
void simulate_crash_and_recovery(const std::string& data_file, const std::string& log_file) {
std::cout << "n--- SIMULATION START: First Run ---" << std::endl;
{
StorageEngine engine(data_file, log_file, 10); // 10页缓冲区
engine.start(); // 恢复 (第一次运行无日志,直接启动)
Transaction* txn1 = engine.begin_transaction();
PageID page0 = engine.new_page(txn1); // 分配页ID 0
PageID page1 = engine.new_page(txn1); // 分配页ID 1
std::cout << "Txn " << txn1->get_tx_id() << ": Writing 'Hello WAL' to Page " << page0 << " Offset 0" << std::endl;
std::vector<char> value1 = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'A', 'L'};
engine.put(txn1, page0, 0, value1);
std::cout << "Txn " << txn1->get_tx_id() << ": Writing 'Database' to Page " << page1 << " Offset 0" << std::endl;
std::vector<char> value2 = {'D', 'a', 't', 'a', 'b', 'a', 's', 'e'};
engine.put(txn1, page1, 0, value2);
engine.commit_transaction(txn1);
std::cout << "Txn " << txn1->get_tx_id() << " committed." << std::endl;
// 模拟一个未提交事务
Transaction* txn2 = engine.begin_transaction();
std::cout << "Txn " << txn2->get_tx_id() << ": Writing 'Uncommitted' to Page " << page0 << " Offset 10" << std::endl;
std::vector<char> value_uncommitted = {'U', 'n', 'c', 'o', 'm', 'm', 'i', 't', 't', 'e', 'd'};
engine.put(txn2, page0,