深入 Raft 协议:Leader 选举、日志复制与安全性(Safety)是如何通过任期(Term)强制对齐的?

深入 Raft 协议:Leader 选举、日志复制与安全性是如何通过任期(Term)强制对齐的?

分布式系统中的共识(Consensus)是构建高可用、容错服务的基础。Raft 协议,作为 Paxos 协议的一种更易于理解的替代方案,通过其清晰的模块化设计和强一致性保证,受到了广泛的关注。Raft 将共识问题分解为三个子问题:Leader 选举、日志复制和安全性(Safety),并巧妙地引入了“任期”(Term)这一核心概念,如同一个系统级的逻辑时钟,强制性地对齐所有节点的状态,从而保障了整个协议的正确性。

今天,我们将深入探讨 Raft 协议的内部机制,重点剖析任期(Term)如何在 Leader 选举、日志复制以及最终的安全性保障中扮演其不可或缺的角色。我们将通过 Go 语言风格的伪代码,一步步揭示这些机制的实现细节。

1. 任期(Term):Raft 协议的逻辑时钟

在 Raft 协议中,任期(Term)是一个单调递增的整数。它代表了时间的一个逻辑周期,Raft 集群中的每个节点都维护一个当前任期号。每次发生 Leader 选举时,新的任期就会开始,并且任期号会递增。任期是 Raft 协议中一切决策的基础,它像一个权威的时间戳,用于识别过时的信息,确保节点状态的一致性。

任期的核心作用:

  • 识别过时信息: 任何带有旧任期号的 RPC 请求或响应都会被立即拒绝。这就像一个过期证件,无法用于执行当前的操作。
  • 强制状态更新: 如果一个节点发现其他节点拥有更高的任期号,它会立即更新自己的任期号,并转变为 Follower 状态。这确保了集群中所有节点最终都会收敛到最新的逻辑时间周期。
  • 确保“后来者居上”: 拥有更高任期号的 Leader 或 Candidate 总是被认为是更权威的。这是 Raft 协议中解决冲突和推进系统进展的基本原则。

为了更好地理解任期的作用,我们首先来看 Raft 节点的基本状态和数据结构。一个 Raft 节点在任何给定时间都处于以下三种状态之一:

状态 描述
Follower 被动状态,响应来自 Leader 或 Candidate 的 RPC 请求。
Candidate 竞选 Leader 的状态,向其他节点请求投票。
Leader 活跃状态,处理客户端请求,管理日志复制和发送心跳。
import (
    "math/rand"
    "sync"
    "time"
)

// ClientEnd 模拟与其他节点的网络连接
type ClientEnd struct {
    // 实际生产环境中会包含网络连接和 RPC 客户端
    // 这里仅为演示目的,不实现具体网络通信
}

func (ce *ClientEnd) Call(svcMethod string, args interface{}, reply interface{}) bool {
    // 模拟网络延迟和失败
    time.Sleep(time.Duration(rand.Intn(10)+1) * time.Millisecond)
    if rand.Intn(100) < 5 { // 5% 概率模拟 RPC 失败
        return false
    }
    // 实际中会进行 RPC 调用,这里直接返回 true 模拟成功
    return true
}

// State 定义了 Raft 节点可能处于的状态
type State int

const (
    Follower  State = iota // 跟随者
    Candidate              // 候选者
    Leader                 // 领导者
)

// LogEntry 代表一条日志记录
type LogEntry struct {
    Term    int         // 记录该日志条目被 Leader 接收时的任期
    Command []byte      // 要执行的命令
}

// RaftNode 代表一个 Raft 节点实例
type RaftNode struct {
    mu        sync.Mutex          // 互斥锁,保护节点状态
    peers     []*ClientEnd        // 与其他节点的网络连接
    me        int                 // 当前节点的 ID
    dead      int32               // 节点是否已停止运行

    currentTerm int         // 节点已知最新的任期号(启动时初始化为0,单调递增)
    votedFor    int         // 当前任期内投票给的 Candidate ID(如果没有则为-1)
    log         []LogEntry  // 日志条目;第一个条目索引为1(索引0是一个虚拟占位符)

    state     State       // 当前节点的状态 (Follower, Candidate, Leader)

    commitIndex int         // 已知已提交的最高日志条目的索引(初始化为0)
    lastApplied int         // 已应用到状态机的最高日志条目的索引(初始化为0)

    // Leader 独有的状态(用于日志复制)
    nextIndex   []int       // 对于每个 Follower,下一个要发送给它的日志条目的索引(初始化为 Leader last log index + 1)
    matchIndex  []int       // 对于每个 Follower,已知它已复制的最高日志条目的索引(初始化为0)

    // 计时器相关
    electionTimeout       time.Duration
    heartbeatInterval     time.Duration
    lastHeartbeatTime     time.Time
    lastElectionResetTime time.Time // 上次重置选举计时器的时间点
    // 实际应用中,这里还需要一个 channel 来通知应用层日志已提交
    // applyCh chan ApplyMsg
}

// MakeRaftNode 初始化一个 Raft 节点
func MakeRaftNode(peers []*ClientEnd, me int) *RaftNode {
    rf := &RaftNode{}
    rf.peers = peers
    rf.me = me
    rf.currentTerm = 0 // 初始任期为0
    rf.votedFor = -1   // 初始未投票
    // Raft 协议的日志索引从1开始。索引0处是一个虚拟的、空的日志条目,简化边界条件处理。
    rf.log = make([]LogEntry, 1)
    rf.state = Follower
    rf.commitIndex = 0
    rf.lastApplied = 0

    // 随机化选举超时时间,避免多个 Follower 同时超时发起选举
    rf.electionTimeout = time.Duration(300+rand.Intn(200)) * time.Millisecond // 300-500ms
    rf.heartbeatInterval = 100 * time.Millisecond // 心跳间隔一般远小于选举超时

    rf.lastElectionResetTime = time.Now() // 初始重置计时器

    // 启动后台协程来处理选举和日志应用
    go rf.ticker() // 主循环,检查选举超时和发送心跳
    // go rf.applyLogTicker() // 另一个协程处理日志应用

    return rf
}

// ticker 是 Raft 节点的主循环,用于检查选举超时和发送心跳
func (rf *RaftNode) ticker() {
    for !rf.killed() { // 节点未被杀死
        rf.mu.Lock()
        state := rf.state
        lastElectionResetTime := rf.lastElectionResetTime
        rf.mu.Unlock()

        switch state {
        case Follower, Candidate:
            // 如果选举超时,则发起新选举
            if time.Since(lastElectionResetTime) > rf.electionTimeout {
                rf.startElection()
            }
        case Leader:
            // Leader 定期发送心跳
            rf.sendHeartbeats()
            // Leader 心跳间隔可以固定
            time.Sleep(rf.heartbeatInterval)
        }
        // 其他状态下,等待一段时间再检查
        if state != Leader {
            time.Sleep(50 * time.Millisecond) // 检查频率可以调整
        }
    }
}

// killed 检查节点是否已停止运行
func (rf *RaftNode) killed() bool {
    // 实际中可能通过 atomic.LoadInt32(&rf.dead) 来判断
    return false // 简化演示,假设节点一直存活
}

// resetElectionTimer 重置选举计时器
func (rf *RaftNode) resetElectionTimer() {
    rf.lastElectionResetTime = time.Now()
    // 每次重置时,重新随机化选举超时时间,进一步减少冲突概率
    rf.electionTimeout = time.Duration(300+rand.Intn(200)) * time.Millisecond
}

注意 currentTerm 字段,它将贯穿我们后续的所有讨论。它是协议中所有逻辑判断的核心。

2. Leader 选举与任期:秩序的建立

Raft 协议中,系统通过选举产生一个 Leader,所有客户端请求都将由 Leader 处理。如果当前没有 Leader,或者 Follower 在一段时间内(选举超时)没有收到 Leader 的心跳,它就会认为 Leader 已失效,并开始新的选举。任期在选举过程中起着决定性的作用,它保证了在任何一个特定任期内最多只有一个 Leader 被选举出来,并确保了新 Leader 拥有最新的已提交日志。

选举流程概述:

  1. Follower 状态: 节点处于 Follower 状态,监听 Leader 的心跳或 RPC 请求。
  2. 选举超时: 如果在选举超时时间内没有收到 Leader 的心跳(AppendEntries RPC)或投票请求(RequestVote RPC),Follower 会认为 Leader 已失联,并转变为 Candidate 状态。
  3. Candidate 状态:
    • 递增 currentTerm 这是新选举周期的开始,所以任期号必须递增。
    • 给自己投票 (votedFor = me): Candidate 首先投票给自己。
    • 向所有其他节点发送 RequestVote RPC: 请求其他节点投票。RPC 请求中包含了 Candidate 的 currentTerm 以及其日志的最新信息 (lastLogIndex, lastLogTerm)。
    • 等待投票: Candidate 处于等待状态,直到以下情况之一发生:
      • 获得大多数节点的投票,成为 Leader。
      • 收到来自更高任期 Leader 的 AppendEntries RPC,转变为 Follower。
      • 选举超时,再次递增任期并发起新一轮选举。
  4. Leader 状态: 如果 Candidate 收到来自大多数节点的投票,它就成为 Leader。
    • 发送心跳: 成为 Leader 后,它会立即向所有 Follower 发送心跳(不包含日志条目的 AppendEntries RPC),以宣示自己的权威,并重置所有 Follower 的选举计时器。
    • 初始化 Leader 独有的状态: (nextIndexmatchIndex),这些是用于日志复制的关键状态。

2.1 RequestVote RPC 中的任期检查

当一个 Candidate 发送 RequestVote RPC 给其他节点时,它会携带自己的 currentTerm 以及其日志的最新信息 (lastLogIndex, lastLogTerm)。接收方会根据这些信息决定是否投票。

// RequestVoteArgs 是 RequestVote RPC 的参数
type RequestVoteArgs struct {
    Term         int // Candidate 的任期
    CandidateId  int // 请求投票的 Candidate 的 ID
    LastLogIndex int // Candidate 最新日志条目的索引
    LastLogTerm  int // Candidate 最新日志条目的任期
}

// RequestVoteReply 是 RequestVote RPC 的响应
type RequestVoteReply struct {
    Term        int  // 接收者当前的任期,如果 Candidate 的任期小于接收者,用于更新 Candidate 的任期
    VoteGranted bool // 如果 Candidate 获得了投票,则为 true
}

// RequestVote 是处理 RequestVote RPC 的处理函数
func (rf *RaftNode) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // 规则 1: 任期检查 - 如果 Candidate 的任期小于接收者的当前任期,则拒绝投票。
    // 这确保了任何带有旧任期号的 Candidate 都会被拒绝,它们无法在更高的逻辑时间周期内获得支持。
    // 收到旧任期号的 RPC 不会改变接收者的状态。
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
        return
    }

    // 规则 2: 任期更新 - 如果 Candidate 的任期大于接收者的当前任期,则更新自己的任期,并转变为 Follower。
    // 这是一个核心规则,确保节点总是以最新的任期运行,并服从更高任期的权威。
    // 无论当前状态是 Follower, Candidate 还是 Leader,一旦发现更高任期,都必须降级。
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.state = Follower
        rf.votedFor = -1 // 重置投票,因为这是一个新的任期,可以投票给新的 Candidate
        rf.resetElectionTimer() // 重置选举计时器,因为现在可能有一个新的 Leader 或 Candidate 出现
    }

    reply.Term = rf.currentTerm // 总是返回接收者当前的任期,以便 Candidate 更新自己的任期(如果需要)

    // 规则 3: 投票限制 - 如果在当前任期内已经投票给其他 Candidate,则拒绝投票。
    // 确保一个节点在一个任期内最多只投一票,防止“双重投票”导致选举混乱。
    if rf.votedFor != -1 && rf.votedFor != args.CandidateId {
        reply.VoteGranted = false
        return
    }

    // 规则 4: 选举限制 (Election Restriction)
    // Raft 强制规定,Candidate 必须拥有至少和自己一样新的日志才能被选为 Leader。
    // 这里的 "更新" 定义为:
    // 1. Candidate 的 `LastLogTerm` 更大,或者
    // 2. `LastLogTerm` 相同,但 `LastLogIndex` 更大。
    // 这个规则保证了 Leader Completeness Property:一旦一个日志条目被提交,那么所有未来的 Leader 都必须包含该条目。
    lastLogIndex := len(rf.log) - 1
    lastLogTerm := rf.log[lastLogIndex].Term

    candidateIsMoreUpToDate := func() bool {
        // 比较 Candidate 的日志是否至少和接收者一样新
        if args.LastLogTerm > lastLogTerm {
            return true
        }
        if args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex {
            return true
        }
        return false
    }()

    if !candidateIsMoreUpToDate {
        reply.VoteGranted = false
        return
    }

    // 如果通过了所有检查,则投票给 Candidate
    rf.votedFor = args.CandidateId
    reply.VoteGranted = true
    // 投票后重置选举计时器,因为已经有了一个活跃的 Candidate,避免自己再次发起选举
    rf.resetElectionTimer()
}

// startElection 是节点发起选举的函数
func (rf *RaftNode) startElection() {
    rf.mu.Lock()
    rf.state = Candidate
    rf.currentTerm++         // 递增任期
    rf.votedFor = rf.me      // 投票给自己
    rf.resetElectionTimer()  // 重置选举计时器
    term := rf.currentTerm
    candidateId := rf.me
    lastLogIndex := len(rf.log) - 1
    lastLogTerm := rf.log[lastLogIndex].Term
    rf.mu.Unlock()

    votesReceived := 1 // 自己的一票

    // 创建一个用于等待投票结果的同步机制
    var mu sync.Mutex
    cond := sync.NewCond(&mu)

    // 向所有其他节点发送 RequestVote RPC
    for i := 0; i < len(rf.peers); i++ {
        if i == rf.me {
            continue
        }
        go func(server int) {
            args := RequestVoteArgs{
                Term:         term,
                CandidateId:  candidateId,
                LastLogIndex: lastLogIndex,
                LastLogTerm:  lastLogTerm,
            }
            reply := RequestVoteReply{}
            ok := rf.sendRequestVote(server, &args, &reply)

            mu.Lock()
            defer mu.Unlock()

            // 如果 RPC 调用失败,或者节点已不再是 Candidate,则忽略
            rf.mu.Lock()
            currentTerm := rf.currentTerm
            currentState := rf.state
            rf.mu.Unlock()
            if !ok || currentState != Candidate || term != currentTerm {
                return
            }

            // 规则:如果收到更高任期的响应,Candidate 降级为 Follower
            if reply.Term > currentTerm {
                rf.mu.Lock()
                if reply.Term > rf.currentTerm { // 再次检查,防止并发问题
                    rf.currentTerm = reply.Term
                    rf.state = Follower
                    rf.votedFor = -1
                    rf.resetElectionTimer()
                }
                rf.mu.Unlock()
                cond.Broadcast() // 通知所有等待者,选举已结束 (Candidate 降级)
                return
            }

            if reply.VoteGranted {
                votesReceived++
                // 如果获得大多数投票,则成为 Leader
                if votesReceived*2 > len(rf.peers) {
                    rf.mu.Lock()
                    if rf.state == Candidate && rf.currentTerm == term { // 再次确认状态和任期
                        rf.becomeLeader()
                    }
                    rf.mu.Unlock()
                    cond.Broadcast() // 通知所有等待者,选举已结束 (已成为 Leader)
                }
            }
        }(i)
    }

    // 等待选举结果,避免 goroutine 泄露,也可以设置超时
    mu.Lock()
    cond.Wait() // 阻塞直到被通知
    mu.Unlock()
}

// sendRequestVote 发送 RequestVote RPC
func (rf *RaftNode) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
    ok := rf.peers[server].Call("RaftNode.RequestVote", args, reply)
    return ok
}

// becomeLeader 是节点成为 Leader 后的操作
func (rf *RaftNode) becomeLeader() {
    rf.state = Leader
    rf.resetElectionTimer() // Leader 也需要定期发送心跳,这里重置是为了立即发送第一批心跳

    // nextIndex: 对于每个 Follower,Leader 认为它下一个要发送的日志条目的索引
    // 初始化为 Leader 的最后一条日志的索引 + 1
    lastLogIndex := len(rf.log) - 1
    rf.nextIndex = make([]int, len(rf.peers))
    rf.matchIndex = make([]int, len(rf.peers))
    for i := 0; i < len(rf.peers); i++ {
        rf.nextIndex[i] = lastLogIndex + 1
        rf.matchIndex[i] = 0 // matchIndex: 对于每个 Follower,Leader 知道它已经复制的最高日志条目的索引
    }

    // 立即发送心跳,宣布自己是新 Leader
    // 注意:这里只是触发,实际发送在 leaderAppendEntriesLoop 中完成
    go rf.sendHeartbeats()
}

任期在选举中的关键作用点:

  • args.Term < rf.currentTerm 这是最基本的任期检查。如果一个 Candidate 试图以一个低于接收者当前任期的任期发起选举,其请求会立即被拒绝。这避免了网络延迟或分区导致的旧 Leader 或 Candidate 干扰当前选举。一个拥有更高任期的节点不会响应来自旧任期的请求,确保了协议的“时间”总是向前推进。
  • args.Term > rf.currentTerm 这是任期机制的强大之处。如果一个节点(无论是 Follower, Candidate 还是 Leader)收到一个来自更高任期的 RequestVote RPC,它会立即更新自己的 currentTerm,并降级为 Follower。这意味着,任何节点一旦发现一个拥有更高任期的权威出现,就必须承认其合法性并服从。这在选举中至关重要,它强制所有节点向拥有更高任期的 Candidate 妥协,从而有助于打破僵局,产生新的 Leader。
  • 选举限制(LastLogTermLastLogIndex 比较): 这个规则与任期紧密结合,确保了只有拥有“最新”日志的 Candidate 才能被选为 Leader。“最新”是相对于投票者而言的。这个规则保证了新 Leader 至少拥有所有已提交的日志条目,因为已提交的日志条目必然存在于多数节点中,而新 Leader 必须获得多数票,所以它至少会包含这些已提交的日志。

3. 日志复制与任期:一致性的基石

Raft 协议的日志复制机制是其核心,它确保了所有节点的状态机都按相同的顺序执行相同的命令。Leader 负责接收客户端请求,将命令作为日志条目附加到自己的日志中,然后复制给 Follower。任期在日志复制中发挥着多重作用,它不仅用于检查 Leader 的权威性,还用于解决日志不一致的问题,并最终确保日志的安全性。

3.1 AppendEntries RPC 中的任期检查

Leader 通过 AppendEntries RPC 来复制日志条目和发送心跳。心跳是空的 AppendEntries RPC。

// AppendEntriesArgs 是 AppendEntries RPC 的参数
type AppendEntriesArgs struct {
    Term         int        // Leader 的任期
    LeaderId     int        // Leader 的 ID
    PrevLogIndex int        // 紧邻新日志条目之前的那个日志条目的索引
    PrevLogTerm  int        // 紧邻新日志条目之前的那个日志条目的任期
    Entries      []LogEntry // 要存储的日志条目(心跳时为空;可能发送多个)
    LeaderCommit int        // Leader 已知已提交的最高日志条目的索引
}

// AppendEntriesReply 是 AppendEntries RPC 的响应
type AppendEntriesReply struct {
    Term    int  // Follower 当前的任期,用于 Leader 更新自己
    Success bool // 如果 Follower 包含了匹配 PrevLogIndex 和 PrevLogTerm 的条目,则为 true
    // For optimization: 如果不成功,Follower 可以告诉 Leader 冲突的任期和其在该任期内的第一个索引
    ConflictTerm  int // 冲突日志条目的任期
    ConflictIndex int // 冲突日志条目在该任期内的第一个索引
}

// AppendEntries 是处理 AppendEntries RPC 的处理函数
func (rf *RaftNode) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // 规则 1: 任期检查 - 如果 Leader 的任期小于 Follower 的当前任期,则拒绝。
    // 这防止了旧 Leader 干扰当前系统的运行。Follower 知道有更高任期的 Leader 存在(或者它自己在更高任期),
    // 因此会拒绝任何来自旧任期的请求。
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.Success = false
        return
    }

    // 规则 2: 任期更新 - 如果 Leader 的任期大于或等于 Follower 的当前任期,则更新自己的任期,并转变为 Follower。
    // 无论当前节点是 Leader, Candidate 还是 Follower,只要发现一个合法 Leader 的任期高于自己,
    // 就必须更新任期并降级为 Follower。这是 Leader 发现自己已经过时时必须做的事情。
    // 它强制节点遵循更高任期的 Leader,是防止“脑裂”的关键。
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.state = Follower
        rf.votedFor = -1 // 重置投票
    }
    // 无论如何,只要 Leader 的任期不低于 Follower,Follower 就需要重置选举计时器。
    // 这意味着它看到了一个有效的 Leader 或 Candidate 的存在。
    rf.resetElectionTimer()
    reply.Term = rf.currentTerm

    // 规则 3: 日志一致性检查
    // Raft 协议的日志匹配属性(Log Matching Property)要求:
    // 如果两个日志包含相同索引和任期的条目,那么它们在该索引之前的所有条目都必须相同。
    // 这里的检查就是验证这一属性。
    // 如果 Follower 的日志在 PrevLogIndex 处没有匹配 PrevLogTerm 的条目,
    // 说明 Leader 和 Follower 的日志在该点上不一致。
    // Follower 必须拒绝,并告知 Leader 冲突信息,Leader 需要回溯其 nextIndex。

    // 场景 1: Follower 日志不够长,无法包含 PrevLogIndex
    if args.PrevLogIndex > len(rf.log)-1 {
        reply.Success = false
        // 优化:告知 Leader Follower 最新的日志索引,Leader 可以直接将 nextIndex 设为该值
        reply.ConflictTerm = -1 // 表示日志长度不足,而非任期冲突
        reply.ConflictIndex = len(rf.log)
        return
    }

    // 场景 2: PrevLogIndex 处的任期不匹配
    if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
        reply.Success = false
        reply.ConflictTerm = rf.log[args.PrevLogIndex].Term
        // 优化:找到 ConflictTerm 在 Follower 日志中第一次出现的索引
        // Leader 收到后可以直接将 nextIndex 跳到该任期的第一个条目,而不是一个一个回溯
        conflictIndex := args.PrevLogIndex
        for conflictIndex > 0 && rf.log[conflictIndex-1].Term == reply.ConflictTerm {
            conflictIndex--
        }
        reply.ConflictIndex = conflictIndex
        return
    }

    // 规则 4: 追加新日志条目
    // 找到 Leader 和 Follower 日志开始分歧的点,并进行截断和追加。
    // 遍历 Leader 传来的新条目 (args.Entries)。
    index := args.PrevLogIndex + 1
    for i, entry := range args.Entries {
        if index+i > len(rf.log)-1 { // Follower 日志较短,直接追加所有剩余条目
            rf.log = append(rf.log, args.Entries[i:]...)
            break
        }
        // 如果日志条目冲突(索引相同但任期不同),则截断 Follower 日志并追加 Leader 的条目。
        // 由于日志匹配属性,一旦发现冲突,该点之后的所有条目都可能不一致,需要被 Leader 的日志覆盖。
        if rf.log[index+i].Term != entry.Term {
            rf.log = rf.log[:index+i] // 截断冲突点及之后的所有条目
            rf.log = append(rf.log, args.Entries[i:]...)
            break
        }
        // 如果日志条目一致,继续检查下一个,直到所有条目都匹配或新条目被追加。
    }

    // 规则 5: 更新 commitIndex
    // 如果 LeaderCommit > commitIndex,则更新 commitIndex 为 min(LeaderCommit, last new entry index)。
    // 这确保了 Follower 不会提交 Leader 尚未提交的条目,也不会提交超出自己日志范围的条目。
    if args.LeaderCommit > rf.commitIndex {
        lastNewEntryIndex := len(rf.log) - 1
        rf.commitIndex = min(args.LeaderCommit, lastNewEntryIndex)
    }

    reply.Success = true
    // 触发日志应用到状态机(实际中会通过通道进行异步处理)
    rf.applyLogToStateMachine()
}

func min(a, b int) int {
    if a < b {
        return a
    }
    return b
}

任期在日志复制中的关键作用点:

  • args.Term < rf.currentTerm 这是 Leader 权威性的核心检查。如果 Leader 试图以一个低于 Follower 当前任期的任期复制日志,其请求会被拒绝。这确保了只有当前合法 Leader 或更高任期的 Leader 才能成功复制日志。旧 Leader 的 AppendEntries 请求会被忽略,防止其影响系统状态。
  • args.Term >= rf.currentTerm 时更新任期并降级: 这是 Leader 发现自己已经过时时必须采取的行动。当 Leader 尝试联系一个拥有更高任期的 Follower 时,Follower 会在 AppendEntriesReply 中返回其更高的任期。Leader 收到这个响应后,会发现自己的任期已过期,从而更新自己的任期并降级为 Follower。这提供了一个反馈机制,使得旧 Leader 能够及时发现自己的过时状态,并避免继续操作,从而防止“脑裂”。
  • PrevLogTermPrevLogIndex 检查: 这是 Raft 保证日志一致性(Log Matching Property)的核心机制。如果两个日志在相同的索引处有相同的任期号,那么它们在该索引之前的所有日志都必须是相同的。这个检查通过比对 PrevLogIndex 处的日志条目任期,来验证 Leader 和 Follower 的日志是否匹配。如果不匹配,Follower 会拒绝,Leader 则会递减 nextIndex 并重试,直到找到一个匹配点,或者直到 nextIndex 到达日志起始点。任期在这里作为日志条目内容的唯一标识,使得 Leader 能够可靠地定位并修复 Follower 的不一致日志。

3.2 Leader 的日志复制循环

Leader 会不断地向 Follower 发送 AppendEntries RPC,以保持心跳并复制日志。

// sendHeartbeats 是 Leader 定期发送心跳(空 AppendEntries RPC)的函数
func (rf *RaftNode) sendHeartbeats() {
    rf.mu.Lock()
    if rf.state != Leader {
        rf.mu.Unlock()
        return
    }
    currentTerm := rf.currentTerm
    leaderId := rf.me
    commitIndex := rf.commitIndex
    lastLogIndex := len(rf.log) - 1
    rf.mu.Unlock()

    for i := 0; i < len(rf.peers); i++ {
        if i == rf.me {
            continue
        }
        // 针对每个 Follower 启动一个独立的协程进行日志复制,包括心跳
        go rf.leaderAppendEntriesLoop(i)
    }
}

// leaderAppendEntriesLoop 是 Leader 用于向特定 Follower 复制日志的协程
func (rf *RaftNode) leaderAppendEntriesLoop(server int) {
    // 这是一个持续运行的循环,负责保持与特定 Follower 的日志同步
    for !rf.killed() {
        rf.mu.Lock()
        if rf.state != Leader { // 如果不再是 Leader,则退出此协程
            rf.mu.Unlock()
            return
        }

        // 根据 nextIndex 准备要发送的日志条目
        prevLogIndex := rf.nextIndex[server] - 1
        // 确保 prevLogIndex 在 Leader 的日志范围内,最小可以是0 (for dummy entry)
        if prevLogIndex < 0 {
            prevLogIndex = 0
        }
        // 确保 prevLogIndex 不会超出 Leader 自己的日志范围
        if prevLogIndex >= len(rf.log) {
            // 这通常不应该发生,除非 nextIndex 逻辑有误
            prevLogIndex = len(rf.log) - 1
        }
        prevLogTerm := rf.log[prevLogIndex].Term

        entriesToSend := make([]LogEntry, 0)
        if rf.nextIndex[server] <= len(rf.log)-1 {
            entriesToSend = rf.log[rf.nextIndex[server]:]
        }

        args := AppendEntriesArgs{
            Term:         rf.currentTerm,
            LeaderId:     rf.me,
            PrevLogIndex: prevLogIndex,
            PrevLogTerm:  prevLogTerm,
            Entries:      entriesToSend,
            LeaderCommit: rf.commitIndex,
        }
        rf.mu.Unlock()

        reply := AppendEntriesReply{}
        ok := rf.sendAppendEntries(server, &args, &reply)

        rf.mu.Lock()
        // 在 RPC 返回后再次检查是否仍是 Leader,以及任期是否匹配
        if rf.state != Leader || args.Term != rf.currentTerm {
            rf.mu.Unlock()
            return
        }

        if !ok { // RPC 调用失败,稍后重试
            rf.mu.Unlock()
            time.Sleep(10 * time.Millisecond) // 简单重试间隔
            continue
        }

        // 规则 1: Leader 收到更高任期的响应
        // 这是 Leader 发现自己已过时的主要方式。如果 Follower 返回的任期更高,Leader 必须降级。
        if reply.Term > rf.currentTerm {
            rf.currentTerm = reply.Term
            rf.state = Follower
            rf.votedFor = -1
            rf.resetElectionTimer()
            rf.mu.Unlock()
            return // Leader 已降级,退出循环
        }

        // 处理成功的响应
        if reply.Success {
            // 更新 nextIndex 和 matchIndex
            // matchIndex 代表 Follower 已经成功复制的最高日志索引
            // newMatchIndex 是 Leader 认为 Follower 应该达到的最新 matchIndex
            newMatchIndex := args.PrevLogIndex + len(args.Entries)
            if newMatchIndex > rf.matchIndex[server] { // 避免回溯更新
                rf.matchIndex[server] = newMatchIndex
            }
            rf.nextIndex[server] = rf.matchIndex[server] + 1
            rf.updateCommitIndex() // 尝试更新 commitIndex
        } else { // 处理失败的响应 (日志不匹配)
            // 根据 Follower 返回的冲突信息优化 nextIndex 的回退策略
            if reply.ConflictTerm != -1 { // Follower 提供了冲突任期信息
                // 在 Leader 自己的日志中查找 ConflictTerm 的最后一个条目
                leaderConflictIndex := -1
                for i := len(rf.log) - 1; i > 0; i-- {
                    if rf.log[i].Term == reply.ConflictTerm {
                        leaderConflictIndex = i
                        break
                    }
                }
                if leaderConflictIndex != -1 {
                    // 如果 Leader 也有这个冲突任期,则从该任期之后开始发送
                    rf.nextIndex[server] = leaderConflictIndex + 1
                } else {
                    // 如果 Leader 没有这个冲突任期,则直接跳到 Follower 提供的 ConflictIndex
                    // (这是该任期在 Follower 日志中首次出现的索引)
                    rf.nextIndex[server] = reply.ConflictIndex
                }
            } else { // Follower 只说日志长度不够 (ConflictTerm == -1)
                // Leader 直接将 nextIndex 回退到 Follower 的日志长度
                rf.nextIndex[server] = reply.ConflictIndex // ConflictIndex 此时是 Follower 的 len(log)
            }

            // 确保 nextIndex 至少为1(日志起始点)
            if rf.nextIndex[server] < 1 {
                rf.nextIndex[server] = 1
            }
        }
        rf.mu.Unlock()
        time.Sleep(rf.heartbeatInterval) // 发送心跳/日志复制的间隔
    }
}

// sendAppendEntries 发送 AppendEntries RPC 给指定的 peer
func (rf *RaftNode) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
    ok := rf.peers[server].Call("RaftNode.AppendEntries", args, reply)
    return ok
}

这里 reply.Term > rf.currentTerm 的检查至关重要。它提供了一个反馈机制:如果 Leader 发现一个 Follower 已经在一个更高的任期中,那么 Leader 必须承认自己已经过时,并立即降级为 Follower。这是 Raft 协议中防止“脑裂”(Split Brain)的关键机制之一。通过这种方式,任期号确保了集群中永远只有一个真正的 Leader,并且所有节点都最终会服从这个 Leader。

4. 安全性(Safety)保障与任期:最终一致性的守护

Raft 协议定义了几项重要的安全性属性,确保即使在网络分区、节点崩溃等故障下,系统也能保持一致性。任期是这些安全属性得以强制执行的核心。

4.1 选举安全 (Election Safety)

属性: 在一个给定的任期内,最多只能选举出一个 Leader。
任期作用:

  • 单任期单票:RequestVote RPC 中,一个节点在一个任期内最多只投一票 (votedFor 字段)。这是通过检查 rf.votedForargs.CandidateId 来实现的。这个简单的规则保证了即使在网络分区的情况下,也不会出现多个 Candidate 在同一任期内获得多数票。
  • 任期更新与降级: 如果一个 Candidate 成功赢得选举,它就将以其当前任期作为 Leader。任何其他 Candidate 或旧 Leader 试图以相同或更低的任期进行操作时,其请求都会被拥有新 Leader 任期的节点拒绝或导致发起者降级。例如,一个旧 Leader 收到来自新 Leader 的 AppendEntries RPC 时,发现新 Leader 的任期更高,就会立即降级为 Follower。这确保了旧 Leader 不会继续操作,从而避免了多个 Leader 同时存在的风险。

4.2 Leader Completeness Property (领导者完整性属性)

属性: 如果一个日志条目在一个任期内被提交,那么所有未来任期的 Leader 都必须包含该日志条目。
任期作用: 这是 Raft 最重要的安全属性之一,它通过 选举限制日志匹配属性 来实现。

  • 选举限制:RequestVote RPC 中,一个 Candidate 只有在拥有至少和投票者一样新的日志时才能获得投票。这里的“新”由 LastLogTermLastLogIndex 共同决定。这意味着,要成为 Leader,Candidate 必须拥有所有已提交的日志条目,因为已提交的日志条目必然存在于多数节点中,而新 Leader 必须获得多数票,这意味着它必须拥有至少和这些多数节点一样新的日志。任期号作为日志的“时间戳”,在这个比较中起到了决定性的作用。
  • 日志匹配属性: 如果两个日志包含相同索引和任期的条目,那么它们在该索引之前的所有条目都必须相同。这个属性是通过 AppendEntries RPC 中的 PrevLogIndexPrevLogTerm 检查来强制执行的。Leader 总是找到它和 Follower 日志的第一个不匹配点,然后从该点开始强制 Follower 复制 Leader 的日志。由于任期是唯一的,相同的索引和任期意味着相同的日志内容,从而保证了日志的前缀一致性。

4.3 State Machine Safety (状态机安全)

属性: 如果一个 Leader 在某个任期内将一个日志条目提交到其状态机,那么其他任何 Leader 都不会在该索引处提交不同的日志条目。
任期作用: 这是通过精心设计的 日志提交规则 来实现的,其中任期扮演了关键角色。

日志提交规则:

一个 Leader 可以提交一个日志条目,如果:

  1. 该条目已复制到大多数 Follower 上。
  2. 并且,该条目必须是当前 Leader 任期内的条目

为什么是“当前 Leader 任期内的条目”?

这是一个非常巧妙且重要的规则,它解决了 Raft 协议中最复杂的一个潜在问题:一个旧任期的、已复制到多数节点但未被提交的日志条目,可能会在一个新的 Leader 产生后被“回滚”或被新的日志覆盖,导致不一致。

考虑以下场景:

  • 初始状态: S1 (Leader, Term 2): log = [ (1,x), (2,y) ]。S2, S3 也是

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注