深入 Raft 协议:Leader 选举、日志复制与安全性(Safety)是如何通过任期(Term)强制对齐的?

深入 Raft 协议:Leader 选举、日志复制与安全性是如何通过任期(Term)强制对齐的?

各位同仁,大家好。

在分布式系统领域,共识协议是实现数据一致性和容错性的基石。Paxos 协议以其严谨的数学推导而闻名,但也因其复杂性让许多开发者望而却步。Raft 协议的出现,旨在提供一个与 Paxos 相同安全性和性能,但更易于理解和实现的替代方案。Raft 的核心思想是“理解性是关键”(Understanding is Key),它通过明确的角色划分、简洁的 RPC 接口以及对时间周期的严格管理,极大地简化了分布式共识的复杂性。

今天,我们将深入探讨 Raft 协议中一个至关重要的概念——任期(Term)。任期是 Raft 协议的逻辑时钟,它像一条无形的纽带,将 Leader 选举、日志复制和整个系统的安全性紧密地强制对齐在一起。理解任期如何运作,是掌握 Raft 协议精髓的关键。

一、Raft 协议概述与任期(Term)的定义

在开始深入任期之前,我们先快速回顾一下 Raft 的基本概念。一个 Raft 集群通常由奇数个节点组成(例如 3 个或 5 个),以确保在网络分区或节点故障时仍能形成多数派。每个节点在任何给定时间都处于以下三种状态之一:

  1. Follower(追随者):被动响应 Leader 或 Candidate 的请求。
  2. Candidate(候选人):在 Leader 选举期间,尝试成为新的 Leader。
  3. Leader(领导者):处理所有客户端请求,管理日志复制。

Raft 节点之间通过两种主要 RPC 消息进行通信:

  1. RequestVote RPC:用于 Leader 选举。
  2. AppendEntries RPC:用于日志复制和心跳机制。

现在,我们来正式定义任期(Term)

任期(Term)是一个单调递增的整数。在 Raft 协议中,每一个任期都代表了一个 Leader 选举的周期。当一个节点发现新的 Leader 被选举出来,或者自身开始尝试成为 Leader 时,它会进入一个新的任期。任期在 Raft 中扮演着以下核心角色:

  • 逻辑时钟:它提供了一个全局的、单调递增的时间概念,帮助节点理解事件发生的顺序和新旧关系。
  • 权威标识:更高的任期号总是代表着更高的权威。Raft 中的所有决策,包括 Leader 的身份、日志的接受与否,都严格服从任期号的权威性。
  • 强制对齐机制:它是确保 Leader 选举、日志复制和系统安全性的核心机制,所有节点都必须根据任期号来更新自己的状态,以实现全局的一致性。

每个 Raft 节点都会持久化存储一个 currentTerm 变量,表示该节点当前所处的任期。这是 Raft 协议中最重要的持久化状态之一,因为它在节点故障重启后必须恢复,以维护协议的正确性。

二、Leader 选举与任期(Term)的强制对齐

Leader 选举是 Raft 协议的起点,也是任期发挥作用的首个关键阶段。当一个 Follower 在一段时间内(称为选举超时)没有收到 Leader 的心跳或 AppendEntries RPC 时,它会认为当前 Leader 已经失效,并开始尝试进行 Leader 选举。

2.1 选举流程

  1. Follower 转换为 Candidate

    • 一个 Follower 增加其 currentTerm
    • votedFor(投票给谁)设置为自己。
    • 向集群中的其他所有节点发送 RequestVote RPC
    • 进入 Candidate 状态。
  2. Candidate 接收投票

    • 如果 Candidate 收到多数节点的投票,它就成为 Leader。
    • 如果 Candidate 在选举超时时间内没有收到多数投票,它会再次增加 currentTerm 并重新发起选举。
    • 如果 Candidate 在选举过程中收到来自其他 Leader 的 AppendEntries RPC(其任期号大于或等于自己的 currentTerm),它会立即转换为 Follower,并接受该 Leader 的领导。

2.2 RequestVote RPC 消息结构

RequestVote RPC 包含以下重要参数:

class RequestVoteArgs:
    def __init__(self, term: int, candidate_id: int, last_log_index: int, last_log_term: int):
        self.term = term  # 候选人的任期
        self.candidate_id = candidate_id  # 候选人的 ID
        self.last_log_index = last_log_index  # 候选人日志中最新条目的索引
        self.last_log_term = last_log_term  # 候选人日志中最新条目的任期

2.3 RequestVote RPC 处理逻辑与任期对齐

当一个节点(无论是 Follower 还是另一个 Candidate)收到 RequestVote RPC 时,它会根据以下规则进行处理,其中任期(term)是核心判断依据:

class RaftNode:
    def __init__(self, id, initial_term=0):
        self.id = id
        self.current_term = initial_term  # 持久化状态
        self.voted_for = None  # 持久化状态
        self.log = []  # 持久化状态,这里简化为一个列表,实际会更复杂
        self.state = "follower"
        # ... 其他状态,如commitIndex, lastApplied, nextIndex, matchIndex等

    def handle_request_vote(self, args: RequestVoteArgs) -> 'RequestVoteReply':
        # 1. 规则:如果 RPC 的任期小于当前节点的任期,则拒绝投票
        #    这意味着发送方(Candidate)是过时的。
        if args.term < self.current_term:
            return RequestVoteReply(self.current_term, False)

        # 2. 规则:如果 RPC 的任期大于当前节点的任期,则当前节点更新其任期,
        #    并转换为 Follower 状态。这是任期对齐的关键一步。
        #    无论当前节点是 Follower 还是 Candidate,发现更高任期都必须服从。
        if args.term > self.current_term:
            self.current_term = args.term
            self.state = "follower"
            self.voted_for = None  # 在新任期开始时,重置投票状态
            # 持久化 current_term 和 voted_for
            self._persist_state()

        # 3. 规则:如果 RPC 的任期等于当前节点的任期:
        #    a. 如果当前节点已经在本任期内投票给其他候选人,则拒绝投票。
        #    b. 如果当前节点尚未投票(votedFor为None或为自己),
        #       则检查候选人的日志是否至少和自己的日志一样新。
        if args.term == self.current_term:
            if self.voted_for is not None and self.voted_for != args.candidate_id:
                # 已经投票给其他人
                return RequestVoteReply(self.current_term, False)
            else:
                # 检查日志是否至少和自己一样新
                last_log_index, last_log_term = self._get_last_log_info()
                if self._is_log_up_to_date(args.last_log_index, args.last_log_term, last_log_index, last_log_term):
                    self.voted_for = args.candidate_id
                    # 持久化 voted_for
                    self._persist_state()
                    return RequestVoteReply(self.current_term, True)
                else:
                    return RequestVoteReply(self.current_term, False)

        # 默认情况,通常不会走到这里,但为了完整性
        return RequestVoteReply(self.current_term, False)

    def _get_last_log_info(self):
        if not self.log:
            return 0, 0 # 初始空日志
        last_entry = self.log[-1]
        return len(self.log), last_entry.term

    # 辅助函数:判断候选人日志是否比当前节点日志更新
    def _is_log_up_to_date(self, candidate_last_log_index, candidate_last_log_term,
                           my_last_log_index, my_last_log_term):
        # 比较最新日志条目的任期
        if candidate_last_log_term > my_last_log_term:
            return True
        if candidate_last_log_term < my_last_log_term:
            return False
        # 如果任期相同,比较日志长度
        return candidate_last_log_index >= my_last_log_index

    # 模拟持久化操作,实际会写入磁盘
    def _persist_state(self):
        print(f"Node {self.id}: Persisting state - currentTerm={self.current_term}, votedFor={self.voted_for}")

# 假设 LogEntry 结构
class LogEntry:
    def __init__(self, term: int, command: any):
        self.term = term
        self.command = command

class RequestVoteReply:
    def __init__(self, term: int, vote_granted: bool):
        self.term = term
        self.vote_granted = vote_granted

任期(Term)在这里的强制对齐作用体现在:

  • 即时更新与服从权威:当一个节点收到一个 RequestVote RPC,如果其 term 大于自己的 currentTerm,它会立即更新自己的 currentTerm,并转换为 Follower。这意味着任何节点发现了一个“更高级别”的任期,都会立即放弃自己的当前状态(无论是 Follower 还是 Candidate),服从更高的权威。这确保了集群中所有节点最终会收敛到最高的任期号。
  • 防止过时 Leader 产生:日志完整性检查(_is_log_up_to_date)确保只有拥有最新日志的候选人才能当选。虽然这直接关系到日志,但任期在此处的 candidate_last_log_term 比较中扮演核心角色。如果两个日志长度相同,但一个日志的最新条目任期更高,则认为该日志更新。这结合 term 的递增性,保证了被选出的 Leader 拥有所有已提交的日志条目。
  • 单任期单票votedFor 变量确保在一个给定任期内,每个节点最多只能投一票。结合多数派原则,这保证了在任何一个任期内,最多只会有一个 Leader 被成功选出(选举安全性)。如果出现两个 Candidate 同时获得多数但相互冲突的投票,那只会是因为它们在不同的任期中获得了投票,或者没有一个 Candidate 能够获得超过一半的投票,从而导致本任期无法选出 Leader,进入下一个任期重新选举。

三、日志复制与任期(Term)的强制对齐

Leader 选举成功后,新的 Leader 开始工作。它负责接收客户端请求,将命令追加到自己的日志中,然后将日志条目复制到 Follower 节点上,并最终提交这些日志条目。日志复制是 Raft 协议的核心功能,任期在这里同样发挥着至关重要的作用,尤其是在维护日志的一致性和系统安全性方面。

3.1 Leader 的日志复制职责

  1. 处理客户端请求:Leader 接收客户端的命令,将其作为新的日志条目追加到自己的本地日志中。
  2. 发送 AppendEntries RPC:Leader 并行地向所有 Follower 发送 AppendEntries RPC,将新的日志条目复制给它们。
  3. 心跳机制:如果没有新的日志条目需要发送,Leader 会定期发送空的 AppendEntries RPC(心跳),以维持其 Leader 地位,并防止 Follower 触发选举超时。
  4. 日志提交:当一个日志条目被复制到多数节点(包括 Leader 自己)的日志中时,Leader 会将其视为已提交(committed),并将其应用到状态机中。

3.2 AppendEntries RPC 消息结构

AppendEntries RPC 包含以下重要参数:

class AppendEntriesArgs:
    def __init__(self, term: int, leader_id: int, prev_log_index: int,
                 prev_log_term: int, entries: list, leader_commit: int):
        self.term = term  # Leader 的任期
        self.leader_id = leader_id  # Leader 的 ID
        self.prev_log_index = prev_log_index  # 紧邻要发送的新日志条目之前的那个日志条目的索引
        self.prev_log_term = prev_log_term  # 紧邻要发送的新日志条目之前的那个日志条目的任期
        self.entries = entries  # 待复制的日志条目(可能为空,用于心跳)
        self.leader_commit = leader_commit  # Leader 的 commitIndex

class AppendEntriesReply:
    def __init__(self, term: int, success: bool):
        self.term = term
        self.success = success

3.3 AppendEntries RPC 处理逻辑与任期对齐

当一个 Follower 收到 AppendEntries RPC 时,它会根据以下严格的规则进行处理:

class RaftNode:
    # ... (前面的代码省略)

    def handle_append_entries(self, args: AppendEntriesArgs) -> 'AppendEntriesReply':
        # 1. 规则:如果 RPC 的任期小于当前节点的任期,则拒绝。
        #    这意味着发送方(Leader)是过时的,它不应该再作为 Leader。
        if args.term < self.current_term:
            return AppendEntriesReply(self.current_term, False)

        # 2. 规则:如果 RPC 的任期大于或等于当前节点的任期:
        #    a. 当前节点必须更新其任期为 Leader 的任期(如果 Leader 任期更高)。
        #    b. 转换为 Follower 状态。
        #    c. 重置选举计时器。
        #    d. 接受 Leader 的领导。
        if args.term > self.current_term:
            self.current_term = args.term
            self.state = "follower"
            self.voted_for = None # 重置投票
            self._persist_state()

        # 无论任期是否更新,只要收到了有效 Leader 的 AppendEntries,
        # 就说明有 Leader 在线,需要重置选举超时计时器。
        # 即使 args.term == self.current_term 且当前节点是 Candidate,
        # 它也应该转换为 Follower。
        self._reset_election_timeout()
        self.state = "follower" # 确保是 Follower 状态

        # 3. 日志一致性检查:
        #    如果 Follower 的日志在 prev_log_index 处没有匹配的条目,
        #    或者该条目的任期与 prev_log_term 不匹配,则拒绝。
        #    Leader 需要递减 nextIndex 并重试。
        if len(self.log) < args.prev_log_index or 
           (args.prev_log_index > 0 and self.log[args.prev_log_index - 1].term != args.prev_log_term):
            # Raft 协议的索引是从 1 开始的,这里用 Python 列表的 0-based index
            # 如果 args.prev_log_index 为 0,表示是第一个条目,不需要检查。
            # 否则,检查前一个条目是否存在且任期匹配。
            return AppendEntriesReply(self.current_term, False)

        # 4. 如果存在冲突的日志条目,删除从冲突点开始的所有后续条目。
        #    冲突意味着在同一个索引上,Follower 的日志条目与 Leader 的不同(任期不同)。
        i = 0
        while i < len(args.entries):
            log_idx = args.prev_log_index + 1 + i
            if log_idx <= len(self.log):
                # 如果 Follower 的日志条目存在且与 Leader 的不同
                if self.log[log_idx - 1].term != args.entries[i].term:
                    # 删除从冲突点开始的所有条目
                    self.log = self.log[:log_idx - 1]
                    break
            i += 1

        # 5. 追加 Leader 日志中 Follower 缺少的条目。
        for entry in args.entries[i:]: # 从 i 开始追加未冲突的或新增的条目
            self.log.append(entry)

        # 持久化日志
        self._persist_state()

        # 6. 如果 Leader 的 commitIndex 大于当前节点的 commitIndex,
        #    则更新当前节点的 commitIndex 为两者中的最小值。
        #    (Leader 的 commitIndex 可能会比 Follower 的大,
        #     但不能超过 Follower 最新日志的索引)
        if args.leader_commit > self.commit_index:
            self.commit_index = min(args.leader_commit, len(self.log))
            self._apply_log_entries() # 将已提交的日志条目应用到状态机

        return AppendEntriesReply(self.current_term, True)

    def _reset_election_timeout(self):
        # 模拟重置选举超时计时器
        print(f"Node {self.id}: Resetting election timeout.")

    def _apply_log_entries(self):
        # 模拟将日志条目应用到状态机
        while self.last_applied < self.commit_index:
            self.last_applied += 1
            print(f"Node {self.id}: Applying log entry at index {self.last_applied} (Term: {self.log[self.last_applied-1].term})")
            # self.state_machine.apply(self.log[self.last_applied-1].command)

    # 假设 RaftNode 类有这些成员变量
    commit_index = 0
    last_applied = 0

任期(Term)在这里的强制对齐作用体现在:

  • Leader 降级(Stale Leader Protection):这是 AppendEntries RPC 中任期最关键的作用之一。如果 Follower 收到一个 AppendEntries RPC,但发现 Leader 的 term 小于自己的 currentTerm,它会立即拒绝该 RPC。同时,它会在 AppendEntriesReply 中返回自己的 currentTerm。当 Leader 收到这个回复时,它会发现自己的任期已经过时,从而主动将自己降级为 Follower。这个机制保证了在任何给定时刻,集群中只有一个 Leader 是活跃的,并且该 Leader 处于集群中最高的任期。
  • Follower 任期更新与服从:与 RequestVote RPC 类似,如果 Follower 收到一个 AppendEntries RPC,其 term 大于自己的 currentTerm,它会立即更新自己的 currentTerm,转换为 Follower 状态,并重置选举计时器。这确保了所有节点都及时感知到最新的任期,并服从新 Leader 的权威。
  • 日志匹配原则(Log Matching Property)prev_log_indexprev_log_term 参数是 Raft 保证日志一致性的核心。如果一个 AppendEntries RPC 在特定索引 prev_log_index 处的日志条目任期不匹配,Follower 会拒绝该 RPC。Leader 收到拒绝后,会递减其 nextIndex(为该 Follower 记录的下一个要发送的日志条目索引),并重试发送更早的日志条目。这个过程会一直持续,直到 Leader 和 Follower 找到它们日志中第一个匹配的条目。一旦找到匹配点,Raft 就保证在该点之前的所有日志条目都是完全相同的。任期在此处是验证日志是否匹配的决定性因素。
  • 防止旧日志条目覆盖新日志条目:日志条目一旦被提交,其任期和内容就不能被改变。当 Leader 在日志复制过程中发现 Follower 有冲突的日志条目(即在相同索引处但任期不同),它会强制 Follower 截断其日志,并用自己的日志条目覆盖。由于 Leader 总是拥有最新的、正确的日志,且任期是递增的,这保证了旧的、错误的日志条目不会被提交,最终所有节点会收敛到 Leader 的正确日志。

四、安全性(Safety)与任期(Term)的全面保障

Raft 协议设计了多项安全属性,以确保分布式系统在面对故障时数据的一致性。任期(Term)是所有这些安全属性的底层保障机制。

4.1 选举安全性(Election Safety)

定义:在一个给定的任期内,最多只能有一个 Leader 被选出。

任期如何保障

  • 单任期单票:每个节点在每个任期内最多只能投一票(votedFor 机制)。
  • 多数派原则:一个 Candidate 必须从集群中的多数节点获得投票才能当选 Leader。
  • 任期比较:如果两个 Candidate 同时发起选举,它们可能会在不同的任期中获得投票,或者在同一任期中平分票数导致无法选出 Leader,进而进入更高的任期重新选举。任何节点收到来自低任期的 RequestVote RPC 都会直接拒绝。这些机制共同保证了在任何一个有效的任期内,只会有一个 Leader 能够获得多数投票并被选举成功。

4.2 Leader 完整性属性(Leader Completeness Property)

定义:如果 Leader 在某个任期内提交了一个日志条目,那么该条目将存在于所有后续 Leader 的日志中。

任期如何保障

  • 投票规则对日志的限制:在 RequestVote RPC 的处理中,一个 Follower 只有在 Candidate 的日志至少和自己的日志一样新(即 candidate.lastLogTerm > my.lastLogTermcandidate.lastLogTerm == my.lastLogTerm && candidate.lastLogIndex >= my.lastLogIndex)时,才会给它投票。这个关键的规则确保了被选出的 Leader 必定拥有所有已提交的日志条目,因为一个已提交的条目必然存在于多数节点中,而新 Leader 必须从多数节点获得投票,这意味着它的日志必须至少和这些多数节点一样新。
  • Leader 只能提交其自身任期内的日志条目:Raft 规定,Leader 只能提交其当前任期内的日志条目。对于来自早期任期的日志条目,Leader 只有在复制并提交了至少一个其自身任期内的日志条目后,才能间接地提交早期任期的日志。这个规则防止了在网络分区期间,一个旧 Leader 在少数派分区中提交了旧任期日志,而这些日志在多数派分区中已经被覆盖的情况。通过强制要求提交当前任期的日志,Raft 确保了只有在多数派同意的情况下,日志才会被提交,从而维护了日志的线性化。

4.3 日志匹配属性(Log Matching Property)

定义:如果两个日志包含具有相同索引和任期的条目,那么这两个日志在该索引之前的所有条目都将是相同的。

任期如何保障

  • AppendEntries RPC 的一致性检查AppendEntries RPC 消息中的 prev_log_indexprev_log_term 字段是实现这一属性的关键。Leader 在发送日志条目时,会包含前一个日志条目的索引和任期。Follower 只有在本地日志中找到完全匹配的 prev_log_indexprev_log_term 时,才会接受新的日志条目。
  • 任期在冲突解决中的作用:如果发现不匹配,Follower 会拒绝,Leader 会递减 nextIndex 并重试。这个过程会一直回溯,直到找到一个匹配点。一旦找到匹配点,由于任期是单调递增的,并且每个日志条目都带有其生成时的任期,这确保了在该匹配点之前的所有日志条目都必然是相同的。任期在这里作为日志条目的“版本戳”,帮助系统精确地定位并解决日志分歧。

4.4 状态机安全性(State Machine Safety)

定义:如果一个服务器已经将某个索引处的日志条目应用到其状态机中,那么其他任何服务器都不会在该索引处应用不同的日志条目。

任期如何保障

  • Leader 完整性属性:确保所有已提交的日志条目都会出现在未来的 Leader 中。
  • 日志匹配属性:确保日志的一致性,防止在相同索引处出现不同内容的日志。
  • 提交规则(Commit Rule):日志条目只有在被复制到多数节点后才能被提交。并且,对于非当前任期的日志条目,Leader 必须至少提交一个当前任期的日志条目后才能将其视为已提交。这个规则避免了在网络分区发生时,一个少数派分区中的 Leader 提交了某些日志,而这些日志在多数派中可能已经因为新的 Leader 而被覆盖,从而导致状态机的不一致。任期在这里严格限制了哪些日志条目可以在何时被提交,确保了只有在全局多数派同意的情况下,才能最终确认并应用日志。

五、Raft 节点状态与任期管理的代码骨架

为了更好地理解任期在实际代码中的管理,我们来看一个简化的 Raft 节点代码骨架。

import threading
import time
import random
from collections import deque

# 简化 RPC 模拟
class RPC:
    def __init__(self, sender_id, type, args):
        self.sender_id = sender_id
        self.type = type
        self.args = args

# 简化 LogEntry
class LogEntry:
    def __init__(self, term: int, command: str):
        self.term = term
        self.command = command

# 模拟网络通信
class Network:
    def __init__(self):
        self.nodes = {} # {node_id: RaftNode}
        self.message_queue = deque() # 模拟网络消息队列

    def register_node(self, node):
        self.nodes[node.id] = node

    def send_rpc(self, sender_id, receiver_id, rpc_type, args):
        # 模拟网络延迟和不可靠性
        time.sleep(random.uniform(0.01, 0.05))
        if receiver_id in self.nodes:
            # 实际中,这里会通过网络发送序列化的 RPC
            self.message_queue.append((sender_id, receiver_id, rpc_type, args))

    def deliver_messages(self):
        while self.message_queue:
            sender_id, receiver_id, rpc_type, args = self.message_queue.popleft()
            receiver_node = self.nodes.get(receiver_id)
            if receiver_node:
                # 实际中,根据 RPC 类型调用不同的处理函数
                if rpc_type == "RequestVote":
                    reply = receiver_node.handle_request_vote(args)
                    self.send_rpc(receiver_id, sender_id, "RequestVoteReply", reply)
                elif rpc_type == "AppendEntries":
                    reply = receiver_node.handle_append_entries(args)
                    self.send_rpc(receiver_id, sender_id, "AppendEntriesReply", reply)
                elif rpc_type == "RequestVoteReply":
                    receiver_node.handle_request_vote_reply(sender_id, args)
                elif rpc_type == "AppendEntriesReply":
                    receiver_node.handle_append_entries_reply(sender_id, args)

class RaftNode:
    def __init__(self, id: int, peers: list, network: Network):
        self.id = id
        self.peers = peers # 其他节点的 ID 列表
        self.network = network

        # Persistent state on all servers (must be updated on stable storage before responding to RPCs)
        self.current_term = 0
        self.voted_for = None # candidateId that received vote in current term (or None if none)
        self.log = [] # list of LogEntry objects

        # Volatile state on all servers
        self.commit_index = 0 # index of highest log entry known to be committed
        self.last_applied = 0 # index of highest log entry applied to state machine

        # Volatile state on leaders (reinitialized after election)
        self.next_index = {peer_id: 1 for peer_id in peers} # for each server, index of the next log entry to send to that server
        self.match_index = {peer_id: 0 for peer_id in peers} # for each server, index of highest log entry known to be replicated on server

        # Raft state
        self.state = "follower"
        self.leader_id = None

        # Timers
        self.election_timeout = random.uniform(0.150, 0.300) # 150-300ms
        self.heartbeat_timeout = 0.050 # 50ms
        self._last_heartbeat_time = time.time()
        self._last_election_reset_time = time.time()

        self._thread = threading.Thread(target=self._run)
        self._thread.daemon = True
        self._running = False
        self._lock = threading.Lock()

    def start(self):
        self._running = True
        self._thread.start()

    def stop(self):
        self._running = False
        self._thread.join()

    def _run(self):
        while self._running:
            with self._lock:
                current_time = time.time()

                if self.state == "follower":
                    if current_time - self._last_election_reset_time > self.election_timeout:
                        print(f"Node {self.id} (T{self.current_term}): Election timeout. Becoming candidate.")
                        self._become_candidate()
                elif self.state == "candidate":
                    # Check if election timeout re-triggers (no majority obtained)
                    if current_time - self._last_election_reset_time > self.election_timeout:
                        print(f"Node {self.id} (T{self.current_term}): Election timeout again. Retrying election.")
                        self._become_candidate()
                elif self.state == "leader":
                    if current_time - self._last_heartbeat_time > self.heartbeat_timeout:
                        self._send_heartbeats()
                        self._last_heartbeat_time = current_time
            time.sleep(0.01) # Small sleep to prevent busy-waiting

    def _persist_state(self):
        # In a real system, this would write current_term, voted_for, and log to disk
        # before responding to RPCs. For simplicity, we just print.
        print(f"Node {self.id}: Persisting state - currentTerm={self.current_term}, votedFor={self.voted_for}, log_len={len(self.log)}")

    def _get_last_log_info(self):
        if not self.log:
            return 0, 0
        last_entry = self.log[-1]
        return len(self.log), last_entry.term

    def _is_log_up_to_date(self, candidate_last_log_index, candidate_last_log_term,
                           my_last_log_index, my_last_log_term):
        if candidate_last_log_term > my_last_log_term:
            return True
        if candidate_last_log_term < my_last_log_term:
            return False
        return candidate_last_log_index >= my_last_log_index

    def _reset_election_timeout(self):
        self._last_election_reset_time = time.time()
        self.election_timeout = random.uniform(0.150, 0.300) # Reset to a new random value

    def _become_follower(self, term):
        self.state = "follower"
        self.leader_id = None
        if term > self.current_term:
            self.current_term = term
            self.voted_for = None
            self._persist_state()
        self._reset_election_timeout()
        print(f"Node {self.id}: Became Follower in Term {self.current_term}.")

    def _become_candidate(self):
        self.current_term += 1
        self.state = "candidate"
        self.voted_for = self.id # Vote for self
        self._persist_state()
        self._reset_election_timeout() # Start new election timeout
        self.votes_received = {self.id} # Include own vote
        self.leader_id = None

        last_log_index, last_log_term = self._get_last_log_info()
        print(f"Node {self.id} (T{self.current_term}): Starting election with log ({last_log_index}, T{last_log_term}).")

        for peer_id in self.peers:
            args = RequestVoteArgs(self.current_term, self.id, last_log_index, last_log_term)
            self.network.send_rpc(self.id, peer_id, "RequestVote", args)

    def _become_leader(self):
        self.state = "leader"
        self.leader_id = self.id
        self._last_heartbeat_time = time.time() # Immediately send heartbeats
        print(f"Node {self.id} (T{self.current_term}): Became Leader!")

        # Reinitialize volatile state on leaders
        last_log_index = len(self.log)
        for peer_id in self.peers:
            self.next_index[peer_id] = last_log_index + 1
            self.match_index[peer_id] = 0

        self._send_heartbeats() # Send initial empty AppendEntries for consistency

    def _send_heartbeats(self):
        # print(f"Node {self.id} (T{self.current_term}): Sending heartbeats.")
        for peer_id in self.peers:
            prev_log_index = self.next_index[peer_id] - 1
            prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0

            # For heartbeat, entries list is empty
            args = AppendEntriesArgs(self.current_term, self.id, prev_log_index, prev_log_term, [], self.commit_index)
            self.network.send_rpc(self.id, peer_id, "AppendEntries", args)

    # --- RPC Handlers (as defined in previous sections) ---
    def handle_request_vote(self, args: RequestVoteArgs) -> 'RequestVoteReply':
        with self._lock:
            if args.term < self.current_term:
                return RequestVoteReply(self.current_term, False)

            if args.term > self.current_term:
                self._become_follower(args.term) # Automatically updates term, state, votedFor

            vote_granted = False
            if (self.voted_for is None or self.voted_for == args.candidate_id):
                last_log_index, last_log_term = self._get_last_log_info()
                if self._is_log_up_to_date(args.last_log_index, args.last_log_term, last_log_index, last_log_term):
                    self.voted_for = args.candidate_id
                    self._persist_state()
                    vote_granted = True

            print(f"Node {self.id} (T{self.current_term}): Voted {vote_granted} for {args.candidate_id} in T{args.term}.")
            return RequestVoteReply(self.current_term, vote_granted)

    def handle_request_vote_reply(self, sender_id: int, reply: RequestVoteReply):
        with self._lock:
            if self.state != "candidate":
                return # Ignore if not candidate anymore

            if reply.term > self.current_term:
                self._become_follower(reply.term)
                return

            if reply.term == self.current_term and reply.vote_granted:
                self.votes_received.add(sender_id)
                if len(self.votes_received) > (len(self.peers) + 1) // 2: # Majority
                    print(f"Node {self.id} (T{self.current_term}): Received majority votes. Becoming Leader.")
                    self._become_leader()
            elif reply.term == self.current_term and not reply.vote_granted:
                print(f"Node {self.id} (T{self.current_term}): {sender_id} denied vote.")

    def handle_append_entries(self, args: AppendEntriesArgs) -> 'AppendEntriesReply':
        with self._lock:
            if args.term < self.current_term:
                return AppendEntriesReply(self.current_term, False)

            # If term is greater or equal, this is a valid leader, reset timeout and become follower
            self._become_follower(args.term) # Handles term update and state change

            # Log consistency check
            # prev_log_index == 0 means first entry or empty log
            if args.prev_log_index > 0 and 
               (len(self.log) < args.prev_log_index or 
                self.log[args.prev_log_index - 1].term != args.prev_log_term):
                print(f"Node {self.id} (T{self.current_term}): Log inconsistency at index {args.prev_log_index}. My log len: {len(self.log)}")
                return AppendEntriesReply(self.current_term, False)

            # If existing entry conflicts with new one (same index, different term), delete existing and all that follow
            # Find the first conflicting entry
            i = 0
            while i < len(args.entries):
                log_idx_to_check = args.prev_log_index + 1 + i
                if log_idx_to_check <= len(self.log):
                    if self.log[log_idx_to_check - 1].term != args.entries[i].term:
                        self.log = self.log[:log_idx_to_check - 1] # Truncate conflicting entries
                        print(f"Node {self.id} (T{self.current_term}): Truncated log from index {log_idx_to_check}.")
                        break # Start appending from this point
                i += 1

            # Append any new entries not already in log
            for entry in args.entries[i:]:
                self.log.append(entry)

            self._persist_state()

            # Update commitIndex
            if args.leader_commit > self.commit_index:
                self.commit_index = min(args.leader_commit, len(self.log))
                self._apply_log_entries()

            return AppendEntriesReply(self.current_term, True)

    def handle_append_entries_reply(self, sender_id: int, reply: AppendEntriesReply):
        with self._lock:
            if self.state != "leader":
                return # Ignore if not leader anymore

            if reply.term > self.current_term:
                self._become_follower(reply.term)
                return

            if reply.term == self.current_term:
                if reply.success:
                    # Update nextIndex and matchIndex for this follower
                    # (Simplified: assume all entries were sent, need to refine for partial sends)
                    # For real Raft, if AppendEntriesArgs.entries was not empty,
                    # nextIndex would be updated to prevLogIndex + len(entries) + 1
                    # and matchIndex to prevLogIndex + len(entries).
                    # For heartbeats (empty entries), these wouldn't change.

                    # This simplified logic for heartbeats or full successful replication
                    # assumes the Follower has matched up to the current log length.
                    # A real implementation needs to track what was actually sent/acked.
                    if self.next_index[sender_id] <= len(self.log): # Ensure next_index doesn't go backwards
                         self.match_index[sender_id] = len(self.log) # Assume it matched up to current leader's log length
                         self.next_index[sender_id] = len(self.log) + 1 # Next entry to send

                    # Try to advance commitIndex
                    self._update_commit_index()
                else:
                    # Replication failed, decrement nextIndex and retry
                    self.next_index[sender_id] = max(1, self.next_index[sender_id] - 1)
                    # print(f"Node {self.id} (T{self.current_term}): Replication failed to {sender_id}. Decrementing nextIndex to {self.next_index[sender_id]}.")
                    # Immediately retry sending AppendEntries for this follower
                    # (In a real system, this would be part of the heartbeat/replication loop)
                    # self._send_append_entries_to_follower(sender_id)

    def _update_commit_index(self):
        # Leader attempts to advance its commitIndex
        # Find N such that N > commitIndex, a majority of matchIndex[i] >= N,
        # and log[N].term == currentTerm.
        # The third condition is crucial for safety with entries from previous terms.

        N = self.commit_index + 1
        while N <= len(self.log):
            if self.log[N-1].term == self.current_term: # Only commit entries from current term (or implicitly via current term entry)
                count = 1 # Leader itself
                for peer_id in self.peers:
                    if self.match_index[peer_id] >= N:
                        count += 1

                if count > (len(self.peers) + 1) // 2: # Majority
                    self.commit_index = N
                    self._apply_log_entries()
                else:
                    break # Cannot commit N yet
            N += 1

    def _apply_log_entries(self):
        while self.last_applied < self.commit_index:
            self.last_applied += 1
            entry = self.log[self.last_applied - 1]
            print(f"Node {self.id} (T{self.current_term}): Applying log entry {self.last_applied} (Term: {entry.term}, Cmd: {entry.command})")
            # self.state_machine.apply(entry.command)

    def client_command(self, command: str):
        with self._lock:
            if self.state != "leader":
                print(f"Node {self.id}: Not Leader. Cannot process command '{command}'. Current state: {self.state}")
                return False

            new_entry = LogEntry(self.current_term, command)
            self.log.append(new_entry)
            self._persist_state() # Persist new log entry

            print(f"Node {self.id} (T{self.current_term}): Appended client command '{command}' at index {len(self.log)}.")

            # Send AppendEntries to followers with the new entry
            for peer_id in self.peers:
                prev_log_index = self.next_index[peer_id] - 1
                prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0

                # Send all entries from nextIndex[peer_id] onwards
                entries_to_send = self.log[self.next_index[peer_id]-1:]

                args = AppendEntriesArgs(self.current_term, self.id, prev_log_index, prev_log_term, entries_to_send, self.commit_index)
                self.network.send_rpc(self.id, peer_id, "AppendEntries", args)
            return True

# --- Simulation Setup ---
if __name__ == "__main__":
    node_ids = [1, 2, 3, 4, 5]
    network = Network()
    nodes = []

    for id in node_ids:
        peers_for_node = [p for p in node_ids if p != id]
        node = RaftNode(id, peers_for_node, network)
        network.register_node(node)
        nodes.append(node)

    for node in nodes:
        node.start()

    # Give some time for initial election
    print("n--- Starting Raft Simulation ---")
    time.sleep(1.0)
    print("n--- Initial Election Period ---")

    # Simulate client commands
    command_counter = 0
    for _ in range(5):
        time.sleep(random.uniform(0.1, 0.5))
        leader_found = False
        for node in nodes:
            with node._lock:
                if node.state == "leader":
                    command_counter += 1
                    command = f"SET_KEY_{command_counter}_VALUE_{command_counter}"
                    print(f"nClient sending command: {command}")
                    node.client_command(command)
                    leader_found = True
                    break
        if not leader_found:
            print("nNo leader found to process command. Retrying...")

        # Process network messages to allow RPCs and replies
        network.deliver_messages()
        time.sleep(0.1) # Give time for replies to be processed

    print("n--- Simulating Network Partition (Node 1 and 2 isolated) ---")
    # Simulate a network partition: Node 1 and 2 cannot talk to 3,4,5
    # This is a simplified simulation and would require more sophisticated network mocking
    # For now, we'll just stop node 1 and 2's message processing temporarily
    # In a real system, you'd modify the network routing logic.

    # Let's assume Node 1 was leader and gets partitioned
    # A new leader should be elected from [3, 4, 5]

    for node in nodes:
        with node._lock:
            if node.id in [1, 2]:
                node._running = False # Temporarily stop processing
                print(f"Node {node.id} temporarily stopped processing messages.")

    time.sleep(2.0) # Allow time for new election in remaining partition

    print("n--- After Partition: New Leader Election Expected ---")
    for _ in range(3):
        time.sleep(random.uniform(0.1, 0.5))
        leader_found = False
        for node in nodes:
            if node.id in [3,4,5]: # Only check nodes in the majority partition
                with node._lock:
                    if node.state == "leader":
                        command_counter += 1
                        command = f"PARTITION_CMD_{command_counter}"
                        print(f"nClient sending command: {command} to new leader.")
                        node.client_command(command)
                        leader_found = True
                        break
        if not leader_found:
            print("nNo leader found in majority partition to process command. Retrying...")

        network.deliver_messages()
        time.sleep(0.1)

    print("n--- Stopping Raft Simulation ---")
    for node in nodes:
        node.stop()

    print("nFinal State of Nodes:")
    for node in nodes:
        print(f"Node {node.id}: Term={node.current_term}, State={node.state}, CommitIndex={node.commit_index}, LogLen={len(node.log)}")

这个代码骨架展示了 current_term 如何贯穿整个 Raft 节点的状态管理和 RPC 处理。每次 RPC 请求和回复都会对 term 进行比较,并根据比较结果决定如何更新节点状态、是否接受请求、是否转换角色,以及是否触发日志同步逻辑。_persist_state() 模拟了将持久化状态写入稳定存储,这在 Raft 中是确保数据不丢失的关键。

六、任期在 Raft 协议中的核心价值

任期在 Raft 协议中扮演着多重角色,其核心价值在于提供了一个强大的、自洽的机制来强制整个分布式系统在面对各种故障时保持一致性:

  1. 统一逻辑时间:任期提供了一个全局的、单调递增的逻辑时间轴。所有节点都通过任期来同步它们对系统当前状态的认知。
  2. 权威性与角色转换:更高的任期总是代表着更高的权威。任何节点收到来自更高任期的 RPC 都会立即更新自己的任期并转换为 Follower,从而避免了“脑裂”(split-brain)问题,即多个节点误以为自己是 Leader。
  3. 日志一致性基石:任期是日志条目的一部分,它在日志复制和日志匹配过程中起到“版本戳”的作用,帮助 Leader 和 Follower 发现并解决日志分歧。
  4. 安全属性的保障:选举安全性、Leader 完整性、日志匹配和状态机安全性都直接或间接地依赖于任期机制。任期确保了只有拥有最新、最完整日志的 Leader 才能被选举,也确保了日志在复制和提交过程中的正确性和不可篡改性。

七、结语

Raft 协议通过引入任期(Term)这一核心概念,成功地将复杂的分布式共识问题分解为更易于理解和实现的部分。任期作为 Raft 的逻辑时钟和权威标识,严格地强制了 Leader 选举、日志复制和各种安全属性的对齐。正是这种对任期的巧妙而严谨的使用,使得 Raft 在保证强一致性和容错性的同时,也极大地提升了协议的教学和工程实践效率。理解任期如何贯穿 Raft 协议的方方面面,是我们掌握分布式系统一致性精髓的关键一步。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注