什么是 ‘Multi-Paxos’ 的简化版实现?探讨在 Go 并发模型下如何处理复杂的决议冲突

各位专家、同仁们:

欢迎来到本次关于分布式系统核心主题的深入探讨。今天,我们将聚焦于一个既经典又极具挑战性的分布式一致性算法——Multi-Paxos,并尝试在 Go 语言的并发模型下,构建一个其“简化版”实现。我们尤其会关注在 Go 并发环境下,如何优雅且高效地处理 Multi-Paxos 中复杂的决议冲突,确保系统的一致性和活性。

1. Multi-Paxos:从基础到优化

在深入 Go 实现之前,我们首先需要对 Paxos 算法及其优化版 Multi-Paxos 有一个清晰的认识。

1.1 Paxos 算法回顾

Paxos 是 Leslie Lamport 提出的一种解决分布式系统中一致性问题的算法。它的目标是在面对节点故障、网络延迟和消息丢失的情况下,让一组节点就某个值达成共识,并且一旦达成共识,这个值就不可改变。

Paxos 算法将节点分为三类角色:

  • Proposer (提议者):提出值,并尝试说服 Acceptor 接受该值。
  • Acceptor (接受者):响应 Proposer 的提议,投票决定是否接受某个值。
  • Learner (学习者):从 Acceptor 处学习到最终被接受的值。

Paxos 算法分为两个阶段:

阶段 1: Prepare (准备) 和 Promise (承诺)

  1. Proposer 发送 Prepare(n) 消息给大多数 Acceptor,其中 n 是一个全局唯一的、递增的提议编号。
  2. Acceptor 收到 Prepare(n) 后:
    • 如果 n 小于它已经承诺过的任何提议编号,则忽略该 Prepare 请求。
    • 否则,Acceptor 承诺不再接受任何小于 n 的提议,并回复 Promise(n, accepted_n, accepted_v) 给 Proposer。accepted_n 是它之前接受过的最高提议编号,accepted_v 是对应的被接受的值。如果从未接受过任何值,则 accepted_naccepted_v 为空。

阶段 2: Accept (接受) 和 Accepted (已接受)

  1. Proposer 收到大多数 Acceptor 的 Promise 响应后:
    • 如果所有 Promise 响应都没有包含 accepted_v,Proposer 可以选择它自己想要提议的值 v
    • 如果至少有一个 Promise 响应包含了 accepted_v,Proposer 必须选择所有 Promise 响应中 accepted_n 最高的那个 accepted_v 作为它的提议值 v
    • Proposer 发送 Accept(n, v) 消息给大多数 Acceptor。
  2. Acceptor 收到 Accept(n, v) 后:
    • 如果 n 大于或等于它已经承诺过的提议编号(在阶段 1 承诺的),则接受 (n, v),并回复 Accepted(n) 给 Proposer。
    • 否则,拒绝该 Accept 请求。

学习阶段 (Learning)

当 Proposer 收到大多数 Acceptor 的 Accepted 响应时,它就知道值 v 已经被接受了。然后它可以通知所有的 Learner 这个被接受的值。Learner 也可以通过询问多数 Acceptor 来学习到被接受的值。

1.2 Multi-Paxos 的引入与优化

单次 Paxos 算法虽然能够保证一致性,但其每次提议都需要经过两阶段消息交换,效率较低。在实际应用中,我们通常需要对一系列操作(例如一个分布式日志)进行排序和复制,这就需要连续地执行多次 Paxos 实例。Multi-Paxos 正是为了优化这一场景而生。

Multi-Paxos 的核心思想是:选举一个稳定的 Leader,并在其领导任期内,通过跳过 Phase 1 (Prepare) 来优化后续的 Paxos 实例

其主要特点和优化点如下:

  1. Leader 选举:系统中会选举出一个 Leader。这个 Leader 负责协调所有的 Paxos 实例。Leader 的选举本身可以使用一个 Paxos 实例,或者其他如 Raft 的选举机制。Leader 通常拥有一个“租约(Lease)”或“任期号(Epoch Number)”,在其租约有效期间,它被认为是合法的 Leader。
  2. 日志序列:Multi-Paxos 将共识问题转化为对一个日志序列的复制。日志中的每一个槽位(slot)对应一个独立的 Paxos 实例。
  3. Phase 1 优化:一旦 Leader 成功地完成了某个日志槽位的 Phase 1,并且其租约有效,它就可以在后续的日志槽位直接进入 Phase 2 (Accept)。这意味着在 Leader 稳定且租约有效的情况下,每个新的提议只需要一轮消息(Accept 请求和 Accepted 响应)。
  4. Proposer 与 Acceptor/Learner 合一:在 Multi-Paxos 中,通常 Leader 节点扮演 Proposer 和 Learner 的角色,而其他 Follower 节点则扮演 Acceptor 和 Learner 的角色。

Multi-Paxos 的优势在于,在没有 Leader 切换的稳定状态下,它能够以非常高的效率(单轮 RPC)复制日志。这也是许多分布式存储系统(如 Google Chubby, Apache Zookeeper, etcd)选择类似 Paxos 或 Raft 算法作为其核心一致性协议的原因。

1.3 简化版 Multi-Paxos 的范围

我们的“简化版”实现将聚焦于 Multi-Paxos 的核心机制,即 Leader 如何管理日志、如何提议新值以及如何通过 Go 的并发模型处理冲突。我们将暂时简化或抽象掉以下方面,但在实际生产系统中,它们是必不可少的:

  • 持久化存储:假设节点状态(日志、提议编号等)在内存中,不考虑崩溃恢复后的数据持久性加载。
  • 网络分区后的自动恢复:我们将主要通过超时和 Leader 切换来模拟,不会实现复杂的网络拓扑感知。
  • 成员变更 (Membership Change):集群中节点的动态增删。
  • 快照 (Snapshotting):为了避免日志无限增长。
  • 完整的 Leader 选举协议:我们将假设有一个外部机制(或一个简化的 Paxos 实例)已经选出了一个 Leader,并且该 Leader 有一个唯一的“任期号”或“租约ID”。

我们的重点是:

  1. 核心数据结构:如何在 Go 中表示 Paxos 消息和节点状态。
  2. Go 并发模型:如何使用 Goroutines 和 Channels 来模拟分布式通信和并发处理。
  3. Leader 逻辑:Leader 如何推进日志,处理客户端请求。
  4. Acceptor 逻辑:Acceptor 如何响应 Proposer 的请求,维护其状态。
  5. 冲突处理:当多个 Leader 出现、网络不稳定或提议冲突时,Go 如何协调这些复杂情况。

2. Go 并发模型下的核心组件

我们将使用 Go 语言的 goroutinechannel 来构建我们的分布式系统骨架。每个节点将运行在一个或多个 goroutine 中,节点间的通信通过模拟 RPC 调用来实现。

2.1 消息结构定义

首先定义我们在 Paxos 协议中使用的消息类型。为了简化,我们假设所有节点都有一个唯一的 ID

package main

import (
    "fmt"
    "sync"
    "time"
)

// 定义节点ID类型
type NodeID int

// ProposalNum 包含任期号(epoch)和在该任期内的递增计数
type ProposalNum struct {
    Epoch   int    // Leader的任期号,每次Leader切换都会递增
    Counter int    // 在当前任期内,Proposer递增的计数
    ProposerID NodeID // 哪个Proposer发出的,用于打破平局
}

// Compare 比较两个ProposalNum
func (pn ProposalNum) Compare(other ProposalNum) int {
    if pn.Epoch != other.Epoch {
        return pn.Epoch - other.Epoch
    }
    if pn.Counter != other.Counter {
        return pn.Counter - other.Counter
    }
    return int(pn.ProposerID - other.ProposerID) // 提议者ID用于打破平局
}

// ValueType 提议的值的类型,可以是任意数据
type ValueType string

// PaxosLogEntry 代表日志中的一个槽位
type PaxosLogEntry struct {
    Index       int         // 日志槽位索引
    ProposalNum ProposalNum // 接受该值的提议编号
    Value       ValueType   // 接受的值
    Committed   bool        // 是否已提交(被多数Learner知晓)
}

// PrepareRequest 阶段1请求
type PrepareRequest struct {
    FromNode    NodeID      // 请求来源
    SlotIndex   int         // 针对哪个日志槽位
    ProposalNum ProposalNum // Proposer的提议编号
}

// PromiseResponse 阶段1响应
type PromiseResponse struct {
    FromNode          NodeID      // 响应来源
    SlotIndex         int         // 针对哪个日志槽位
    Err               error       // 如果有错误
    PromisedProposalNum ProposalNum // Acceptor承诺不再接受小于此编号的提议
    AcceptedProposalNum ProposalNum // Acceptor之前接受过的最高提议编号
    AcceptedValue     ValueType   // 对应AcceptedProposalNum的值
}

// AcceptRequest 阶段2请求
type AcceptRequest struct {
    FromNode    NodeID      // 请求来源
    SlotIndex   int         // 针对哪个日志槽位
    ProposalNum ProposalNum // Proposer的提议编号
    Value       ValueType   // Proposer提议的值
}

// AcceptResponse 阶段2响应
type AcceptResponse struct {
    FromNode          NodeID      // 响应来源
    SlotIndex         int         // 针对哪个日志槽位
    Err               error       // 如果有错误
    AcceptedProposalNum ProposalNum // Acceptor实际接受的提议编号 (可能高于请求的,表示冲突)
}

// ClientProposal 客户端请求,携带一个值
type ClientProposal struct {
    Value     ValueType
    ResponseC chan ClientResponse // 用于返回结果给客户端
}

// ClientResponse 客户端响应
type ClientResponse struct {
    Err error
}

// HeartbeatRequest Leader发送的心跳
type HeartbeatRequest struct {
    FromNode    NodeID      // Leader ID
    LeaderEpoch int         // Leader的任期号
    CommitIndex int         // Leader已提交的最高日志索引
    LogEntries  []PaxosLogEntry // 用于同步落后Follower的日志
}

// HeartbeatResponse Follower对心跳的响应
type HeartbeatResponse struct {
    FromNode      NodeID // Follower ID
    CurrentEpoch  int    // Follower当前看到的最高任期号
    LastLogIndex  int    // Follower的最新日志索引
    Err           error  // 错误信息
}

// LeaderChange 通知Leader状态变化的内部消息
type LeaderChange struct {
    NewLeaderID NodeID
    NewLeaderEpoch int
}

ProposalNum 的设计:为了处理 Multi-Paxos 中的 Leader 切换和冲突,ProposalNum 不仅仅是一个简单的整数。它包含 Epoch(任期号,每次 Leader 切换都会递增)、Counter(在当前任期内递增的提议计数)和 ProposerID(用于打破平局)。这种设计使得不同 Leader 的提议天然具有优先级,高 Epoch 的提议总是优先于低 Epoch 的提议。

2.2 节点状态定义

每个分布式节点需要维护其自身的 Paxos 状态。

// Node 结构体定义了一个分布式节点
type Node struct {
    ID        NodeID
    IsLeader  bool
    Peers     []NodeID // 集群中的所有其他节点ID
    PeerRPCs  map[NodeID]chan interface{} // 模拟节点间RPC通信的通道

    mu sync.RWMutex // 保护节点状态的读写锁

    // Paxos 状态
    currentEpoch      int                  // 节点当前知道的最高任期号
    promisedProposal  ProposalNum          // Acceptor 承诺过的最高提议编号
    log               map[int]*PaxosLogEntry // 分布式日志,key是槽位索引
    commitIndex       int                  // 已提交的最高日志槽位索引
    lastApplied       int                  // 已应用到状态机的最高日志槽位索引

    // Leader 状态 (仅当 IsLeader 为 true 时有效)
    nextProposalCounter int                 // Leader在其当前任期内,下一个Proposer计数
    leaderLease         *time.Timer         // Leader租约计时器
    leaderLeaseDuration time.Duration       // 租约持续时间
    nextIndex           map[NodeID]int      // 对于每个Follower,Leader下一个要发送给它的日志索引
    matchIndex          map[NodeID]int      // 对于每个Follower,Leader已知其复制的最高日志索引
    pendingProposals    []ClientProposal    // 待处理的客户端请求

    // 通信通道
    prepareC      chan PrepareRequest
    acceptC       chan AcceptRequest
    clientProposeC chan ClientProposal
    heartbeatC    chan HeartbeatRequest
    leaderChangeC chan LeaderChange // 内部通知Leader变化
    quitC         chan struct{}      // 退出信号
}

// NewNode 创建一个新的分布式节点
func NewNode(id NodeID, peers []NodeID, peerRPCs map[NodeID]chan interface{}) *Node {
    n := &Node{
        ID:        id,
        Peers:     peers,
        PeerRPCs:  peerRPCs,
        log:       make(map[int]*PaxosLogEntry),
        commitIndex: -1, // 初始为-1,表示没有提交任何日志
        lastApplied: -1,
        leaderLeaseDuration: 500 * time.Millisecond, // 示例租约时长
        nextIndex:   make(map[NodeID]int),
        matchIndex:  make(map[NodeID]int),
        prepareC:      make(chan PrepareRequest),
        acceptC:       make(chan AcceptRequest),
        clientProposeC: make(chan ClientProposal),
        heartbeatC:    make(chan HeartbeatRequest),
        leaderChangeC: make(chan LeaderChange),
        quitC:         make(chan struct{}),
    }
    // 初始化nextIndex和matchIndex
    for _, peerID := range peers {
        n.nextIndex[peerID] = 0 // 假设从日志索引0开始
        n.matchIndex[peerID] = -1
    }
    return n
}

核心状态解释:

  • currentEpoch:节点当前知道的最高任期号。这是处理 Leader 冲突的关键。
  • promisedProposal:Acceptor 角色使用的,记录它承诺过的最高提议编号。
  • log:存储已接受的日志条目,使用 map[int]*PaxosLogEntry 可以方便地处理日志中的“空洞”(holes)。
  • commitIndex:已被多数节点确认复制的最高日志索引。
  • nextProposalCounter:Leader 在其当前 currentEpoch 内使用的递增计数。
  • leaderLease:模拟 Leader 租约,一旦租约过期,Leader 可能需要重新进行 Phase 1 或者重新选举。
  • nextIndex, matchIndex:Leader 维护的每个 Follower 的日志复制进度,类似于 Raft。

2.3 模拟 RPC 通信

我们通过 Go channel 来模拟节点间的 RPC 通信。每个节点有一个 PeerRPCs map,其中 key 是对端节点 ID,value 是一个 chan interface{},用于接收来自该对端节点的消息。

// simulateRPC 发送消息到目标节点的RPC通道
func (n *Node) simulateRPC(targetID NodeID, msg interface{}) {
    if targetID == n.ID {
        // 消息发给自己,直接处理
        n.handleIncomingMessage(msg)
        return
    }
    if peerChan, ok := n.PeerRPCs[targetID]; ok {
        select {
        case peerChan <- msg:
            // 消息发送成功
        case <-time.After(10 * time.Millisecond): // 模拟网络延迟或拥堵
            fmt.Printf("Node %d: RPC to %d timed out for message type %Tn", n.ID, targetID, msg)
        }
    } else {
        fmt.Printf("Node %d: Unknown peer %dn", n.ID, targetID)
    }
}

// handleIncomingMessage 处理接收到的消息
func (n *Node) handleIncomingMessage(msg interface{}) {
    switch req := msg.(type) {
    case PrepareRequest:
        n.prepareC <- req
    case PromiseResponse:
        // PromiseResponse通常由Leader在发送Prepare后异步接收
        // 因此需要一个机制将响应路由回发起Prepare的goroutine
        // 在我们的简化模型中,Leader会直接在发送后等待chan响应
        // 这里暂不处理PromiseResponse的直接路由,而是通过RPC调用返回
    case AcceptRequest:
        n.acceptC <- req
    case AcceptResponse:
        // AcceptResponse同上
    case HeartbeatRequest:
        n.heartbeatC <- req
    case HeartbeatResponse:
        // HeartbeatResponse同上
    default:
        fmt.Printf("Node %d: Unhandled message type %Tn", n.ID, msg)
    }
}

为了使 RPC 模拟更真实,simulateRPC 包含了超时机制。PeerRPCschan interface{} 允许我们将任何消息类型发送给目标节点。每个节点的主循环会从其对应的 PeerRPCs 通道中接收消息并分发到特定的处理通道(prepareC, acceptC 等)。

3. 节点核心逻辑与 Go 并发处理

每个节点都将在其独立的 goroutine 中运行,通过 select 语句并发处理各种事件:客户端请求、来自其他节点的 RPC 请求、内部定时器等。

3.1 节点主循环 (Run 方法)

// Run 启动节点的主循环
func (n *Node) Run() {
    go n.rpcListener() // 启动RPC监听器

    ticker := time.NewTicker(n.leaderLeaseDuration / 2) // 心跳间隔短于租约
    defer ticker.Stop()

    for {
        select {
        case <-n.quitC:
            fmt.Printf("Node %d: Shutting down.n", n.ID)
            return

        case req := <-n.prepareC:
            // 异步处理 Prepare 请求
            go n.handlePrepareRequest(req)

        case req := <-n.acceptC:
            // 异步处理 Accept 请求
            go n.handleAcceptRequest(req)

        case req := <-n.heartbeatC:
            // 异步处理 Heartbeat 请求
            go n.handleHeartbeatRequest(req)

        case clientReq := <-n.clientProposeC:
            n.mu.RLock()
            isLeader := n.IsLeader
            n.mu.RUnlock()

            if !isLeader {
                // 如果不是Leader,拒绝客户端请求或转发给Leader
                clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Node %d is not leader", n.ID)}
            } else {
                n.mu.Lock()
                n.pendingProposals = append(n.pendingProposals, clientReq)
                n.mu.Unlock()
                // 立即尝试处理待处理的提案
                go n.processPendingProposals()
            }

        case <-ticker.C:
            n.mu.RLock()
            isLeader := n.IsLeader
            currentEpoch := n.currentEpoch
            n.mu.RUnlock()

            if isLeader {
                // Leader发送心跳
                go n.sendHeartbeats(currentEpoch)
                // 检查租约
                n.checkLeaderLease()
            } else {
                // Follower检测Leader活性,简化为等待LeaderChange通知
                // 实际中这里会检测超时并触发选举
            }

        case lc := <-n.leaderChangeC:
            n.mu.Lock()
            if lc.NewLeaderEpoch > n.currentEpoch {
                fmt.Printf("Node %d: Leader change detected. New Leader: %d, Epoch: %dn", n.ID, lc.NewLeaderID, lc.NewLeaderEpoch)
                n.IsLeader = (lc.NewLeaderID == n.ID)
                n.currentEpoch = lc.NewLeaderEpoch
                n.promisedProposal = ProposalNum{Epoch: lc.NewLeaderEpoch, Counter: 0, ProposerID: n.ID} // 提升承诺的提议编号
                if n.IsLeader {
                    n.nextProposalCounter = 0 // Leader重置计数器
                    n.resetLeaderState()
                    n.startLeaderLease()
                    fmt.Printf("Node %d: I am the new leader for epoch %d!n", n.ID, n.currentEpoch)
                    go n.processPendingProposals() // 作为新Leader处理可能有的待处理请求
                    go n.sendHeartbeats(n.currentEpoch) // 新Leader立即发送心跳
                } else {
                    n.leaderLease = nil // 非Leader清除租约
                    n.pendingProposals = nil // 清空待处理请求
                }
            }
            n.mu.Unlock()
        }
    }
}

// rpcListener 负责从PeerRPCs通道接收消息并分发
func (n *Node) rpcListener() {
    for {
        select {
        case <-n.quitC:
            return
        case msg := <-n.PeerRPCs[n.ID]: // 每个节点通过自己的RPC通道接收消息
            n.handleIncomingMessage(msg)
        }
    }
}

Node.Run 关键点:

  • select 多路复用:这是 Go 并发编程的核心。节点可以同时监听多个通道,处理不同类型的事件。
  • 异步处理:对于 RPC 请求(prepareC, acceptC, heartbeatC),我们通常会在一个新的 goroutine 中处理,避免阻塞主循环。
  • Leader 状态管理:Leader 会周期性发送心跳,并检查租约。Follower 则通过接收心跳来感知 Leader 活性。
  • leaderChangeC:这是一个简化的 Leader 选举机制。当外部(或其他节点)检测到 Leader 变更时,会通过此通道通知节点,节点根据通知更新自己的 Leader 状态和任期号。高 Epoch 的 Leader 总是优先。
  • sync.RWMutex:保护节点共享状态的并发访问。读锁 (RLock) 允许多个 goroutine 同时读取,写锁 (Lock) 确保排他性写入。

3.2 Leader 角色的实现

Leader 是 Multi-Paxos 的核心。它负责接收客户端请求,将它们转化为 Paxos 提议,并协调 Acceptor 达成共识。

3.2.1 处理客户端提案 (processPendingProposals)

// processPendingProposals 处理待处理的客户端请求
func (n *Node) processPendingProposals() {
    n.mu.Lock()
    if !n.IsLeader {
        n.mu.Unlock()
        return
    }
    // 在处理完当前所有 pending proposals 之前,不接受新的 client proposals
    // 这是一个简化的处理,实际中会有一个更复杂的请求队列和并发控制
    proposalsToProcess := make([]ClientProposal, len(n.pendingProposals))
    copy(proposalsToProcess, n.pendingProposals)
    n.pendingProposals = nil // 清空待处理队列
    n.mu.Unlock()

    for _, clientReq := range proposalsToProcess {
        if !n.IsLeader { // Leader可能在处理过程中下台
            clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Node %d lost leadership", n.ID)}
            return
        }

        // 找到下一个可用的日志槽位
        n.mu.RLock()
        nextSlot := n.commitIndex + 1 // 尝试在已提交日志的下一个槽位进行提议
        // 也可以从 n.log 中找到最大的 key + 1
        for {
            if _, exists := n.log[nextSlot]; exists {
                nextSlot++
            } else {
                break
            }
        }
        currentEpoch := n.currentEpoch
        n.mu.RUnlock()

        // Multi-Paxos 优化: Leader可以直接进入Phase 2 (Accept)
        // 除非它不确定自己是否仍是Leader,或者需要填充空洞
        // 这里简化为直接进入Accept,如果失败则可能需要重新Phase 1或下台
        proposalNum := ProposalNum{Epoch: currentEpoch, Counter: n.nextProposalCounter, ProposerID: n.ID}

        // 尝试进行 Accept 阶段
        accepted, err := n.runAcceptPhase(nextSlot, proposalNum, clientReq.Value)
        if err != nil || !accepted {
            fmt.Printf("Node %d (Leader): Failed to get majority for slot %d, value %s. Error: %vn", n.ID, nextSlot, clientReq.Value, err)
            clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Leader failed to commit value: %v", err)}
            // 如果Accept失败,Leader可能需要重新发起Prepare或下台
            // 触发LeaderChange让系统重新选举或Leader自我降级
            n.mu.Lock()
            if n.IsLeader { // 确保仍是Leader才降级
                fmt.Printf("Node %d (Leader): Stepping down due to failed proposal.n", n.ID)
                n.IsLeader = false
                n.currentEpoch++ // 提升Epoch,促使新Leader选举
                for _, peerID := range n.Peers {
                    if peerID != n.ID {
                        n.simulateRPC(peerID, LeaderChange{NewLeaderID: peerID, NewLeaderEpoch: n.currentEpoch})
                    }
                }
                // 通知自己Leader变更
                n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 实际是触发自己的降级
            }
            n.mu.Unlock()
            continue
        }

        // 提议成功,更新Leader状态
        n.mu.Lock()
        n.nextProposalCounter++
        n.log[nextSlot] = &PaxosLogEntry{Index: nextSlot, ProposalNum: proposalNum, Value: clientReq.Value, Committed: true}
        if nextSlot > n.commitIndex {
            n.commitIndex = nextSlot
        }
        fmt.Printf("Node %d (Leader): Value '%s' committed at slot %d (Epoch: %d, Counter: %d)n", n.ID, clientReq.Value, nextSlot, proposalNum.Epoch, proposalNum.Counter)
        n.mu.Unlock()

        clientReq.ResponseC <- ClientResponse{Err: nil}
    }
}

// runAcceptPhase 执行Paxos的Accept阶段
func (n *Node) runAcceptPhase(slotIndex int, proposalNum ProposalNum, value ValueType) (bool, error) {
    acceptRequests := make(chan AcceptResponse, len(n.Peers))
    var wg sync.WaitGroup

    // 发送 Accept 请求给所有 Acceptor
    for _, peerID := range n.Peers {
        wg.Add(1)
        go func(id NodeID) {
            defer wg.Done()
            // 模拟RPC调用
            responseC := make(chan AcceptResponse, 1)
            req := AcceptRequest{FromNode: n.ID, SlotIndex: slotIndex, ProposalNum: proposalNum, Value: value}
            // 在实际RPC中,会等待一个响应
            // 这里我们直接调用目标节点的RPC处理函数,并假定它会把响应发回
            // 简化:直接构造响应并发送到通道
            resp := n.callAcceptRPC(id, req) // 这是一个同步阻塞的RPC模拟
            responseC <- resp
            acceptRequests <- resp // 将响应发送到收集通道
        }(peerID)
    }

    wg.Wait() // 等待所有RPC调用完成

    // 收集响应,判断是否获得多数
    majorityCount := len(n.Peers)/2 + 1
    acceptedCount := 0
    highestConflictProposalNum := ProposalNum{} // 记录遇到的最高冲突提议编号

    for i := 0; i < len(n.Peers); i++ {
        resp := <-acceptRequests
        if resp.Err == nil {
            // 检查Acceptor是否接受了当前的proposalNum
            if resp.AcceptedProposalNum.Compare(proposalNum) == 0 {
                acceptedCount++
            } else {
                // Acceptor接受了更高编号的提议,说明Leader可能不是最新的
                if resp.AcceptedProposalNum.Compare(highestConflictProposalNum) > 0 {
                    highestConflictProposalNum = resp.AcceptedProposalNum
                }
            }
        } else {
            fmt.Printf("Node %d (Leader): Accept RPC to %d for slot %d failed: %vn", n.ID, resp.FromNode, slotIndex, resp.Err)
        }
    }

    if acceptedCount >= majorityCount {
        return true, nil
    }

    // 如果没有获得多数接受,并且遇到了更高编号的提议,Leader应该自我降级
    if highestConflictProposalNum.Epoch > proposalNum.Epoch {
        n.mu.Lock()
        if n.IsLeader {
            fmt.Printf("Node %d (Leader): Detected higher epoch %d from Acceptor. Stepping down.n", n.ID, highestConflictProposalNum.Epoch)
            n.IsLeader = false
            n.currentEpoch = highestConflictProposalNum.Epoch // 更新自己的epoch
            n.promisedProposal = highestConflictProposalNum    // 提升承诺
            // 通知其他节点Leader变更
            for _, peerID := range n.Peers {
                if peerID != n.ID {
                    n.simulateRPC(peerID, LeaderChange{NewLeaderID: highestConflictProposalNum.ProposerID, NewLeaderEpoch: highestConflictProposalNum.Epoch})
                }
            }
            // 触发自己的降级和可能的LeaderChange处理
            n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 这是一个自我降级的信号
        }
        n.mu.Unlock()
        return false, fmt.Errorf("Leader detected higher proposal number %v, stepping down", highestConflictProposalNum)
    }

    return false, fmt.Errorf("Failed to get majority acceptance for slot %d", slotIndex)
}

// callAcceptRPC 模拟对某个节点的 Accept RPC 调用
func (n *Node) callAcceptRPC(targetID NodeID, req AcceptRequest) AcceptResponse {
    // 实际RPC会通过网络发送请求并等待响应
    // 简化:直接调用目标节点的处理函数
    // 为了确保消息能被处理,需要将请求发送到目标节点的RPC监听通道
    // 并需要一个机制来获取响应
    responseChan := make(chan AcceptResponse, 1)

    // 模拟RPC请求和响应的传递
    go func() {
        // 模拟RPC请求
        n.simulateRPC(targetID, req)

        // 模拟从目标节点接收响应。这在真实RPC中是自动的
        // 在我们的模拟中,需要一个方法让目标节点知道响应应该发到哪里
        // 这里我们简化:目标节点直接将响应“发送”到这个临时的responseChan
        // 这是一个不完美的模拟,真实RPC需要一个请求ID来匹配响应
        // 更准确的模拟需要:
        // 1. targetID的rpcListener收到req
        // 2. targetID的handleAcceptRequest处理req
        // 3. handleAcceptRequest将response发送回发起者的某个通道 (通过req中的replyTo字段)
        // 为了简化,我们假设这个函数会阻塞直到收到响应
        // 实际上,这在并发系统中会更复杂。这里仅为演示逻辑
        // 暂时通过一个同步回调或共享通道来模拟
        // 让我们重构一下,让handleAcceptRequest直接返回给一个临时的通道
        // 但这会破坏handleAcceptRequest的异步性

        // 妥协:为了保持handleAcceptRequest的异步性,且避免复杂的回调
        // 我们假设这里有一个机制,能在 targetID 节点处理完 req 后,
        // 将结果通过某个路由机制(例如带有req.FromNode的通道)发送回本节点
        // 暂时用一个简单的模拟方式:直接调用处理函数,并“捕获”其结果
        // 这种模拟在并发场景下很脆弱,仅为概念演示
        // 更鲁棒的方案是为每个请求生成一个唯一ID,并在响应中带回
        // 或者为每个请求创建一个临时的response channel,并在请求中传递
    }()

    // 更好的模拟:目标节点处理请求后,通过其simulateRPC将响应发回
    // 但simulateRPC发送的是 interface{},我们需要一个方式来区分响应
    // 鉴于此,我们的simulateRPC模型在处理 RPC 响应时显得不足
    // 让我们改变一下 callAcceptRPC 的工作方式:
    // 它不会直接调用 simulateRPC,而是直接调用目标节点的处理函数,并期望一个同步响应。
    // 这牺牲了一点模拟的真实性,但简化了响应路由。
    // 在真实的分布式系统中,RPC库会处理这些。

    if targetID == n.ID {
        // 自己处理
        resp := n.doHandleAcceptRequest(req)
        return resp
    }

    // 模拟远程调用:创建一个临时的请求/响应通道
    // 这是一个简化的RPC模式,实际更复杂
    rpcReq := struct {
        Request AcceptRequest
        Response chan AcceptResponse
    }{
        Request: req,
        Response: make(chan AcceptResponse, 1),
    }
    // 将带响应通道的请求发送到目标节点的RPC通道
    n.simulateRPC(targetID, rpcReq) // simulateRPC现在需要能处理这种结构体

    select {
    case resp := <-rpcReq.Response:
        return resp
    case <-time.After(50 * time.Millisecond): // RPC超时
        return AcceptResponse{FromNode: n.ID, SlotIndex: req.SlotIndex, Err: fmt.Errorf("RPC to %d timed out for AcceptRequest", targetID)}
    }
}

processPendingProposals 解释:

  • Leader 身份检查:每次处理前检查 !n.IsLeader,如果已经不是 Leader,则停止处理并返回错误。
  • 槽位查找nextSlot 寻找下一个可用的日志槽位,通常是 commitIndex + 1,但也要处理可能存在的空洞。
  • Multi-Paxos 优化:Leader 直接进入 runAcceptPhase(Phase 2),跳过 Phase 1。
  • 冲突处理:如果在 runAcceptPhase 中发现多数 Acceptor 接受了更高 ProposalNum 的提议,Leader 会自我降级,更新 currentEpoch,并通知其他节点 Leader 变更。
  • 日志更新:成功提交后,更新 commitIndexlog

3.2.2 Leader 心跳 (sendHeartbeatscheckLeaderLease)

// sendHeartbeats Leader周期性发送心跳给Follower
func (n *Node) sendHeartbeats(currentEpoch int) {
    n.mu.RLock()
    if !n.IsLeader || n.currentEpoch != currentEpoch { // 如果Leader已变或Epoch不匹配,则停止发送
        n.mu.RUnlock()
        return
    }
    commitIndex := n.commitIndex
    logEntriesToSync := make(map[NodeID][]PaxosLogEntry) // 为每个Follower准备要同步的日志
    for _, peerID := range n.Peers {
        if peerID == n.ID {
            continue
        }
        // 简化:为每个Follower发送完整的日志(不高效,实际中会根据nextIndex发送增量)
        // 在这里,假设HeartbeatRequest的LogEntries是用于快速同步部分落后日志
        // 实际应根据 n.nextIndex[peerID] 来发送从该索引开始的日志
        var entries []PaxosLogEntry
        for i := n.nextIndex[peerID]; i <= commitIndex; i++ {
            if entry, ok := n.log[i]; ok {
                entries = append(entries, *entry)
            }
        }
        logEntriesToSync[peerID] = entries
    }
    n.mu.RUnlock()

    for _, peerID := range n.Peers {
        if peerID == n.ID {
            continue
        }
        go func(id NodeID) {
            req := HeartbeatRequest{
                FromNode:    n.ID,
                LeaderEpoch: currentEpoch,
                CommitIndex: commitIndex,
                LogEntries:  logEntriesToSync[id],
            }
            // 模拟RPC,并等待响应
            // 真实RPC会有一个响应机制,这里简化
            responseC := make(chan HeartbeatResponse, 1)
            // 再次为了简化,我们让目标节点直接处理并返回
            resp := n.callHeartbeatRPC(id, req)
            responseC <- resp

            select {
            case r := <-responseC:
                n.mu.Lock()
                if r.Err != nil {
                    fmt.Printf("Node %d (Leader): Heartbeat to %d failed: %vn", n.ID, id, r.Err)
                } else {
                    // 更新Follower的matchIndex和nextIndex
                    if r.LastLogIndex > n.matchIndex[id] {
                        n.matchIndex[id] = r.LastLogIndex
                        n.nextIndex[id] = r.LastLogIndex + 1
                    }
                    // 如果Follower的Epoch更高,Leader需要降级
                    if r.CurrentEpoch > n.currentEpoch {
                        fmt.Printf("Node %d (Leader): Follower %d has higher epoch %d. Stepping down.n", n.ID, id, r.CurrentEpoch)
                        n.IsLeader = false
                        n.currentEpoch = r.CurrentEpoch
                        // 通知其他节点Leader变更
                        for _, pID := range n.Peers {
                            if pID != n.ID {
                                n.simulateRPC(pID, LeaderChange{NewLeaderID: id, NewLeaderEpoch: n.currentEpoch})
                            }
                        }
                        n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 自我降级
                    }
                }
                n.mu.Unlock()
            case <-time.After(n.leaderLeaseDuration / 4): // 心跳响应超时
                fmt.Printf("Node %d (Leader): Heartbeat to %d timed out.n", n.ID, id)
            }
        }(peerID)
    }
}

// callHeartbeatRPC 模拟对某个节点的 Heartbeat RPC 调用
func (n *Node) callHeartbeatRPC(targetID NodeID, req HeartbeatRequest) HeartbeatResponse {
    if targetID == n.ID {
        return n.doHandleHeartbeatRequest(req)
    }
    rpcReq := struct {
        Request HeartbeatRequest
        Response chan HeartbeatResponse
    }{
        Request: req,
        Response: make(chan HeartbeatResponse, 1),
    }
    n.simulateRPC(targetID, rpcReq) // simulateRPC需要能处理这种结构体
    select {
    case resp := <-rpcReq.Response:
        return resp
    case <-time.After(50 * time.Millisecond):
        return HeartbeatResponse{FromNode: n.ID, Err: fmt.Errorf("RPC to %d timed out for HeartbeatRequest", targetID)}
    }
}

// checkLeaderLease 检查Leader租约是否过期
func (n *Node) checkLeaderLease() {
    n.mu.Lock()
    defer n.mu.Unlock()

    if !n.IsLeader || n.leaderLease == nil {
        return
    }

    select {
    case <-n.leaderLease.C:
        // 租约过期,Leader应该降级
        fmt.Printf("Node %d (Leader): Lease expired. Stepping down.n", n.ID)
        n.IsLeader = false
        n.currentEpoch++ // 提升Epoch,促使新Leader选举
        // 通知其他节点Leader变更
        for _, peerID := range n.Peers {
            if peerID != n.ID {
                n.simulateRPC(peerID, LeaderChange{NewLeaderID: peerID, NewLeaderEpoch: n.currentEpoch})
            }
        }
        n.leaderChangeC <- LeaderChange{NewLeaderID: n.ID, NewLeaderEpoch: n.currentEpoch} // 自我降级
        n.leaderLease = nil
    default:
        // 租约未过期,重置计时器
        n.leaderLease.Reset(n.leaderLeaseDuration)
    }
}

// startLeaderLease 启动Leader租约计时器
func (n *Node) startLeaderLease() {
    if n.leaderLease != nil {
        n.leaderLease.Stop()
    }
    n.leaderLease = time.NewTimer(n.leaderLeaseDuration)
}

// resetLeaderState 重置Leader相关的状态
func (n *Node) resetLeaderState() {
    for _, peerID := range n.Peers {
        if peerID == n.ID {
            continue
        }
        // Leader刚上任,假设所有Follower都落后
        n.nextIndex[peerID] = n.commitIndex + 1 // 或者根据实际情况初始化
        n.matchIndex[peerID] = -1
    }
    n.pendingProposals = nil
}

心跳机制和租约

  • Leader 定期发送 HeartbeatRequest 给所有 Follower,包含其当前 LeaderEpochcommitIndex
  • Follower 收到心跳后,会更新其 commitIndex 并同步日志。如果 Follower 发现自己的 currentEpoch 比 Leader 的 LeaderEpoch 高,它会拒绝心跳并告知 Leader,促使 Leader 降级。
  • Leader 还会维护一个 leaderLease。如果在租约时间内没有成功向多数 Follower 发送心跳并获得响应,或者租约过期,Leader 就会自我降级,并尝试触发一次新的 Leader 选举(通过提升 currentEpoch)。

3.3 Acceptor (Follower) 角色的实现

Follower 节点扮演 Acceptor 和 Learner 的角色。它们接收 Leader 的 PrepareAccept 请求,并响应。

// handlePrepareRequest 处理 Prepare 请求
func (n *Node) handlePrepareRequest(req PrepareRequest) {
    resp := n.doHandlePrepareRequest(req)
    // 将响应发送回发起者
    n.simulateRPC(req.FromNode, resp) // simulateRPC现在需要能处理PromiseResponse
}

// doHandlePrepareRequest 实际处理 Prepare 请求并返回响应
func (n *Node) doHandlePrepareRequest(req PrepareRequest) PromiseResponse {
    n.mu.Lock()
    defer n.mu.Unlock()

    // 更新自己的最高任期号
    if req.ProposalNum.Epoch > n.currentEpoch {
        n.currentEpoch = req.ProposalNum.Epoch
        n.IsLeader = false // 如果发现更高Epoch,自己肯定不是Leader
    }

    // 提议编号必须大于或等于已承诺的编号
    if req.ProposalNum.Compare(n.promisedProposal) < 0 {
        return PromiseResponse{
            FromNode:          n.ID,
            SlotIndex:         req.SlotIndex,
            Err:               fmt.Errorf("received Prepare with lower proposal num %v than promised %v", req.ProposalNum, n.promisedProposal),
            PromisedProposalNum: n.promisedProposal, // 返回已承诺的最高编号
        }
    }

    // 承诺接受此提议编号
    n.promisedProposal = req.ProposalNum

    // 返回之前接受过的最高提议和值(如果有)
    var acceptedProposalNum ProposalNum
    var acceptedValue ValueType
    if entry, ok := n.log[req.SlotIndex]; ok {
        acceptedProposalNum = entry.ProposalNum
        acceptedValue = entry.Value
    }

    fmt.Printf("Node %d (Acceptor): Promised for slot %d with proposal num %vn", n.ID, req.SlotIndex, req.ProposalNum)
    return PromiseResponse{
        FromNode:          n.ID,
        SlotIndex:         req.SlotIndex,
        Err:               nil,
        PromisedProposalNum: n.promisedProposal,
        AcceptedProposalNum: acceptedProposalNum,
        AcceptedValue:     acceptedValue,
    }
}

// handleAcceptRequest 处理 Accept 请求
func (n *Node) handleAcceptRequest(req AcceptRequest) {
    resp := n.doHandleAcceptRequest(req)
    n.simulateRPC(req.FromNode, resp) // simulateRPC现在需要能处理AcceptResponse
}

// doHandleAcceptRequest 实际处理 Accept 请求并返回响应
func (n *Node) doHandleAcceptRequest(req AcceptRequest) AcceptResponse {
    n.mu.Lock()
    defer n.mu.Unlock()

    // 更新自己的最高任期号
    if req.ProposalNum.Epoch > n.currentEpoch {
        n.currentEpoch = req.ProposalNum.Epoch
        n.IsLeader = false // 如果发现更高Epoch,自己肯定不是Leader
    }

    // 提议编号必须大于或等于已承诺的编号
    if req.ProposalNum.Compare(n.promisedProposal) < 0 {
        return AcceptResponse{
            FromNode:          n.ID,
            SlotIndex:         req.SlotIndex,
            Err:               fmt.Errorf("received Accept with lower proposal num %v than promised %v", req.ProposalNum, n.promisedProposal),
            AcceptedProposalNum: n.promisedProposal, // 返回已承诺的最高编号
        }
    }

    // 接受该提议
    n.promisedProposal = req.ProposalNum // 更新承诺
    n.log[req.SlotIndex] = &PaxosLogEntry{
        Index:       req.SlotIndex,
        ProposalNum: req.ProposalNum,
        Value:       req.Value,
        Committed:   false, // 此时只是Accepted,未Committed
    }
    // 更新commitIndex(如果Leader的commitIndex更高,会在心跳中同步)
    // 这里Acceptor只知道接受了,还不知道Leader是否已成功提交
    if req.SlotIndex > n.commitIndex {
        // 只有当Leader在心跳中告知了committedIndex,Follower才能更新自己的committedIndex
        // 这里的commitIndex只代表Acceptor本地的最高已知接受日志索引
    }
    fmt.Printf("Node %d (Acceptor): Accepted value '%s' for slot %d with proposal num %vn", n.ID, req.Value, req.SlotIndex, req.ProposalNum)

    return AcceptResponse{
        FromNode:          n.ID,
        SlotIndex:         req.SlotIndex,
        Err:               nil,
        AcceptedProposalNum: req.ProposalNum, // 返回自己接受的提议编号
    }
}

// handleHeartbeatRequest 处理心跳请求
func (n *Node) handleHeartbeatRequest(req HeartbeatRequest) {
    resp := n.doHandleHeartbeatRequest(req)
    n.simulateRPC(req.FromNode, resp)
}

// doHandleHeartbeatRequest 实际处理心跳请求
func (n *Node) doHandleHeartbeatRequest(req HeartbeatRequest) HeartbeatResponse {
    n.mu.Lock()
    defer n.mu.Unlock()

    // 如果Leader的Epoch比我低,拒绝
    if req.LeaderEpoch < n.currentEpoch {
        return HeartbeatResponse{
            FromNode:     n.ID,
            Err:          fmt.Errorf("Leader %d has lower epoch %d than current %d", req.FromNode, req.LeaderEpoch, n.currentEpoch),
            CurrentEpoch: n.currentEpoch,
            LastLogIndex: n.commitIndex,
        }
    }

    // 如果Leader的Epoch比我高,更新自己的Epoch,并确认其领导地位
    if req.LeaderEpoch > n.currentEpoch {
        n.currentEpoch = req.LeaderEpoch
        n.IsLeader = false // 我肯定不是Leader
        // 通知自己Leader变更
        n.leaderChangeC <- LeaderChange{NewLeaderID: req.FromNode, NewLeaderEpoch: req.LeaderEpoch}
    }

    // Leader的Epoch等于我的Epoch,确认其领导地位
    n.IsLeader = false // 如果我之前是Leader,现在发现真Leader,则降级

    // 同步日志
    for _, entry := range req.LogEntries {
        if existingEntry, ok := n.log[entry.Index]; ok {
            // 如果已有日志,且Leader的提议编号更高,则更新
            if entry.ProposalNum.Compare(existingEntry.ProposalNum) > 0 {
                n.log[entry.Index] = &entry
            }
        } else {
            n.log[entry.Index] = &entry
        }
        // 标记为已提交
        n.log[entry.Index].Committed = true
    }

    // 更新自己的commitIndex
    if req.CommitIndex > n.commitIndex {
        fmt.Printf("Node %d (Follower): Updated commit index to %d from Leader %dn", n.ID, req.CommitIndex, req.FromNode)
        n.commitIndex = req.CommitIndex
    }

    return HeartbeatResponse{
        FromNode:     n.ID,
        CurrentEpoch: n.currentEpoch,
        LastLogIndex: n.commitIndex,
        Err:          nil,
    }
}

Acceptor 关键逻辑:

  • promisedProposal 检查:所有 PrepareAccept 请求都会检查 ProposalNum 是否大于等于 promisedProposal。这是 Paxos 的核心安全性保障。
  • Epoch 更新:任何时候,如果 Acceptor 收到一个带更高 Epoch 的请求,它都会更新自己的 currentEpoch,并确保自己不是 Leader。这是处理 Leader 冲突和Leader 切换的关键。
  • 日志复制:在 handleAcceptRequest 中,Acceptor 会将接受的值写入其本地日志。在 handleHeartbeatRequest 中,Follower 会根据 Leader 发来的 LogEntries 同步自己的日志,并更新 commitIndex

3.4 模拟 RPC 通道改进

为了更好地模拟 RPC 响应,我们需要在 simulateRPC 函数中做一些调整,使其能处理带有响应通道的请求。

// simulateRPC 改进版:发送消息到目标节点的RPC通道,并处理带有响应通道的请求
func (n *Node) simulateRPC(targetID NodeID, msg interface{}) {
    if peerChan, ok := n.PeerRPCs[targetID]; ok {
        select {
        case peerChan <- msg:
            // 消息发送成功
        case <-time.After(10 * time.Millisecond): // 模拟网络延迟或拥堵
            //fmt.Printf("Node %d: RPC to %d timed out for message type %Tn", n.ID, targetID, msg)
        }
    } else {
        fmt.Printf("Node %d: Unknown peer %dn", n.ID, targetID)
    }
}

// rpcListener 负责从PeerRPCs通道接收消息并分发,现在需要处理包装的RPC请求
func (n *Node) rpcListener() {
    for {
        select {
        case <-n.quitC:
            return
        case msg := <-n.PeerRPCs[n.ID]:
            // 处理带有响应通道的RPC请求
            if reqWithResp, ok := msg.(struct {
                Request  interface{}
                Response chan interface{} // 泛型响应通道
            }); ok {
                // 在一个新的goroutine中处理请求,并将结果发回响应通道
                go func() {
                    var resp interface{}
                    switch r := reqWithResp.Request.(type) {
                    case PrepareRequest:
                        resp = n.doHandlePrepareRequest(r)
                    case AcceptRequest:
                        resp = n.doHandleAcceptRequest(r)
                    case HeartbeatRequest:
                        resp = n.doHandleHeartbeatRequest(r)
                    default:
                        fmt.Printf("Node %d: Unhandled RPC request type %Tn", n.ID, r)
                        resp = fmt.Errorf("Unhandled RPC type") // 错误响应
                    }
                    select {
                    case reqWithResp.Response <- resp:
                    case <-time.After(10 * time.Millisecond):
                        fmt.Printf("Node %d: Failed to send RPC response back to sender for type %Tn", n.ID, reqWithResp.Request)
                    }
                }()
            } else {
                // 处理不带响应通道的普通消息 (例如LeaderChange)
                n.handleIncomingMessage(msg)
            }
        }
    }
}

// callAcceptRPC 改进版,使用新的RPC模拟方式
func (n *Node) callAcceptRPC(targetID NodeID, req AcceptRequest) AcceptResponse {
    if targetID == n.ID {
        return n.doHandleAcceptRequest(req)
    }

    responseChan := make(chan AcceptResponse, 1)
    rpcReq := struct {
        Request AcceptRequest
        Response chan AcceptResponse
    }{
        Request: req,
        Response: responseChan,
    }

    n.simulateRPC(targetID, rpcReq)

    select {
    case resp := <-responseChan:
        return resp
    case <-time.After(50 * time.Millisecond):
        return AcceptResponse{FromNode: n.ID, SlotIndex: req.SlotIndex, Err: fmt.Errorf("RPC to %d timed out for AcceptRequest", targetID)}
    }
}

// callHeartbeatRPC 改进版
func (n *Node) callHeartbeatRPC(targetID NodeID, req HeartbeatRequest) HeartbeatResponse {
    if targetID == n.ID {
        return n.doHandleHeartbeatRequest(req)
    }

    responseChan := make(chan HeartbeatResponse, 1)
    rpcReq := struct {
        Request HeartbeatRequest
        Response chan HeartbeatResponse
    }{
        Request: req,
        Response: responseChan,
    }

    n.simulateRPC(targetID, rpcReq)

    select {
    case resp := <-responseChan:
        return resp
    case <-time.After(50 * time.Millisecond):
        return HeartbeatResponse{FromNode: n.ID, Err: fmt.Errorf("RPC to %d timed out for HeartbeatRequest", targetID)}
    }
}

现在,simulateRPC 可以发送一个包含实际请求和响应通道的匿名结构体。rpcListener 会识别这种结构体,在新 goroutine 中处理请求,并将结果通过提供的响应通道发送回去。这使得 RPC 模拟更加接近真实。

4. 复杂决议冲突的处理

在 Multi-Paxos 中,复杂的决议冲突主要围绕 Leader 切换、日志不一致和提议编号的竞争。Go 的并发模型提供了强大的工具来处理这些。

4.1 Leader 选举与任期号 (Epoch)

冲突场景:两个节点都认为自己是 Leader,或一个旧 Leader 仍然尝试提议。

Go 处理方式

  • 任期号 (Epoch):这是最核心的机制。ProposalNum 中包含 Epoch。所有 Paxos 消息(Prepare, Accept, Heartbeat)都携带 Epoch
  • Leader 降级
    • 当 Leader 收到来自任何 Acceptor 的响应,发现其中包含一个比自己当前 Epoch 更高的 ProposalNumcurrentEpoch,它就会立即自我降级 (n.IsLeader = false),并更新自己的 currentEpoch
    • Leader 的租约过期也会导致自我降级。
  • Follower 提升 Epoch:当 Follower 收到一个带更高 Epoch 的请求时,它会更新自己的 currentEpoch,并拒绝所有低于此 Epoch 的提议。
  • leaderChangeC 通道:我们用一个内部通道来模拟 Leader 变更通知。当节点发现(或被告知)存在一个更高 Epoch 的 Leader 时,它会向 leaderChangeC 发送消息,触发主循环中的 Leader 状态更新逻辑。这确保了所有节点对当前 Leader 及其 Epoch 的认知保持一致。
// 冲突处理的Go代码片段 (已散布在上面的代码中)
// Leader在runAcceptPhase中检测到更高ProposalNum时降级:
// if highestConflictProposalNum.Epoch > proposalNum.Epoch { ... n.IsLeader = false; n.currentEpoch = highestConflictProposalNum.Epoch ... }

// Follower在doHandlePrepareRequest/doHandleAcceptRequest/doHandleHeartbeatRequest中更新Epoch:
// if req.ProposalNum.Epoch > n.currentEpoch { n.currentEpoch = req.ProposalNum.Epoch; n.IsLeader = false; ... }
// if req.LeaderEpoch > n.currentEpoch { n.currentEpoch = req.LeaderEpoch; n.IsLeader = false; ... n.leaderChangeC <- ... }

Go 并发优势select 语句允许节点同时监听来自其他节点的消息和内部计时器(如租约),一旦发现冲突信号(如更高 Epoch),可以立即响应,通过通道进行状态同步,实现快速的 Leader 切换和冲突解决。

4.2 日志不一致与空洞 (Holes)

冲突场景:新 Leader 上任时,其日志可能不完整或与其他节点的日志存在差异。网络分区可能导致某些 Follower 错过 Leader 的提议,从而在日志中产生空洞。

Go 处理方式

  • *`log map[int]PaxosLogEntry**:使用map而不是slice来存储日志,可以自然地处理日志中的空洞,因为map` 的 key 是日志索引。
  • Leader 补齐日志
    • 新 Leader 上任时,它首先需要通过 Phase 1 (Prepare) 来发现所有 Acceptor 上已接受的最高 ProposalNumValue,从而重建其自身的日志视图。这是 Leader 发现其 commitIndex 和填补自身空洞的关键。
    • 在我们的简化版中,Leader 在 sendHeartbeats 时会尝试根据 nextIndex 同步日志。如果 Follower 落后,Leader 会发送缺失的日志条目。
    • 更完善的 Leader 补齐 (未完全实现,但思路如下):当 Leader 首次当选或发现 Follower 日志存在较大差异时,它会进行一个“日志检查”或“追赶”阶段。这可能涉及:
      1. Leader 向所有 Follower 发送 AppendEntriesCatchUp 请求,其中包含 Leader 自己的 commitIndex 和日志元数据。
      2. Follower 收到后,比较自身日志与 Leader 日志的差异。如果 Follower 发现自己有 Leader 没有的日志条目,或者两者在某个索引处的值不同,Follower 会告知 Leader。
      3. Leader 可能会为每个有空洞的槽位重新发起一个 Phase 1/Phase 2 的 Paxos 实例,以确保这些槽位最终达成共识。如果某个槽位已经有值被多数 Acceptor 接受,Leader 必须选择那个值;否则,Leader 可以提议一个 NoOp 值来填充空洞,以保证日志的连续性。
// 日志同步片段 (在sendHeartbeats中)
// for i := n.nextIndex[peerID]; i <= commitIndex; i++ {
//  if entry, ok := n.log[i]; ok {
//      entries = append(entries, *entry)
//  }
// }
// req := HeartbeatRequest{... LogEntries: logEntriesToSync[id], ...}

// Follower在handleHeartbeatRequest中处理日志同步:
// for _, entry := range req.LogEntries { ... n.log[entry.Index] = &entry ... }

Go 并发优势:Leader 可以为每个 Follower 启动独立的 goroutine 来进行日志同步(sendHeartbeats 内部就是这样做的),避免一个慢速 Follower 阻塞整个日志复制过程。通过 nextIndexmatchIndex 这样的状态,Leader 可以精确地知道每个 Follower 的同步进度。

4.3 并发提议竞争

冲突场景:在 Leader 切换期间,可能有两个 Leader 同时尝试在同一个日志槽位提议不同的值。

Go 处理方式

  • Paxos 协议保障:这是 Paxos 算法本身提供的核心保障。即使有多个 Proposer 同时活跃,只要它们使用递增的 ProposalNum,最终只有一个值能被多数 Acceptor 接受。
  • ProposalNumEpochCounterProposalNumEpoch 确保了新 Leader 的提议总是优先于旧 Leader。在同一个 Epoch 内,Counter 确保了 Leader 内部提议的顺序性。ProposerID 进一步打破平局。
  • Acceptor 的 promisedProposal:Acceptor 只会接受 ProposalNum 大于或等于其 promisedProposal 的提议。这确保了“活锁”的避免(即 Proposer 不断提高 ProposalNum 但从未成功)。
  • Leader 发现冲突并降级:如前所述,如果 Leader 的 Accept 请求被多数 Acceptor 拒绝,并且 Acceptor 返回了更高的 AcceptedProposalNum,Leader 会发现冲突并降级。
// Acceptor 在 doHandleAcceptRequest 中的核心逻辑
// if req.ProposalNum.Compare(n.promisedProposal) < 0 {
//     // 拒绝,因为它承诺了更高或同等优先级的提议
//     return AcceptResponse{... AcceptedProposalNum: n.promisedProposal ...}
// }
// // 接受,并更新承诺
// n.promisedProposal = req.ProposalNum
// n.log[req.SlotIndex] = ...

Go 并发优势sync.Mutex 保护了 Acceptor 状态的并发访问。多个并发的 AcceptRequest 都会通过锁串行化地访问 promisedProposallog,确保了 Paxos 规则的正确执行。

4.4 客户端请求的幂等性与重试

冲突场景:客户端发送请求,但 Leader 崩溃或网络分区导致客户端未收到响应。客户端可能重试,导致重复操作。

Go 处理方式

  • 客户端重试:客户端在未收到响应或收到错误时,会重试请求,可能发给不同的节点。
  • 幂等性:Multi-Paxos 本身通过日志槽位提供了某种程度的幂等性。如果一个值已经被写入某个日志槽位,重复的相同请求(如果它们能被映射到同一个逻辑操作)不应改变最终状态。在实际应用中,客户端通常会为每个操作生成一个唯一的请求 ID,Leader 会维护一个已处理请求的映射,以确保即使重复提交,也只执行一次。
  • Leader 转发:非 Leader 节点收到客户端请求时,应将其转发给当前的 Leader,或者告知客户端 Leader 的 ID。
// 客户端在Node.Run中的处理
// if !isLeader {
//  clientReq.ResponseC <- ClientResponse{Err: fmt.Errorf("Node %d is not leader", n.ID)}
// } else {
//  n.mu.Lock()
//  n.pendingProposals = append(n.pendingProposals, clientReq)
//  n.mu.Unlock()
//  go n.processPendingProposals()
// }

Go 并发优势:Go 的 chan 机制使得客户端可以很容易地通过 ClientProposal 中的 ResponseC 接收异步响应。客户端可以启动一个 goroutine 来发送请求并等待响应,同时设置超时。

5. 展望与总结

我们已经详细探讨了一个简化版 Multi-Paxos 在 Go 并发模型下的实现,并着重分析了如何处理 Leader 冲突、日志不一致和提议竞争等复杂决议冲突。通过 EpochProposalNum、Leader 租约、select 语句以及 sync.Mutex 等 Go 特性,我们构建了一个能够维持一致性和活性的分布式共识系统骨架。

本实现简化了持久化、全面的 Leader 选举、成员变更和快照等生产级特性。一个完整的 Multi-Paxos 或 Raft 实现需要更复杂的错误处理、存储管理和网络通信层。然而,这个简化版为我们理解 Multi-Paxos 的核心机制及其在 Go 并发环境下的工作原理,提供了一个坚实的基础。

通过 Go 的并发原语,我们可以清晰地模拟分布式系统的行为,直观地理解 Paxos 算法的精妙之处,并设计出健壮的分布式一致性解决方案。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注