尊敬的各位专家、同事们,大家好!
在当今瞬息万变的数字化时代,分布式系统已成为支撑现代应用不可或缺的基石。从云计算平台到大数据处理,从微服务架构到区块链技术,分布式系统的身影无处不在。然而,随着系统规模的扩大和复杂性的增加,如何确保这些系统的高可用性、数据一致性与故障容忍能力,成为了我们面临的核心挑战。今天,我将与大家深入探讨一个在分布式系统中至关重要的议题:如何利用C++和异步状态机,实现基于Paxos算法的高可靠选主逻辑,从而构建一个坚如磐石的分布式协同系统。
分布式系统的基石:为何需要选主?
我们先从分布式系统的基本概念和选主的需求说起。一个分布式系统由多台独立的计算机节点组成,它们通过网络协同工作,共同完成某个任务或提供服务。这种架构带来了诸多优势,如高可用性(单个节点故障不影响整体服务)、可伸缩性(通过增加节点扩展处理能力)和高性能(并行处理)。
然而,分布式系统也伴随着固有挑战:
- 并发性与一致性: 多个节点同时操作共享数据时,如何保证数据的一致性?
- 故障容忍: 节点可能随时崩溃、网络可能出现分区、消息可能丢失或延迟,系统如何在这种不确定性下保持正常运行?
- 协调与同步: 节点之间如何有效地协调行动,避免冲突和死锁?
在许多分布式场景中,为了简化系统设计、确保决策的唯一性和数据的一致性,我们需要一个“领导者”(Leader)角色。领导者负责协调所有从属节点(Follower)的行为,例如:
- 主备复制: 领导者负责接收所有写入请求,并将其复制到备份节点。
- 分布式锁: 领导者可以作为分布式锁服务的协调者,确保资源互斥访问。
- 任务调度: 领导者负责分配任务给工作节点。
- 配置管理: 领导者统一管理和分发系统配置。
选主机制的目标是:在任何时候,系统中只能有一个被公认的领导者,并且当当前领导者失效时,系统能够自动、快速、可靠地选举出新的领导者。如果选主机制出现问题,例如出现“脑裂”(Split-Brain,即多个节点都认为自己是领导者),将导致数据不一致、服务混乱甚至系统崩溃的灾难性后果。因此,一个健壮的选主算法是构建高可靠分布式系统的核心。
Paxos:分布式共识的基石
在众多分布式共识算法中,Paxos算法以其卓越的容错性和严谨的数学证明,被誉为分布式领域的“圣杯”。它由Leslie Lamport于1990年提出,旨在解决分布式系统在面对节点故障、网络延迟和消息丢失等不确定性时,如何就某个单一值达成一致的问题。
Paxos算法的核心思想是,通过Proposer(提议者)、Acceptor(接受者)和Learner(学习者)这三种角色的协作,即使在多数节点故障的情况下,也能保证最终只有一个值被“选定”(chosen)。在选主场景中,这个“值”就是当前集群中领导者的ID。
Paxos算法的角色
在Paxos算法中,节点可以扮演以下一个或多个角色:
| 角色 | 职责 |
|---|---|
| Proposer (提议者) | 提出一个值(在选主场景中是希望成为领导者的节点ID),并试图说服Acceptor接受这个值。Proposer必须生成一个唯一的提案编号,并在收到多数Acceptor的承诺后,才能将提案发送给Acceptor。 |
| Acceptor (接受者) | 接收Proposer的提案。Acceptor会根据提案编号和自身的内部状态(最高已承诺的提案编号和最高已接受的提案)来决定是否承诺或接受提案。一个Acceptor可以承诺接受多个提案,但只能接受一个值。 |
| Learner (学习者) | 从Acceptor那里学习被选定的值。Learner本身不参与提案过程,只负责接收并记录最终的共识结果。在选主场景中,所有节点最终都应知道谁是当前的领导者。 |
在一个实际的分布式系统中,一个物理节点通常会同时扮演Proposer、Acceptor和Learner的角色。当它尝试成为领导者时,它就是Proposer;当它响应其他节点的提案时,它就是Acceptor;当它得知最终的领导者是谁时,它就是Learner。
Paxos算法的两个阶段
Paxos算法的核心是两个阶段:准备阶段(Phase 1) 和 接受阶段(Phase 2)。
阶段1:准备阶段 (Prepare Phase)
目标:Proposer尝试获取Acceptor的“承诺”,以便后续提交自己的提案。
-
1a. Prepare 请求:
- Proposer选择一个唯一的提案编号
n(这个编号必须是单调递增的,通常由一个递增的计数器和Proposer的ID组合而成,以确保全局唯一性)。 - Proposer向所有(或多数)Acceptor发送一个
Prepare(n)请求。
- Proposer选择一个唯一的提案编号
-
1b. Promise 响应:
- Acceptor收到
Prepare(n)请求后,会检查n是否大于它之前已经承诺过的任何提案编号。 - 如果
n小于等于 Acceptor 已经承诺的最高提案编号,Acceptor 会忽略这个请求或者返回一个拒绝响应。 - 如果
n大于 Acceptor 已经承诺的最高提案编号:- Acceptor 记录下
n作为它承诺的最高提案编号,并“承诺”不再接受任何小于n的提案。 - Acceptor 回复 Proposer 一个
Promise(n, accepted_value, accepted_proposal_number)响应。如果 Acceptor 之前已经接受过某个值accepted_value及其提案编号accepted_proposal_number,它会一并告知Proposer。否则,accepted_value为空。
- Acceptor 记录下
- Acceptor收到
阶段2:接受阶段 (Accept Phase)
目标:Proposer在获得多数Acceptor的承诺后,尝试让它们接受一个特定的值。
-
2a. Accept 请求:
- Proposer在收到来自多数(
N/2 + 1)Acceptor的Promise响应后,进入此阶段。 - Proposer需要决定要提议的值
v:- 如果在所有收到的
Promise响应中,有任何一个Acceptor报告它已经接受过一个值,Proposer必须选择其中提案编号最大的那个值作为v。 - 如果所有收到的
Promise响应都表示它们从未接受过任何值,Proposer可以自由选择一个新值v(在选主场景中,这个v就是Proposer自身的ID)。
- 如果在所有收到的
- Proposer向所有(或多数)Acceptor发送一个
Accept(n, v)请求,其中n是之前Prepare请求中的提案编号,v是决定好的值。
- Proposer在收到来自多数(
-
2b. Accepted 响应:
- Acceptor收到
Accept(n, v)请求后,会检查n是否大于或等于它当前已承诺的最高提案编号。 - 如果
n小于它已承诺的最高提案编号,Acceptor 会拒绝这个请求。 - 如果
n大于或等于它已承诺的最高提案编号:- Acceptor 接受这个提案,记录下
n和v作为它已接受的提案和值。 - Acceptor 回复 Proposer 一个
Accepted(n, v)响应。同时,Acceptor 可以将这个(n, v)通知给所有Learner。
- Acceptor 接受这个提案,记录下
- Acceptor收到
Paxos的安全性与活性
- 安全性 (Safety): Paxos保证最终只有一个值被选定。一旦一个值被选定,它将永远不会改变。这是通过严格的提案编号比较和多数派机制来实现的。
- 活性 (Liveness): Paxos在非拜占庭故障(即节点不会恶意行为,只会崩溃或网络延迟)和多数节点可用的情况下,最终能够选定一个值。然而,在某些极端情况下(例如多个Proposer同时发起提案且每次都未能获得多数),可能会出现“活锁”,即系统不断地进行选举,但没有一个提案能最终被接受。实际实现中需要加入随机退避和超时机制来缓解。
在选主场景中,Paxos的强大之处在于,即使在复杂的分布式环境下,也能保证最终所有节点都能一致地识别出唯一的领导者。
C++与异步状态机实现高可靠选主逻辑
现在,我们将Paxos算法的理论应用于实践,探讨如何使用C++和异步状态机来实现一个高可靠的选主模块。C++以其高性能、精细的内存控制和丰富的并发原语,是构建底层分布式服务理想的选择。异步状态机模式则能优雅地处理网络通信的非确定性、超时机制以及复杂的协议状态转换。
异步状态机设计理念
一个分布式节点在进行选主时,其内部状态会根据接收到的消息、定时器事件以及自身发起的行为而不断变化。异步状态机(Asynchronous State Machine, ASM)是管理这种复杂行为的有效模式。
ASM的优势:
- 清晰的逻辑: 将系统的行为分解为明确定义的状态和状态转换,易于理解和维护。
- 非阻塞: 避免使用阻塞I/O,通过事件驱动的方式处理并发操作,提高系统吞吐量。
- 故障容忍: 能够更好地管理超时和重试逻辑,应对网络不可靠性。
我们将为每个参与选主的节点设计一个状态机。
核心组件与数据结构
-
NodeId: 节点的唯一标识符,通常是整数或字符串。 -
ProposalNumber: 提案编号,由(epoch_timestamp, node_id)组成,保证全局唯一性和单调递增。epoch_timestamp可以是当前时间戳,node_id用于在同一时间戳下区分不同Proposer的提案。 -
消息结构: 定义用于节点间通信的消息类型。
#include <string> #include <vector> #include <chrono> #include <map> #include <mutex> #include <thread> #include <condition_variable> #include <iostream> #include <random> #include <set> #include <optional> // 节点ID using NodeId = int; // 提案编号,由时间戳和节点ID组成,保证全局唯一且单调递增 struct ProposalNumber { long long timestamp; NodeId proposer_id; bool operator<(const ProposalNumber& other) const { if (timestamp != other.timestamp) { return timestamp < other.timestamp; } return proposer_id < other.proposer_id; } bool operator==(const ProposalNumber& other) const { return timestamp == other.timestamp && proposer_id == other.proposer_id; } bool operator<=(const ProposalNumber& other) const { return *this < other || *this == other; } bool operator>(const ProposalNumber& other) const { return !(*this <= other); } bool operator>=(const ProposalNumber& other) const { return !(*this < other); } }; // 为std::map和std::set使用ProposalNumber作为键提供hash支持 namespace std { template <> struct hash<ProposalNumber> { size_t operator()(const ProposalNumber& p) const { return hash<long long>()(p.timestamp) ^ (hash<NodeId>()(p.proposer_id) << 1); } }; } // Paxos消息类型 enum class MessageType { PREPARE, PROMISE, ACCEPT, ACCEPTED, HEARTBEAT // 用于领导者维持心跳 }; // 所有消息的基类 struct BaseMessage { MessageType type; NodeId sender_id; NodeId receiver_id; // 接收者ID,如果是广播则可能为-1或特殊值 BaseMessage(MessageType t, NodeId s, NodeId r) : type(t), sender_id(s), receiver_id(r) {} virtual ~BaseMessage() = default; }; // Prepare消息 struct PrepareMessage : public BaseMessage { ProposalNumber proposal_num; PrepareMessage(NodeId s, NodeId r, ProposalNumber pn) : BaseMessage(MessageType::PREPARE, s, r), proposal_num(pn) {} }; // Promise消息 struct PromiseMessage : public BaseMessage { ProposalNumber proposal_num; // 承诺的提案编号 std::optional<ProposalNumber> accepted_proposal_num; // 如果Acceptor之前接受过值,则为该值的提案编号 std::optional<NodeId> accepted_value; // 如果Acceptor之前接受过值,则为该值 (leader ID) PromiseMessage(NodeId s, NodeId r, ProposalNumber pn, std::optional<ProposalNumber> apn = std::nullopt, std::optional<NodeId> av = std::nullopt) : BaseMessage(MessageType::PROMISE, s, r), proposal_num(pn), accepted_proposal_num(apn), accepted_value(av) {} }; // Accept消息 struct AcceptMessage : public BaseMessage { ProposalNumber proposal_num; // 提议的提案编号 NodeId value; // 提议的值 (leader ID) AcceptMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v) : BaseMessage(MessageType::ACCEPT, s, r), proposal_num(pn), value(v) {} }; // Accepted消息 struct AcceptedMessage : public BaseMessage { ProposalNumber proposal_num; // 已接受的提案编号 NodeId value; // 已接受的值 (leader ID) AcceptedMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v) : BaseMessage(MessageType::ACCEPTED, s, r), proposal_num(pn), value(v) {} }; // Heartbeat消息 struct HeartbeatMessage : public BaseMessage { NodeId leader_id; // 当前领导者ID long long term; // 领导者任期,用于区分过期的心跳 HeartbeatMessage(NodeId s, NodeId r, NodeId lid, long long t) : BaseMessage(MessageType::HEARTBEAT, s, r), leader_id(lid), term(t) {} }; // 抽象网络接口,用于发送和接收消息 class NetworkInterface { public: virtual ~NetworkInterface() = default; virtual void send_message(NodeId receiver_id, std::unique_ptr<BaseMessage> msg) = 0; virtual void broadcast_message(std::unique_ptr<BaseMessage> msg, const std::set<NodeId>& exclude_nodes) = 0; // 模拟消息接收,实际中通常通过回调或队列实现 virtual std::unique_ptr<BaseMessage> receive_message() = 0; }; -
PaxosNode状态: 节点可能处于的几种状态。enum class PaxosNodeState { FOLLOWER, // 跟随者,等待领导者心跳或发起选举 CANDIDATE, // 候选者,正在尝试成为领导者 (Proposer 角色) LEADER // 领导者,负责发送心跳和协调 }; -
PaxosNode类: 封装一个Paxos节点的全部逻辑。class PaxosNode { private: NodeId my_id; std::set<NodeId> peer_ids; // 其他节点的ID std::unique_ptr<NetworkInterface> network_interface; PaxosNodeState current_state; NodeId current_leader_id; // 当前已知的领导者ID long long current_term; // 当前领导者任期 // Proposer 状态 ProposalNumber current_proposal_num; // 当前Proposer使用的提案编号 std::map<NodeId, PromiseMessage> received_promises; // 存储收到的Promise响应 int promise_count; // 收到承诺的数量 // Acceptor 状态 ProposalNumber highest_promised_num; // 接受者已承诺的最高提案编号 std::optional<ProposalNumber> accepted_proposal_num; // 接受者已接受的提案编号 std::optional<NodeId> accepted_value; // 接受者已接受的值 (leader ID) // Learner 状态 NodeId decided_leader_id; // 最终确认的领导者ID // 选举超时计时器 std::chrono::steady_clock::time_point last_heartbeat_time; std::chrono::milliseconds election_timeout_min; std::chrono::milliseconds election_timeout_max; std::chrono::milliseconds heartbeat_interval; std::unique_ptr<std::thread> timer_thread; std::unique_ptr<std::thread> receiver_thread; std::mutex mtx; std::condition_variable cv_timer; std::condition_variable cv_message; bool running; std::queue<std::unique_ptr<BaseMessage>> message_queue; // 异步消息队列 std::default_random_engine random_engine; void generate_new_proposal_number() { current_proposal_num = {std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch()).count(), my_id}; std::cout << "Node " << my_id << " generated new proposal number: " << current_proposal_num.timestamp << "." << current_proposal_num.proposer_id << std::endl; } void reset_election_timeout() { last_heartbeat_time = std::chrono::steady_clock::now(); // 在FOLLOWER状态下,随机化超时,避免活锁 if (current_state == PaxosNodeState::FOLLOWER) { std::uniform_int_distribution<int> dist(election_timeout_min.count(), election_timeout_max.count()); election_timeout_min = std::chrono::milliseconds(dist(random_engine)); // 更新下次超时 } std::cout << "Node " << my_id << " reset election timeout, next timeout in " << election_timeout_min.count() << "ms." << std::endl; } void timer_loop() { while (running) { std::unique_lock<std::mutex> lock(mtx); auto now = std::chrono::steady_clock::now(); std::chrono::milliseconds time_since_last_heartbeat = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_heartbeat_time); if (current_state == PaxosNodeState::LEADER) { // 领导者定期发送心跳 if (time_since_last_heartbeat >= heartbeat_interval) { lock.unlock(); // 释放锁,防止发送消息时阻塞其他操作 send_heartbeat(); lock.lock(); // 重新获取锁 last_heartbeat_time = std::chrono::steady_clock::now(); // 重置心跳计时 } } else if (current_state == PaxosNodeState::FOLLOWER || current_state == PaxosNodeState::CANDIDATE) { // 跟随者和候选者等待领导者心跳或进行选举 if (time_since_last_heartbeat >= election_timeout_min) { std::cout << "Node " << my_id << " election timeout! Current state: " << (current_state == PaxosNodeState::FOLLOWER ? "FOLLOWER" : "CANDIDATE") << ". Starting new election." << std::endl; start_election(); reset_election_timeout(); // 每次超时后重置计时器 } } cv_timer.wait_for(lock, std::chrono::milliseconds(100)); // 每100ms检查一次 } } void receiver_loop() { while (running) { std::unique_ptr<BaseMessage> msg = network_interface->receive_message(); if (msg) { std::unique_lock<std::mutex> lock(mtx); message_queue.push(std::move(msg)); cv_message.notify_one(); // 通知主线程有新消息 } // 避免忙等,适当休眠 std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } void process_message_queue() { while (running) { std::unique_lock<std::mutex> lock(mtx); cv_message.wait(lock, [this]{ return !message_queue.empty() || !running; }); if (!running) break; while (!message_queue.empty()) { std::unique_ptr<BaseMessage> msg = std::move(message_queue.front()); message_queue.pop(); lock.unlock(); // 处理消息时释放锁,允许其他线程访问共享资源 handle_message(std::move(msg)); lock.lock(); // 处理完消息后重新获取锁 } } } public: PaxosNode(NodeId id, const std::set<NodeId>& peers, std::unique_ptr<NetworkInterface> net_if) : my_id(id), peer_ids(peers), network_interface(std::move(net_if)), current_state(PaxosNodeState::FOLLOWER), current_leader_id(0), // 初始未知 current_term(0), promise_count(0), highest_promised_num({0, 0}), running(false), election_timeout_min(150), // 默认最小选举超时 election_timeout_max(300), // 默认最大选举超时 heartbeat_interval(50), random_engine(std::chrono::system_clock::now().time_since_epoch().count() + id) { reset_election_timeout(); } ~PaxosNode() { stop(); } void start() { if (running) return; running = true; timer_thread = std::make_unique<std::thread>(&PaxosNode::timer_loop, this); receiver_thread = std::make_unique<std::thread>(&PaxosNode::receiver_loop, this); // 主线程也可以处理消息,这里用一个单独的线程模拟主循环处理消息 // 或者可以合并到timer_loop或main函数中 // std::thread msg_processor_thread(&PaxosNode::process_message_queue, this); // msg_processor_thread.detach(); // 模拟后台处理 std::cout << "Node " << my_id << " started." << std::endl; } void stop() { if (!running) return; running = false; cv_timer.notify_all(); cv_message.notify_all(); if (timer_thread && timer_thread->joinable()) { timer_thread->join(); } if (receiver_thread && receiver_thread->joinable()) { receiver_thread->join(); } std::cout << "Node " << my_id << " stopped." << std::endl; } // 处理接收到的消息 void handle_message(std::unique_ptr<BaseMessage> msg) { std::unique_lock<std::mutex> lock(mtx); // 保护内部状态 std::cout << "Node " << my_id << " received message from " << msg->sender_id << " of type " << static_cast<int>(msg->type) << std::endl; switch (msg->type) { case MessageType::PREPARE: handle_prepare(static_cast<PrepareMessage&>(*msg)); break; case MessageType::PROMISE: handle_promise(static_cast<PromiseMessage&>(*msg)); break; case MessageType::ACCEPT: handle_accept(static_cast<AcceptMessage&>(*msg)); break; case MessageType::ACCEPTED: handle_accepted(static_cast<AcceptedMessage&>(*msg)); break; case MessageType::HEARTBEAT: handle_heartbeat(static_cast<HeartbeatMessage&>(*msg)); break; default: std::cerr << "Node " << my_id << " received unknown message type." << std::endl; } } private: // Paxos Proposer/Acceptor/Learner 逻辑实现 void start_election() { std::lock_guard<std::mutex> lock(mtx); current_state = PaxosNodeState::CANDIDATE; current_term++; // 每次选举开始,增加任期 generate_new_proposal_number(); // 生成新的提案编号 received_promises.clear(); promise_count = 0; std::cout << "Node " << my_id << " is starting an election (term " << current_term << ") with proposal " << current_proposal_num.timestamp << "." << current_proposal_num.proposer_id << std::endl; // Phase 1a: Send Prepare messages for (NodeId peer_id : peer_ids) { if (peer_id == my_id) continue; network_interface->send_message(peer_id, std::make_unique<PrepareMessage>(my_id, peer_id, current_proposal_num)); } } void handle_prepare(const PrepareMessage& msg) { std::lock_guard<std::mutex> lock(mtx); // 如果收到更高任期或更高提案编号的Prepare,则退回FOLLOWER if (msg.proposal_num > highest_promised_num) { highest_promised_num = msg.proposal_num; // 如果当前是领导者,但收到更高提案的Prepare,也需要退位 if (current_state == PaxosNodeState::LEADER) { std::cout << "Node " << my_id << " (LEADER) received higher PREPARE from " << msg.sender_id << ", stepping down to FOLLOWER." << std::endl; current_state = PaxosNodeState::FOLLOWER; } // 如果是候选者,但收到更高提案的Prepare,也需要退回FOLLOWER else if (current_state == PaxosNodeState::CANDIDATE && msg.proposal_num > current_proposal_num) { std::cout << "Node " << my_id << " (CANDIDATE) received higher PREPARE from " << msg.sender_id << ", stepping down to FOLLOWER." << std::endl; current_state = PaxosNodeState::FOLLOWER; } reset_election_timeout(); // 收到有效Prepare,重置超时 // Phase 1b: Send Promise network_interface->send_message(msg.sender_id, std::make_unique<PromiseMessage>(my_id, msg.sender_id, msg.proposal_num, accepted_proposal_num, accepted_value)); } else { std::cout << "Node " << my_id << " rejected PREPARE from " << msg.sender_id << " (higher_promised_num: " << highest_promised_num.timestamp << "." << highest_promised_num.proposer_id << " vs received: " << msg.proposal_num.timestamp << "." << msg.proposal_num.proposer_id << ")" << std::endl; // 可以选择发送拒绝消息,或直接忽略 } } void handle_promise(const PromiseMessage& msg) { std::lock_guard<std::mutex> lock(mtx); if (current_state != PaxosNodeState::CANDIDATE || !(msg.proposal_num == current_proposal_num)) { std::cout << "Node " << my_id << " ignored PROMISE from " << msg.sender_id << " (not CANDIDATE or proposal mismatch)." << std::endl; return; // 忽略过期的Promise或非当前选举的Promise } // 只处理相同提案编号的Promise if (received_promises.find(msg.sender_id) == received_promises.end()) { received_promises[msg.sender_id] = msg; promise_count++; std::cout << "Node " << my_id << " received PROMISE from " << msg.sender_id << ". Total promises: " << promise_count << std::endl; // 检查是否获得多数承诺 if (promise_count > peer_ids.size() / 2) { std::cout << "Node " << my_id << " received majority promises (" << promise_count << "/" << peer_ids.size() << ")!" << std::endl; // Phase 2a: Send Accept message NodeId value_to_propose = my_id; // 默认提议自己作为领导者 std::optional<ProposalNumber> max_accepted_proposal_num = std::nullopt; // 找出所有Promise中,提案编号最高的已接受值 for (const auto& pair : received_promises) { if (pair.second.accepted_proposal_num && pair.second.accepted_value) { if (!max_accepted_proposal_num || *pair.second.accepted_proposal_num > *max_accepted_proposal_num) { max_accepted_proposal_num = pair.second.accepted_proposal_num; value_to_propose = *pair.second.accepted_value; } } } // 发送Accept消息 std::cout << "Node " << my_id << " proposing value: " << value_to_propose << std::endl; for (NodeId peer_id : peer_ids) { if (peer_id == my_id) continue; network_interface->send_message(peer_id, std::make_unique<AcceptMessage>(my_id, peer_id, current_proposal_num, value_to_propose)); } // 自身也接受这个值 accepted_proposal_num = current_proposal_num; accepted_value = value_to_propose; decided_leader_id = value_to_propose; // 暂时认为自己已经决定 current_leader_id = value_to_propose; // 更新当前领导者 } } } void handle_accept(const AcceptMessage& msg) { std::lock_guard<std::mutex> lock(mtx); // Acceptor 逻辑:如果提案编号大于或等于已承诺的最高提案编号 if (msg.proposal_num >= highest_promised_num) { highest_promised_num = msg.proposal_num; // 更新承诺 accepted_proposal_num = msg.proposal_num; // 接受此提案 accepted_value = msg.value; // 接受此值 std::cout << "Node " << my_id << " accepted ACCEPT from " << msg.sender_id << " for proposal " << msg.proposal_num.timestamp << "." << msg.proposal_num.proposer_id << " with value " << msg.value << std::endl; reset_election_timeout(); // 收到有效Accept,重置超时 // Phase 2b: Send Accepted network_interface->send_message(msg.sender_id, std::make_unique<AcceptedMessage>(my_id, msg.sender_id, msg.proposal_num, msg.value)); // 同时,将这个接受结果广播给所有Learner (包括自己) // 实际中可能由Proposer在收到多数Accepted后广播,这里简化为Acceptor广播 network_interface->broadcast_message(std::make_unique<AcceptedMessage>(my_id, -1, msg.proposal_num, msg.value), {my_id}); // 排除自己,因为自己已经处理 decided_leader_id = msg.value; // 学习到新的领导者 current_leader_id = msg.value; // 更新当前领导者 } else { std::cout << "Node " << my_id << " rejected ACCEPT from " << msg.sender_id << " (higher_promised_num: " << highest_promised_num.timestamp << "." << highest_promised_num.proposer_id << " vs received: " << msg.proposal_num.timestamp << "." << msg.proposal_num.proposer_id << ")" << std::endl; } } void handle_accepted(const AcceptedMessage& msg) { std::lock_guard<std::mutex> lock(mtx); // Learner 逻辑:学习被接受的值 if (decided_leader_id != msg.value) { // 如果是新的决定 decided_leader_id = msg.value; current_leader_id = msg.value; // 更新当前领导者 std::cout << "Node " << my_id << " learned leader is: " << decided_leader_id << std::endl; // 如果自己是被选定的领导者,则转换状态 if (decided_leader_id == my_id && current_state != PaxosNodeState::LEADER) { std::cout << "Node " << my_id << " elected as LEADER!" << std::endl; current_state = PaxosNodeState::LEADER; last_heartbeat_time = std::chrono::steady_clock::now(); // 立即发送心跳 } else if (decided_leader_id != my_id && current_state != PaxosNodeState::FOLLOWER) { std::cout << "Node " << my_id << " is now FOLLOWER to leader " << decided_leader_id << std::endl; current_state = PaxosNodeState::FOLLOWER; reset_election_timeout(); // 学习到领导者后重置选举超时 } } } void handle_heartbeat(const HeartbeatMessage& msg) { std::lock_guard<std::mutex> lock(mtx); // 如果心跳的任期更高,或者心跳来自当前领导者 if (msg.term >= current_term || (msg.leader_id == current_leader_id && current_leader_id != 0)) { if (msg.term > current_term) { current_term = msg.term; } current_leader_id = msg.leader_id; decided_leader_id = msg.leader_id; // 学习到新的领导者 current_state = PaxosNodeState::FOLLOWER; // 收到心跳,退回跟随者 reset_election_timeout(); // 重置选举超时 // std::cout << "Node " << my_id << " received heartbeat from leader " << msg.leader_id // << " (term " << msg.term << "), resetting timeout." << std::endl; } else { // 收到过期心跳,忽略 // std::cout << "Node " << my_id << " ignored old heartbeat from " << msg.sender_id // << " (term " << msg.term << " vs current " << current_term << ")." << std::endl; } } void send_heartbeat() { std::lock_guard<std::mutex> lock(mtx); if (current_state == PaxosNodeState::LEADER) { // std::cout << "Node " << my_id << " (LEADER) sending heartbeat (term " << current_term << ")." << std::endl; network_interface->broadcast_message(std::make_unique<HeartbeatMessage>(my_id, -1, my_id, current_term), {my_id}); } } };
异步网络与消息处理
在实际系统中,NetworkInterface 会是一个复杂的组件,可能基于TCP/UDP、gRPC、ZeroMQ等实现。为了简化示例,这里我们模拟一个简单的网络接口,通过队列进行消息传递。
// 模拟网络环境,所有节点共享一个消息队列
class MockNetworkInterface : public NetworkInterface {
private:
NodeId my_id;
std::map<NodeId, std::queue<std::unique_ptr<BaseMessage>>>* global_message_queues;
std::mutex* global_queue_mtx;
std::condition_variable* global_queue_cv;
public:
MockNetworkInterface(NodeId id, std::map<NodeId, std::queue<std::unique_ptr<BaseMessage>>>* queues,
std::mutex* mtx, std::condition_variable* cv)
: my_id(id), global_message_queues(queues), global_queue_mtx(mtx), global_queue_cv(cv) {}
void send_message(NodeId receiver_id, std::unique_ptr<BaseMessage> msg) override {
// 模拟消息延迟
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 20 + 10)); // 10-30ms延迟
std::unique_lock<std::mutex> lock(*global_queue_mtx);
if (global_message_queues->count(receiver_id)) {
msg->receiver_id = receiver_id; // 确保接收者ID正确
global_message_queues->at(receiver_id).push(std::move(msg));
global_queue_cv->notify_all(); // 通知所有节点,可能有新消息
} else {
std::cerr << "MockNetwork: Receiver " << receiver_id << " not found for message from " << my_id << std::endl;
}
}
void broadcast_message(std::unique_ptr<BaseMessage> msg, const std::set<NodeId>& exclude_nodes) override {
// 模拟消息延迟
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 20 + 10)); // 10-30ms延迟
std::unique_lock<std::mutex> lock(*global_queue_mtx);
for (auto const& [node_id, queue] : *global_message_queues) {
if (node_id == my_id || exclude_nodes.count(node_id)) continue;
// 复制消息发送给每个接收者
std::unique_ptr<BaseMessage> msg_copy;
// 根据消息类型创建副本
if (msg->type == MessageType::HEARTBEAT) {
const HeartbeatMessage& hb_msg = static_cast<const HeartbeatMessage&>(*msg);
msg_copy = std::make_unique<HeartbeatMessage>(hb_msg.sender_id, node_id, hb_msg.leader_id, hb_msg.term);
} else if (msg->type == MessageType::ACCEPTED) {
const AcceptedMessage& acc_msg = static_cast<const AcceptedMessage&>(*msg);
msg_copy = std::make_unique<AcceptedMessage>(acc_msg.sender_id, node_id, acc_msg.proposal_num, acc_msg.value);
}
// ... 其他消息类型也需要复制
if (msg_copy) {
global_message_queues->at(node_id).push(std::move(msg_copy));
}
}
global_queue_cv->notify_all();
}
std::unique_ptr<BaseMessage> receive_message() override {
std::unique_lock<std::mutex> lock(*global_queue_mtx);
global_queue_cv->wait(lock, [this]{
return global_message_queues->count(my_id) && !global_message_queues->at(my_id).empty();
});
if (global_message_queues->count(my_id) && !global_message_queues->at(my_id).empty()) {
std::unique_ptr<BaseMessage> msg = std::move(global_message_queues->at(my_id).front());
global_message_queues->at(my_id).pop();
return msg;
}
return nullptr;
}
};
// Main函数用于模拟集群运行
int main() {
std::srand(std::time(0)); // 初始化随机数种子
const int num_nodes = 5;
std::set<NodeId> all_node_ids;
for (int i = 1; i <= num_nodes; ++i) {
all_node_ids.insert(i);
}
std::map<NodeId, std::queue<std::unique_ptr<BaseMessage>>> global_message_queues;
std::mutex global_queue_mtx;
std::condition_variable global_queue_cv;
std::vector<std::unique_ptr<PaxosNode>> nodes;
std::vector<std::thread> node_message_processing_threads;
for (int i = 1; i <= num_nodes; ++i) {
global_message_queues[i] = std::queue<std::unique_ptr<BaseMessage>>(); // 为每个节点创建消息队列
std::unique_ptr<MockNetworkInterface> net_if = std::make_unique<MockNetworkInterface>(
i, &global_message_queues, &global_queue_mtx, &global_queue_cv);
nodes.push_back(std::make_unique<PaxosNode>(i, all_node_ids, std::move(net_if)));
}
for (auto& node : nodes) {
node->start();
// 每个节点启动一个线程来处理其消息队列
node_message_processing_threads.emplace_back([&node]{
while(true) { // 持续处理消息
std::unique_ptr<BaseMessage> msg = node->network_interface->receive_message();
if (msg) {
node->handle_message(std::move(msg));
}
// std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 避免忙等
}
});
}
// 运行一段时间,观察选主过程
std::this_thread::sleep_for(std::chrono::seconds(10));
// 模拟节点故障 (例如,节点3崩溃)
std::cout << "n--- Simulating Node 3 failure ---n" << std::endl;
nodes[2]->stop(); // 节点ID为3的节点在vector中索引为2
node_message_processing_threads[2].join(); // 等待其消息处理线程结束
std::this_thread::sleep_for(std::chrono::seconds(10)); // 观察新一轮选举
std::cout << "n--- Simulating Node 1 failure ---n" << std::endl;
nodes[0]->stop();
node_message_processing_threads[0].join();
std::this_thread::sleep_for(std::chrono::seconds(10)); // 观察在仅剩多数节点时的选举
// 停止所有节点
for (size_t i = 0; i < nodes.size(); ++i) {
if (nodes[i]) { // 确保节点仍在运行
nodes[i]->stop();
if (node_message_processing_threads[i].joinable()) {
node_message_processing_threads[i].join();
}
}
}
return 0;
}
代码解析:
ProposalNumber结构体: 使用long long timestamp和NodeId proposer_id组合确保提案编号的全局唯一性和单调递增性。时间戳保证了新提案的编号通常更大,proposer_id解决了同一时间戳下多个Proposer的问题。BaseMessage及其派生类: 定义了不同Paxos阶段所需的消息结构,便于序列化和反序列化(在实际网络中)。NetworkInterface抽象类和MockNetworkInterface: 模拟了网络通信,包括消息发送、广播和接收,并加入了随机延迟以模拟真实网络环境。global_message_queues模拟了所有节点的信箱。PaxosNode类:- 状态管理:
current_state(FOLLOWER, CANDIDATE, LEADER) 驱动着节点的行为。 - Proposer 状态:
current_proposal_num,received_promises,promise_count用于跟踪选举进程。 - Acceptor 状态:
highest_promised_num,accepted_proposal_num,accepted_value维护了Acceptor的承诺和接受记录。 - Learner 状态:
decided_leader_id存储了当前学习到的领导者。 - 异步机制:
timer_thread:负责管理心跳发送和选举超时检测。receiver_thread:专门负责从网络接口接收消息,并将其放入内部队列。message_queue:用于缓冲接收到的消息,通过cv_message通知主逻辑线程处理。handle_message方法是消息处理的入口,根据消息类型分发到具体的处理函数。
- Paxos逻辑:
start_election,handle_prepare,handle_promise,handle_accept,handle_accepted,handle_heartbeat等方法严格按照Paxos算法的两个阶段和角色职责实现。 - 超时与随机化:
reset_election_timeout在FOLLOWER状态下使用随机超时,以减少活锁的概率。 - 线程安全: 使用
std::mutex保护所有共享状态,确保并发访问的正确性。
- 状态管理:
状态转换示例
| 当前状态 | 事件/消息 | 条件 “`cpp
include
#include <vector>
#include <string>
#include <map>
#include <set>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <optional>
#include <random>
#include <algorithm> // For std::max and std::min
// 节点ID
using NodeId = int;
// 提案编号,由时间戳和节点ID组成,保证全局唯一且单调递增
// 使用 std::chrono::system_clock::now().time_since_epoch().count() 获取毫秒级时间戳
struct ProposalNumber {
long long timestamp;
NodeId proposer_id;
// 重载比较运算符以支持排序和比较
bool operator<(const ProposalNumber& other) const {
if (timestamp != other.timestamp) {
return timestamp < other.timestamp;
}
return proposer_id < other.proposer_id;
}
bool operator==(const ProposalNumber& other) const {
return timestamp == other.timestamp && proposer_id == other.proposer_id;
}
bool operator<=(const ProposalNumber& other) const {
return *this < other || *this == other;
}
bool operator>(const ProposalNumber& other) const {
return !(*this <= other);
}
bool operator>=(const ProposalNumber& other) const {
return !(*this < other);
}
};
// 为 std::map 和 std::set 使用 ProposalNumber 作为键提供 hash 支持 (可选,但推荐)
namespace std {
template <> struct hash<ProposalNumber> {
size_t operator()(const ProposalNumber& p) const {
// 简单的组合哈希,确保不同 ProposalNumber 有不同哈希值
return hash<long long>()(p.timestamp) ^ (hash<NodeId>()(p.proposer_id) << 1);
}
};
}
// Paxos 消息类型
enum class MessageType {
PREPARE,
PROMISE,
ACCEPT,
ACCEPTED,
HEARTBEAT // 用于领导者维持心跳
};
// 所有消息的基类
struct BaseMessage {
MessageType type;
NodeId sender_id;
NodeId receiver_id; // 接收者ID,如果是广播则可能为-1或特殊值
BaseMessage(MessageType t, NodeId s, NodeId r) : type(t), sender_id(s), receiver_id(r) {}
virtual ~BaseMessage() = default; // 虚析构函数,确保正确释放派生类资源
virtual std::unique_ptr<BaseMessage> clone() const = 0; // 用于消息复制
};
// Prepare 消息
struct PrepareMessage : public BaseMessage {
ProposalNumber proposal_num;
PrepareMessage(NodeId s, NodeId r, ProposalNumber pn)
: BaseMessage(MessageType::PREPARE, s, r), proposal_num(pn) {}
std::unique_ptr<BaseMessage> clone() const override {
return std::make_unique<PrepareMessage>(*this);
}
};
// Promise 消息
struct PromiseMessage : public BaseMessage {
ProposalNumber proposal_num; // 承诺的提案编号
std::optional<ProposalNumber> accepted_proposal_num; // 如果 Acceptor 之前接受过值,则为该值的提案编号
std::optional<NodeId> accepted_value; // 如果 Acceptor 之前接受过值,则为该值 (leader ID)
PromiseMessage(NodeId s, NodeId r, ProposalNumber pn,
std::optional<ProposalNumber> apn = std::nullopt,
std::optional<NodeId> av = std::nullopt)
: BaseMessage(MessageType::PROMISE, s, r),
proposal_num(pn), accepted_proposal_num(apn), accepted_value(av) {}
std::unique_ptr<BaseMessage> clone() const override {
return std::make_unique<PromiseMessage>(*this);
}
};
// Accept 消息
struct AcceptMessage : public BaseMessage {
ProposalNumber proposal_num; // 提议的提案编号
NodeId value; // 提议的值 (leader ID)
AcceptMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v)
: BaseMessage(MessageType::ACCEPT, s, r), proposal_num(pn), value(v) {}
std::unique_ptr<BaseMessage> clone() const override {
return std::make_unique<AcceptMessage>(*this);
}
};
// Accepted 消息
struct AcceptedMessage : public BaseMessage {
ProposalNumber proposal_num; // 已接受的提案编号
NodeId value; // 已接受的值 (leader ID)
AcceptedMessage(NodeId s, NodeId r, ProposalNumber pn, NodeId v)
: BaseMessage(MessageType::ACCEPTED, s, r), proposal_num(pn), value(v) {}
std::unique_ptr<BaseMessage> clone() const override {
return std::make_unique<AcceptedMessage>(*this);
}
};
// Heartbeat 消息
struct HeartbeatMessage : public BaseMessage {
NodeId leader_id; // 当前领导者ID
long long term; // 领导者任期,用于区分过期的心跳
HeartbeatMessage(NodeId s, NodeId r, NodeId lid, long long t)
: BaseMessage(MessageType::HEARTBEAT, s, r), leader_id(lid), term(t) {}
std::unique_ptr<BaseMessage> clone() const override {
return std::make_unique<HeartbeatMessage>(*this);
}
};
// 抽象网络接口,用于发送和接收消息
class NetworkInterface {
public:
virtual ~NetworkInterface() = default;
virtual void send_message(NodeId receiver_id, std::unique_ptr<BaseMessage> msg) = 0;
virtual void broadcast_message(std::unique_ptr<BaseMessage> msg, const std::set<NodeId>& exclude_nodes) = 0;
// 模拟消息接收,实际中通常通过回调或队列实现
virtual std::unique_ptr<BaseMessage> receive_message() = 0;
};
// PaxosNode 的状态
enum class PaxosNodeState {
FOLLOWER, // 跟随者,等待领导者心跳或发起选举
CANDIDATE, // 候选者,正在尝试成为领导者 (Proposer 角色)
LEADER // 领导者,负责发送心跳和协调
};
// Paxos 节点的实现
class PaxosNode {
private:
NodeId my_id;
std::set<NodeId> peer_ids; // 集群中所有其他节点的ID,包括自己
std::unique_ptr<NetworkInterface> network_interface;
PaxosNodeState current_state;
NodeId current_leader_id; // 当前已知的领导者ID
long long current_term; // 当前领导者任期,用于领导者心跳和选举的上下文
// Proposer 状态 (用于 CANDIDATE 状态)
ProposalNumber current_proposal_num; // 当前 Proposer 使用的提案编号
std::map<NodeId, PromiseMessage> received_promises; // 存储收到的 Promise 响应
int promise_count; // 收到承诺的数量,用于判断是否达到多数
// Acceptor 状态 (所有节点都扮演 Acceptor 角色)
ProposalNumber highest_promised_num; // 接受者已承诺的最高提案编号
std::optional<ProposalNumber> accepted_proposal_num; // 接受者已接受的提案编号
std::optional<NodeId> accepted_value; // 接受者已接受的值 (leader ID)
// Learner 状态 (所有节点都扮演 Learner 角色)
// NodeId decided_leader_id; // 最终确认的领导者ID,这里合并到 current_leader_id
// 选举超时计时器相关
std::chrono::steady_clock::time_point last_heartbeat_time; // 上次收到心跳或发送心跳的时间
std::chrono::milliseconds election_timeout_min; // 选举超时最小随机值
std::chrono::milliseconds election_timeout_max; // 选举超时最大随机值
std::chrono::milliseconds heartbeat_interval; // 领导者心跳间隔
// 线程与同步
std::unique_ptr<std::thread> timer_thread; // 计时器线程
std::unique_ptr<std::thread> receiver_thread; // 消息接收线程
std::mutex mtx; // 保护节点内部状态的互斥锁
std::condition_variable cv_timer; // 用于计时器线程等待
std::condition_variable cv_message; // 用于消息处理线程等待新消息
bool running; // 节点运行标志
std::queue<std::unique_ptr<BaseMessage>> message_queue; // 异步消息队列
std::default_random_engine random_engine; // 随机数生成器,用于随机化选举超时
// 生成新的提案编号
void generate_new_proposal_number() {
// 使用当前系统时间作为时间戳,结合节点ID确保唯一性
current_proposal_num = {std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count(),
my_id};
std::cout << "[Node " << my_id << "] Generated new proposal number: "
<< current_proposal_num.timestamp << "." << current_proposal_num.proposer_id << std::endl;
}
// 重置选举超时
void reset_election_timeout() {
last_heartbeat_time = std::chrono::steady_clock::now();
// 在 FOLLOWER 状态下,为了避免活锁,每次超时后随机化下一次的超时时间
if (current_state == PaxosNodeState::FOLLOWER) {
std::uniform_int_distribution<int> dist(election_timeout_min.count(), election_timeout_max.count());
// 更新 election_timeout_min 以存储本次随机出的超时值
election_timeout_min = std::chrono::milliseconds(dist(random_engine));
}
// std::cout << "[Node " << my_id << "] Reset election timeout, next timeout in "
// << election_timeout_min.count() << "ms." << std::endl;
}
// 计时器循环,处理心跳和选举超时
void timer_loop() {
while (running) {
std::unique_lock<std::mutex> lock(mtx);
auto now = std::chrono::steady_clock::now();
std::chrono::milliseconds time_since_last_event =
std::chrono::duration_cast<std::chrono::milliseconds>(now - last_heartbeat_time);
if (current_state == PaxosNodeState::LEADER) {
// 领导者定期发送心跳
if (time_since_last_event >= heartbeat_interval) {
lock.unlock(); // 释放锁,防止发送消息时阻塞其他操作
send_heartbeat();
lock.lock(); // 重新获取锁
last_heartbeat_time = std::chrono::steady_clock::now(); // 重置心跳计时
}
} else if (current_state == PaxosNodeState::FOLLOWER || current_state == PaxosNodeState::CANDIDATE) {
// 跟随者和候选者等待领导者心跳或进行选举
if (time_since_last_event >= election_timeout_min) {
std::cout << "[Node " << my_id << "] Election timeout! Current state: "
<< (current_state == PaxosNodeState::FOLLOWER ? "FOLLOWER" : "CANDIDATE")
<< ". Starting new election." << std::endl;
start_election(); // 发起新的选举
reset_election_timeout(); // 每次超时后重置计时器
}
}
// 等待下一个检查点,避免忙等
cv_timer.wait_for(lock, std::chrono::milliseconds(50));
}
}
// 消息接收循环,将网络消息放入内部队列
void receiver_loop() {
while (running) {
std::unique_ptr<BaseMessage> msg = network_interface->receive_message();
if (msg) {
std::unique_lock<std::mutex> lock(mtx);
message_queue.push(std::move(msg));
cv_message.notify_one(); // 通知主线程有新消息
}
// 避免忙等,适当休眠
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
// 主消息处理循环(可以由主线程或单独的worker线程调用)
void process_message_queue() {
while (running) {
std::unique_lock<std::mutex> lock(mtx);
// 等待队列非空或节点停止
cv_message.wait(lock, [this]{ return !message_queue.empty() || !running; });
if (!running) break;
while (!message_queue.empty()) {
std::unique_ptr<BaseMessage> msg = std::move(message_queue.front());
message_queue.pop();
lock.unlock(); // 处理消息时释放锁,允许其他线程访问共享资源
handle_message_internal(std::move(msg)); // 调用实际的消息处理逻辑
lock.lock(); // 处理完消息后重新获取锁
}
}
}
// 内部消息处理函数,用于状态保护
void handle_message_internal(std::unique_ptr<BaseMessage> msg) {
std::lock_guard<std::mutex> lock(mtx); // 保护内部状态
// std::cout << "[Node " << my_id << "] Received message from " << msg->sender_id << " of type "
// << static_cast<int>(msg->type) << std::endl;
switch (msg->type) {
case MessageType::PREPARE:
handle_prepare(static_cast<PrepareMessage&>(*msg));
break;
case MessageType::PROMISE:
handle_promise(static_cast<PromiseMessage&>(*msg));
break;
case MessageType::ACCEPT:
handle_accept(static_cast<AcceptMessage&>(*msg));
break;
case MessageType::ACCEPTED:
handle_accepted(static_cast<AcceptedMessage&>(*msg));
break;
case MessageType::HEARTBEAT:
handle_heartbeat(static_cast<HeartbeatMessage&>(*msg));
break;
default:
std::cerr << "[Node " << my_id << "] Received unknown message type." << std::endl;
}
}
public:
// 构造函数
PaxosNode(NodeId id, const std::set<NodeId>& peers, std::unique_ptr<NetworkInterface> net_if)
: my_id(id), peer_ids(peers), network_interface(std::move(net_if)),
current_state(PaxosNodeState::FOLLOWER),
current_leader_id(0), // 初始未知领导者
current_term(0), // 初始任期为0
promise_count(0),
highest_promised_num({0, 0}), // 初始为最小提案编号
running(false),
election_timeout_min(150), // 默认最小选举超时
election_timeout_max(300), // 默认最大选举超时
heartbeat_interval(50), // 默认心跳间隔
random_engine(std::chrono::system_clock::now().time_since_epoch().count() + id) { // 使用时间+ID作为种子
reset_election_timeout(); // 初始化选举超时
}
// 析构函数,确保线程正确停止
~PaxosNode() {
stop();
}
// 启动节点
void start() {
if (running) return;
running = true;
timer_thread = std::make_unique<std::thread>(&PaxosNode::timer_loop, this);
receiver_thread = std::make_unique<std::thread>(&PaxosNode::receiver_loop, this);
// 启动一个单独的线程来处理消息队列,避免阻塞主线程或计时器线程
std::thread msg_processor_thread(&PaxosNode::process_message_queue, this);
msg_processor_thread.detach(); // 让其后台运行
std::cout << "[Node " << my_id << "] Started." << std::endl;
}
// 停止节点
void stop() {
if (!running) return;
running = false;
cv_timer.notify_all(); // 唤醒计时器线程
cv_message.notify_all(); // 唤醒消息处理线程
if (timer_thread && timer_thread->joinable()) {
timer_thread->join(); // 等待计时器线程结束
}
if (receiver_thread && receiver_thread->joinable()) {
receiver_thread->join(); // 等待消息接收线程结束
}
std::cout << "[Node " << my_id << "] Stopped." << std::endl;
}
// 获取当前状态 (用于外部查询)
PaxosNodeState get_state() const {
std::lock_guard<std::mutex> lock(mtx);
return current_state;
}
// 获取当前领导者ID (用于外部查询)
NodeId get_leader_id() const {
std::lock_guard<std::mutex> lock(mtx);
return current_leader_id;
}
private:
// Paxos Proposer 逻辑
void start_election() {
std::lock_guard<std::mutex> lock(mtx); // 保护状态变更
current_state = PaxosNodeState