欢迎来到本次关于分布式系统核心主题的讲座。今天,我们将深入探讨 Raft 共识算法中日志的物理存储格式,这本质上是一个分布式版本的 Write-Ahead Logging (WAL) 机制。我们将剖析 Raft 如何将抽象的日志概念转化为磁盘上持久化、高效且容错的数据结构,这对于理解任何基于 Raft 的分布式数据库、消息队列或协调服务至关重要。
Raft 日志:分布式 WAL 的基石
在分布式系统中,持久化和可靠性是核心挑战。传统的单机数据库通过 Write-Ahead Logging (WAL) 机制来确保事务的原子性、持久性和崩溃恢复能力。WAL 的核心思想是:在对数据进行任何修改之前,先将这些修改记录到持久化的日志中。如果系统崩溃,可以通过重放日志来恢复到一致状态。
Raft 共识算法将这一思想推广到了分布式环境。Raft 的核心就是其“复制状态机”模型,其中每个服务器都维护一个包含一系列命令的日志。这些命令以相同的顺序应用到状态机,从而确保所有服务器上的状态机最终达到一致。Raft 日志的物理存储,正是这种分布式 WAL 机制的实现。它不仅要满足单机 WAL 的持久性、高效性要求,还要额外考虑分布式系统中的一致性、可用性、成员变更以及快照等复杂性。
Raft 日志的目标包括:
- 持久性 (Durability):即使服务器崩溃,已提交的日志条目也不会丢失。
- 顺序性 (Ordering):日志条目必须严格按照它们被领导者接收和复制的顺序写入。
- 一致性 (Consistency):所有已提交的日志条目在所有已提交的副本上必须是相同的。
- 高效性 (Efficiency):日志写入操作应尽可能快,通常通过顺序 I/O 实现。
- 可恢复性 (Recoverability):系统崩溃后,能够从持久化日志中快速准确地恢复状态。
- 可管理性 (Manageability):能够有效地管理日志的增长,包括截断和快照。
Raft 日志的抽象结构
在深入物理存储之前,我们先回顾 Raft 日志的抽象结构。Raft 日志是一个严格追加、不可变的条目序列。每个日志条目包含以下关键信息:
- 索引 (Index):一个单调递增的整数,表示日志条目在整个日志序列中的位置。从 1 开始。
- 任期 (Term):一个单调递增的整数,表示该日志条目被创建时的领导者任期。任期是 Raft 中逻辑时钟和冲突解决的关键。
- 命令 (Command):需要复制并应用到状态机的具体操作。这通常是一个序列化的字节数组,包含了操作类型和操作参数。
例如,一个银行转账操作 Transfer(account_A, account_B, amount) 将被封装成一个命令,连同其索引和任期,一起作为日志条目存储。
Raft 的核心规则之一是“领导者追加日志”。所有客户端请求都通过领导者,领导者将请求封装为日志条目,追加到自己的日志中,然后并行发送给追随者进行复制。只有当多数追随者确认接收并写入日志后,领导者才会将该日志条目标记为“已提交 (Committed)”,并将其应用到本地状态机。
物理存储的目标与挑战
将 Raft 日志的抽象结构映射到磁盘上,需要解决一系列具体的挑战:
- 数据完整性:如何确保写入磁盘的数据是完整的、未损坏的,即使在写入过程中发生崩溃。
- 性能优化:磁盘 I/O 是昂贵的操作。如何最大限度地利用顺序写入的优势,减少随机 I/O 和
fsync的开销。 - 日志容量管理:Raft 日志会持续增长。如何避免单个文件过大,如何有效地删除旧的、已提交且已快照的日志条目。
- 快速查找:如何在恢复时或处理
AppendEntriesRPC 时,快速定位到特定的日志条目。 - 原子性保证:在写入一个日志条目时,如何确保要么完全写入,要么完全不写入。
为应对这些挑战,大多数 Raft 实现(如 etcd 的 WAL、RocksDB 的 WAL 机制)都采用了分段文件 (Segmented File) 的策略。
分段文件架构
将整个 Raft 日志存储在一个巨大的文件中是不明智的。它会导致文件系统管理效率低下,删除旧日志需要复杂的截断操作,并且单个文件过大也会增加崩溃恢复时的扫描时间。因此,Raft 日志通常被分解成一系列较小的、固定大小(或达到一定大小后滚动)的日志段 (Log Segments)。
每个日志段通常包含以下文件:
- 数据文件 (Data File):存储实际的日志条目内容。
- 索引文件 (Index File):可选但常用,用于存储日志索引到数据文件偏移量的映射,加速查找。
- 元数据文件 (Metadata File):存储日志段自身的元数据,以及 Raft 节点的一些核心状态(如
HardState)。
日志目录结构示例:
/data/raft/
├── wal/
│ ├── 00000000000000000000-00000000000000000000.log # 数据文件 (起始索引-起始任期.log)
│ ├── 00000000000000000000-00000000000000000000.idx # 索引文件 (可选)
│ ├── 00000000000000000001-00000000000000000002.log # 第二个数据文件
│ ├── 00000000000000000001-00000000000000000002.idx
│ └── ...
├── snapshot/
│ ├── 00000000000000000042-00000000000000000005.snap # 快照文件 (LastIncludedIndex-LastIncludedTerm.snap)
│ └── 00000000000000000042-00000000000000000005.meta # 快照元数据
└── raft_state.meta # Raft 节点全局状态 (HardState)
文件命名约定:
日志段文件通常以其包含的第一个日志条目的 (index, term) 来命名。例如,00000000000000000001-00000000000000000002.log 表示这个日志段从索引 1、任期 2 的日志条目开始。使用固定长度的零填充数字可以确保文件按字典序排序,方便按时间或索引顺序遍历。
当当前日志段达到预设大小(例如 64MB 或 128MB)时,系统会“滚动”到一个新的日志段。这意味着当前段被关闭,并创建一个新的文件来继续追加日志。
详细文件格式解析
现在,我们深入到每个文件的具体格式。
1. 日志数据文件 (.log)
日志数据文件是核心,它以追加的方式存储所有 Raft 日志条目。为了确保数据完整性和高效读取,每个日志条目在文件中存储时都会包含一些额外的元数据。
单个日志条目的存储格式:
一个典型的日志条目在磁盘上的布局可能如下:
+------------------+------------------+------------------+------------------+------------------+------------------+
| Entry Length (4B)| Checksum (4B/8B) | Term (8B) | Index (8B) | Type (1B) | Command Data (VB)|
+------------------+------------------+------------------+------------------+------------------+------------------+
- Entry Length (4字节或8字节):这是一个变长编码 (Varint) 或固定长度的整数,表示整个日志条目(从
Checksum到Command Data结束)的字节数。这个长度前缀至关重要,它允许系统在不知道条目内容的情况下,一次性读取一个完整的条目,或者跳过损坏的条目。 - Checksum (4字节或8字节):通常是 CRC32 或 CRC64 校验和,用于验证从磁盘读取的日志条目是否完整和未被损坏。这能有效检测磁盘错误或部分写入。
- Term (8字节):日志条目的任期号。通常是一个
uint64。 - Index (8字节):日志条目的索引号。通常是一个
uint64。 - Type (1字节):日志条目的类型。Raft 协议定义了不同类型的日志条目,例如:
EntryNormal:普通的客户端命令。EntryConfChange:集群配置变更命令(如添加/删除节点)。EntryNoOp:空操作,有时用于推进commitIndex或验证领导者身份。
- Command Data (变长字节):实际的客户端命令数据,通常是一个序列化的消息(例如 Protobuf、JSON 或自定义二进制格式)。其长度由
Entry Length字段决定。
Go 语言风格的结构体和序列化/反序列化示例:
package wal
import (
"encoding/binary"
"hash/crc32"
"io"
)
// LogEntryType defines the type of a Raft log entry.
type LogEntryType byte
const (
EntryNormal LogEntryType = iota
EntryConfChange
EntryNoOp
)
// RaftLogEntry represents a single Raft log entry.
type RaftLogEntry struct {
Term uint64
Index uint64
Type LogEntryType
Command []byte // The actual command data
}
// MarshalBinary serializes a RaftLogEntry into a byte slice for disk storage.
func (e *RaftLogEntry) MarshalBinary() ([]byte, error) {
// Estimate buffer size: Term(8) + Index(8) + Type(1) + CommandLength(Varint) + CommandData
// Let's use a simple fixed-size buffer for this example, or grow dynamically.
// In a real implementation, you'd use a bytes.Buffer or protobuf.
// For simplicity, let's just combine the fields directly here.
// A real implementation would use a more robust serialization library.
buf := make([]byte, 8+8+1+len(e.Command)) // Term + Index + Type + Command
binary.BigEndian.PutUint64(buf[0:8], e.Term)
binary.BigEndian.PutUint64(buf[8:16], e.Index)
buf[16] = byte(e.Type)
copy(buf[17:], e.Command)
return buf, nil
}
// UnmarshalBinary deserializes a byte slice back into a RaftLogEntry.
func (e *RaftLogEntry) UnmarshalBinary(data []byte) error {
if len(data) < 17 { // Minimum size for Term, Index, Type
return io.ErrUnexpectedEOF
}
e.Term = binary.BigEndian.Uint64(data[0:8])
e.Index = binary.BigEndian.Uint64(data[8:16])
e.Type = LogEntryType(data[16])
e.Command = data[17:]
return nil
}
// WalSegmentWriter handles writing log entries to a WAL segment data file.
type WalSegmentWriter struct {
file *os.File
buf []byte // Internal buffer for write operations
}
// NewWalSegmentWriter creates a new writer for a WAL segment.
func NewWalSegmentWriter(filePath string) (*WalSegmentWriter, error) {
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
return &WalSegmentWriter{file: file, buf: make([]byte, 1024)}, nil // Example buffer size
}
// AppendEntry writes a RaftLogEntry to the WAL segment.
func (w *WalSegmentWriter) AppendEntry(entry *RaftLogEntry) error {
serializedEntry, err := entry.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal entry: %w", err)
}
// Calculate checksum
checksum := crc32.ChecksumIEEE(serializedEntry)
// Prepare data for writing: [length][checksum][entry_data]
entryDataSize := len(serializedEntry)
// Ensure buffer is large enough for length (4B) + checksum (4B) + entryData
requiredBufSize := 4 + 4 + entryDataSize
if cap(w.buf) < requiredBufSize {
w.buf = make([]byte, requiredBufSize)
}
binary.BigEndian.PutUint32(w.buf[0:4], uint32(entryDataSize)) // Length prefix
binary.BigEndian.PutUint32(w.buf[4:8], checksum) // Checksum
copy(w.buf[8:], serializedEntry) // Actual entry data
// Write to file
_, err = w.file.Write(w.buf[:8+entryDataSize]) // Write length + checksum + serialized data
if err != nil {
return fmt.Errorf("failed to write entry to WAL: %w", err)
}
return nil
}
// Sync ensures data is flushed to disk.
func (w *WalSegmentWriter) Sync() error {
return w.file.Sync()
}
// Close closes the underlying file.
func (w *WalSegmentWriter) Close() error {
return w.file.Close()
}
// WalSegmentReader handles reading log entries from a WAL segment data file.
type WalSegmentReader struct {
file *os.File
buf []byte
offset int64 // Current read offset in the file
}
// NewWalSegmentReader creates a new reader for a WAL segment.
func NewWalSegmentReader(filePath string) (*WalSegmentReader, error) {
file, err := os.OpenFile(filePath, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
return &WalSegmentReader{file: file, buf: make([]byte, 1024), offset: 0}, nil
}
// ReadEntry reads the next RaftLogEntry from the WAL segment.
func (r *WalSegmentReader) ReadEntry() (*RaftLogEntry, error) {
// Read length prefix (4 bytes)
_, err := r.file.ReadAt(r.buf[0:4], r.offset)
if err != nil {
if err == io.EOF {
return nil, err // No more entries
}
return nil, fmt.Errorf("failed to read entry length: %w", err)
}
r.offset += 4
entryDataSize := binary.BigEndian.Uint32(r.buf[0:4])
// Read checksum (4 bytes)
_, err = r.file.ReadAt(r.buf[0:4], r.offset)
if err != nil {
return nil, fmt.Errorf("failed to read entry checksum: %w", err)
}
r.offset += 4
expectedChecksum := binary.BigEndian.Uint32(r.buf[0:4])
// Read entry data
entryData := make([]byte, entryDataSize)
_, err = r.file.ReadAt(entryData, r.offset)
if err != nil {
return nil, fmt.Errorf("failed to read entry data: %w", err)
}
r.offset += int64(entryDataSize)
// Verify checksum
actualChecksum := crc32.ChecksumIEEE(entryData)
if actualChecksum != expectedChecksum {
return nil, fmt.Errorf("checksum mismatch: expected %d, got %d", expectedChecksum, actualChecksum)
}
// Unmarshal the entry
entry := &RaftLogEntry{}
if err := entry.UnmarshalBinary(entryData); err != nil {
return nil, fmt.Errorf("failed to unmarshal entry: %w", err)
}
return entry, nil
}
// Close closes the underlying file.
func (r *WalSegmentReader) Close() error {
return r.file.Close()
}
关于 fsync 的重要性:
仅仅将数据写入文件并不意味着它已持久化。操作系统通常会将数据缓存到内存中,然后异步写入磁盘。为了保证数据在断电后不丢失,必须调用 fsync()(或 fdatasync())系统调用,强制操作系统将文件的所有修改(包括数据和元数据)写入物理磁盘。在 Raft 中,每次领导者将日志条目发送给追随者之前,或者在提交一个批次的日志条目之前,通常会调用 fsync 来确保日志的持久性。然而,频繁的 fsync 会严重影响性能,因此通常会采用批处理 fsync 的策略。
2. 日志索引文件 (.idx)
日志索引文件是可选的,但对于需要快速随机访问日志条目的系统来说非常有用。它存储了 Raft 索引到数据文件中物理偏移量的映射。
单个索引条目的存储格式:
+------------------+------------------+------------------+
| Log Index (8B) | File Offset (8B) | Term (8B) |
+------------------+------------------+------------------+
- Log Index (8字节):日志条目的 Raft 逻辑索引。
- File Offset (8字节):该日志条目在对应的
.log数据文件中的起始偏移量。 - Term (8字节):该日志条目的任期号。在
AppendEntriesRPC 匹配日志时,需要同时匹配索引和任期,因此将任期存储在索引文件中可以避免额外读取数据文件,从而加速匹配过程。
索引文件通常也是追加写入的,每个写入的日志条目都会在索引文件中对应一个条目。由于索引条目是固定大小的,因此可以通过简单的数学计算快速定位到任何一个索引条目,例如:offset_in_idx_file = (log_index - first_log_index_in_segment) * entry_size_in_idx_file。
Go 语言风格的结构体和操作示例:
package wal
import (
"encoding/binary"
"fmt"
"os"
)
// IndexEntry represents an entry in the WAL segment index file.
type IndexEntry struct {
LogIndex uint64 // The Raft log index
FileOffset uint64 // The byte offset in the corresponding .log data file
Term uint64 // The term of the log entry
}
// IndexEntrySize is the fixed size of an IndexEntry in bytes.
const IndexEntrySize = 8 + 8 + 8 // LogIndex + FileOffset + Term
// MarshalBinary serializes an IndexEntry.
func (ie *IndexEntry) MarshalBinary() []byte {
buf := make([]byte, IndexEntrySize)
binary.BigEndian.PutUint64(buf[0:8], ie.LogIndex)
binary.BigEndian.PutUint64(buf[8:16], ie.FileOffset)
binary.BigEndian.PutUint64(buf[16:24], ie.Term)
return buf
}
// UnmarshalBinary deserializes an IndexEntry.
func (ie *IndexEntry) UnmarshalBinary(data []byte) error {
if len(data) < IndexEntrySize {
return io.ErrUnexpectedEOF
}
ie.LogIndex = binary.BigEndian.Uint64(data[0:8])
ie.FileOffset = binary.BigEndian.Uint64(data[8:16])
ie.Term = binary.BigEndian.Uint64(data[16:24])
return nil
}
// WalSegmentIndexWriter handles writing index entries to a WAL segment index file.
type WalSegmentIndexWriter struct {
file *os.File
}
// NewWalSegmentIndexWriter creates a new writer for a WAL segment index.
func NewWalSegmentIndexWriter(filePath string) (*WalSegmentIndexWriter, error) {
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
return &WalSegmentIndexWriter{file: file}, nil
}
// AppendIndexEntry writes an IndexEntry to the index file.
func (w *WalSegmentIndexWriter) AppendIndexEntry(entry *IndexEntry) error {
_, err := w.file.Write(entry.MarshalBinary())
if err != nil {
return fmt.Errorf("failed to write index entry: %w", err)
}
return nil
}
// Sync ensures data is flushed to disk.
func (w *WalSegmentIndexWriter) Sync() error {
return w.file.Sync()
}
// Close closes the underlying file.
func (w *WalSegmentIndexWriter) Close() error {
return w.file.Close()
}
// WalSegmentIndexReader handles reading index entries from a WAL segment index file.
type WalSegmentIndexReader struct {
file *os.File
firstLogIndex uint64 // The first log index contained in this segment
}
// NewWalSegmentIndexReader creates a new reader for a WAL segment index.
// firstLogIndex is needed to calculate file offsets from logical indices.
func NewWalSegmentIndexReader(filePath string, firstLogIndex uint64) (*WalSegmentIndexReader, error) {
file, err := os.OpenFile(filePath, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
return &WalSegmentIndexReader{file: file, firstLogIndex: firstLogIndex}, nil
}
// GetIndexEntry retrieves an IndexEntry for a given Raft log index.
func (r *WalSegmentIndexReader) GetIndexEntry(logIndex uint64) (*IndexEntry, error) {
if logIndex < r.firstLogIndex {
return nil, fmt.Errorf("log index %d is before first index %d of this segment", logIndex, r.firstLogIndex)
}
relativeIndex := logIndex - r.firstLogIndex
fileOffset := int64(relativeIndex * IndexEntrySize)
entryData := make([]byte, IndexEntrySize)
_, err := r.file.ReadAt(entryData, fileOffset)
if err != nil {
return nil, fmt.Errorf("failed to read index entry at offset %d: %w", fileOffset, err)
}
entry := &IndexEntry{}
if err := entry.UnmarshalBinary(entryData); err != nil {
return nil, fmt.Errorf("failed to unmarshal index entry: %w", err)
}
if entry.LogIndex != logIndex {
// This should ideally not happen if the index file is consistent
return nil, fmt.Errorf("index mismatch: requested %d, got %d", logIndex, entry.LogIndex)
}
return entry, nil
}
// Close closes the underlying file.
func (r *WalSegmentIndexReader) Close() error {
return r.file.Close()
}
3. Raft 状态和元数据文件 (raft_state.meta 或 .meta 文件)
除了日志条目本身,Raft 节点还需要持久化其自身的关键状态,以便在崩溃后恢复。这些状态通常存储在一个独立的元数据文件中。
Raft HardState:
Raft 协议明确要求持久化 HardState,它包含:
- Current Term (8B):当前节点已知最新任期。
- Voted For (8B):在当前任期内投票给的候选人 ID(如果已投票)。
HardState 必须在响应 RequestVote RPC 或 AppendEntries RPC 之前持久化。这意味着,在节点发送投票或追加日志的响应之前,current_term 和 voted_for 必须已经安全地写入磁盘。
元数据文件中的其他信息:
除了 HardState,元数据文件可能还会存储:
- Commit Index (8B):当前已提交的最高日志索引。尽管 Raft 协议本身允许在恢复时通过日志扫描来重新计算
commitIndex,但在实践中为了加速恢复,一些实现会将其持久化。然而,更严格的实现可能会选择不持久化commitIndex,因为它是一个动态值,可以通过多数副本的确认来推导,而非节点自身的状态。 - Log Segment 的起始信息:例如,当前活跃日志段的起始索引和任期。
- 集群配置 (Configuration):当前集群的成员列表。这通常通过日志条目(
EntryConfChange)来持久化和管理,但也可以在元数据文件中保留一个最新的快照配置副本。
Go 语言风格的结构体示例:
package wal
import (
"encoding/binary"
"fmt"
"os"
"io/ioutil" // For simpler file operations
)
// HardState represents the Raft persistent state.
type HardState struct {
CurrentTerm uint64
VotedFor uint64 // Peer ID
}
// MarshalBinary serializes HardState.
func (hs *HardState) MarshalBinary() ([]byte, error) {
buf := make([]byte, 8+8) // CurrentTerm + VotedFor
binary.BigEndian.PutUint64(buf[0:8], hs.CurrentTerm)
binary.BigEndian.PutUint64(buf[8:16], hs.VotedFor)
return buf, nil
}
// UnmarshalBinary deserializes HardState.
func (hs *HardState) UnmarshalBinary(data []byte) error {
if len(data) < 16 {
return io.ErrUnexpectedEOF
}
hs.CurrentTerm = binary.BigEndian.Uint64(data[0:8])
hs.VotedFor = binary.BigEndian.Uint64(data[8:16])
return nil
}
// RaftMetadata represents other persistent metadata for a Raft node.
type RaftMetadata struct {
HardState
CommitIndex uint64 // Optional: can be recomputed from log, but often persisted for speed
FirstLogIndex uint64 // The first valid log index in the WAL (after any snapshot)
FirstLogTerm uint64 // The term of the first valid log index
// Other fields like cluster configuration, etc.
}
// PersistRaftMetadata writes the RaftMetadata to a file.
func PersistRaftMetadata(filePath string, meta *RaftMetadata) error {
hsData, err := meta.HardState.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal hard state: %w", err)
}
// For simplicity, let's just write HardState for this example.
// A real RaftMetadata would combine all fields.
// Write to a temporary file first, then rename for atomic update.
tmpFilePath := filePath + ".tmp"
err = ioutil.WriteFile(tmpFilePath, hsData, 0644)
if err != nil {
return fmt.Errorf("failed to write hard state to temp file: %w", err)
}
// Sync the temp file to disk
tmpFile, err := os.OpenFile(tmpFilePath, os.O_WRONLY, 0644)
if err == nil {
err = tmpFile.Sync()
tmpFile.Close()
}
if err != nil {
return fmt.Errorf("failed to sync temp hard state file: %w", err)
}
err = os.Rename(tmpFilePath, filePath)
if err != nil {
return fmt.Errorf("failed to rename temp hard state file: %w", err)
}
// Ensure directory entry is also synced
dir := filepath.Dir(filePath)
dirFile, err := os.OpenFile(dir, os.O_RDONLY, 0)
if err == nil {
err = dirFile.Sync()
dirFile.Close()
}
if err != nil {
return fmt.Errorf("failed to sync directory after rename: %w", err)
}
return nil
}
// LoadRaftMetadata loads RaftMetadata from a file.
func LoadRaftMetadata(filePath string) (*RaftMetadata, error) {
data, err := ioutil.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
return &RaftMetadata{}, nil // Return empty state if file doesn't exist
}
return nil, fmt.Errorf("failed to read hard state file: %w", err)
}
hs := &HardState{}
if err := hs.UnmarshalBinary(data); err != nil {
return nil, fmt.Errorf("failed to unmarshal hard state: %w", err)
}
// In a real implementation, you'd unmarshal other fields too.
return &RaftMetadata{HardState: *hs}, nil
}
原子更新的重要性:
对于 raft_state.meta 这样的关键元数据文件,更新操作必须是原子的。这意味着要么旧文件内容不变,要么新文件内容完全写入。常用的原子更新策略是“写入临时文件 -> fsync 临时文件 -> 重命名临时文件”。重命名操作在大多数文件系统上是原子的。同时,为了确保目录条目的持久化,也需要 fsync 父目录。
4. 快照文件 (.snap)
Raft 日志会无限增长,这会消耗大量磁盘空间并延长启动时的恢复时间。因此,Raft 引入了快照机制。快照是某个时间点状态机的完整副本,它包含了所有已提交到该时间点的日志条目所生成的状态。
快照文件的作用:
- 日志截断:一旦生成并保存了快照,所有早于快照包含的最后一个日志条目的日志段都可以安全删除。
- 快速恢复:新加入的节点或崩溃后恢复的节点可以直接从领导者那里接收快照,而无需回放整个日志历史。
- 避免无限增长:防止日志占用过多磁盘空间。
快照文件的存储格式:
一个快照通常由两个文件组成:
- 快照数据文件 (
.snap):这是一个压缩的二进制文件,包含特定索引和任期下状态机的完整数据。其内部格式取决于应用状态机本身。例如,如果状态机是一个键值存储,快照可能就是整个数据库的 SStable 或一个压缩的目录存档。 - 快照元数据文件 (
.meta):存储快照的元数据,对于 Raft 协议至关重要。
快照元数据 (.meta) 包含:
+------------------+------------------+------------------+------------------+
| LastIncludedIndex(8B)| LastIncludedTerm (8B)| ClusterConfig (VB) | Checksum (4B/8B)|
+------------------+------------------+------------------+------------------+
- LastIncludedIndex (8B):快照中包含的最后一个日志条目的索引。
- LastIncludedTerm (8B):快照中包含的最后一个日志条目的任期。
- ClusterConfig (变长字节):当快照被创建时,集群的配置信息(包括所有成员的 ID 和地址)。这对于新节点加入或旧节点恢复时,能够正确地加入集群至关重要。这通常是一个序列化的 Protobuf 消息。
- Checksum (4B/8B):快照数据文件的校验和,用于验证快照数据的完整性。
Go 语言风格的结构体示例:
package wal
import (
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
"hash/crc32"
)
// SnapshotMetadata holds the metadata for a Raft snapshot.
type SnapshotMetadata struct {
LastIncludedIndex uint64
LastIncludedTerm uint64
ClusterConfig []byte // Serialized Raft Configuration (e.g., protobuf)
Checksum uint32 // CRC32 of the actual snapshot data file
}
// MarshalBinary serializes SnapshotMetadata.
func (sm *SnapshotMetadata) MarshalBinary() ([]byte, error) {
// For simplicity, let's manually serialize. A real implementation might use protobuf.
buf := make([]byte, 8+8+len(sm.ClusterConfig)+4) // Index + Term + ConfigLen + Config + Checksum
binary.BigEndian.PutUint64(buf[0:8], sm.LastIncludedIndex)
binary.BigEndian.PutUint64(buf[8:16], sm.LastIncludedTerm)
// Store config length first, then config data
configLen := uint32(len(sm.ClusterConfig))
binary.BigEndian.PutUint32(buf[16:20], configLen)
copy(buf[20:20+configLen], sm.ClusterConfig)
binary.BigEndian.PutUint32(buf[20+configLen:20+configLen+4], sm.Checksum)
return buf, nil
}
// UnmarshalBinary deserializes SnapshotMetadata.
func (sm *SnapshotMetadata) UnmarshalBinary(data []byte) error {
if len(data) < 24 { // Min size: Index + Term + ConfigLen + Checksum (assuming ConfigLen is 0)
return io.ErrUnexpectedEOF
}
sm.LastIncludedIndex = binary.BigEndian.Uint64(data[0:8])
sm.LastIncludedTerm = binary.BigEndian.Uint64(data[8:16])
configLen := binary.BigEndian.Uint32(data[16:20])
if len(data) < int(20+configLen+4) {
return io.ErrUnexpectedEOF
}
sm.ClusterConfig = make([]byte, configLen)
copy(sm.ClusterConfig, data[20:20+configLen])
sm.Checksum = binary.BigEndian.Uint32(data[20+configLen:20+configLen+4])
return nil
}
// SaveSnapshot writes the snapshot metadata and data.
func SaveSnapshot(dir string, metadata *SnapshotMetadata, snapData io.Reader) error {
snapName := fmt.Sprintf("%020d-%020d", metadata.LastIncludedIndex, metadata.LastIncludedTerm)
snapFilePath := filepath.Join(dir, snapName+".snap")
metaFilePath := filepath.Join(dir, snapName+".meta")
// 1. Write snapshot data to a temporary file
tmpSnapFilePath := snapFilePath + ".tmp"
tmpSnapFile, err := os.OpenFile(tmpSnapFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create temp snapshot file: %w", err)
}
defer tmpSnapFile.Close()
hasher := crc32.NewIEEE()
teeReader := io.TeeReader(snapData, hasher) // Calculate checksum while writing
_, err = io.Copy(tmpSnapFile, teeReader)
if err != nil {
return fmt.Errorf("failed to write snapshot data: %w", err)
}
// Ensure snapshot data is flushed
err = tmpSnapFile.Sync()
if err != nil {
return fmt.Errorf("failed to sync temp snapshot data file: %w", err)
}
metadata.Checksum = hasher.Sum32() // Set the calculated checksum
// 2. Write snapshot metadata to a temporary file
metaData, err := metadata.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal snapshot metadata: %w", err)
}
tmpMetaFilePath := metaFilePath + ".tmp"
err = ioutil.WriteFile(tmpMetaFilePath, metaData, 0644)
if err != nil {
return fmt.Errorf("failed to write temp snapshot metadata file: %w", err)
}
// Ensure metadata is flushed
tmpMetaFile, err := os.OpenFile(tmpMetaFilePath, os.O_WRONLY, 0644)
if err == nil {
err = tmpMetaFile.Sync()
tmpMetaFile.Close()
}
if err != nil {
return fmt.Errorf("failed to sync temp snapshot metadata file: %w", err)
}
// 3. Atomically rename temp files
err = os.Rename(tmpSnapFilePath, snapFilePath)
if err != nil {
return fmt.Errorf("failed to rename temp snapshot data file: %w", err)
}
err = os.Rename(tmpMetaFilePath, metaFilePath)
if err != nil {
return fmt.Errorf("failed to rename temp snapshot metadata file: %w", err)
}
// 4. Sync directory to ensure new file entries are persistent
dirFile, err := os.OpenFile(dir, os.O_RDONLY, 0)
if err == nil {
err = dirFile.Sync()
dirFile.Close()
}
if err != nil {
return fmt.Errorf("failed to sync snapshot directory: %w", err)
}
return nil
}
// LoadLatestSnapshot loads the latest snapshot from the given directory.
func LoadLatestSnapshot(dir string) (*SnapshotMetadata, *os.File, error) {
// Scan directory for .meta files, identify the one with highest index/term
// (Implementation details omitted for brevity)
// Assume we found the latest meta file and its corresponding snap file.
// For example, let's assume we know the latest snapshot is named "latest-snap.meta"
metaFilePath := filepath.Join(dir, "latest-snap.meta") // Simplified
metaData, err := ioutil.ReadFile(metaFilePath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil, nil // No snapshot found
}
return nil, nil, fmt.Errorf("failed to read snapshot metadata: %w", err)
}
metadata := &SnapshotMetadata{}
if err := metadata.UnmarshalBinary(metaData); err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal snapshot metadata: %w", err)
}
snapFilePath := filepath.Join(dir, fmt.Sprintf("%020d-%020d.snap", metadata.LastIncludedIndex, metadata.LastIncludedTerm))
snapFile, err := os.OpenFile(snapFilePath, os.O_RDONLY, 0644)
if err != nil {
return nil, nil, fmt.Errorf("failed to open snapshot data file: %w", err)
}
// Verify checksum of snapshot data
hasher := crc32.NewIEEE()
_, err = io.Copy(hasher, snapFile)
if err != nil {
snapFile.Close()
return nil, nil, fmt.Errorf("failed to read snapshot data for checksum verification: %w", err)
}
if hasher.Sum32() != metadata.Checksum {
snapFile.Close()
return nil, nil, fmt.Errorf("snapshot data checksum mismatch")
}
// Reset file pointer to beginning for actual data consumption
_, err = snapFile.Seek(0, io.SeekStart)
if err != nil {
snapFile.Close()
return nil, nil, fmt.Errorf("failed to seek snapshot file to start: %w", err)
}
return metadata, snapFile, nil
}
崩溃恢复机制
当 Raft 节点启动时,它必须从持久化存储中恢复其状态。这个过程通常遵循以下步骤:
- 加载
HardState:首先从raft_state.meta文件加载CurrentTerm和VotedFor。 - 加载最新快照:检查
snapshot/目录,找到最新的快照文件 (.snap和.meta)。如果存在快照,加载其元数据,并应用快照数据到状态机。快照的LastIncludedIndex和LastIncludedTerm将成为日志的起始点。 - 扫描日志段:遍历
wal/目录中的所有日志段文件,从快照的LastIncludedIndex之后(或从索引 1 开始,如果没有快照)的第一个日志段开始。- 对于每个日志段,按顺序读取其中的日志条目。
- 使用校验和验证每个条目的完整性。如果遇到损坏的条目,该日志段及之后的所有内容都将被视为无效并丢弃(截断)。
- 重建内存中的 Raft 日志结构 (例如,一个
[]RaftLogEntry或一个跳表/B树)。 - 更新
last_log_index和last_log_term。
- 初始化 Raft 状态机:根据恢复的日志和快照数据,构建 Raft 状态机,使其准备好参与共识。
处理部分写入和损坏:
- 日志数据文件:由于每个日志条目都带有长度前缀和校验和,读取器可以在遇到损坏时跳过无效条目,或直接截断到最后一个有效条目。
- 元数据文件和快照文件:使用“写入临时文件 ->
fsync-> 重命名”的原子更新策略,确保这些文件的内容要么是旧的有效版本,要么是新的完整版本。
性能优化
虽然持久化是首要任务,但性能在分布式系统中同样关键。
- 顺序 I/O:分段文件架构和追加写入模式天然支持顺序 I/O,这是磁盘操作中最快的模式。
- 批处理
fsync:频繁调用fsync是性能瓶颈。在实际系统中,Raft 领导者通常会将多个日志条目写入内存缓冲区,然后一次性将整个批次写入磁盘,并执行一次fsync。这在保证持久性的同时,显著减少了fsync的次数。 - 内存映射文件 (mmap):对于读取操作,尤其是在需要随机访问索引文件时,使用
mmap可以将文件内容直接映射到进程的虚拟地址空间,从而避免了read()系统调用的开销,并利用操作系统的页缓存进行高效管理。 - 预分配文件空间:在创建新的日志段文件时,可以预先分配好文件所需的全部空间(例如
fallocate或ftruncate)。这可以减少文件系统碎片的产生,并确保后续写入操作不会因为文件扩展而阻塞。 - Direct I/O (O_DIRECT):某些高性能场景下,可能会绕过操作系统的页缓存,直接将数据写入磁盘。这可以避免双重缓存(应用层缓存和页缓存),但会增加应用层的复杂性,需要自行管理缓存和对齐。对于 Raft WAL 来说,通常默认的带页缓存的
fsync策略已足够。
日志管理与截断
Raft 日志的无限增长是一个问题。快照机制是解决这个问题的核心:
- 创建快照:当日志达到一定大小或时间间隔后,Raft 节点会将当前状态机的完整状态写入一个快照文件。这个快照包含了所有已提交到某个索引
LastIncludedIndex的数据。 - 删除旧日志段:一旦快照成功创建并持久化,并且
LastIncludedIndex之前的日志条目不再被需要(例如,所有节点都已接收到这个快照或已经有了更高索引的日志),那么所有包含LastIncludedIndex之前日志的旧日志段文件都可以安全地删除。 - 清理快照:通常只保留最新的几个快照,旧的快照也会被删除。
总结
Raft 日志的物理存储是分布式系统持久化和一致性的基石。通过采用分段文件架构,结合数据文件、索引文件和元数据文件,Raft 实现了高效的顺序写入、可靠的崩溃恢复以及灵活的日志管理。校验和、fsync 和原子文件操作是确保数据完整性的关键,而快照机制则有效地解决了日志无限增长的问题。理解这些底层存储细节,对于设计和实现高性能、高可用的分布式系统至关重要。