深入 Raft 协议:Leader 选举、日志复制与安全性是如何通过任期(Term)强制对齐的?
分布式系统中的共识(Consensus)是构建高可用、容错服务的基础。Raft 协议,作为 Paxos 协议的一种更易于理解的替代方案,通过其清晰的模块化设计和强一致性保证,受到了广泛的关注。Raft 将共识问题分解为三个子问题:Leader 选举、日志复制和安全性(Safety),并巧妙地引入了“任期”(Term)这一核心概念,如同一个系统级的逻辑时钟,强制性地对齐所有节点的状态,从而保障了整个协议的正确性。
今天,我们将深入探讨 Raft 协议的内部机制,重点剖析任期(Term)如何在 Leader 选举、日志复制以及最终的安全性保障中扮演其不可或缺的角色。我们将通过 Go 语言风格的伪代码,一步步揭示这些机制的实现细节。
1. 任期(Term):Raft 协议的逻辑时钟
在 Raft 协议中,任期(Term)是一个单调递增的整数。它代表了时间的一个逻辑周期,Raft 集群中的每个节点都维护一个当前任期号。每次发生 Leader 选举时,新的任期就会开始,并且任期号会递增。任期是 Raft 协议中一切决策的基础,它像一个权威的时间戳,用于识别过时的信息,确保节点状态的一致性。
任期的核心作用:
- 识别过时信息: 任何带有旧任期号的 RPC 请求或响应都会被立即拒绝。这就像一个过期证件,无法用于执行当前的操作。
- 强制状态更新: 如果一个节点发现其他节点拥有更高的任期号,它会立即更新自己的任期号,并转变为 Follower 状态。这确保了集群中所有节点最终都会收敛到最新的逻辑时间周期。
- 确保“后来者居上”: 拥有更高任期号的 Leader 或 Candidate 总是被认为是更权威的。这是 Raft 协议中解决冲突和推进系统进展的基本原则。
为了更好地理解任期的作用,我们首先来看 Raft 节点的基本状态和数据结构。一个 Raft 节点在任何给定时间都处于以下三种状态之一:
| 状态 | 描述 |
|---|---|
Follower |
被动状态,响应来自 Leader 或 Candidate 的 RPC 请求。 |
Candidate |
竞选 Leader 的状态,向其他节点请求投票。 |
Leader |
活跃状态,处理客户端请求,管理日志复制和发送心跳。 |
import (
"math/rand"
"sync"
"time"
)
// ClientEnd 模拟与其他节点的网络连接
type ClientEnd struct {
// 实际生产环境中会包含网络连接和 RPC 客户端
// 这里仅为演示目的,不实现具体网络通信
}
func (ce *ClientEnd) Call(svcMethod string, args interface{}, reply interface{}) bool {
// 模拟网络延迟和失败
time.Sleep(time.Duration(rand.Intn(10)+1) * time.Millisecond)
if rand.Intn(100) < 5 { // 5% 概率模拟 RPC 失败
return false
}
// 实际中会进行 RPC 调用,这里直接返回 true 模拟成功
return true
}
// State 定义了 Raft 节点可能处于的状态
type State int
const (
Follower State = iota // 跟随者
Candidate // 候选者
Leader // 领导者
)
// LogEntry 代表一条日志记录
type LogEntry struct {
Term int // 记录该日志条目被 Leader 接收时的任期
Command []byte // 要执行的命令
}
// RaftNode 代表一个 Raft 节点实例
type RaftNode struct {
mu sync.Mutex // 互斥锁,保护节点状态
peers []*ClientEnd // 与其他节点的网络连接
me int // 当前节点的 ID
dead int32 // 节点是否已停止运行
currentTerm int // 节点已知最新的任期号(启动时初始化为0,单调递增)
votedFor int // 当前任期内投票给的 Candidate ID(如果没有则为-1)
log []LogEntry // 日志条目;第一个条目索引为1(索引0是一个虚拟占位符)
state State // 当前节点的状态 (Follower, Candidate, Leader)
commitIndex int // 已知已提交的最高日志条目的索引(初始化为0)
lastApplied int // 已应用到状态机的最高日志条目的索引(初始化为0)
// Leader 独有的状态(用于日志复制)
nextIndex []int // 对于每个 Follower,下一个要发送给它的日志条目的索引(初始化为 Leader last log index + 1)
matchIndex []int // 对于每个 Follower,已知它已复制的最高日志条目的索引(初始化为0)
// 计时器相关
electionTimeout time.Duration
heartbeatInterval time.Duration
lastHeartbeatTime time.Time
lastElectionResetTime time.Time // 上次重置选举计时器的时间点
// 实际应用中,这里还需要一个 channel 来通知应用层日志已提交
// applyCh chan ApplyMsg
}
// MakeRaftNode 初始化一个 Raft 节点
func MakeRaftNode(peers []*ClientEnd, me int) *RaftNode {
rf := &RaftNode{}
rf.peers = peers
rf.me = me
rf.currentTerm = 0 // 初始任期为0
rf.votedFor = -1 // 初始未投票
// Raft 协议的日志索引从1开始。索引0处是一个虚拟的、空的日志条目,简化边界条件处理。
rf.log = make([]LogEntry, 1)
rf.state = Follower
rf.commitIndex = 0
rf.lastApplied = 0
// 随机化选举超时时间,避免多个 Follower 同时超时发起选举
rf.electionTimeout = time.Duration(300+rand.Intn(200)) * time.Millisecond // 300-500ms
rf.heartbeatInterval = 100 * time.Millisecond // 心跳间隔一般远小于选举超时
rf.lastElectionResetTime = time.Now() // 初始重置计时器
// 启动后台协程来处理选举和日志应用
go rf.ticker() // 主循环,检查选举超时和发送心跳
// go rf.applyLogTicker() // 另一个协程处理日志应用
return rf
}
// ticker 是 Raft 节点的主循环,用于检查选举超时和发送心跳
func (rf *RaftNode) ticker() {
for !rf.killed() { // 节点未被杀死
rf.mu.Lock()
state := rf.state
lastElectionResetTime := rf.lastElectionResetTime
rf.mu.Unlock()
switch state {
case Follower, Candidate:
// 如果选举超时,则发起新选举
if time.Since(lastElectionResetTime) > rf.electionTimeout {
rf.startElection()
}
case Leader:
// Leader 定期发送心跳
rf.sendHeartbeats()
// Leader 心跳间隔可以固定
time.Sleep(rf.heartbeatInterval)
}
// 其他状态下,等待一段时间再检查
if state != Leader {
time.Sleep(50 * time.Millisecond) // 检查频率可以调整
}
}
}
// killed 检查节点是否已停止运行
func (rf *RaftNode) killed() bool {
// 实际中可能通过 atomic.LoadInt32(&rf.dead) 来判断
return false // 简化演示,假设节点一直存活
}
// resetElectionTimer 重置选举计时器
func (rf *RaftNode) resetElectionTimer() {
rf.lastElectionResetTime = time.Now()
// 每次重置时,重新随机化选举超时时间,进一步减少冲突概率
rf.electionTimeout = time.Duration(300+rand.Intn(200)) * time.Millisecond
}
注意 currentTerm 字段,它将贯穿我们后续的所有讨论。它是协议中所有逻辑判断的核心。
2. Leader 选举与任期:秩序的建立
Raft 协议中,系统通过选举产生一个 Leader,所有客户端请求都将由 Leader 处理。如果当前没有 Leader,或者 Follower 在一段时间内(选举超时)没有收到 Leader 的心跳,它就会认为 Leader 已失效,并开始新的选举。任期在选举过程中起着决定性的作用,它保证了在任何一个特定任期内最多只有一个 Leader 被选举出来,并确保了新 Leader 拥有最新的已提交日志。
选举流程概述:
- Follower 状态: 节点处于 Follower 状态,监听 Leader 的心跳或 RPC 请求。
- 选举超时: 如果在选举超时时间内没有收到 Leader 的心跳(
AppendEntriesRPC)或投票请求(RequestVoteRPC),Follower 会认为 Leader 已失联,并转变为 Candidate 状态。 - Candidate 状态:
- 递增
currentTerm: 这是新选举周期的开始,所以任期号必须递增。 - 给自己投票 (
votedFor = me): Candidate 首先投票给自己。 - 向所有其他节点发送
RequestVoteRPC: 请求其他节点投票。RPC 请求中包含了 Candidate 的currentTerm以及其日志的最新信息 (lastLogIndex,lastLogTerm)。 - 等待投票: Candidate 处于等待状态,直到以下情况之一发生:
- 获得大多数节点的投票,成为 Leader。
- 收到来自更高任期 Leader 的
AppendEntriesRPC,转变为 Follower。 - 选举超时,再次递增任期并发起新一轮选举。
- 递增
- Leader 状态: 如果 Candidate 收到来自大多数节点的投票,它就成为 Leader。
- 发送心跳: 成为 Leader 后,它会立即向所有 Follower 发送心跳(不包含日志条目的
AppendEntriesRPC),以宣示自己的权威,并重置所有 Follower 的选举计时器。 - 初始化 Leader 独有的状态: (
nextIndex和matchIndex),这些是用于日志复制的关键状态。
- 发送心跳: 成为 Leader 后,它会立即向所有 Follower 发送心跳(不包含日志条目的
2.1 RequestVote RPC 中的任期检查
当一个 Candidate 发送 RequestVote RPC 给其他节点时,它会携带自己的 currentTerm 以及其日志的最新信息 (lastLogIndex, lastLogTerm)。接收方会根据这些信息决定是否投票。
// RequestVoteArgs 是 RequestVote RPC 的参数
type RequestVoteArgs struct {
Term int // Candidate 的任期
CandidateId int // 请求投票的 Candidate 的 ID
LastLogIndex int // Candidate 最新日志条目的索引
LastLogTerm int // Candidate 最新日志条目的任期
}
// RequestVoteReply 是 RequestVote RPC 的响应
type RequestVoteReply struct {
Term int // 接收者当前的任期,如果 Candidate 的任期小于接收者,用于更新 Candidate 的任期
VoteGranted bool // 如果 Candidate 获得了投票,则为 true
}
// RequestVote 是处理 RequestVote RPC 的处理函数
func (rf *RaftNode) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// 规则 1: 任期检查 - 如果 Candidate 的任期小于接收者的当前任期,则拒绝投票。
// 这确保了任何带有旧任期号的 Candidate 都会被拒绝,它们无法在更高的逻辑时间周期内获得支持。
// 收到旧任期号的 RPC 不会改变接收者的状态。
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
// 规则 2: 任期更新 - 如果 Candidate 的任期大于接收者的当前任期,则更新自己的任期,并转变为 Follower。
// 这是一个核心规则,确保节点总是以最新的任期运行,并服从更高任期的权威。
// 无论当前状态是 Follower, Candidate 还是 Leader,一旦发现更高任期,都必须降级。
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = -1 // 重置投票,因为这是一个新的任期,可以投票给新的 Candidate
rf.resetElectionTimer() // 重置选举计时器,因为现在可能有一个新的 Leader 或 Candidate 出现
}
reply.Term = rf.currentTerm // 总是返回接收者当前的任期,以便 Candidate 更新自己的任期(如果需要)
// 规则 3: 投票限制 - 如果在当前任期内已经投票给其他 Candidate,则拒绝投票。
// 确保一个节点在一个任期内最多只投一票,防止“双重投票”导致选举混乱。
if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
reply.VoteGranted = false
return
}
// 规则 4: 选举限制 (Election Restriction)
// Raft 强制规定,Candidate 必须拥有至少和自己一样新的日志才能被选为 Leader。
// 这里的 "更新" 定义为:
// 1. Candidate 的 `LastLogTerm` 更大,或者
// 2. `LastLogTerm` 相同,但 `LastLogIndex` 更大。
// 这个规则保证了 Leader Completeness Property:一旦一个日志条目被提交,那么所有未来的 Leader 都必须包含该条目。
lastLogIndex := len(rf.log) - 1
lastLogTerm := rf.log[lastLogIndex].Term
candidateIsMoreUpToDate := func() bool {
// 比较 Candidate 的日志是否至少和接收者一样新
if args.LastLogTerm > lastLogTerm {
return true
}
if args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex {
return true
}
return false
}()
if !candidateIsMoreUpToDate {
reply.VoteGranted = false
return
}
// 如果通过了所有检查,则投票给 Candidate
rf.votedFor = args.CandidateId
reply.VoteGranted = true
// 投票后重置选举计时器,因为已经有了一个活跃的 Candidate,避免自己再次发起选举
rf.resetElectionTimer()
}
// startElection 是节点发起选举的函数
func (rf *RaftNode) startElection() {
rf.mu.Lock()
rf.state = Candidate
rf.currentTerm++ // 递增任期
rf.votedFor = rf.me // 投票给自己
rf.resetElectionTimer() // 重置选举计时器
term := rf.currentTerm
candidateId := rf.me
lastLogIndex := len(rf.log) - 1
lastLogTerm := rf.log[lastLogIndex].Term
rf.mu.Unlock()
votesReceived := 1 // 自己的一票
// 创建一个用于等待投票结果的同步机制
var mu sync.Mutex
cond := sync.NewCond(&mu)
// 向所有其他节点发送 RequestVote RPC
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(server int) {
args := RequestVoteArgs{
Term: term,
CandidateId: candidateId,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
reply := RequestVoteReply{}
ok := rf.sendRequestVote(server, &args, &reply)
mu.Lock()
defer mu.Unlock()
// 如果 RPC 调用失败,或者节点已不再是 Candidate,则忽略
rf.mu.Lock()
currentTerm := rf.currentTerm
currentState := rf.state
rf.mu.Unlock()
if !ok || currentState != Candidate || term != currentTerm {
return
}
// 规则:如果收到更高任期的响应,Candidate 降级为 Follower
if reply.Term > currentTerm {
rf.mu.Lock()
if reply.Term > rf.currentTerm { // 再次检查,防止并发问题
rf.currentTerm = reply.Term
rf.state = Follower
rf.votedFor = -1
rf.resetElectionTimer()
}
rf.mu.Unlock()
cond.Broadcast() // 通知所有等待者,选举已结束 (Candidate 降级)
return
}
if reply.VoteGranted {
votesReceived++
// 如果获得大多数投票,则成为 Leader
if votesReceived*2 > len(rf.peers) {
rf.mu.Lock()
if rf.state == Candidate && rf.currentTerm == term { // 再次确认状态和任期
rf.becomeLeader()
}
rf.mu.Unlock()
cond.Broadcast() // 通知所有等待者,选举已结束 (已成为 Leader)
}
}
}(i)
}
// 等待选举结果,避免 goroutine 泄露,也可以设置超时
mu.Lock()
cond.Wait() // 阻塞直到被通知
mu.Unlock()
}
// sendRequestVote 发送 RequestVote RPC
func (rf *RaftNode) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("RaftNode.RequestVote", args, reply)
return ok
}
// becomeLeader 是节点成为 Leader 后的操作
func (rf *RaftNode) becomeLeader() {
rf.state = Leader
rf.resetElectionTimer() // Leader 也需要定期发送心跳,这里重置是为了立即发送第一批心跳
// nextIndex: 对于每个 Follower,Leader 认为它下一个要发送的日志条目的索引
// 初始化为 Leader 的最后一条日志的索引 + 1
lastLogIndex := len(rf.log) - 1
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = lastLogIndex + 1
rf.matchIndex[i] = 0 // matchIndex: 对于每个 Follower,Leader 知道它已经复制的最高日志条目的索引
}
// 立即发送心跳,宣布自己是新 Leader
// 注意:这里只是触发,实际发送在 leaderAppendEntriesLoop 中完成
go rf.sendHeartbeats()
}
任期在选举中的关键作用点:
args.Term < rf.currentTerm: 这是最基本的任期检查。如果一个 Candidate 试图以一个低于接收者当前任期的任期发起选举,其请求会立即被拒绝。这避免了网络延迟或分区导致的旧 Leader 或 Candidate 干扰当前选举。一个拥有更高任期的节点不会响应来自旧任期的请求,确保了协议的“时间”总是向前推进。args.Term > rf.currentTerm: 这是任期机制的强大之处。如果一个节点(无论是 Follower, Candidate 还是 Leader)收到一个来自更高任期的RequestVoteRPC,它会立即更新自己的currentTerm,并降级为 Follower。这意味着,任何节点一旦发现一个拥有更高任期的权威出现,就必须承认其合法性并服从。这在选举中至关重要,它强制所有节点向拥有更高任期的 Candidate 妥协,从而有助于打破僵局,产生新的 Leader。- 选举限制(
LastLogTerm和LastLogIndex比较): 这个规则与任期紧密结合,确保了只有拥有“最新”日志的 Candidate 才能被选为 Leader。“最新”是相对于投票者而言的。这个规则保证了新 Leader 至少拥有所有已提交的日志条目,因为已提交的日志条目必然存在于多数节点中,而新 Leader 必须获得多数票,所以它至少会包含这些已提交的日志。
3. 日志复制与任期:一致性的基石
Raft 协议的日志复制机制是其核心,它确保了所有节点的状态机都按相同的顺序执行相同的命令。Leader 负责接收客户端请求,将命令作为日志条目附加到自己的日志中,然后复制给 Follower。任期在日志复制中发挥着多重作用,它不仅用于检查 Leader 的权威性,还用于解决日志不一致的问题,并最终确保日志的安全性。
3.1 AppendEntries RPC 中的任期检查
Leader 通过 AppendEntries RPC 来复制日志条目和发送心跳。心跳是空的 AppendEntries RPC。
// AppendEntriesArgs 是 AppendEntries RPC 的参数
type AppendEntriesArgs struct {
Term int // Leader 的任期
LeaderId int // Leader 的 ID
PrevLogIndex int // 紧邻新日志条目之前的那个日志条目的索引
PrevLogTerm int // 紧邻新日志条目之前的那个日志条目的任期
Entries []LogEntry // 要存储的日志条目(心跳时为空;可能发送多个)
LeaderCommit int // Leader 已知已提交的最高日志条目的索引
}
// AppendEntriesReply 是 AppendEntries RPC 的响应
type AppendEntriesReply struct {
Term int // Follower 当前的任期,用于 Leader 更新自己
Success bool // 如果 Follower 包含了匹配 PrevLogIndex 和 PrevLogTerm 的条目,则为 true
// For optimization: 如果不成功,Follower 可以告诉 Leader 冲突的任期和其在该任期内的第一个索引
ConflictTerm int // 冲突日志条目的任期
ConflictIndex int // 冲突日志条目在该任期内的第一个索引
}
// AppendEntries 是处理 AppendEntries RPC 的处理函数
func (rf *RaftNode) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
// 规则 1: 任期检查 - 如果 Leader 的任期小于 Follower 的当前任期,则拒绝。
// 这防止了旧 Leader 干扰当前系统的运行。Follower 知道有更高任期的 Leader 存在(或者它自己在更高任期),
// 因此会拒绝任何来自旧任期的请求。
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
// 规则 2: 任期更新 - 如果 Leader 的任期大于或等于 Follower 的当前任期,则更新自己的任期,并转变为 Follower。
// 无论当前节点是 Leader, Candidate 还是 Follower,只要发现一个合法 Leader 的任期高于自己,
// 就必须更新任期并降级为 Follower。这是 Leader 发现自己已经过时时必须做的事情。
// 它强制节点遵循更高任期的 Leader,是防止“脑裂”的关键。
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = -1 // 重置投票
}
// 无论如何,只要 Leader 的任期不低于 Follower,Follower 就需要重置选举计时器。
// 这意味着它看到了一个有效的 Leader 或 Candidate 的存在。
rf.resetElectionTimer()
reply.Term = rf.currentTerm
// 规则 3: 日志一致性检查
// Raft 协议的日志匹配属性(Log Matching Property)要求:
// 如果两个日志包含相同索引和任期的条目,那么它们在该索引之前的所有条目都必须相同。
// 这里的检查就是验证这一属性。
// 如果 Follower 的日志在 PrevLogIndex 处没有匹配 PrevLogTerm 的条目,
// 说明 Leader 和 Follower 的日志在该点上不一致。
// Follower 必须拒绝,并告知 Leader 冲突信息,Leader 需要回溯其 nextIndex。
// 场景 1: Follower 日志不够长,无法包含 PrevLogIndex
if args.PrevLogIndex > len(rf.log)-1 {
reply.Success = false
// 优化:告知 Leader Follower 最新的日志索引,Leader 可以直接将 nextIndex 设为该值
reply.ConflictTerm = -1 // 表示日志长度不足,而非任期冲突
reply.ConflictIndex = len(rf.log)
return
}
// 场景 2: PrevLogIndex 处的任期不匹配
if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
// 优化:找到 ConflictTerm 在 Follower 日志中第一次出现的索引
// Leader 收到后可以直接将 nextIndex 跳到该任期的第一个条目,而不是一个一个回溯
conflictIndex := args.PrevLogIndex
for conflictIndex > 0 && rf.log[conflictIndex-1].Term == reply.ConflictTerm {
conflictIndex--
}
reply.ConflictIndex = conflictIndex
return
}
// 规则 4: 追加新日志条目
// 找到 Leader 和 Follower 日志开始分歧的点,并进行截断和追加。
// 遍历 Leader 传来的新条目 (args.Entries)。
index := args.PrevLogIndex + 1
for i, entry := range args.Entries {
if index+i > len(rf.log)-1 { // Follower 日志较短,直接追加所有剩余条目
rf.log = append(rf.log, args.Entries[i:]...)
break
}
// 如果日志条目冲突(索引相同但任期不同),则截断 Follower 日志并追加 Leader 的条目。
// 由于日志匹配属性,一旦发现冲突,该点之后的所有条目都可能不一致,需要被 Leader 的日志覆盖。
if rf.log[index+i].Term != entry.Term {
rf.log = rf.log[:index+i] // 截断冲突点及之后的所有条目
rf.log = append(rf.log, args.Entries[i:]...)
break
}
// 如果日志条目一致,继续检查下一个,直到所有条目都匹配或新条目被追加。
}
// 规则 5: 更新 commitIndex
// 如果 LeaderCommit > commitIndex,则更新 commitIndex 为 min(LeaderCommit, last new entry index)。
// 这确保了 Follower 不会提交 Leader 尚未提交的条目,也不会提交超出自己日志范围的条目。
if args.LeaderCommit > rf.commitIndex {
lastNewEntryIndex := len(rf.log) - 1
rf.commitIndex = min(args.LeaderCommit, lastNewEntryIndex)
}
reply.Success = true
// 触发日志应用到状态机(实际中会通过通道进行异步处理)
rf.applyLogToStateMachine()
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
任期在日志复制中的关键作用点:
args.Term < rf.currentTerm: 这是 Leader 权威性的核心检查。如果 Leader 试图以一个低于 Follower 当前任期的任期复制日志,其请求会被拒绝。这确保了只有当前合法 Leader 或更高任期的 Leader 才能成功复制日志。旧 Leader 的AppendEntries请求会被忽略,防止其影响系统状态。args.Term >= rf.currentTerm时更新任期并降级: 这是 Leader 发现自己已经过时时必须采取的行动。当 Leader 尝试联系一个拥有更高任期的 Follower 时,Follower 会在AppendEntriesReply中返回其更高的任期。Leader 收到这个响应后,会发现自己的任期已过期,从而更新自己的任期并降级为 Follower。这提供了一个反馈机制,使得旧 Leader 能够及时发现自己的过时状态,并避免继续操作,从而防止“脑裂”。PrevLogTerm和PrevLogIndex检查: 这是 Raft 保证日志一致性(Log Matching Property)的核心机制。如果两个日志在相同的索引处有相同的任期号,那么它们在该索引之前的所有日志都必须是相同的。这个检查通过比对PrevLogIndex处的日志条目任期,来验证 Leader 和 Follower 的日志是否匹配。如果不匹配,Follower 会拒绝,Leader 则会递减nextIndex并重试,直到找到一个匹配点,或者直到nextIndex到达日志起始点。任期在这里作为日志条目内容的唯一标识,使得 Leader 能够可靠地定位并修复 Follower 的不一致日志。
3.2 Leader 的日志复制循环
Leader 会不断地向 Follower 发送 AppendEntries RPC,以保持心跳并复制日志。
// sendHeartbeats 是 Leader 定期发送心跳(空 AppendEntries RPC)的函数
func (rf *RaftNode) sendHeartbeats() {
rf.mu.Lock()
if rf.state != Leader {
rf.mu.Unlock()
return
}
currentTerm := rf.currentTerm
leaderId := rf.me
commitIndex := rf.commitIndex
lastLogIndex := len(rf.log) - 1
rf.mu.Unlock()
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
// 针对每个 Follower 启动一个独立的协程进行日志复制,包括心跳
go rf.leaderAppendEntriesLoop(i)
}
}
// leaderAppendEntriesLoop 是 Leader 用于向特定 Follower 复制日志的协程
func (rf *RaftNode) leaderAppendEntriesLoop(server int) {
// 这是一个持续运行的循环,负责保持与特定 Follower 的日志同步
for !rf.killed() {
rf.mu.Lock()
if rf.state != Leader { // 如果不再是 Leader,则退出此协程
rf.mu.Unlock()
return
}
// 根据 nextIndex 准备要发送的日志条目
prevLogIndex := rf.nextIndex[server] - 1
// 确保 prevLogIndex 在 Leader 的日志范围内,最小可以是0 (for dummy entry)
if prevLogIndex < 0 {
prevLogIndex = 0
}
// 确保 prevLogIndex 不会超出 Leader 自己的日志范围
if prevLogIndex >= len(rf.log) {
// 这通常不应该发生,除非 nextIndex 逻辑有误
prevLogIndex = len(rf.log) - 1
}
prevLogTerm := rf.log[prevLogIndex].Term
entriesToSend := make([]LogEntry, 0)
if rf.nextIndex[server] <= len(rf.log)-1 {
entriesToSend = rf.log[rf.nextIndex[server]:]
}
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entriesToSend,
LeaderCommit: rf.commitIndex,
}
rf.mu.Unlock()
reply := AppendEntriesReply{}
ok := rf.sendAppendEntries(server, &args, &reply)
rf.mu.Lock()
// 在 RPC 返回后再次检查是否仍是 Leader,以及任期是否匹配
if rf.state != Leader || args.Term != rf.currentTerm {
rf.mu.Unlock()
return
}
if !ok { // RPC 调用失败,稍后重试
rf.mu.Unlock()
time.Sleep(10 * time.Millisecond) // 简单重试间隔
continue
}
// 规则 1: Leader 收到更高任期的响应
// 这是 Leader 发现自己已过时的主要方式。如果 Follower 返回的任期更高,Leader 必须降级。
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.state = Follower
rf.votedFor = -1
rf.resetElectionTimer()
rf.mu.Unlock()
return // Leader 已降级,退出循环
}
// 处理成功的响应
if reply.Success {
// 更新 nextIndex 和 matchIndex
// matchIndex 代表 Follower 已经成功复制的最高日志索引
// newMatchIndex 是 Leader 认为 Follower 应该达到的最新 matchIndex
newMatchIndex := args.PrevLogIndex + len(args.Entries)
if newMatchIndex > rf.matchIndex[server] { // 避免回溯更新
rf.matchIndex[server] = newMatchIndex
}
rf.nextIndex[server] = rf.matchIndex[server] + 1
rf.updateCommitIndex() // 尝试更新 commitIndex
} else { // 处理失败的响应 (日志不匹配)
// 根据 Follower 返回的冲突信息优化 nextIndex 的回退策略
if reply.ConflictTerm != -1 { // Follower 提供了冲突任期信息
// 在 Leader 自己的日志中查找 ConflictTerm 的最后一个条目
leaderConflictIndex := -1
for i := len(rf.log) - 1; i > 0; i-- {
if rf.log[i].Term == reply.ConflictTerm {
leaderConflictIndex = i
break
}
}
if leaderConflictIndex != -1 {
// 如果 Leader 也有这个冲突任期,则从该任期之后开始发送
rf.nextIndex[server] = leaderConflictIndex + 1
} else {
// 如果 Leader 没有这个冲突任期,则直接跳到 Follower 提供的 ConflictIndex
// (这是该任期在 Follower 日志中首次出现的索引)
rf.nextIndex[server] = reply.ConflictIndex
}
} else { // Follower 只说日志长度不够 (ConflictTerm == -1)
// Leader 直接将 nextIndex 回退到 Follower 的日志长度
rf.nextIndex[server] = reply.ConflictIndex // ConflictIndex 此时是 Follower 的 len(log)
}
// 确保 nextIndex 至少为1(日志起始点)
if rf.nextIndex[server] < 1 {
rf.nextIndex[server] = 1
}
}
rf.mu.Unlock()
time.Sleep(rf.heartbeatInterval) // 发送心跳/日志复制的间隔
}
}
// sendAppendEntries 发送 AppendEntries RPC 给指定的 peer
func (rf *RaftNode) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("RaftNode.AppendEntries", args, reply)
return ok
}
这里 reply.Term > rf.currentTerm 的检查至关重要。它提供了一个反馈机制:如果 Leader 发现一个 Follower 已经在一个更高的任期中,那么 Leader 必须承认自己已经过时,并立即降级为 Follower。这是 Raft 协议中防止“脑裂”(Split Brain)的关键机制之一。通过这种方式,任期号确保了集群中永远只有一个真正的 Leader,并且所有节点都最终会服从这个 Leader。
4. 安全性(Safety)保障与任期:最终一致性的守护
Raft 协议定义了几项重要的安全性属性,确保即使在网络分区、节点崩溃等故障下,系统也能保持一致性。任期是这些安全属性得以强制执行的核心。
4.1 选举安全 (Election Safety)
属性: 在一个给定的任期内,最多只能选举出一个 Leader。
任期作用:
- 单任期单票: 在
RequestVoteRPC 中,一个节点在一个任期内最多只投一票 (votedFor字段)。这是通过检查rf.votedFor和args.CandidateId来实现的。这个简单的规则保证了即使在网络分区的情况下,也不会出现多个 Candidate 在同一任期内获得多数票。 - 任期更新与降级: 如果一个 Candidate 成功赢得选举,它就将以其当前任期作为 Leader。任何其他 Candidate 或旧 Leader 试图以相同或更低的任期进行操作时,其请求都会被拥有新 Leader 任期的节点拒绝或导致发起者降级。例如,一个旧 Leader 收到来自新 Leader 的
AppendEntriesRPC 时,发现新 Leader 的任期更高,就会立即降级为 Follower。这确保了旧 Leader 不会继续操作,从而避免了多个 Leader 同时存在的风险。
4.2 Leader Completeness Property (领导者完整性属性)
属性: 如果一个日志条目在一个任期内被提交,那么所有未来任期的 Leader 都必须包含该日志条目。
任期作用: 这是 Raft 最重要的安全属性之一,它通过 选举限制 和 日志匹配属性 来实现。
- 选举限制: 在
RequestVoteRPC 中,一个 Candidate 只有在拥有至少和投票者一样新的日志时才能获得投票。这里的“新”由LastLogTerm和LastLogIndex共同决定。这意味着,要成为 Leader,Candidate 必须拥有所有已提交的日志条目,因为已提交的日志条目必然存在于多数节点中,而新 Leader 必须获得多数票,这意味着它必须拥有至少和这些多数节点一样新的日志。任期号作为日志的“时间戳”,在这个比较中起到了决定性的作用。 - 日志匹配属性: 如果两个日志包含相同索引和任期的条目,那么它们在该索引之前的所有条目都必须相同。这个属性是通过
AppendEntriesRPC 中的PrevLogIndex和PrevLogTerm检查来强制执行的。Leader 总是找到它和 Follower 日志的第一个不匹配点,然后从该点开始强制 Follower 复制 Leader 的日志。由于任期是唯一的,相同的索引和任期意味着相同的日志内容,从而保证了日志的前缀一致性。
4.3 State Machine Safety (状态机安全)
属性: 如果一个 Leader 在某个任期内将一个日志条目提交到其状态机,那么其他任何 Leader 都不会在该索引处提交不同的日志条目。
任期作用: 这是通过精心设计的 日志提交规则 来实现的,其中任期扮演了关键角色。
日志提交规则:
一个 Leader 可以提交一个日志条目,如果:
- 该条目已复制到大多数 Follower 上。
- 并且,该条目必须是当前 Leader 任期内的条目。
为什么是“当前 Leader 任期内的条目”?
这是一个非常巧妙且重要的规则,它解决了 Raft 协议中最复杂的一个潜在问题:一个旧任期的、已复制到多数节点但未被提交的日志条目,可能会在一个新的 Leader 产生后被“回滚”或被新的日志覆盖,导致不一致。
考虑以下场景:
- 初始状态: S1 (Leader, Term 2):
log = [ (1,x), (2,y) ]。S2, S3 也是