各位同学,大家下午好!我是你们的“内存泄漏克星”,C++ 存储引擎架构师。
今天我们不聊虚的,也不聊那些花里胡哨的 AI 框架,我们来聊聊一个让无数分布式系统工程师掉头发的老生常谈——Raft 日志压缩。
想象一下,你开了一家自助餐厅(你的存储引擎)。客人(请求)源源不断地进来点菜(写入数据)。为了确保账目不乱,你规定:每上一道新菜,必须先在账本(WAL 日志)上记一笔,然后才能上菜。这叫“预写式”,保证数据不丢。
但是,问题来了。这家餐厅开了三年,客人没走,账本厚得像砖头。现在你想要给新客人上菜,你得先把这厚砖头搬到厨房,告诉新客人“这道菜 2018 年就上过了”。如果这砖头有 10GB,你每次上菜都要搬 10GB,厨房(网络/磁盘)得累死,客人(CPU)也得饿死。
这时候,快照 机制就登场了。这就像是餐厅老板突然决定:“行了,从今天起,我们不再记流水账了,我们直接把当天的库存和账目做成一张 A4 纸的报表,以后只看这张表!旧账本?烧了!”
好,我们直接切入正题,看看在 C++ 里,这事儿到底该怎么优雅地干。
1. 日志膨胀的“窒息”感
在 Raft 协议里,Leader 的职责就是忠实地复制日志。每一条日志都有一个 Term(任期)和一个 Index(索引)。
如果你不进行压缩,随着时间的推移,你的 last_log_index 会一直增长,直到宇宙热寂。这时候,你的 WAL 文件可能从几 MB 变成了几十 GB。
这有什么坏处?
- 复制开销大: 当 Leader 收到一个新请求,它需要把这条日志复制给所有 Follower。如果 WAL 有 100GB,Leader 就得把 100GB 的数据广播出去。哪怕你只写了一个字节,网络和磁盘 IO 都得全量走一遍。这就像你想寄一封信,结果邮递员非要把你家里所有的家具都搬上卡车,只为了让你这封信能贴上邮票。
- 内存占用高: 所有的日志条目都驻留在内存里。如果日志有 1000 万条,每条 1KB,你的内存就没了。
- 恢复速度慢: 如果节点崩溃重启,它得读取 100GB 的日志来恢复状态机,这得等到猴年马月?
所以,日志压缩 是必须的。而 Raft 标准协议里,唯一合法的压缩手段就是 Snapshot(快照)。
2. 快照的本质:状态机的“时光机”
很多新手对 Snapshot 的理解有误。他们以为 Snapshot 是把数据库里的数据全拍个照存下来。
错!大错特错!
在 Raft 的语境下,Snapshot 不是数据库的快照,Snapshot 是 Log State 的快照。
Snapshot 包含两个核心信息:
- Last Include Index & Term: 这是最重要的。它告诉所有节点:“从 Term X 的 Index Y 开始,后面的日志都是多余的,可以直接扔了。”
- State Machine Data: 数据库里的持久化数据(比如 RocksDB 里的 SST 文件,或者 KV 存储里的 MemTable)。
简单来说,Snapshot 就是一个“压缩包”。它把从 [0, Y] 这段历史日志压缩成了一段状态,然后把 [Y+1, Current] 这段还没被压缩的日志扔掉。
3. C++ 实战:构建一个简易的 Snapshot 机制
为了讲清楚,我们假设一个最简化的 C++ RaftLog 类。不要嫌它简单,这是万丈高楼平地起。
3.1 数据结构设计
首先,我们需要定义一个 Snapshot 结构体。我们需要序列化它,所以它得是个 POD(Plain Old Data)类型,或者提供简单的序列化接口。
#include <vector>
#include <string>
#include <cstdint>
#include <fstream>
#include <mutex>
// 定义日志条目,为了演示简化
struct LogEntry {
uint64_t term;
uint64_t index;
std::string command; // 实际上可能是序列化的 WriteBatch
};
// 定义快照元数据
struct SnapshotMeta {
uint64_t last_included_term; // 快照包含的日志的最后一个 Term
uint64_t last_included_index; // 快照包含的日志的最后一个 Index
// 这里可以扩展:存储 State Machine 的 checksum,或者快照文件的路径
std::string snapshot_file_path;
};
class RaftLog {
private:
std::vector<LogEntry> entries_; // 内存中的日志
std::mutex mutex_; // 保护 entries_ 的互斥锁
// 持久化存储路径
std::string wal_dir_;
// 当前日志的元数据
uint64_t last_log_term_;
uint64_t last_log_index_;
public:
RaftLog(const std::string& dir) : wal_dir_(dir), last_log_index_(0), last_log_term_(0) {}
// --- 核心功能 1:创建快照 ---
/**
* 创建快照的流程:
* 1. 暂停写入(或者加锁)。
* 2. 序列化当前状态机数据。
* 3. 写入磁盘。
* 4. 更新元数据。
* 5. 截断日志。
*/
void CreateSnapshot(const std::string& snapshot_name) {
std::lock_guard<std::mutex> lock(mutex_);
uint64_t snapshot_index = last_log_index_;
uint64_t snapshot_term = last_log_term_;
// 1. 准备元数据
SnapshotMeta meta;
meta.last_included_index = snapshot_index;
meta.last_included_term = snapshot_term;
meta.snapshot_file_path = wal_dir_ + "/" + snapshot_name + ".meta";
// 2. 模拟序列化 State Machine 数据 (这里简化为写入一个文件)
// 实际场景中,这里会把 RocksDB 的 SST 文件打包,或者把 MemTable Dump 到磁盘
std::ofstream meta_file(meta.snapshot_file_path, std::ios::binary);
meta_file.write(reinterpret_cast<const char*>(&meta), sizeof(SnapshotMeta));
// ... 写入实际数据 ...
meta_file.close();
std::cout << "[RaftLog] Snapshot created at Index: " << snapshot_index
<< ", Term: " << snapshot_term << std::endl;
// 3. 截断日志
// 逻辑上:保留 entries_ 中 index < snapshot_index 的部分。
// 实际上,由于 vector 删除中间元素很慢,我们通常通过调整 last_log_index_ 来标记,
// 或者重新分配内存。这里为了演示简单,我们直接清空。
entries_.clear();
// 更新元数据指针
last_log_index_ = snapshot_index;
last_log_term_ = snapshot_term;
}
// --- 核心功能 2:应用快照 (Follower 收到 Snapshot 时调用) ---
/**
* 当 Follower 从 Leader 收到 Snapshot RPC 时,它需要:
* 1. 加载快照数据。
* 2. 覆盖当前的 State Machine。
* 3. 删除所有 index <= snapshot_index 的日志。
*/
void ApplySnapshot(const SnapshotMeta& meta) {
std::lock_guard<std::mutex> lock(mutex_);
std::cout << "[RaftLog] Applying Snapshot: Index " << meta.last_included_index
<< ", Term " << meta.last_included_term << std::endl;
// 1. 恢复 State Machine (这里只是模拟)
// 实际操作:从 meta.snapshot_file_path 加载数据到 RocksDB/MemTable
std::ifstream meta_file(meta.snapshot_file_path, std::ios::binary);
SnapshotMeta loaded_meta;
meta_file.read(reinterpret_cast<char*>(&loaded_meta), sizeof(SnapshotMeta));
meta_file.close();
// 2. 截断日志
// 这里的逻辑是:既然快照已经包含了 Index <= snapshot_index 的所有数据,
// 那么我们之前的日志就是垃圾。
entries_.clear();
// 更新本地元数据
last_log_index_ = loaded_meta.last_included_index;
last_log_term_ = loaded_meta.last_included_term;
std::cout << "[RaftLog] Log truncated to Index " << last_log_index_ << std::endl;
}
// --- 核心功能 3:追加日志 (AppendEntries) ---
void AppendEntry(const LogEntry& entry) {
std::lock_guard<std::mutex> lock(mutex_);
// 简单的检查:如果追加的 index 不连续,说明有数据丢失,需要拒绝
if (entries_.empty()) {
if (entry.index != 1) {
// 容错处理,或者返回错误
return;
}
} else {
if (entry.index != entries_.back().index + 1) {
// 如果不是连续的,说明发生了 Snapshot 或者 Log Compaction
// 这种情况下,Follower 需要请求 Snapshot
return;
}
}
entries_.push_back(entry);
last_log_index_ = entry.index;
last_log_term_ = entry.term;
}
};
4. 代码深挖:那些“坑坑洼洼”的地方
上面的代码是理想状态。在真实的 C++ 存储引擎(比如 TiKV, Etcd)中,事情要复杂得多。
4.1 内存映射文件
上面的代码用 std::ofstream 写文件。这很慢,而且容易阻塞线程。在 C++ 高性能场景下,我们会用 Memory-Mapped Files (mmap)。
为什么?因为文件 IO 和 内存 IO 在 CPU 看来,有时候没什么区别。
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
// 使用 mmap 写入 Snapshot
void WriteSnapshotWithMmap(const std::string& path, const SnapshotMeta& meta) {
int fd = open(path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0644);
if (fd == -1) {
// Handle error
}
// 调整文件大小,比如 1MB
size_t file_size = 1024 * 1024;
ftruncate(fd, file_size);
// 映射到内存
void* mapped = mmap(nullptr, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
// 直接像操作数组一样操作内存
memcpy(mapped, &meta, sizeof(SnapshotMeta));
// ... 写入数据 ...
// 重要:必须 msync,确保数据落盘
msync(mapped, file_size, MS_SYNC);
munmap(mapped, file_size);
close(fd);
}
专家提示: 使用 mmap 时,一定要小心段错误。如果进程崩溃,文件内容可能会被部分写入,导致数据不一致。
4.2 并发陷阱:Snapshot 期间的写入
这是最难搞的部分。假设 Leader 正在创建快照,此时它收到了一个新的客户端写入请求。它应该先写 WAL,还是先创建快照?
如果在快照创建过程中,Leader 挂了,那么:
- 如果它已经截断了 WAL,但没有保存快照元数据,它的状态机就处于“中间态”。
- 如果它没截断 WAL,那么重启后,它的日志和状态机数据就不一致了(日志里没有,但状态机里有)。
解决方案:
通常有两种流派:
- 阻塞写入: 创建快照期间,拒绝新写入。这简单,但会降低吞吐量。
- 异步快照 + Pre-Vote(预投票): 这更高级。Leader 会在后台线程创建快照。当快照完成时,它会更新一个
snapshot_meta_变量。下次收到 AppendEntries 时,它会带上这个元数据。Follower 如果发现 Leader 的日志不连续,就会请求快照。
C++ 代码示例:异步快照控制
class RaftLog {
private:
std::atomic<bool> is_snapshotting_{false};
SnapshotMeta pending_snapshot_;
public:
// 在后台线程调用
void StartSnapshot() {
// 设置标志位,防止新日志写入
is_snapshotting_ = true;
// ... 执行快照逻辑 ...
// 快照完成
SnapshotMeta new_meta;
new_meta.last_included_index = last_log_index_;
new_meta.last_included_term = last_log_term_;
// 原子更新元数据,通知上层逻辑可以截断日志了
pending_snapshot_ = new_meta;
is_snapshotting_ = false;
}
// 检查是否需要截断日志
bool NeedsSnapshot() const {
return is_snapshotting_;
}
SnapshotMeta GetSnapshotMeta() const {
return pending_snapshot_;
}
};
5. 恢复:从灰烬中重生
当节点重启时,会发生什么?
- 加载 Snapshot: 检查磁盘上是否有最新的 Snapshot 文件。如果有,先加载它,恢复 State Machine。
- 截断日志: 调用
Truncate,把所有 Index <= Snapshot Index 的日志条目从内存中移除。 - 重放 WAL: 打开 WAL 文件,从 Snapshot Index + 1 开始,读取并重放剩余的日志条目。
这就像是游戏存档。你加载了存档,然后继续玩。如果你在加载存档后还保留着之前没玩完的旧关卡数据,那就不对了。
C++ 恢复流程代码:
void RecoverFromSnapshotAndWAL(RaftLog* log, const std::string& wal_path) {
// 1. 尝试加载 Snapshot
SnapshotMeta latest_meta = LoadLatestSnapshot(wal_path + "/snapshots");
if (latest_meta.last_included_index > 0) {
std::cout << "Recovering from Snapshot at Index " << latest_meta.last_included_index << std::endl;
log->ApplySnapshot(latest_meta);
}
// 2. 重放 WAL
std::ifstream wal(wal_path + "/wal.log", std::ios::binary);
LogEntry entry;
// 从哪里开始读?
// 如果有 Snapshot,就从 Snapshot Index + 1 开始读
// 如果没有 Snapshot,从 Index 1 开始读
uint64_t start_index = (latest_meta.last_included_index > 0) ? latest_meta.last_included_index + 1 : 1;
while (wal.read(reinterpret_cast<char*>(&entry), sizeof(LogEntry))) {
if (entry.index < start_index) continue; // 跳过旧数据
// 重新构建日志
// 注意:这里需要处理 term 和 index 的校验
// 实际代码中需要更复杂的逻辑来处理 WAL 的循环使用
log->AppendEntry(entry);
}
wal.close();
}
6. 与 RocksDB 的爱恨情仇
在真实的 C++ 存储引擎中,我们很少手写状态机,而是使用 RocksDB。这时候,Snapshot 的逻辑就变成了 RocksDB Snapshot 的管理。
RocksDB 本身就提供了 CreateSnapshot() API。它的原理是创建一个不可变的 MemTable 和不可变的 SST 文件。
集成技巧:
- 利用 RocksDB 的 Snapshot: 在创建 Raft Snapshot 之前,先调用
db_->CreateSnapshot()。这会冻结当前的数据状态。 - 文件搬运: 把 RocksDB 的 Snapshot 目录(或者 SST 文件)复制到你的 Raft Snapshot 目录下。
- 注意: RocksDB 的 Snapshot 是为了读一致性,而 Raft 的 Snapshot 是为了日志压缩。它们不能混用!千万不要试图用 RocksDB 的 Snapshot API 来做 Raft 的日志压缩!
错误示范:
// 错误!这会导致并发读写冲突
void BadSnapshotCreation(rocksdb::DB* db) {
auto rocks_snap = db->CreateSnapshot(); // 拿到了一个只读锁
// 此时如果有一个写线程来了,会阻塞在这里
SaveToDisk(rocks_snap);
// ...
}
正确做法:
// 正确!先暂停写入,或者使用 Write-in-Progress (WIP) 文件
void GoodSnapshotCreation(WriteBatch* batch, rocksdb::DB* db) {
// 1. 构建一个 WriteBatch,包含当前所有要持久化的数据
// 2. 将这个 WriteBatch 持久化到磁盘文件中
// 3. 应用这个 WriteBatch 到 RocksDB
// 4. 此时,磁盘上的文件就是“快照”了
// 5. RocksDB 里的数据已经更新了,旧的 SST 文件被标记为过期
}
7. 网络传输与 RPC
快照文件通常很大(几百 MB 到几 GB)。你不能像传日志条目那样,一条一条 send() 过去。
在 C++ 中,我们通常使用 mmap 配合 sendfile 系统调用。
sendfile 的魔法:
sendfile 是 Linux 专有(但非常通用)的系统调用。它可以在内核态直接把一个文件描述符(源)的数据拷贝到另一个文件描述符(目的),不需要经过用户态缓冲区。这极大地减少了 CPU 拷贝的开销。
#include <sys/sendfile.h>
// 将快照文件发送到网络 Socket
int SendSnapshot(int socket_fd, const std::string& snapshot_path) {
int source_fd = open(snapshot_path.c_str(), O_RDONLY);
if (source_fd == -1) return -1;
struct stat stat;
fstat(source_fd, &stat);
off_t offset = 0;
// 核心魔法:sendfile
// 返回值是实际发送的字节数
int sent_bytes = sendfile(socket_fd, source_fd, &offset, stat.st_size);
close(source_fd);
return sent_bytes;
}
这比 read() -> memcpy() -> write() 快得多。这就像是直接把水管接到水龙头上,而不是用水桶一桶一桶地提。
8. 常见 Bug 与调试技巧
在 C++ 里写 Raft Snapshot,你肯定会遇到 Bug。以下是一些经典场景:
Bug 1: “快照间隙”
假设 Leader 创建了 Index 100 的快照,Follower 收到了快照。但是,Leader 在发送快照的过程中挂了,或者网络断了。
Follower 下载了快照,截断了日志到 Index 100。此时,Leader 重启,重启后它的日志是 Index 1 到 101。
Leader 恢复后,发现自己和 Follower 不一致:Leader 有 101,Follower 只有 100。
Leader 会认为 Follower 落后,然后发送 Index 101 的日志给 Follower。Follower 发现自己没有 101,于是拒绝。集群陷入“脑裂”或选举僵局。
解决方法: Leader 在重启时,必须检查自己的 last_log_index 和磁盘上最新的 Snapshot last_included_index。如果 last_log_index < snapshot_index,说明数据不一致,必须强制应用 Snapshot 并截断日志。
Bug 2: 磁盘空间耗尽
快照文件写入一半,磁盘满了。此时 Raft 节点崩溃。
重启时,它发现磁盘上有一个“损坏的”快照文件(只有一半数据)。
它会尝试加载这个快照,结果内存里全是垃圾数据。
解决方法:
- 先写后改名: 先写入临时文件,文件写完且校验通过后,再重命名为正式文件名。
- 校验和: 在 Snapshot 文件头写入 CRC32 校验和,加载前先验证。
struct SnapshotHeader {
uint32_t magic_number; // "SNAP"
uint32_t crc32; // Header 的校验和
uint64_t size; // 文件总大小
// ...
};
bool LoadSnapshotSafe(const std::string& path) {
// 1. 检查文件大小是否合法
if (GetFileSize(path) < sizeof(SnapshotHeader)) return false;
// 2. 读取 Header
SnapshotHeader header;
ReadHeader(path, &header);
// 3. 校验 CRC
if (CalculateCRC32(path) != header.crc32) return false;
// 4. 加载
return true;
}
9. 性能优化:不仅仅是“删日志”
优化 Snapshot 不只是让日志变短,它还影响整个系统的性能。
9.1 并行压缩
如果 State Machine 是 RocksDB,我们可以开启 RocksDB 的后台压缩线程。在 Raft 创建快照时,我们可以利用这些线程来合并 SST 文件。
9.2 增量快照
这属于高级话题。每次只压缩最近变化的数据,而不是全量压缩。这需要更复杂的元数据管理。对于大多数 C++ 引擎来说,全量快照已经够用了。
10. 总结与进阶
好了,同学们。我们今天从 Raft 的痛点出发,深入到了 C++ 的内存管理、文件 IO、并发控制以及系统调用的底层细节。
记住几个核心点:
- Snapshot 是 Log State 的压缩包,不是 Data Snapshot。
- 截断日志是危险的,必须先保存元数据。
- C++ 中使用
sendfile和mmap来处理大文件传输。 - 线程安全是第一要务,Snapshot 期间要处理好并发写入。
下一步进阶建议:
去读读 TiKV 的源码,看看他们是怎么处理 Snapshot 的(SnapManager)。去看看 etcd 的 Snapshot 实现。你会发现,即使是同样的 Raft 协议,不同的大佬写出来的 C++ 代码,其优雅程度和性能差异简直是天壤之别。
记住,代码写得好不好,不在于你用了多少花哨的模板元编程,而在于你能不能优雅地处理那些“边缘情况”和“并发冲突”。
好了,下课!别忘了把你的 WAL 日志清理干净,不然你的硬盘要爆炸了!