深入 Raft 协议:Leader 选举、日志复制与安全性是如何通过任期(Term)强制对齐的?
各位同仁,大家好。
在分布式系统领域,共识协议是实现数据一致性和容错性的基石。Paxos 协议以其严谨的数学推导而闻名,但也因其复杂性让许多开发者望而却步。Raft 协议的出现,旨在提供一个与 Paxos 相同安全性和性能,但更易于理解和实现的替代方案。Raft 的核心思想是“理解性是关键”(Understanding is Key),它通过明确的角色划分、简洁的 RPC 接口以及对时间周期的严格管理,极大地简化了分布式共识的复杂性。
今天,我们将深入探讨 Raft 协议中一个至关重要的概念——任期(Term)。任期是 Raft 协议的逻辑时钟,它像一条无形的纽带,将 Leader 选举、日志复制和整个系统的安全性紧密地强制对齐在一起。理解任期如何运作,是掌握 Raft 协议精髓的关键。
一、Raft 协议概述与任期(Term)的定义
在开始深入任期之前,我们先快速回顾一下 Raft 的基本概念。一个 Raft 集群通常由奇数个节点组成(例如 3 个或 5 个),以确保在网络分区或节点故障时仍能形成多数派。每个节点在任何给定时间都处于以下三种状态之一:
- Follower(追随者):被动响应 Leader 或 Candidate 的请求。
- Candidate(候选人):在 Leader 选举期间,尝试成为新的 Leader。
- Leader(领导者):处理所有客户端请求,管理日志复制。
Raft 节点之间通过两种主要 RPC 消息进行通信:
RequestVote RPC:用于 Leader 选举。AppendEntries RPC:用于日志复制和心跳机制。
现在,我们来正式定义任期(Term)。
任期(Term)是一个单调递增的整数。在 Raft 协议中,每一个任期都代表了一个 Leader 选举的周期。当一个节点发现新的 Leader 被选举出来,或者自身开始尝试成为 Leader 时,它会进入一个新的任期。任期在 Raft 中扮演着以下核心角色:
- 逻辑时钟:它提供了一个全局的、单调递增的时间概念,帮助节点理解事件发生的顺序和新旧关系。
- 权威标识:更高的任期号总是代表着更高的权威。Raft 中的所有决策,包括 Leader 的身份、日志的接受与否,都严格服从任期号的权威性。
- 强制对齐机制:它是确保 Leader 选举、日志复制和系统安全性的核心机制,所有节点都必须根据任期号来更新自己的状态,以实现全局的一致性。
每个 Raft 节点都会持久化存储一个 currentTerm 变量,表示该节点当前所处的任期。这是 Raft 协议中最重要的持久化状态之一,因为它在节点故障重启后必须恢复,以维护协议的正确性。
二、Leader 选举与任期(Term)的强制对齐
Leader 选举是 Raft 协议的起点,也是任期发挥作用的首个关键阶段。当一个 Follower 在一段时间内(称为选举超时)没有收到 Leader 的心跳或 AppendEntries RPC 时,它会认为当前 Leader 已经失效,并开始尝试进行 Leader 选举。
2.1 选举流程
-
Follower 转换为 Candidate:
- 一个 Follower 增加其
currentTerm。 - 将
votedFor(投票给谁)设置为自己。 - 向集群中的其他所有节点发送
RequestVote RPC。 - 进入 Candidate 状态。
- 一个 Follower 增加其
-
Candidate 接收投票:
- 如果 Candidate 收到多数节点的投票,它就成为 Leader。
- 如果 Candidate 在选举超时时间内没有收到多数投票,它会再次增加
currentTerm并重新发起选举。 - 如果 Candidate 在选举过程中收到来自其他 Leader 的
AppendEntries RPC(其任期号大于或等于自己的currentTerm),它会立即转换为 Follower,并接受该 Leader 的领导。
2.2 RequestVote RPC 消息结构
RequestVote RPC 包含以下重要参数:
class RequestVoteArgs:
def __init__(self, term: int, candidate_id: int, last_log_index: int, last_log_term: int):
self.term = term # 候选人的任期
self.candidate_id = candidate_id # 候选人的 ID
self.last_log_index = last_log_index # 候选人日志中最新条目的索引
self.last_log_term = last_log_term # 候选人日志中最新条目的任期
2.3 RequestVote RPC 处理逻辑与任期对齐
当一个节点(无论是 Follower 还是另一个 Candidate)收到 RequestVote RPC 时,它会根据以下规则进行处理,其中任期(term)是核心判断依据:
class RaftNode:
def __init__(self, id, initial_term=0):
self.id = id
self.current_term = initial_term # 持久化状态
self.voted_for = None # 持久化状态
self.log = [] # 持久化状态,这里简化为一个列表,实际会更复杂
self.state = "follower"
# ... 其他状态,如commitIndex, lastApplied, nextIndex, matchIndex等
def handle_request_vote(self, args: RequestVoteArgs) -> 'RequestVoteReply':
# 1. 规则:如果 RPC 的任期小于当前节点的任期,则拒绝投票
# 这意味着发送方(Candidate)是过时的。
if args.term < self.current_term:
return RequestVoteReply(self.current_term, False)
# 2. 规则:如果 RPC 的任期大于当前节点的任期,则当前节点更新其任期,
# 并转换为 Follower 状态。这是任期对齐的关键一步。
# 无论当前节点是 Follower 还是 Candidate,发现更高任期都必须服从。
if args.term > self.current_term:
self.current_term = args.term
self.state = "follower"
self.voted_for = None # 在新任期开始时,重置投票状态
# 持久化 current_term 和 voted_for
self._persist_state()
# 3. 规则:如果 RPC 的任期等于当前节点的任期:
# a. 如果当前节点已经在本任期内投票给其他候选人,则拒绝投票。
# b. 如果当前节点尚未投票(votedFor为None或为自己),
# 则检查候选人的日志是否至少和自己的日志一样新。
if args.term == self.current_term:
if self.voted_for is not None and self.voted_for != args.candidate_id:
# 已经投票给其他人
return RequestVoteReply(self.current_term, False)
else:
# 检查日志是否至少和自己一样新
last_log_index, last_log_term = self._get_last_log_info()
if self._is_log_up_to_date(args.last_log_index, args.last_log_term, last_log_index, last_log_term):
self.voted_for = args.candidate_id
# 持久化 voted_for
self._persist_state()
return RequestVoteReply(self.current_term, True)
else:
return RequestVoteReply(self.current_term, False)
# 默认情况,通常不会走到这里,但为了完整性
return RequestVoteReply(self.current_term, False)
def _get_last_log_info(self):
if not self.log:
return 0, 0 # 初始空日志
last_entry = self.log[-1]
return len(self.log), last_entry.term
# 辅助函数:判断候选人日志是否比当前节点日志更新
def _is_log_up_to_date(self, candidate_last_log_index, candidate_last_log_term,
my_last_log_index, my_last_log_term):
# 比较最新日志条目的任期
if candidate_last_log_term > my_last_log_term:
return True
if candidate_last_log_term < my_last_log_term:
return False
# 如果任期相同,比较日志长度
return candidate_last_log_index >= my_last_log_index
# 模拟持久化操作,实际会写入磁盘
def _persist_state(self):
print(f"Node {self.id}: Persisting state - currentTerm={self.current_term}, votedFor={self.voted_for}")
# 假设 LogEntry 结构
class LogEntry:
def __init__(self, term: int, command: any):
self.term = term
self.command = command
class RequestVoteReply:
def __init__(self, term: int, vote_granted: bool):
self.term = term
self.vote_granted = vote_granted
任期(Term)在这里的强制对齐作用体现在:
- 即时更新与服从权威:当一个节点收到一个
RequestVote RPC,如果其term大于自己的currentTerm,它会立即更新自己的currentTerm,并转换为 Follower。这意味着任何节点发现了一个“更高级别”的任期,都会立即放弃自己的当前状态(无论是 Follower 还是 Candidate),服从更高的权威。这确保了集群中所有节点最终会收敛到最高的任期号。 - 防止过时 Leader 产生:日志完整性检查(
_is_log_up_to_date)确保只有拥有最新日志的候选人才能当选。虽然这直接关系到日志,但任期在此处的candidate_last_log_term比较中扮演核心角色。如果两个日志长度相同,但一个日志的最新条目任期更高,则认为该日志更新。这结合term的递增性,保证了被选出的 Leader 拥有所有已提交的日志条目。 - 单任期单票:
votedFor变量确保在一个给定任期内,每个节点最多只能投一票。结合多数派原则,这保证了在任何一个任期内,最多只会有一个 Leader 被成功选出(选举安全性)。如果出现两个 Candidate 同时获得多数但相互冲突的投票,那只会是因为它们在不同的任期中获得了投票,或者没有一个 Candidate 能够获得超过一半的投票,从而导致本任期无法选出 Leader,进入下一个任期重新选举。
三、日志复制与任期(Term)的强制对齐
Leader 选举成功后,新的 Leader 开始工作。它负责接收客户端请求,将命令追加到自己的日志中,然后将日志条目复制到 Follower 节点上,并最终提交这些日志条目。日志复制是 Raft 协议的核心功能,任期在这里同样发挥着至关重要的作用,尤其是在维护日志的一致性和系统安全性方面。
3.1 Leader 的日志复制职责
- 处理客户端请求:Leader 接收客户端的命令,将其作为新的日志条目追加到自己的本地日志中。
- 发送
AppendEntries RPC:Leader 并行地向所有 Follower 发送AppendEntries RPC,将新的日志条目复制给它们。 - 心跳机制:如果没有新的日志条目需要发送,Leader 会定期发送空的
AppendEntries RPC(心跳),以维持其 Leader 地位,并防止 Follower 触发选举超时。 - 日志提交:当一个日志条目被复制到多数节点(包括 Leader 自己)的日志中时,Leader 会将其视为已提交(committed),并将其应用到状态机中。
3.2 AppendEntries RPC 消息结构
AppendEntries RPC 包含以下重要参数:
class AppendEntriesArgs:
def __init__(self, term: int, leader_id: int, prev_log_index: int,
prev_log_term: int, entries: list, leader_commit: int):
self.term = term # Leader 的任期
self.leader_id = leader_id # Leader 的 ID
self.prev_log_index = prev_log_index # 紧邻要发送的新日志条目之前的那个日志条目的索引
self.prev_log_term = prev_log_term # 紧邻要发送的新日志条目之前的那个日志条目的任期
self.entries = entries # 待复制的日志条目(可能为空,用于心跳)
self.leader_commit = leader_commit # Leader 的 commitIndex
class AppendEntriesReply:
def __init__(self, term: int, success: bool):
self.term = term
self.success = success
3.3 AppendEntries RPC 处理逻辑与任期对齐
当一个 Follower 收到 AppendEntries RPC 时,它会根据以下严格的规则进行处理:
class RaftNode:
# ... (前面的代码省略)
def handle_append_entries(self, args: AppendEntriesArgs) -> 'AppendEntriesReply':
# 1. 规则:如果 RPC 的任期小于当前节点的任期,则拒绝。
# 这意味着发送方(Leader)是过时的,它不应该再作为 Leader。
if args.term < self.current_term:
return AppendEntriesReply(self.current_term, False)
# 2. 规则:如果 RPC 的任期大于或等于当前节点的任期:
# a. 当前节点必须更新其任期为 Leader 的任期(如果 Leader 任期更高)。
# b. 转换为 Follower 状态。
# c. 重置选举计时器。
# d. 接受 Leader 的领导。
if args.term > self.current_term:
self.current_term = args.term
self.state = "follower"
self.voted_for = None # 重置投票
self._persist_state()
# 无论任期是否更新,只要收到了有效 Leader 的 AppendEntries,
# 就说明有 Leader 在线,需要重置选举超时计时器。
# 即使 args.term == self.current_term 且当前节点是 Candidate,
# 它也应该转换为 Follower。
self._reset_election_timeout()
self.state = "follower" # 确保是 Follower 状态
# 3. 日志一致性检查:
# 如果 Follower 的日志在 prev_log_index 处没有匹配的条目,
# 或者该条目的任期与 prev_log_term 不匹配,则拒绝。
# Leader 需要递减 nextIndex 并重试。
if len(self.log) < args.prev_log_index or
(args.prev_log_index > 0 and self.log[args.prev_log_index - 1].term != args.prev_log_term):
# Raft 协议的索引是从 1 开始的,这里用 Python 列表的 0-based index
# 如果 args.prev_log_index 为 0,表示是第一个条目,不需要检查。
# 否则,检查前一个条目是否存在且任期匹配。
return AppendEntriesReply(self.current_term, False)
# 4. 如果存在冲突的日志条目,删除从冲突点开始的所有后续条目。
# 冲突意味着在同一个索引上,Follower 的日志条目与 Leader 的不同(任期不同)。
i = 0
while i < len(args.entries):
log_idx = args.prev_log_index + 1 + i
if log_idx <= len(self.log):
# 如果 Follower 的日志条目存在且与 Leader 的不同
if self.log[log_idx - 1].term != args.entries[i].term:
# 删除从冲突点开始的所有条目
self.log = self.log[:log_idx - 1]
break
i += 1
# 5. 追加 Leader 日志中 Follower 缺少的条目。
for entry in args.entries[i:]: # 从 i 开始追加未冲突的或新增的条目
self.log.append(entry)
# 持久化日志
self._persist_state()
# 6. 如果 Leader 的 commitIndex 大于当前节点的 commitIndex,
# 则更新当前节点的 commitIndex 为两者中的最小值。
# (Leader 的 commitIndex 可能会比 Follower 的大,
# 但不能超过 Follower 最新日志的索引)
if args.leader_commit > self.commit_index:
self.commit_index = min(args.leader_commit, len(self.log))
self._apply_log_entries() # 将已提交的日志条目应用到状态机
return AppendEntriesReply(self.current_term, True)
def _reset_election_timeout(self):
# 模拟重置选举超时计时器
print(f"Node {self.id}: Resetting election timeout.")
def _apply_log_entries(self):
# 模拟将日志条目应用到状态机
while self.last_applied < self.commit_index:
self.last_applied += 1
print(f"Node {self.id}: Applying log entry at index {self.last_applied} (Term: {self.log[self.last_applied-1].term})")
# self.state_machine.apply(self.log[self.last_applied-1].command)
# 假设 RaftNode 类有这些成员变量
commit_index = 0
last_applied = 0
任期(Term)在这里的强制对齐作用体现在:
- Leader 降级(Stale Leader Protection):这是
AppendEntries RPC中任期最关键的作用之一。如果 Follower 收到一个AppendEntries RPC,但发现 Leader 的term小于自己的currentTerm,它会立即拒绝该 RPC。同时,它会在AppendEntriesReply中返回自己的currentTerm。当 Leader 收到这个回复时,它会发现自己的任期已经过时,从而主动将自己降级为 Follower。这个机制保证了在任何给定时刻,集群中只有一个 Leader 是活跃的,并且该 Leader 处于集群中最高的任期。 - Follower 任期更新与服从:与
RequestVote RPC类似,如果 Follower 收到一个AppendEntries RPC,其term大于自己的currentTerm,它会立即更新自己的currentTerm,转换为 Follower 状态,并重置选举计时器。这确保了所有节点都及时感知到最新的任期,并服从新 Leader 的权威。 - 日志匹配原则(Log Matching Property):
prev_log_index和prev_log_term参数是 Raft 保证日志一致性的核心。如果一个AppendEntries RPC在特定索引prev_log_index处的日志条目任期不匹配,Follower 会拒绝该 RPC。Leader 收到拒绝后,会递减其nextIndex(为该 Follower 记录的下一个要发送的日志条目索引),并重试发送更早的日志条目。这个过程会一直持续,直到 Leader 和 Follower 找到它们日志中第一个匹配的条目。一旦找到匹配点,Raft 就保证在该点之前的所有日志条目都是完全相同的。任期在此处是验证日志是否匹配的决定性因素。 - 防止旧日志条目覆盖新日志条目:日志条目一旦被提交,其任期和内容就不能被改变。当 Leader 在日志复制过程中发现 Follower 有冲突的日志条目(即在相同索引处但任期不同),它会强制 Follower 截断其日志,并用自己的日志条目覆盖。由于 Leader 总是拥有最新的、正确的日志,且任期是递增的,这保证了旧的、错误的日志条目不会被提交,最终所有节点会收敛到 Leader 的正确日志。
四、安全性(Safety)与任期(Term)的全面保障
Raft 协议设计了多项安全属性,以确保分布式系统在面对故障时数据的一致性。任期(Term)是所有这些安全属性的底层保障机制。
4.1 选举安全性(Election Safety)
定义:在一个给定的任期内,最多只能有一个 Leader 被选出。
任期如何保障:
- 单任期单票:每个节点在每个任期内最多只能投一票(
votedFor机制)。 - 多数派原则:一个 Candidate 必须从集群中的多数节点获得投票才能当选 Leader。
- 任期比较:如果两个 Candidate 同时发起选举,它们可能会在不同的任期中获得投票,或者在同一任期中平分票数导致无法选出 Leader,进而进入更高的任期重新选举。任何节点收到来自低任期的
RequestVote RPC都会直接拒绝。这些机制共同保证了在任何一个有效的任期内,只会有一个 Leader 能够获得多数投票并被选举成功。
4.2 Leader 完整性属性(Leader Completeness Property)
定义:如果 Leader 在某个任期内提交了一个日志条目,那么该条目将存在于所有后续 Leader 的日志中。
任期如何保障:
- 投票规则对日志的限制:在
RequestVote RPC的处理中,一个 Follower 只有在 Candidate 的日志至少和自己的日志一样新(即candidate.lastLogTerm > my.lastLogTerm或candidate.lastLogTerm == my.lastLogTerm && candidate.lastLogIndex >= my.lastLogIndex)时,才会给它投票。这个关键的规则确保了被选出的 Leader 必定拥有所有已提交的日志条目,因为一个已提交的条目必然存在于多数节点中,而新 Leader 必须从多数节点获得投票,这意味着它的日志必须至少和这些多数节点一样新。 - Leader 只能提交其自身任期内的日志条目:Raft 规定,Leader 只能提交其当前任期内的日志条目。对于来自早期任期的日志条目,Leader 只有在复制并提交了至少一个其自身任期内的日志条目后,才能间接地提交早期任期的日志。这个规则防止了在网络分区期间,一个旧 Leader 在少数派分区中提交了旧任期日志,而这些日志在多数派分区中已经被覆盖的情况。通过强制要求提交当前任期的日志,Raft 确保了只有在多数派同意的情况下,日志才会被提交,从而维护了日志的线性化。
4.3 日志匹配属性(Log Matching Property)
定义:如果两个日志包含具有相同索引和任期的条目,那么这两个日志在该索引之前的所有条目都将是相同的。
任期如何保障:
AppendEntries RPC的一致性检查:AppendEntries RPC消息中的prev_log_index和prev_log_term字段是实现这一属性的关键。Leader 在发送日志条目时,会包含前一个日志条目的索引和任期。Follower 只有在本地日志中找到完全匹配的prev_log_index和prev_log_term时,才会接受新的日志条目。- 任期在冲突解决中的作用:如果发现不匹配,Follower 会拒绝,Leader 会递减
nextIndex并重试。这个过程会一直回溯,直到找到一个匹配点。一旦找到匹配点,由于任期是单调递增的,并且每个日志条目都带有其生成时的任期,这确保了在该匹配点之前的所有日志条目都必然是相同的。任期在这里作为日志条目的“版本戳”,帮助系统精确地定位并解决日志分歧。
4.4 状态机安全性(State Machine Safety)
定义:如果一个服务器已经将某个索引处的日志条目应用到其状态机中,那么其他任何服务器都不会在该索引处应用不同的日志条目。
任期如何保障:
- Leader 完整性属性:确保所有已提交的日志条目都会出现在未来的 Leader 中。
- 日志匹配属性:确保日志的一致性,防止在相同索引处出现不同内容的日志。
- 提交规则(Commit Rule):日志条目只有在被复制到多数节点后才能被提交。并且,对于非当前任期的日志条目,Leader 必须至少提交一个当前任期的日志条目后才能将其视为已提交。这个规则避免了在网络分区发生时,一个少数派分区中的 Leader 提交了某些日志,而这些日志在多数派中可能已经因为新的 Leader 而被覆盖,从而导致状态机的不一致。任期在这里严格限制了哪些日志条目可以在何时被提交,确保了只有在全局多数派同意的情况下,才能最终确认并应用日志。
五、Raft 节点状态与任期管理的代码骨架
为了更好地理解任期在实际代码中的管理,我们来看一个简化的 Raft 节点代码骨架。
import threading
import time
import random
from collections import deque
# 简化 RPC 模拟
class RPC:
def __init__(self, sender_id, type, args):
self.sender_id = sender_id
self.type = type
self.args = args
# 简化 LogEntry
class LogEntry:
def __init__(self, term: int, command: str):
self.term = term
self.command = command
# 模拟网络通信
class Network:
def __init__(self):
self.nodes = {} # {node_id: RaftNode}
self.message_queue = deque() # 模拟网络消息队列
def register_node(self, node):
self.nodes[node.id] = node
def send_rpc(self, sender_id, receiver_id, rpc_type, args):
# 模拟网络延迟和不可靠性
time.sleep(random.uniform(0.01, 0.05))
if receiver_id in self.nodes:
# 实际中,这里会通过网络发送序列化的 RPC
self.message_queue.append((sender_id, receiver_id, rpc_type, args))
def deliver_messages(self):
while self.message_queue:
sender_id, receiver_id, rpc_type, args = self.message_queue.popleft()
receiver_node = self.nodes.get(receiver_id)
if receiver_node:
# 实际中,根据 RPC 类型调用不同的处理函数
if rpc_type == "RequestVote":
reply = receiver_node.handle_request_vote(args)
self.send_rpc(receiver_id, sender_id, "RequestVoteReply", reply)
elif rpc_type == "AppendEntries":
reply = receiver_node.handle_append_entries(args)
self.send_rpc(receiver_id, sender_id, "AppendEntriesReply", reply)
elif rpc_type == "RequestVoteReply":
receiver_node.handle_request_vote_reply(sender_id, args)
elif rpc_type == "AppendEntriesReply":
receiver_node.handle_append_entries_reply(sender_id, args)
class RaftNode:
def __init__(self, id: int, peers: list, network: Network):
self.id = id
self.peers = peers # 其他节点的 ID 列表
self.network = network
# Persistent state on all servers (must be updated on stable storage before responding to RPCs)
self.current_term = 0
self.voted_for = None # candidateId that received vote in current term (or None if none)
self.log = [] # list of LogEntry objects
# Volatile state on all servers
self.commit_index = 0 # index of highest log entry known to be committed
self.last_applied = 0 # index of highest log entry applied to state machine
# Volatile state on leaders (reinitialized after election)
self.next_index = {peer_id: 1 for peer_id in peers} # for each server, index of the next log entry to send to that server
self.match_index = {peer_id: 0 for peer_id in peers} # for each server, index of highest log entry known to be replicated on server
# Raft state
self.state = "follower"
self.leader_id = None
# Timers
self.election_timeout = random.uniform(0.150, 0.300) # 150-300ms
self.heartbeat_timeout = 0.050 # 50ms
self._last_heartbeat_time = time.time()
self._last_election_reset_time = time.time()
self._thread = threading.Thread(target=self._run)
self._thread.daemon = True
self._running = False
self._lock = threading.Lock()
def start(self):
self._running = True
self._thread.start()
def stop(self):
self._running = False
self._thread.join()
def _run(self):
while self._running:
with self._lock:
current_time = time.time()
if self.state == "follower":
if current_time - self._last_election_reset_time > self.election_timeout:
print(f"Node {self.id} (T{self.current_term}): Election timeout. Becoming candidate.")
self._become_candidate()
elif self.state == "candidate":
# Check if election timeout re-triggers (no majority obtained)
if current_time - self._last_election_reset_time > self.election_timeout:
print(f"Node {self.id} (T{self.current_term}): Election timeout again. Retrying election.")
self._become_candidate()
elif self.state == "leader":
if current_time - self._last_heartbeat_time > self.heartbeat_timeout:
self._send_heartbeats()
self._last_heartbeat_time = current_time
time.sleep(0.01) # Small sleep to prevent busy-waiting
def _persist_state(self):
# In a real system, this would write current_term, voted_for, and log to disk
# before responding to RPCs. For simplicity, we just print.
print(f"Node {self.id}: Persisting state - currentTerm={self.current_term}, votedFor={self.voted_for}, log_len={len(self.log)}")
def _get_last_log_info(self):
if not self.log:
return 0, 0
last_entry = self.log[-1]
return len(self.log), last_entry.term
def _is_log_up_to_date(self, candidate_last_log_index, candidate_last_log_term,
my_last_log_index, my_last_log_term):
if candidate_last_log_term > my_last_log_term:
return True
if candidate_last_log_term < my_last_log_term:
return False
return candidate_last_log_index >= my_last_log_index
def _reset_election_timeout(self):
self._last_election_reset_time = time.time()
self.election_timeout = random.uniform(0.150, 0.300) # Reset to a new random value
def _become_follower(self, term):
self.state = "follower"
self.leader_id = None
if term > self.current_term:
self.current_term = term
self.voted_for = None
self._persist_state()
self._reset_election_timeout()
print(f"Node {self.id}: Became Follower in Term {self.current_term}.")
def _become_candidate(self):
self.current_term += 1
self.state = "candidate"
self.voted_for = self.id # Vote for self
self._persist_state()
self._reset_election_timeout() # Start new election timeout
self.votes_received = {self.id} # Include own vote
self.leader_id = None
last_log_index, last_log_term = self._get_last_log_info()
print(f"Node {self.id} (T{self.current_term}): Starting election with log ({last_log_index}, T{last_log_term}).")
for peer_id in self.peers:
args = RequestVoteArgs(self.current_term, self.id, last_log_index, last_log_term)
self.network.send_rpc(self.id, peer_id, "RequestVote", args)
def _become_leader(self):
self.state = "leader"
self.leader_id = self.id
self._last_heartbeat_time = time.time() # Immediately send heartbeats
print(f"Node {self.id} (T{self.current_term}): Became Leader!")
# Reinitialize volatile state on leaders
last_log_index = len(self.log)
for peer_id in self.peers:
self.next_index[peer_id] = last_log_index + 1
self.match_index[peer_id] = 0
self._send_heartbeats() # Send initial empty AppendEntries for consistency
def _send_heartbeats(self):
# print(f"Node {self.id} (T{self.current_term}): Sending heartbeats.")
for peer_id in self.peers:
prev_log_index = self.next_index[peer_id] - 1
prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0
# For heartbeat, entries list is empty
args = AppendEntriesArgs(self.current_term, self.id, prev_log_index, prev_log_term, [], self.commit_index)
self.network.send_rpc(self.id, peer_id, "AppendEntries", args)
# --- RPC Handlers (as defined in previous sections) ---
def handle_request_vote(self, args: RequestVoteArgs) -> 'RequestVoteReply':
with self._lock:
if args.term < self.current_term:
return RequestVoteReply(self.current_term, False)
if args.term > self.current_term:
self._become_follower(args.term) # Automatically updates term, state, votedFor
vote_granted = False
if (self.voted_for is None or self.voted_for == args.candidate_id):
last_log_index, last_log_term = self._get_last_log_info()
if self._is_log_up_to_date(args.last_log_index, args.last_log_term, last_log_index, last_log_term):
self.voted_for = args.candidate_id
self._persist_state()
vote_granted = True
print(f"Node {self.id} (T{self.current_term}): Voted {vote_granted} for {args.candidate_id} in T{args.term}.")
return RequestVoteReply(self.current_term, vote_granted)
def handle_request_vote_reply(self, sender_id: int, reply: RequestVoteReply):
with self._lock:
if self.state != "candidate":
return # Ignore if not candidate anymore
if reply.term > self.current_term:
self._become_follower(reply.term)
return
if reply.term == self.current_term and reply.vote_granted:
self.votes_received.add(sender_id)
if len(self.votes_received) > (len(self.peers) + 1) // 2: # Majority
print(f"Node {self.id} (T{self.current_term}): Received majority votes. Becoming Leader.")
self._become_leader()
elif reply.term == self.current_term and not reply.vote_granted:
print(f"Node {self.id} (T{self.current_term}): {sender_id} denied vote.")
def handle_append_entries(self, args: AppendEntriesArgs) -> 'AppendEntriesReply':
with self._lock:
if args.term < self.current_term:
return AppendEntriesReply(self.current_term, False)
# If term is greater or equal, this is a valid leader, reset timeout and become follower
self._become_follower(args.term) # Handles term update and state change
# Log consistency check
# prev_log_index == 0 means first entry or empty log
if args.prev_log_index > 0 and
(len(self.log) < args.prev_log_index or
self.log[args.prev_log_index - 1].term != args.prev_log_term):
print(f"Node {self.id} (T{self.current_term}): Log inconsistency at index {args.prev_log_index}. My log len: {len(self.log)}")
return AppendEntriesReply(self.current_term, False)
# If existing entry conflicts with new one (same index, different term), delete existing and all that follow
# Find the first conflicting entry
i = 0
while i < len(args.entries):
log_idx_to_check = args.prev_log_index + 1 + i
if log_idx_to_check <= len(self.log):
if self.log[log_idx_to_check - 1].term != args.entries[i].term:
self.log = self.log[:log_idx_to_check - 1] # Truncate conflicting entries
print(f"Node {self.id} (T{self.current_term}): Truncated log from index {log_idx_to_check}.")
break # Start appending from this point
i += 1
# Append any new entries not already in log
for entry in args.entries[i:]:
self.log.append(entry)
self._persist_state()
# Update commitIndex
if args.leader_commit > self.commit_index:
self.commit_index = min(args.leader_commit, len(self.log))
self._apply_log_entries()
return AppendEntriesReply(self.current_term, True)
def handle_append_entries_reply(self, sender_id: int, reply: AppendEntriesReply):
with self._lock:
if self.state != "leader":
return # Ignore if not leader anymore
if reply.term > self.current_term:
self._become_follower(reply.term)
return
if reply.term == self.current_term:
if reply.success:
# Update nextIndex and matchIndex for this follower
# (Simplified: assume all entries were sent, need to refine for partial sends)
# For real Raft, if AppendEntriesArgs.entries was not empty,
# nextIndex would be updated to prevLogIndex + len(entries) + 1
# and matchIndex to prevLogIndex + len(entries).
# For heartbeats (empty entries), these wouldn't change.
# This simplified logic for heartbeats or full successful replication
# assumes the Follower has matched up to the current log length.
# A real implementation needs to track what was actually sent/acked.
if self.next_index[sender_id] <= len(self.log): # Ensure next_index doesn't go backwards
self.match_index[sender_id] = len(self.log) # Assume it matched up to current leader's log length
self.next_index[sender_id] = len(self.log) + 1 # Next entry to send
# Try to advance commitIndex
self._update_commit_index()
else:
# Replication failed, decrement nextIndex and retry
self.next_index[sender_id] = max(1, self.next_index[sender_id] - 1)
# print(f"Node {self.id} (T{self.current_term}): Replication failed to {sender_id}. Decrementing nextIndex to {self.next_index[sender_id]}.")
# Immediately retry sending AppendEntries for this follower
# (In a real system, this would be part of the heartbeat/replication loop)
# self._send_append_entries_to_follower(sender_id)
def _update_commit_index(self):
# Leader attempts to advance its commitIndex
# Find N such that N > commitIndex, a majority of matchIndex[i] >= N,
# and log[N].term == currentTerm.
# The third condition is crucial for safety with entries from previous terms.
N = self.commit_index + 1
while N <= len(self.log):
if self.log[N-1].term == self.current_term: # Only commit entries from current term (or implicitly via current term entry)
count = 1 # Leader itself
for peer_id in self.peers:
if self.match_index[peer_id] >= N:
count += 1
if count > (len(self.peers) + 1) // 2: # Majority
self.commit_index = N
self._apply_log_entries()
else:
break # Cannot commit N yet
N += 1
def _apply_log_entries(self):
while self.last_applied < self.commit_index:
self.last_applied += 1
entry = self.log[self.last_applied - 1]
print(f"Node {self.id} (T{self.current_term}): Applying log entry {self.last_applied} (Term: {entry.term}, Cmd: {entry.command})")
# self.state_machine.apply(entry.command)
def client_command(self, command: str):
with self._lock:
if self.state != "leader":
print(f"Node {self.id}: Not Leader. Cannot process command '{command}'. Current state: {self.state}")
return False
new_entry = LogEntry(self.current_term, command)
self.log.append(new_entry)
self._persist_state() # Persist new log entry
print(f"Node {self.id} (T{self.current_term}): Appended client command '{command}' at index {len(self.log)}.")
# Send AppendEntries to followers with the new entry
for peer_id in self.peers:
prev_log_index = self.next_index[peer_id] - 1
prev_log_term = self.log[prev_log_index - 1].term if prev_log_index > 0 else 0
# Send all entries from nextIndex[peer_id] onwards
entries_to_send = self.log[self.next_index[peer_id]-1:]
args = AppendEntriesArgs(self.current_term, self.id, prev_log_index, prev_log_term, entries_to_send, self.commit_index)
self.network.send_rpc(self.id, peer_id, "AppendEntries", args)
return True
# --- Simulation Setup ---
if __name__ == "__main__":
node_ids = [1, 2, 3, 4, 5]
network = Network()
nodes = []
for id in node_ids:
peers_for_node = [p for p in node_ids if p != id]
node = RaftNode(id, peers_for_node, network)
network.register_node(node)
nodes.append(node)
for node in nodes:
node.start()
# Give some time for initial election
print("n--- Starting Raft Simulation ---")
time.sleep(1.0)
print("n--- Initial Election Period ---")
# Simulate client commands
command_counter = 0
for _ in range(5):
time.sleep(random.uniform(0.1, 0.5))
leader_found = False
for node in nodes:
with node._lock:
if node.state == "leader":
command_counter += 1
command = f"SET_KEY_{command_counter}_VALUE_{command_counter}"
print(f"nClient sending command: {command}")
node.client_command(command)
leader_found = True
break
if not leader_found:
print("nNo leader found to process command. Retrying...")
# Process network messages to allow RPCs and replies
network.deliver_messages()
time.sleep(0.1) # Give time for replies to be processed
print("n--- Simulating Network Partition (Node 1 and 2 isolated) ---")
# Simulate a network partition: Node 1 and 2 cannot talk to 3,4,5
# This is a simplified simulation and would require more sophisticated network mocking
# For now, we'll just stop node 1 and 2's message processing temporarily
# In a real system, you'd modify the network routing logic.
# Let's assume Node 1 was leader and gets partitioned
# A new leader should be elected from [3, 4, 5]
for node in nodes:
with node._lock:
if node.id in [1, 2]:
node._running = False # Temporarily stop processing
print(f"Node {node.id} temporarily stopped processing messages.")
time.sleep(2.0) # Allow time for new election in remaining partition
print("n--- After Partition: New Leader Election Expected ---")
for _ in range(3):
time.sleep(random.uniform(0.1, 0.5))
leader_found = False
for node in nodes:
if node.id in [3,4,5]: # Only check nodes in the majority partition
with node._lock:
if node.state == "leader":
command_counter += 1
command = f"PARTITION_CMD_{command_counter}"
print(f"nClient sending command: {command} to new leader.")
node.client_command(command)
leader_found = True
break
if not leader_found:
print("nNo leader found in majority partition to process command. Retrying...")
network.deliver_messages()
time.sleep(0.1)
print("n--- Stopping Raft Simulation ---")
for node in nodes:
node.stop()
print("nFinal State of Nodes:")
for node in nodes:
print(f"Node {node.id}: Term={node.current_term}, State={node.state}, CommitIndex={node.commit_index}, LogLen={len(node.log)}")
这个代码骨架展示了 current_term 如何贯穿整个 Raft 节点的状态管理和 RPC 处理。每次 RPC 请求和回复都会对 term 进行比较,并根据比较结果决定如何更新节点状态、是否接受请求、是否转换角色,以及是否触发日志同步逻辑。_persist_state() 模拟了将持久化状态写入稳定存储,这在 Raft 中是确保数据不丢失的关键。
六、任期在 Raft 协议中的核心价值
任期在 Raft 协议中扮演着多重角色,其核心价值在于提供了一个强大的、自洽的机制来强制整个分布式系统在面对各种故障时保持一致性:
- 统一逻辑时间:任期提供了一个全局的、单调递增的逻辑时间轴。所有节点都通过任期来同步它们对系统当前状态的认知。
- 权威性与角色转换:更高的任期总是代表着更高的权威。任何节点收到来自更高任期的 RPC 都会立即更新自己的任期并转换为 Follower,从而避免了“脑裂”(split-brain)问题,即多个节点误以为自己是 Leader。
- 日志一致性基石:任期是日志条目的一部分,它在日志复制和日志匹配过程中起到“版本戳”的作用,帮助 Leader 和 Follower 发现并解决日志分歧。
- 安全属性的保障:选举安全性、Leader 完整性、日志匹配和状态机安全性都直接或间接地依赖于任期机制。任期确保了只有拥有最新、最完整日志的 Leader 才能被选举,也确保了日志在复制和提交过程中的正确性和不可篡改性。
七、结语
Raft 协议通过引入任期(Term)这一核心概念,成功地将复杂的分布式共识问题分解为更易于理解和实现的部分。任期作为 Raft 的逻辑时钟和权威标识,严格地强制了 Leader 选举、日志复制和各种安全属性的对齐。正是这种对任期的巧妙而严谨的使用,使得 Raft 在保证强一致性和容错性的同时,也极大地提升了协议的教学和工程实践效率。理解任期如何贯穿 Raft 协议的方方面面,是我们掌握分布式系统一致性精髓的关键一步。