各位专家、同仁们:
欢迎来到本次关于分布式系统核心主题的深入探讨。今天,我们将聚焦于一个既经典又极具挑战性的分布式一致性算法——Multi-Paxos,并尝试在 Go 语言的并发模型下,构建一个其“简化版”实现。我们尤其会关注在 Go 并发环境下,如何优雅且高效地处理 Multi-Paxos 中复杂的决议冲突,确保系统的一致性和活性。
1. Multi-Paxos:从基础到优化
在深入 Go 实现之前,我们首先需要对 Paxos 算法及其优化版 Multi-Paxos 有一个清晰的认识。
1.1 Paxos 算法回顾
Paxos 是 Leslie Lamport 提出的一种解决分布式系统中一致性问题的算法。它的目标是在面对节点故障、网络延迟和消息丢失的情况下,让一组节点就某个值达成共识,并且一旦达成共识,这个值就不可改变。
Paxos 算法将节点分为三类角色:
- Proposer (提议者):提出值,并尝试说服 Acceptor 接受该值。
- Acceptor (接受者):响应 Proposer 的提议,投票决定是否接受某个值。
- Learner (学习者):从 Acceptor 处学习到最终被接受的值。
Paxos 算法分为两个阶段:
阶段 1: Prepare (准备) 和 Promise (承诺)
- Proposer 发送
Prepare(n)消息给大多数 Acceptor,其中n是一个全局唯一的、递增的提议编号。 - Acceptor 收到
Prepare(n)后:- 如果
n小于它已经承诺过的任何提议编号,则忽略该Prepare请求。 - 否则,Acceptor 承诺不再接受任何小于
n的提议,并回复Promise(n, accepted_n, accepted_v)给 Proposer。accepted_n是它之前接受过的最高提议编号,accepted_v是对应的被接受的值。如果从未接受过任何值,则accepted_n和accepted_v为空。
- 如果
阶段 2: Accept (接受) 和 Accepted (已接受)
- Proposer 收到大多数 Acceptor 的
Promise响应后:- 如果所有
Promise响应都没有包含accepted_v,Proposer 可以选择它自己想要提议的值v。 - 如果至少有一个
Promise响应包含了accepted_v,Proposer 必须选择所有Promise响应中accepted_n最高的那个accepted_v作为它的提议值v。 - Proposer 发送
Accept(n, v)消息给大多数 Acceptor。
- 如果所有
- Acceptor 收到
Accept(n, v)后:- 如果
n大于或等于它已经承诺过的提议编号(在阶段 1 承诺的),则接受(n, v),并回复Accepted(n)给 Proposer。 - 否则,拒绝该
Accept请求。
- 如果
学习阶段 (Learning)
当 Proposer 收到大多数 Acceptor 的 Accepted 响应时,它就知道值 v 已经被接受了。然后它可以通知所有的 Learner 这个被接受的值。Learner 也可以通过询问多数 Acceptor 来学习到被接受的值。
1.2 Multi-Paxos 的引入与优化
单次 Paxos 算法虽然能够保证一致性,但其每次提议都需要经过两阶段消息交换,效率较低。在实际应用中,我们通常需要对一系列操作(例如一个分布式日志)进行排序和复制,这就需要连续地执行多次 Paxos 实例。Multi-Paxos 正是为了优化这一场景而生。
Multi-Paxos 的核心思想是:选举一个稳定的 Leader,并在其领导任期内,通过跳过 Phase 1 (Prepare) 来优化后续的 Paxos 实例。
其主要特点和优化点如下:
- Leader 选举:系统中会选举出一个 Leader。这个 Leader 负责协调所有的 Paxos 实例。Leader 的选举本身可以使用一个 Paxos 实例,或者其他如 Raft 的选举机制。Leader 通常拥有一个“租约(Lease)”或“任期号(Epoch Number)”,在其租约有效期间,它被认为是合法的 Leader。
- 日志序列:Multi-Paxos 将共识问题转化为对一个日志序列的复制。日志中的每一个槽位(slot)对应一个独立的 Paxos 实例。
- Phase 1 优化:一旦 Leader 成功地完成了某个日志槽位的 Phase 1,并且其租约有效,它就可以在后续的日志槽位直接进入 Phase 2 (Accept)。这意味着在 Leader 稳定且租约有效的情况下,每个新的提议只需要一轮消息(Accept 请求和 Accepted 响应)。
- Proposer 与 Acceptor/Learner 合一:在 Multi-Paxos 中,通常 Leader 节点扮演 Proposer 和 Learner 的角色,而其他 Follower 节点则扮演 Acceptor 和 Learner 的角色。
Multi-Paxos 的优势在于,在没有 Leader 切换的稳定状态下,它能够以非常高的效率(单轮 RPC)复制日志。这也是许多分布式存储系统(如 Google Chubby, Apache Zookeeper, etcd)选择类似 Paxos 或 Raft 算法作为其核心一致性协议的原因。
1.3 简化版 Multi-Paxos 的范围
我们的“简化版”实现将聚焦于 Multi-Paxos 的核心机制,即 Leader 如何管理日志、如何提议新值以及如何通过 Go 的并发模型处理冲突。我们将暂时简化或抽象掉以下方面,但在实际生产系统中,它们是必不可少的:
- 持久化存储:假设节点状态(日志、提议编号等)在内存中,不考虑崩溃恢复后的数据持久性加载。
- 网络分区后的自动恢复:我们将主要通过超时和 Leader 切换来模拟,不会实现复杂的网络拓扑感知。
- 成员变更 (Membership Change):集群中节点的动态增删。
- 快照 (Snapshotting):为了避免日志无限增长。
- 完整的 Leader 选举协议:我们将假设有一个外部机制(或一个简化的 Paxos 实例)已经选出了一个 Leader,并且该 Leader 有一个唯一的“任期号”或“租约ID”。
我们的重点是:
- 核心数据结构:如何在 Go 中表示 Paxos 消息和节点状态。
- Go 并发模型:如何使用 Goroutines 和 Channels 来模拟分布式通信和并发处理。
- Leader 逻辑:Leader 如何推进日志,处理客户端请求。
- Acceptor 逻辑:Acceptor 如何响应 Proposer 的请求,维护其状态。
- 冲突处理:当多个 Leader 出现、网络不稳定或提议冲突时,Go 如何协调这些复杂情况。
2. Go 并发模型下的核心组件
我们将使用 Go 语言的 goroutine 和 channel 来构建我们的分布式系统骨架。每个节点将运行在一个或多个 goroutine 中,节点间的通信通过模拟 RPC 调用来实现。
2.1 消息结构定义
首先定义我们在 Paxos 协议中使用的消息类型。为了简化,我们假设所有节点都有一个唯一的 ID。
package main
import (
"fmt"
"sync"
"time"
)
// 定义节点ID类型
type NodeID int
// ProposalNum 包含任期号(epoch)和在该任期内的递增计数
type ProposalNum struct {
Epoch int // Leader的任期号,每次Leader切换都会递增
Counter int // 在当前任期内,Proposer递增的计数
ProposerID NodeID // 哪个Proposer发出的,用于打破平局
}
// Compare 比较两个ProposalNum
func (pn ProposalNum) Compare(other ProposalNum) int {
if pn.Epoch != other.Epoch {
return pn.Epoch - other.Epoch
}
if pn.Counter != other.Counter {
return pn.Counter - other.Counter
}
return int(pn.ProposerID - other.ProposerID) // 提议者ID用于打破平局
}
// ValueType 提议的值的类型,可以是任意数据
type ValueType string
// PaxosLogEntry 代表日志中的一个槽位
type PaxosLogEntry struct {
Index int // 日志槽位索引
ProposalNum ProposalNum // 接受该值的提议编号
Value ValueType // 接受的值
Committed bool // 是否已提交(被多数Learner知晓)
}
// PrepareRequest 阶段1请求
type PrepareRequest struct {
FromNode NodeID // 请求来源
SlotIndex int // 针对哪个日志槽位
ProposalNum ProposalNum // Proposer的提议编号
}
// PromiseResponse 阶段1响应
type PromiseResponse struct {
FromNode NodeID // 响应来源
SlotIndex int // 针对哪个日志槽位
Err error // 如果有错误
PromisedProposalNum ProposalNum // Acceptor承诺不再接受小于此编号的提议
AcceptedProposalNum ProposalNum // Acceptor之前接受过的最高提议编号
AcceptedValue ValueType // 对应AcceptedProposalNum的值
}
// AcceptRequest 阶段2请求
type AcceptRequest struct {
FromNode NodeID // 请求来源
SlotIndex int // 针对哪个日志槽位
ProposalNum ProposalNum // Proposer的提议编号
Value ValueType // Proposer提议的值
}
// AcceptResponse 阶段2响应
type AcceptResponse struct {
FromNode NodeID // 响应来源
SlotIndex int // 针对哪个日志槽位
Err error // 如果有错误
AcceptedProposalNum ProposalNum // Acceptor实际接受的提议编号 (可能高于请求的,表示冲突)
}
// ClientProposal 客户端请求,携带一个值
type ClientProposal struct {
Value ValueType
ResponseC chan ClientResponse // 用于返回结果给客户端
}
// ClientResponse 客户端响应
type ClientResponse struct {
Err error
}
// HeartbeatRequest Leader发送的心跳
type HeartbeatRequest struct {
FromNode NodeID // Leader ID
LeaderEpoch int // Leader的任期号
CommitIndex int // Leader已提交的最高日志索引
LogEntries []PaxosLogEntry // 用于同步落后Follower的日志
}
// HeartbeatResponse Follower对心跳的响应
type HeartbeatResponse struct {
FromNode NodeID // Follower ID
CurrentEpoch int // Follower当前看到的最高任期号
LastLogIndex int // Follower的最新日志索引
Err error // 错误信息
}
// LeaderChange 通知Leader状态变化的内部消息
type LeaderChange struct {
NewLeaderID NodeID
NewLeaderEpoch int
}
ProposalNum 的设计:为了处理 Multi-Paxos 中的 Leader 切换和冲突,ProposalNum 不仅仅是一个简单的整数。它包含 Epoch(任期号,每次 Leader 切换都会递增)、Counter(在当前任期内递增的提议计数)和 ProposerID(用于打破平局)。这种设计使得不同 Leader 的提议天然具有优先级,高 Epoch 的提议总是优先于低 Epoch 的提议。
2.2 节点状态定义
每个分布式节点需要维护其自身的 Paxos 状态。
// Node 结构体定义了一个分布式节点
type Node struct {
ID NodeID
IsLeader bool
Peers []NodeID // 集群中的所有其他节点ID
PeerRPCs map[NodeID]chan interface{} // 模拟节点间RPC通信的通道
mu sync.RWMutex // 保护节点状态的读写锁
// Paxos 状态
currentEpoch int // 节点当前知道的最高任期号
promisedProposal ProposalNum // Acceptor 承诺过的最高提议编号
log map[int]*PaxosLogEntry // 分布式日志,key是槽位索引
commitIndex int // 已提交的最高日志槽位索引
lastApplied int // 已应用到状态机的最高日志槽位索引
// Leader 状态 (仅当 IsLeader 为 true 时有效)
nextProposalCounter int // Leader在其当前任期内,下一个Proposer计数
leaderLease *time.Timer // Leader租约计时器
leaderLeaseDuration time.Duration // 租约持续时间
nextIndex map[NodeID]int // 对于每个Follower,Leader下一个要发送给它的日志索引
matchIndex map[NodeID]int // 对于每个Follower,Leader已知其复制的最高日志索引
pendingProposals []ClientProposal // 待处理的客户端请求
// 通信通道
prepareC chan PrepareRequest
acceptC chan AcceptRequest
clientProposeC chan ClientProposal
heartbeatC chan HeartbeatRequest
leaderChangeC chan LeaderChange // 内部通知Leader变化
quitC chan struct{} // 退出信号
}
// NewNode 创建一个新的分布式节点
func NewNode(id NodeID, peers []NodeID, peerRPCs map[NodeID]chan interface{}) *Node {
n := &Node{
ID: id,
Peers: peers,
PeerRPCs: peerRPCs,
log: make(map[int]*PaxosLogEntry),
commitIndex: -1, // 初始为-1,表示没有提交任何日志
lastApplied: -1,
leaderLeaseDuration: 500 * time.Millisecond, // 示例租约时长
nextIndex: make(map[NodeID]int),
matchIndex: make(map[NodeID]int),
prepareC: make(chan PrepareRequest),
acceptC: make(chan AcceptRequest),
clientProposeC: make(chan ClientProposal),
heartbeatC: make(chan HeartbeatRequest),
leaderChangeC: make(chan LeaderChange),
quitC: make(chan struct{}),
}
// 初始化nextIndex和matchIndex
for _, peerID := range peers {
n.nextIndex[peerID] = 0 // 假设从日志索引0开始
n.matchIndex[peerID] = -1
}
return n
}
核心状态解释:
currentEpoch:节点当前知道的最高任期号。这是处理 Leader 冲突的关键。promisedProposal:Acceptor 角色使用的,记录它承诺过的最高提议编号。log:存储已接受的日志条目,使用map[int]*PaxosLogEntry可以方便地处理日志中的“空洞”(holes)。commitIndex:已被多数节点确认复制的最高日志索引。nextProposalCounter:Leader 在其当前currentEpoch内使用的递增计数。leaderLease:模拟 Leader 租约,一旦租约过期,Leader 可能需要重新进行 Phase 1 或者重新选举。nextIndex,matchIndex:Leader 维护的每个 Follower 的日志复制进度,类似于 Raft。
2.3 模拟 RPC 通信
我们通过 Go channel 来模拟节点间的 RPC 通信。每个节点有一个 PeerRPCs map,其中 key 是对端节点 ID,value 是一个 chan interface{},用于接收来自该对端节点的消息。
// simulateRPC 发送消息到目标节点的RPC通道
func (n *Node) simulateRPC(targetID NodeID, msg interface{}) {
if targetID == n.ID {
// 消息发给自己,直接处理
n.handleIncomingMessage(msg)
return
}
if peerChan, ok := n.PeerRPCs[targetID]; ok {
select {
case peerChan <- msg:
// 消息发送成功
case <-time.After(10 * time.Millisecond): // 模拟网络延迟或拥堵
fmt.Printf("Node %d: RPC to %d timed out for message type %Tn", n.ID, targetID, msg)
}
} else {
fmt.Printf("Node %d: Unknown peer %dn", n.ID, targetID)
}
}
// handleIncomingMessage 处理接收到的消息
func (n *Node) handleIncomingMessage(msg interface{}) {
switch req := msg.(type) {
case PrepareRequest:
n.prepareC <- req
case PromiseResponse:
// PromiseResponse通常由Leader在发送Prepare后异步接收
// 因此需要一个机制将响应路由回发起Prepare的goroutine
// 在我们的简化模型中,Leader会直接在发送后等待chan响应
// 这里暂不处理PromiseResponse的直接路由,而是通过RPC调用返回
case AcceptRequest:
n.acceptC <- req
case AcceptResponse:
// AcceptResponse同上
case HeartbeatRequest:
n.heartbeatC <- req
case HeartbeatResponse:
// HeartbeatResponse同上
default:
fmt.Printf("Node %d: Unhandled message type %Tn", n.ID, msg)
}
}
为了使 RPC 模拟更真实,simulateRPC 包含了超时机制。PeerRPCs 的 chan interface{} 允许我们将任何消息类型发送给目标节点。每个节点的主循环会从其对应的 PeerRPCs 通道中接收消息并分发到特定的处理通道(prepareC, acceptC 等)。
3. 节点核心逻辑与 Go 并发处理
每个节点都将在其独立的 goroutine 中运行,通过 select 语句并发处理各种事件:客户端请求、来自其他节点的 RPC 请求、内部定时器等。
3.1 节点主循环 (Run 方法)
// Run 启动节点的主循环
func (n *Node) Run() {
go n.rpcListener() // 启动RPC监听器
ticker := time.NewTicker(n.leaderLeaseDuration / 2) // 心跳间隔短于租约
defer ticker.Stop()
for {
select {
case <-n.quitC:
fmt.Printf("Node %d: Shutting down.n", n.ID)
return
case req := <-n.prepareC:
// 异步处理 Prepare 请求
go n.handlePrepareRequest(req)
case req := <-n.acceptC:
// 异步处理 Accept 请求
go n.handleAcceptRequest(req)
case req := <-n.heartbeatC:
// 异步处理 Heartbeat 请求
go n.handleHeartbeatRequest(req)
case clientReq := <-n.clientProposeC:
n.mu.RLock()
isLeader := n.IsLeader
n.mu.RUnlock()
if !isLeader {
// 如果不是Leader,拒绝客户端请求或转发给Leader
clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Node %d is not leader", n.ID)}
} else {
n.mu.Lock()
n.pendingProposals = append(n.pendingProposals, clientReq)
n.mu.Unlock()
// 立即尝试处理待处理的提案
go n.processPendingProposals()
}
case <-ticker.C:
n.mu.RLock()
isLeader := n.IsLeader
currentEpoch := n.currentEpoch
n.mu.RUnlock()
if isLeader {
// Leader发送心跳
go n.sendHeartbeats(currentEpoch)
// 检查租约
n.checkLeaderLease()
} else {
// Follower检测Leader活性,简化为等待LeaderChange通知
// 实际中这里会检测超时并触发选举
}
case lc := <-n.leaderChangeC:
n.mu.Lock()
if lc.NewLeaderEpoch > n.currentEpoch {
fmt.Printf("Node %d: Leader change detected. New Leader: %d, Epoch: %dn", n.ID, lc.NewLeaderID, lc.NewLeaderEpoch)
n.IsLeader = (lc.NewLeaderID == n.ID)
n.currentEpoch = lc.NewLeaderEpoch
n.promisedProposal = ProposalNum{Epoch: lc.NewLeaderEpoch, Counter: 0, ProposerID: n.ID} // 提升承诺的提议编号
if n.IsLeader {
n.nextProposalCounter = 0 // Leader重置计数器
n.resetLeaderState()
n.startLeaderLease()
fmt.Printf("Node %d: I am the new leader for epoch %d!n", n.ID, n.currentEpoch)
go n.processPendingProposals() // 作为新Leader处理可能有的待处理请求
go n.sendHeartbeats(n.currentEpoch) // 新Leader立即发送心跳
} else {
n.leaderLease = nil // 非Leader清除租约
n.pendingProposals = nil // 清空待处理请求
}
}
n.mu.Unlock()
}
}
}
// rpcListener 负责从PeerRPCs通道接收消息并分发
func (n *Node) rpcListener() {
for {
select {
case <-n.quitC:
return
case msg := <-n.PeerRPCs[n.ID]: // 每个节点通过自己的RPC通道接收消息
n.handleIncomingMessage(msg)
}
}
}
Node.Run 关键点:
select多路复用:这是 Go 并发编程的核心。节点可以同时监听多个通道,处理不同类型的事件。- 异步处理:对于 RPC 请求(
prepareC,acceptC,heartbeatC),我们通常会在一个新的 goroutine 中处理,避免阻塞主循环。 - Leader 状态管理:Leader 会周期性发送心跳,并检查租约。Follower 则通过接收心跳来感知 Leader 活性。
leaderChangeC:这是一个简化的 Leader 选举机制。当外部(或其他节点)检测到 Leader 变更时,会通过此通道通知节点,节点根据通知更新自己的 Leader 状态和任期号。高Epoch的 Leader 总是优先。sync.RWMutex:保护节点共享状态的并发访问。读锁 (RLock) 允许多个 goroutine 同时读取,写锁 (Lock) 确保排他性写入。
3.2 Leader 角色的实现
Leader 是 Multi-Paxos 的核心。它负责接收客户端请求,将它们转化为 Paxos 提议,并协调 Acceptor 达成共识。
3.2.1 处理客户端提案 (processPendingProposals)
// processPendingProposals 处理待处理的客户端请求
func (n *Node) processPendingProposals() {
n.mu.Lock()
if !n.IsLeader {
n.mu.Unlock()
return
}
// 在处理完当前所有 pending proposals 之前,不接受新的 client proposals
// 这是一个简化的处理,实际中会有一个更复杂的请求队列和并发控制
proposalsToProcess := make([]ClientProposal, len(n.pendingProposals))
copy(proposalsToProcess, n.pendingProposals)
n.pendingProposals = nil // 清空待处理队列
n.mu.Unlock()
for _, clientReq := range proposalsToProcess {
if !n.IsLeader { // Leader可能在处理过程中下台
clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Node %d lost leadership", n.ID)}
return
}
// 找到下一个可用的日志槽位
n.mu.RLock()
nextSlot := n.commitIndex + 1 // 尝试在已提交日志的下一个槽位进行提议
// 也可以从 n.log 中找到最大的 key + 1
for {
if _, exists := n.log[nextSlot]; exists {
nextSlot++
} else {
break
}
}
currentEpoch := n.currentEpoch
n.mu.RUnlock()
// Multi-Paxos 优化: Leader可以直接进入Phase 2 (Accept)
// 除非它不确定自己是否仍是Leader,或者需要填充空洞
// 这里简化为直接进入Accept,如果失败则可能需要重新Phase 1或下台
proposalNum := ProposalNum{Epoch: currentEpoch, Counter: n.nextProposalCounter, ProposerID: n.ID}
// 尝试进行 Accept 阶段
accepted, err := n.runAcceptPhase(nextSlot, proposalNum, clientReq.Value)
if err != nil || !accepted {
fmt.Printf("Node %d (Leader): Failed to get majority for slot %d, value %s. Error: %vn", n.ID, nextSlot, clientReq.Value, err)
clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Leader failed to commit value: %v", err)}
// 如果Accept失败,Leader可能需要重新发起Prepare或下台
// 触发LeaderChange让系统重新选举或Leader自我降级
n.mu.Lock()
if n.IsLeader { // 确保仍是Leader才降级
fmt.Printf("Node %d (Leader): Stepping down due to failed proposal.n", n.ID)
n.IsLeader = false
n.currentEpoch++ // 提升Epoch,促使新Leader选举
for _, peerID := range n.Peers {
if peerID != n.ID {
n.simulateRPC(peerID, LeaderChange{NewLeaderID: peerID, NewLeaderEpoch: n.currentEpoch})
}
}
// 通知自己Leader变更
n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 实际是触发自己的降级
}
n.mu.Unlock()
continue
}
// 提议成功,更新Leader状态
n.mu.Lock()
n.nextProposalCounter++
n.log[nextSlot] = &PaxosLogEntry{Index: nextSlot, ProposalNum: proposalNum, Value: clientReq.Value, Committed: true}
if nextSlot > n.commitIndex {
n.commitIndex = nextSlot
}
fmt.Printf("Node %d (Leader): Value '%s' committed at slot %d (Epoch: %d, Counter: %d)n", n.ID, clientReq.Value, nextSlot, proposalNum.Epoch, proposalNum.Counter)
n.mu.Unlock()
clientReq.ResponseC <- ClientResponse{Err: nil}
}
}
// runAcceptPhase 执行Paxos的Accept阶段
func (n *Node) runAcceptPhase(slotIndex int, proposalNum ProposalNum, value ValueType) (bool, error) {
acceptRequests := make(chan AcceptResponse, len(n.Peers))
var wg sync.WaitGroup
// 发送 Accept 请求给所有 Acceptor
for _, peerID := range n.Peers {
wg.Add(1)
go func(id NodeID) {
defer wg.Done()
// 模拟RPC调用
responseC := make(chan AcceptResponse, 1)
req := AcceptRequest{FromNode: n.ID, SlotIndex: slotIndex, ProposalNum: proposalNum, Value: value}
// 在实际RPC中,会等待一个响应
// 这里我们直接调用目标节点的RPC处理函数,并假定它会把响应发回
// 简化:直接构造响应并发送到通道
resp := n.callAcceptRPC(id, req) // 这是一个同步阻塞的RPC模拟
responseC <- resp
acceptRequests <- resp // 将响应发送到收集通道
}(peerID)
}
wg.Wait() // 等待所有RPC调用完成
// 收集响应,判断是否获得多数
majorityCount := len(n.Peers)/2 + 1
acceptedCount := 0
highestConflictProposalNum := ProposalNum{} // 记录遇到的最高冲突提议编号
for i := 0; i < len(n.Peers); i++ {
resp := <-acceptRequests
if resp.Err == nil {
// 检查Acceptor是否接受了当前的proposalNum
if resp.AcceptedProposalNum.Compare(proposalNum) == 0 {
acceptedCount++
} else {
// Acceptor接受了更高编号的提议,说明Leader可能不是最新的
if resp.AcceptedProposalNum.Compare(highestConflictProposalNum) > 0 {
highestConflictProposalNum = resp.AcceptedProposalNum
}
}
} else {
fmt.Printf("Node %d (Leader): Accept RPC to %d for slot %d failed: %vn", n.ID, resp.FromNode, slotIndex, resp.Err)
}
}
if acceptedCount >= majorityCount {
return true, nil
}
// 如果没有获得多数接受,并且遇到了更高编号的提议,Leader应该自我降级
if highestConflictProposalNum.Epoch > proposalNum.Epoch {
n.mu.Lock()
if n.IsLeader {
fmt.Printf("Node %d (Leader): Detected higher epoch %d from Acceptor. Stepping down.n", n.ID, highestConflictProposalNum.Epoch)
n.IsLeader = false
n.currentEpoch = highestConflictProposalNum.Epoch // 更新自己的epoch
n.promisedProposal = highestConflictProposalNum // 提升承诺
// 通知其他节点Leader变更
for _, peerID := range n.Peers {
if peerID != n.ID {
n.simulateRPC(peerID, LeaderChange{NewLeaderID: highestConflictProposalNum.ProposerID, NewLeaderEpoch: highestConflictProposalNum.Epoch})
}
}
// 触发自己的降级和可能的LeaderChange处理
n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 这是一个自我降级的信号
}
n.mu.Unlock()
return false, fmt.Errorf("Leader detected higher proposal number %v, stepping down", highestConflictProposalNum)
}
return false, fmt.Errorf("Failed to get majority acceptance for slot %d", slotIndex)
}
// callAcceptRPC 模拟对某个节点的 Accept RPC 调用
func (n *Node) callAcceptRPC(targetID NodeID, req AcceptRequest) AcceptResponse {
// 实际RPC会通过网络发送请求并等待响应
// 简化:直接调用目标节点的处理函数
// 为了确保消息能被处理,需要将请求发送到目标节点的RPC监听通道
// 并需要一个机制来获取响应
responseChan := make(chan AcceptResponse, 1)
// 模拟RPC请求和响应的传递
go func() {
// 模拟RPC请求
n.simulateRPC(targetID, req)
// 模拟从目标节点接收响应。这在真实RPC中是自动的
// 在我们的模拟中,需要一个方法让目标节点知道响应应该发到哪里
// 这里我们简化:目标节点直接将响应“发送”到这个临时的responseChan
// 这是一个不完美的模拟,真实RPC需要一个请求ID来匹配响应
// 更准确的模拟需要:
// 1. targetID的rpcListener收到req
// 2. targetID的handleAcceptRequest处理req
// 3. handleAcceptRequest将response发送回发起者的某个通道 (通过req中的replyTo字段)
// 为了简化,我们假设这个函数会阻塞直到收到响应
// 实际上,这在并发系统中会更复杂。这里仅为演示逻辑
// 暂时通过一个同步回调或共享通道来模拟
// 让我们重构一下,让handleAcceptRequest直接返回给一个临时的通道
// 但这会破坏handleAcceptRequest的异步性
// 妥协:为了保持handleAcceptRequest的异步性,且避免复杂的回调
// 我们假设这里有一个机制,能在 targetID 节点处理完 req 后,
// 将结果通过某个路由机制(例如带有req.FromNode的通道)发送回本节点
// 暂时用一个简单的模拟方式:直接调用处理函数,并“捕获”其结果
// 这种模拟在并发场景下很脆弱,仅为概念演示
// 更鲁棒的方案是为每个请求生成一个唯一ID,并在响应中带回
// 或者为每个请求创建一个临时的response channel,并在请求中传递
}()
// 更好的模拟:目标节点处理请求后,通过其simulateRPC将响应发回
// 但simulateRPC发送的是 interface{},我们需要一个方式来区分响应
// 鉴于此,我们的simulateRPC模型在处理 RPC 响应时显得不足
// 让我们改变一下 callAcceptRPC 的工作方式:
// 它不会直接调用 simulateRPC,而是直接调用目标节点的处理函数,并期望一个同步响应。
// 这牺牲了一点模拟的真实性,但简化了响应路由。
// 在真实的分布式系统中,RPC库会处理这些。
if targetID == n.ID {
// 自己处理
resp := n.doHandleAcceptRequest(req)
return resp
}
// 模拟远程调用:创建一个临时的请求/响应通道
// 这是一个简化的RPC模式,实际更复杂
rpcReq := struct {
Request AcceptRequest
Response chan AcceptResponse
}{
Request: req,
Response: make(chan AcceptResponse, 1),
}
// 将带响应通道的请求发送到目标节点的RPC通道
n.simulateRPC(targetID, rpcReq) // simulateRPC现在需要能处理这种结构体
select {
case resp := <-rpcReq.Response:
return resp
case <-time.After(50 * time.Millisecond): // RPC超时
return AcceptResponse{FromNode: n.ID, SlotIndex: req.SlotIndex, Err: fmt.Errorf("RPC to %d timed out for AcceptRequest", targetID)}
}
}
processPendingProposals 解释:
- Leader 身份检查:每次处理前检查
!n.IsLeader,如果已经不是 Leader,则停止处理并返回错误。 - 槽位查找:
nextSlot寻找下一个可用的日志槽位,通常是commitIndex + 1,但也要处理可能存在的空洞。 - Multi-Paxos 优化:Leader 直接进入
runAcceptPhase(Phase 2),跳过 Phase 1。 - 冲突处理:如果在
runAcceptPhase中发现多数 Acceptor 接受了更高ProposalNum的提议,Leader 会自我降级,更新currentEpoch,并通知其他节点 Leader 变更。 - 日志更新:成功提交后,更新
commitIndex和log。
3.2.2 Leader 心跳 (sendHeartbeats 和 checkLeaderLease)
// sendHeartbeats Leader周期性发送心跳给Follower
func (n *Node) sendHeartbeats(currentEpoch int) {
n.mu.RLock()
if !n.IsLeader || n.currentEpoch != currentEpoch { // 如果Leader已变或Epoch不匹配,则停止发送
n.mu.RUnlock()
return
}
commitIndex := n.commitIndex
logEntriesToSync := make(map[NodeID][]PaxosLogEntry) // 为每个Follower准备要同步的日志
for _, peerID := range n.Peers {
if peerID == n.ID {
continue
}
// 简化:为每个Follower发送完整的日志(不高效,实际中会根据nextIndex发送增量)
// 在这里,假设HeartbeatRequest的LogEntries是用于快速同步部分落后日志
// 实际应根据 n.nextIndex[peerID] 来发送从该索引开始的日志
var entries []PaxosLogEntry
for i := n.nextIndex[peerID]; i <= commitIndex; i++ {
if entry, ok := n.log[i]; ok {
entries = append(entries, *entry)
}
}
logEntriesToSync[peerID] = entries
}
n.mu.RUnlock()
for _, peerID := range n.Peers {
if peerID == n.ID {
continue
}
go func(id NodeID) {
req := HeartbeatRequest{
FromNode: n.ID,
LeaderEpoch: currentEpoch,
CommitIndex: commitIndex,
LogEntries: logEntriesToSync[id],
}
// 模拟RPC,并等待响应
// 真实RPC会有一个响应机制,这里简化
responseC := make(chan HeartbeatResponse, 1)
// 再次为了简化,我们让目标节点直接处理并返回
resp := n.callHeartbeatRPC(id, req)
responseC <- resp
select {
case r := <-responseC:
n.mu.Lock()
if r.Err != nil {
fmt.Printf("Node %d (Leader): Heartbeat to %d failed: %vn", n.ID, id, r.Err)
} else {
// 更新Follower的matchIndex和nextIndex
if r.LastLogIndex > n.matchIndex[id] {
n.matchIndex[id] = r.LastLogIndex
n.nextIndex[id] = r.LastLogIndex + 1
}
// 如果Follower的Epoch更高,Leader需要降级
if r.CurrentEpoch > n.currentEpoch {
fmt.Printf("Node %d (Leader): Follower %d has higher epoch %d. Stepping down.n", n.ID, id, r.CurrentEpoch)
n.IsLeader = false
n.currentEpoch = r.CurrentEpoch
// 通知其他节点Leader变更
for _, pID := range n.Peers {
if pID != n.ID {
n.simulateRPC(pID, LeaderChange{NewLeaderID: id, NewLeaderEpoch: n.currentEpoch})
}
}
n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 自我降级
}
}
n.mu.Unlock()
case <-time.After(n.leaderLeaseDuration / 4): // 心跳响应超时
fmt.Printf("Node %d (Leader): Heartbeat to %d timed out.n", n.ID, id)
}
}(peerID)
}
}
// callHeartbeatRPC 模拟对某个节点的 Heartbeat RPC 调用
func (n *Node) callHeartbeatRPC(targetID NodeID, req HeartbeatRequest) HeartbeatResponse {
if targetID == n.ID {
return n.doHandleHeartbeatRequest(req)
}
rpcReq := struct {
Request HeartbeatRequest
Response chan HeartbeatResponse
}{
Request: req,
Response: make(chan HeartbeatResponse, 1),
}
n.simulateRPC(targetID, rpcReq) // simulateRPC需要能处理这种结构体
select {
case resp := <-rpcReq.Response:
return resp
case <-time.After(50 * time.Millisecond):
return HeartbeatResponse{FromNode: n.ID, Err: fmt.Errorf("RPC to %d timed out for HeartbeatRequest", targetID)}
}
}
// checkLeaderLease 检查Leader租约是否过期
func (n *Node) checkLeaderLease() {
n.mu.Lock()
defer n.mu.Unlock()
if !n.IsLeader || n.leaderLease == nil {
return
}
select {
case <-n.leaderLease.C:
// 租约过期,Leader应该降级
fmt.Printf("Node %d (Leader): Lease expired. Stepping down.n", n.ID)
n.IsLeader = false
n.currentEpoch++ // 提升Epoch,促使新Leader选举
// 通知其他节点Leader变更
for _, peerID := range n.Peers {
if peerID != n.ID {
n.simulateRPC(peerID, LeaderChange{NewLeaderID: peerID, NewLeaderEpoch: n.currentEpoch})
}
}
n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 自我降级
n.leaderLease = nil
default:
// 租约未过期,重置计时器
n.leaderLease.Reset(n.leaderLeaseDuration)
}
}
// startLeaderLease 启动Leader租约计时器
func (n *Node) startLeaderLease() {
if n.leaderLease != nil {
n.leaderLease.Stop()
}
n.leaderLease = time.NewTimer(n.leaderLeaseDuration)
}
// resetLeaderState 重置Leader相关的状态
func (n *Node) resetLeaderState() {
for _, peerID := range n.Peers {
if peerID == n.ID {
continue
}
// Leader刚上任,假设所有Follower都落后
n.nextIndex[peerID] = n.commitIndex + 1 // 或者根据实际情况初始化
n.matchIndex[peerID] = -1
}
n.pendingProposals = nil
}
心跳机制和租约:
- Leader 定期发送
HeartbeatRequest给所有 Follower,包含其当前LeaderEpoch和commitIndex。 - Follower 收到心跳后,会更新其
commitIndex并同步日志。如果 Follower 发现自己的currentEpoch比 Leader 的LeaderEpoch高,它会拒绝心跳并告知 Leader,促使 Leader 降级。 - Leader 还会维护一个
leaderLease。如果在租约时间内没有成功向多数 Follower 发送心跳并获得响应,或者租约过期,Leader 就会自我降级,并尝试触发一次新的 Leader 选举(通过提升currentEpoch)。
3.3 Acceptor (Follower) 角色的实现
Follower 节点扮演 Acceptor 和 Learner 的角色。它们接收 Leader 的 Prepare 和 Accept 请求,并响应。
// handlePrepareRequest 处理 Prepare 请求
func (n *Node) handlePrepareRequest(req PrepareRequest) {
resp := n.doHandlePrepareRequest(req)
// 将响应发送回发起者
n.simulateRPC(req.FromNode, resp) // simulateRPC现在需要能处理PromiseResponse
}
// doHandlePrepareRequest 实际处理 Prepare 请求并返回响应
func (n *Node) doHandlePrepareRequest(req PrepareRequest) PromiseResponse {
n.mu.Lock()
defer n.mu.Unlock()
// 更新自己的最高任期号
if req.ProposalNum.Epoch > n.currentEpoch {
n.currentEpoch = req.ProposalNum.Epoch
n.IsLeader = false // 如果发现更高Epoch,自己肯定不是Leader
}
// 提议编号必须大于或等于已承诺的编号
if req.ProposalNum.Compare(n.promisedProposal) < 0 {
return PromiseResponse{
FromNode: n.ID,
SlotIndex: req.SlotIndex,
Err: fmt.Errorf("received Prepare with lower proposal num %v than promised %v", req.ProposalNum, n.promisedProposal),
PromisedProposalNum: n.promisedProposal, // 返回已承诺的最高编号
}
}
// 承诺接受此提议编号
n.promisedProposal = req.ProposalNum
// 返回之前接受过的最高提议和值(如果有)
var acceptedProposalNum ProposalNum
var acceptedValue ValueType
if entry, ok := n.log[req.SlotIndex]; ok {
acceptedProposalNum = entry.ProposalNum
acceptedValue = entry.Value
}
fmt.Printf("Node %d (Acceptor): Promised for slot %d with proposal num %vn", n.ID, req.SlotIndex, req.ProposalNum)
return PromiseResponse{
FromNode: n.ID,
SlotIndex: req.SlotIndex,
Err: nil,
PromisedProposalNum: n.promisedProposal,
AcceptedProposalNum: acceptedProposalNum,
AcceptedValue: acceptedValue,
}
}
// handleAcceptRequest 处理 Accept 请求
func (n *Node) handleAcceptRequest(req AcceptRequest) {
resp := n.doHandleAcceptRequest(req)
n.simulateRPC(req.FromNode, resp) // simulateRPC现在需要能处理AcceptResponse
}
// doHandleAcceptRequest 实际处理 Accept 请求并返回响应
func (n *Node) doHandleAcceptRequest(req AcceptRequest) AcceptResponse {
n.mu.Lock()
defer n.mu.Unlock()
// 更新自己的最高任期号
if req.ProposalNum.Epoch > n.currentEpoch {
n.currentEpoch = req.ProposalNum.Epoch
n.IsLeader = false // 如果发现更高Epoch,自己肯定不是Leader
}
// 提议编号必须大于或等于已承诺的编号
if req.ProposalNum.Compare(n.promisedProposal) < 0 {
return AcceptResponse{
FromNode: n.ID,
SlotIndex: req.SlotIndex,
Err: fmt.Errorf("received Accept with lower proposal num %v than promised %v", req.ProposalNum, n.promisedProposal),
AcceptedProposalNum: n.promisedProposal, // 返回已承诺的最高编号
}
}
// 接受该提议
n.promisedProposal = req.ProposalNum // 更新承诺
n.log[req.SlotIndex] = &PaxosLogEntry{
Index: req.SlotIndex,
ProposalNum: req.ProposalNum,
Value: req.Value,
Committed: false, // 此时只是Accepted,未Committed
}
// 更新commitIndex(如果Leader的commitIndex更高,会在心跳中同步)
// 这里Acceptor只知道接受了,还不知道Leader是否已成功提交
if req.SlotIndex > n.commitIndex {
// 只有当Leader在心跳中告知了committedIndex,Follower才能更新自己的committedIndex
// 这里的commitIndex只代表Acceptor本地的最高已知接受日志索引
}
fmt.Printf("Node %d (Acceptor): Accepted value '%s' for slot %d with proposal num %vn", n.ID, req.Value, req.SlotIndex, req.ProposalNum)
return AcceptResponse{
FromNode: n.ID,
SlotIndex: req.SlotIndex,
Err: nil,
AcceptedProposalNum: req.ProposalNum, // 返回自己接受的提议编号
}
}
// handleHeartbeatRequest 处理心跳请求
func (n *Node) handleHeartbeatRequest(req HeartbeatRequest) {
resp := n.doHandleHeartbeatRequest(req)
n.simulateRPC(req.FromNode, resp)
}
// doHandleHeartbeatRequest 实际处理心跳请求
func (n *Node) doHandleHeartbeatRequest(req HeartbeatRequest) HeartbeatResponse {
n.mu.Lock()
defer n.mu.Unlock()
// 如果Leader的Epoch比我低,拒绝
if req.LeaderEpoch < n.currentEpoch {
return HeartbeatResponse{
FromNode: n.ID,
Err: fmt.Errorf("Leader %d has lower epoch %d than current %d", req.FromNode, req.LeaderEpoch, n.currentEpoch),
CurrentEpoch: n.currentEpoch,
LastLogIndex: n.commitIndex,
}
}
// 如果Leader的Epoch比我高,更新自己的Epoch,并确认其领导地位
if req.LeaderEpoch > n.currentEpoch {
n.currentEpoch = req.LeaderEpoch
n.IsLeader = false // 我肯定不是Leader
// 通知自己Leader变更
n.leaderChangeC <- LeaderChange{NewLeaderID: req.FromNode, NewLeaderEpoch: req.LeaderEpoch}
}
// Leader的Epoch等于我的Epoch,确认其领导地位
n.IsLeader = false // 如果我之前是Leader,现在发现真Leader,则降级
// 同步日志
for _, entry := range req.LogEntries {
if existingEntry, ok := n.log[entry.Index]; ok {
// 如果已有日志,且Leader的提议编号更高,则更新
if entry.ProposalNum.Compare(existingEntry.ProposalNum) > 0 {
n.log[entry.Index] = &entry
}
} else {
n.log[entry.Index] = &entry
}
// 标记为已提交
n.log[entry.Index].Committed = true
}
// 更新自己的commitIndex
if req.CommitIndex > n.commitIndex {
fmt.Printf("Node %d (Follower): Updated commit index to %d from Leader %dn", n.ID, req.CommitIndex, req.FromNode)
n.commitIndex = req.CommitIndex
}
return HeartbeatResponse{
FromNode: n.ID,
CurrentEpoch: n.currentEpoch,
LastLogIndex: n.commitIndex,
Err: nil,
}
}
Acceptor 关键逻辑:
promisedProposal检查:所有Prepare和Accept请求都会检查ProposalNum是否大于等于promisedProposal。这是 Paxos 的核心安全性保障。Epoch更新:任何时候,如果 Acceptor 收到一个带更高Epoch的请求,它都会更新自己的currentEpoch,并确保自己不是 Leader。这是处理 Leader 冲突和Leader 切换的关键。- 日志复制:在
handleAcceptRequest中,Acceptor 会将接受的值写入其本地日志。在handleHeartbeatRequest中,Follower 会根据 Leader 发来的LogEntries同步自己的日志,并更新commitIndex。
3.4 模拟 RPC 通道改进
为了更好地模拟 RPC 响应,我们需要在 simulateRPC 函数中做一些调整,使其能处理带有响应通道的请求。
// simulateRPC 改进版:发送消息到目标节点的RPC通道,并处理带有响应通道的请求
func (n *Node) simulateRPC(targetID NodeID, msg interface{}) {
if peerChan, ok := n.PeerRPCs[targetID]; ok {
select {
case peerChan <- msg:
// 消息发送成功
case <-time.After(10 * time.Millisecond): // 模拟网络延迟或拥堵
//fmt.Printf("Node %d: RPC to %d timed out for message type %Tn", n.ID, targetID, msg)
}
} else {
fmt.Printf("Node %d: Unknown peer %dn", n.ID, targetID)
}
}
// rpcListener 负责从PeerRPCs通道接收消息并分发,现在需要处理包装的RPC请求
func (n *Node) rpcListener() {
for {
select {
case <-n.quitC:
return
case msg := <-n.PeerRPCs[n.ID]:
// 处理带有响应通道的RPC请求
if reqWithResp, ok := msg.(struct {
Request interface{}
Response chan interface{} // 泛型响应通道
}); ok {
// 在一个新的goroutine中处理请求,并将结果发回响应通道
go func() {
var resp interface{}
switch r := reqWithResp.Request.(type) {
case PrepareRequest:
resp = n.doHandlePrepareRequest(r)
case AcceptRequest:
resp = n.doHandleAcceptRequest(r)
case HeartbeatRequest:
resp = n.doHandleHeartbeatRequest(r)
default:
fmt.Printf("Node %d: Unhandled RPC request type %Tn", n.ID, r)
resp = fmt.Errorf("Unhandled RPC type") // 错误响应
}
select {
case reqWithResp.Response <- resp:
case <-time.After(10 * time.Millisecond):
fmt.Printf("Node %d: Failed to send RPC response back to sender for type %Tn", n.ID, reqWithResp.Request)
}
}()
} else {
// 处理不带响应通道的普通消息 (例如LeaderChange)
n.handleIncomingMessage(msg)
}
}
}
}
// callAcceptRPC 改进版,使用新的RPC模拟方式
func (n *Node) callAcceptRPC(targetID NodeID, req AcceptRequest) AcceptResponse {
if targetID == n.ID {
return n.doHandleAcceptRequest(req)
}
responseChan := make(chan AcceptResponse, 1)
rpcReq := struct {
Request AcceptRequest
Response chan AcceptResponse
}{
Request: req,
Response: responseChan,
}
n.simulateRPC(targetID, rpcReq)
select {
case resp := <-responseChan:
return resp
case <-time.After(50 * time.Millisecond):
return AcceptResponse{FromNode: n.ID, SlotIndex: req.SlotIndex, Err: fmt.Errorf("RPC to %d timed out for AcceptRequest", targetID)}
}
}
// callHeartbeatRPC 改进版
func (n *Node) callHeartbeatRPC(targetID NodeID, req HeartbeatRequest) HeartbeatResponse {
if targetID == n.ID {
return n.doHandleHeartbeatRequest(req)
}
responseChan := make(chan HeartbeatResponse, 1)
rpcReq := struct {
Request HeartbeatRequest
Response chan HeartbeatResponse
}{
Request: req,
Response: responseChan,
}
n.simulateRPC(targetID, rpcReq)
select {
case resp := <-responseChan:
return resp
case <-time.After(50 * time.Millisecond):
return HeartbeatResponse{FromNode: n.ID, Err: fmt.Errorf("RPC to %d timed out for HeartbeatRequest", targetID)}
}
}
现在,simulateRPC 可以发送一个包含实际请求和响应通道的匿名结构体。rpcListener 会识别这种结构体,在新 goroutine 中处理请求,并将结果通过提供的响应通道发送回去。这使得 RPC 模拟更加接近真实。
4. 复杂决议冲突的处理
在 Multi-Paxos 中,复杂的决议冲突主要围绕 Leader 切换、日志不一致和提议编号的竞争。Go 的并发模型提供了强大的工具来处理这些。
4.1 Leader 选举与任期号 (Epoch)
冲突场景:两个节点都认为自己是 Leader,或一个旧 Leader 仍然尝试提议。
Go 处理方式:
- 任期号 (
Epoch):这是最核心的机制。ProposalNum中包含Epoch。所有 Paxos 消息(Prepare,Accept,Heartbeat)都携带Epoch。 - Leader 降级:
- 当 Leader 收到来自任何 Acceptor 的响应,发现其中包含一个比自己当前
Epoch更高的ProposalNum或currentEpoch,它就会立即自我降级 (n.IsLeader = false),并更新自己的currentEpoch。 - Leader 的租约过期也会导致自我降级。
- 当 Leader 收到来自任何 Acceptor 的响应,发现其中包含一个比自己当前
- Follower 提升
Epoch:当 Follower 收到一个带更高Epoch的请求时,它会更新自己的currentEpoch,并拒绝所有低于此Epoch的提议。 leaderChangeC通道:我们用一个内部通道来模拟 Leader 变更通知。当节点发现(或被告知)存在一个更高Epoch的 Leader 时,它会向leaderChangeC发送消息,触发主循环中的 Leader 状态更新逻辑。这确保了所有节点对当前 Leader 及其Epoch的认知保持一致。
// 冲突处理的Go代码片段 (已散布在上面的代码中)
// Leader在runAcceptPhase中检测到更高ProposalNum时降级:
// if highestConflictProposalNum.Epoch > proposalNum.Epoch { ... n.IsLeader = false; n.currentEpoch = highestConflictProposalNum.Epoch ... }
// Follower在doHandlePrepareRequest/doHandleAcceptRequest/doHandleHeartbeatRequest中更新Epoch:
// if req.ProposalNum.Epoch > n.currentEpoch { n.currentEpoch = req.ProposalNum.Epoch; n.IsLeader = false; ... }
// if req.LeaderEpoch > n.currentEpoch { n.currentEpoch = req.LeaderEpoch; n.IsLeader = false; ... n.leaderChangeC <- ... }
Go 并发优势:select 语句允许节点同时监听来自其他节点的消息和内部计时器(如租约),一旦发现冲突信号(如更高 Epoch),可以立即响应,通过通道进行状态同步,实现快速的 Leader 切换和冲突解决。
4.2 日志不一致与空洞 (Holes)
冲突场景:新 Leader 上任时,其日志可能不完整或与其他节点的日志存在差异。网络分区可能导致某些 Follower 错过 Leader 的提议,从而在日志中产生空洞。
Go 处理方式:
- *`log map[int]PaxosLogEntry
**:使用map而不是slice来存储日志,可以自然地处理日志中的空洞,因为map` 的 key 是日志索引。 - Leader 补齐日志:
- 新 Leader 上任时,它首先需要通过 Phase 1 (Prepare) 来发现所有 Acceptor 上已接受的最高
ProposalNum和Value,从而重建其自身的日志视图。这是 Leader 发现其commitIndex和填补自身空洞的关键。 - 在我们的简化版中,Leader 在
sendHeartbeats时会尝试根据nextIndex同步日志。如果 Follower 落后,Leader 会发送缺失的日志条目。 - 更完善的 Leader 补齐 (未完全实现,但思路如下):当 Leader 首次当选或发现 Follower 日志存在较大差异时,它会进行一个“日志检查”或“追赶”阶段。这可能涉及:
- Leader 向所有 Follower 发送
AppendEntries或CatchUp请求,其中包含 Leader 自己的commitIndex和日志元数据。 - Follower 收到后,比较自身日志与 Leader 日志的差异。如果 Follower 发现自己有 Leader 没有的日志条目,或者两者在某个索引处的值不同,Follower 会告知 Leader。
- Leader 可能会为每个有空洞的槽位重新发起一个 Phase 1/Phase 2 的 Paxos 实例,以确保这些槽位最终达成共识。如果某个槽位已经有值被多数 Acceptor 接受,Leader 必须选择那个值;否则,Leader 可以提议一个
NoOp值来填充空洞,以保证日志的连续性。
- Leader 向所有 Follower 发送
- 新 Leader 上任时,它首先需要通过 Phase 1 (Prepare) 来发现所有 Acceptor 上已接受的最高
// 日志同步片段 (在sendHeartbeats中)
// for i := n.nextIndex[peerID]; i <= commitIndex; i++ {
// if entry, ok := n.log[i]; ok {
// entries = append(entries, *entry)
// }
// }
// req := HeartbeatRequest{... LogEntries: logEntriesToSync[id], ...}
// Follower在handleHeartbeatRequest中处理日志同步:
// for _, entry := range req.LogEntries { ... n.log[entry.Index] = &entry ... }
Go 并发优势:Leader 可以为每个 Follower 启动独立的 goroutine 来进行日志同步(sendHeartbeats 内部就是这样做的),避免一个慢速 Follower 阻塞整个日志复制过程。通过 nextIndex 和 matchIndex 这样的状态,Leader 可以精确地知道每个 Follower 的同步进度。
4.3 并发提议竞争
冲突场景:在 Leader 切换期间,可能有两个 Leader 同时尝试在同一个日志槽位提议不同的值。
Go 处理方式:
- Paxos 协议保障:这是 Paxos 算法本身提供的核心保障。即使有多个 Proposer 同时活跃,只要它们使用递增的
ProposalNum,最终只有一个值能被多数 Acceptor 接受。 ProposalNum的Epoch和Counter:ProposalNum的Epoch确保了新 Leader 的提议总是优先于旧 Leader。在同一个Epoch内,Counter确保了 Leader 内部提议的顺序性。ProposerID进一步打破平局。- Acceptor 的
promisedProposal:Acceptor 只会接受ProposalNum大于或等于其promisedProposal的提议。这确保了“活锁”的避免(即 Proposer 不断提高ProposalNum但从未成功)。 - Leader 发现冲突并降级:如前所述,如果 Leader 的
Accept请求被多数 Acceptor 拒绝,并且 Acceptor 返回了更高的AcceptedProposalNum,Leader 会发现冲突并降级。
// Acceptor 在 doHandleAcceptRequest 中的核心逻辑
// if req.ProposalNum.Compare(n.promisedProposal) < 0 {
// // 拒绝,因为它承诺了更高或同等优先级的提议
// return AcceptResponse{... AcceptedProposalNum: n.promisedProposal ...}
// }
// // 接受,并更新承诺
// n.promisedProposal = req.ProposalNum
// n.log[req.SlotIndex] = ...
Go 并发优势:sync.Mutex 保护了 Acceptor 状态的并发访问。多个并发的 AcceptRequest 都会通过锁串行化地访问 promisedProposal 和 log,确保了 Paxos 规则的正确执行。
4.4 客户端请求的幂等性与重试
冲突场景:客户端发送请求,但 Leader 崩溃或网络分区导致客户端未收到响应。客户端可能重试,导致重复操作。
Go 处理方式:
- 客户端重试:客户端在未收到响应或收到错误时,会重试请求,可能发给不同的节点。
- 幂等性:Multi-Paxos 本身通过日志槽位提供了某种程度的幂等性。如果一个值已经被写入某个日志槽位,重复的相同请求(如果它们能被映射到同一个逻辑操作)不应改变最终状态。在实际应用中,客户端通常会为每个操作生成一个唯一的请求 ID,Leader 会维护一个已处理请求的映射,以确保即使重复提交,也只执行一次。
- Leader 转发:非 Leader 节点收到客户端请求时,应将其转发给当前的 Leader,或者告知客户端 Leader 的 ID。
// 客户端在Node.Run中的处理
// if !isLeader {
// clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Node %d is not leader", n.ID)}
// } else {
// n.mu.Lock()
// n.pendingProposals = append(n.pendingProposals, clientReq)
// n.mu.Unlock()
// go n.processPendingProposals()
// }
Go 并发优势:Go 的 chan 机制使得客户端可以很容易地通过 ClientProposal 中的 ResponseC 接收异步响应。客户端可以启动一个 goroutine 来发送请求并等待响应,同时设置超时。
5. 展望与总结
我们已经详细探讨了一个简化版 Multi-Paxos 在 Go 并发模型下的实现,并着重分析了如何处理 Leader 冲突、日志不一致和提议竞争等复杂决议冲突。通过 Epoch、ProposalNum、Leader 租约、select 语句以及 sync.Mutex 等 Go 特性,我们构建了一个能够维持一致性和活性的分布式共识系统骨架。
本实现简化了持久化、全面的 Leader 选举、成员变更和快照等生产级特性。一个完整的 Multi-Paxos 或 Raft 实现需要更复杂的错误处理、存储管理和网络通信层。然而,这个简化版为我们理解 Multi-Paxos 的核心机制及其在 Go 并发环境下的工作原理,提供了一个坚实的基础。
通过 Go 的并发原语,我们可以清晰地模拟分布式系统的行为,直观地理解 Paxos 算法的精妙之处,并设计出健壮的分布式一致性解决方案。