什么是 ‘Lease-based Leader Election’?利用 Go 实现一个具备高可用性且无脑裂风险的选主逻辑

各位听众,大家好。

在构建分布式系统时,我们经常面临一个核心挑战:如何确保系统在面对故障时依然能够持续提供服务,并且各个组件之间能够协同工作。其中,“选主”(Leader Election)是一个关键的机制,它允许分布式系统中的多个节点通过协商,推举出一个“领导者”来负责执行特定的任务或协调操作。这个领导者通常是唯一且权威的,它的存在简化了系统的设计和实现,避免了多个节点同时执行冲突操作的复杂性。

例如,在一个生产者-消费者系统中,可能只有一个领导者负责分配任务给消费者;在一个复制状态机中,领导者负责处理所有客户端请求并将其复制到其他节点。选主机制的核心目标是提供高可用性,并在任何时候都只有一个有效的领导者,从而避免“脑裂”(Split-Brain)问题。脑裂是指由于网络分区或其他故障,导致系统中出现两个或更多节点都认为自己是领导者的情况,这会造成数据不一致、行为冲突等严重后果。

今天,我们将深入探讨一种非常实用且在工业界广泛应用的选主机制:基于租约(Lease-based Leader Election)的选主。我们将利用 Go 语言,结合 etcd 这个强大的分布式键值存储,来实现一个具备高可用性且无脑裂风险的选主逻辑。

1. 什么是基于租约的选主?

基于租约的选主是一种利用时间限制的“租约”来授予领导权的方式。其核心思想是:

  1. 领导者(Leader):拥有一个在特定时间段内有效的租约。只要租约有效,它就认为是领导者,并执行相应的职责。
  2. 租约续期(Lease Renewal):领导者必须在租约到期前定期续期,以维持其领导地位。
  3. 追随者(Follower)/候选者(Candidate):没有租约或租约已过期,它们不能执行领导者职责。它们会监控领导者的状态,并在当前领导者的租约过期或明确放弃领导权时,尝试获取新的租约成为领导者。

这种机制的优点在于其相对简单性。与 Raft 或 Paxos 这类复杂的全分布式共识算法相比,基于租约的选主通常更轻量级,更容易理解和实现,尤其适用于那些只需要一个单一领导者来协调简单任务的场景。

租约的生命周期

阶段 描述 关键操作
竞选 (Campaign) 节点尝试获取领导权。它会尝试在一个共享存储中写入自己的身份信息,并附带一个租约。 1. 创建一个具有 TTL (Time-To-Live) 的租约。2. 使用原子操作 (如 Compare-And-Swap, CAS) 尝试将自己的 ID 和租约 ID 写入一个预定义的领导者键。如果键不存在或其租约已过期,则竞选成功。
领导 (Lead) 节点成功获取租约并成为领导者。它开始执行领导者职责。 1. 周期性地续期租约,防止其过期。2. 执行领导者任务。3. 监控自身状态,如果无法续期或遇到严重错误,则主动放弃领导权。
追随 (Follow) 节点未能获取领导权或当前不是领导者。它会监控领导者键,等待当前领导者的租约过期或领导者主动放弃。 1. 观察领导者键的变化。2. 如果领导者键被删除、修改,或者其关联的租约过期,则重新进入竞选阶段。3. 不执行领导者任务。

2. 基于 etcd 实现选主

选择 etcd 作为我们的底层存储有其充分的理由。etcd 是一个高可用的分布式键值存储,它基于 Raft 共识算法,提供了强一致性、高可用性和可靠性。etcd 提供了以下关键特性,使其成为实现租约选主的理想选择:

  • Lease API:etcd 允许为键设置一个租约,租约有 TTL。当租约到期时,所有与该租约关联的键都会被自动删除。领导者可以通过续期租约来维持其键的存活。
  • Transaction API (Txn):etcd 支持原子性的多操作事务。这对于实现“Compare-And-Swap”语义至关重要,即“如果键满足某个条件,则执行写操作”。这保证了在并发竞选时,只有一个节点能成功抢占领导权,从而有效防止脑裂。
  • Watch API:etcd 允许客户端观察特定键或键前缀的变化。追随者可以利用这个特性来实时感知领导者键的状态变化,从而快速响应领导者故障。

2.1 etcd 选主的核心原理

  1. 节点身份:每个参与选主的节点都有一个唯一的 ID。
  2. 领导者键:在 etcd 中定义一个共享的键(例如 /leader_election/my_service/leader),用来存储当前领导者的信息。
  3. 租约创建:每个节点在尝试竞选时,首先会从 etcd 创建一个具有特定 TTL 的租约。
  4. 原子竞选:节点会尝试执行一个 etcd 事务:
    • 条件:检查领导者键是否存在,或者如果存在,其关联的租约是否已经过期。
    • 操作:如果条件满足,则将自己的 ID 写入领导者键,并将其与之前创建的租约关联起来。
  5. 领导者职责:成功写入的节点成为领导者,它会定期续期其租约。
  6. 追随者职责:未成为领导者的节点会持续观察领导者键。如果领导者键被删除(意味着租约过期或领导者主动放弃),或者其值发生变化,追随者就会知道领导者已失效,然后重新进入竞选流程。

3. Go 语言实现:设计思路

我们将构建一个 LeaderElection 服务,它封装了与 etcd 交互的所有逻辑。

核心组件

  • LeaderElection 结构体:包含 etcd 客户端、节点 ID、选主键路径、当前租约 ID、当前状态等。
  • Run() 方法:启动选主循环的主入口。
  • campaign():尝试获取领导权的核心逻辑。
  • observeLeader():追随者模式下监控领导者状态。
  • renewLease():领导者模式下续期租约。
  • 状态机:Candidate(候选者)、Leader(领导者)、Follower(追随者)。

高可用性与无脑裂

  • 高可用性:etcd 本身是高可用的。我们的选主逻辑通过 Watch 机制能快速感知领导者失效,并立即启动新一轮竞选。多个节点同时竞选,只要 etcd 集群健康,总能选出一个领导者。
  • 无脑裂:这是基于 etcd TxnLease 的核心保证。
    • 原子性Txn 保证了只有一次成功的写入,即只有一个节点能成功将自己注册为领导者。
    • 时间限制Lease 保证了即使某个旧领导者因网络分区等原因无法与 etcd 通信,其领导权也会在租约到期后自动失效,从而防止它在“自以为是”的情况下继续执行领导者职责。新领导者只有在旧领导者租约过期后才能成功注册。

4. Go 语言实现:代码详情

首先,我们需要导入 etcd 客户端库:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency" // etcd 提供的并发工具包,虽然我们这次主要自己实现,但了解它很有用
)

4.1 LeaderElection 结构体定义

// State 定义了选主节点可能处于的状态
type State int

const (
    Follower  State = iota // 追随者,不持有领导权,监控领导者
    Candidate              // 候选者,正在尝试获取领导权
    Leader                 // 领导者,持有领导权并执行领导者职责
)

func (s State) String() string {
    switch s {
    case Follower:
        return "Follower"
    case Candidate:
        return "Candidate"
    case Leader:
        return "Leader"
    default:
        return "Unknown"
    }
}

// LeaderElection 封装了基于 etcd 的领导者选举逻辑
type LeaderElection struct {
    client      *clientv3.Client
    nodeID      string        // 当前节点的唯一标识
    electionKey string        // etcd 中用于选主的键路径
    leaseID     clientv3.LeaseID // 当前节点持有的租约 ID
    state       State         // 当前节点的状态
    stateMu     sync.RWMutex  // 保护 state 字段的读写锁

    leaseTTL    time.Duration // 租约的生存时间
    renewInterval time.Duration // 租约续期的间隔
    retryInterval time.Duration // 失败重试的间隔

    leaderCtx    context.Context    // 领导者任务的上下文
    leaderCancel context.CancelFunc // 取消领导者任务的函数

    stopCtx    context.Context    // 控制 LeaderElection 服务的停止
    stopCancel context.CancelFunc // 取消 LeaderElection 服务的函数
    wg         sync.WaitGroup     // 等待所有 goroutine 退出
}

// LeaderInfo 存储领导者的信息
type LeaderInfo struct {
    ID        string    `json:"id"`
    LeaseID   int64     `json:"lease_id"`
    Timestamp time.Time `json:"timestamp"`
}

4.2 构造函数 NewLeaderElection

// NewLeaderElection 创建一个新的 LeaderElection 实例
func NewLeaderElection(
    etcdEndpoints []string,
    nodeID string,
    electionKey string,
    leaseTTL time.Duration,
    renewInterval time.Duration,
    retryInterval time.Duration,
) (*LeaderElection, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   etcdEndpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, fmt.Errorf("failed to connect to etcd: %w", err)
    }

    stopCtx, stopCancel := context.WithCancel(context.Background())

    return &LeaderElection{
        client:        cli,
        nodeID:        nodeID,
        electionKey:   electionKey,
        state:         Follower, // 初始状态为追随者
        leaseTTL:      leaseTTL,
        renewInterval: renewInterval,
        retryInterval: retryInterval,
        stopCtx:       stopCtx,
        stopCancel:    stopCancel,
    }, nil
}

// Close 关闭 etcd 客户端连接
func (le *LeaderElection) Close() error {
    le.stopCancel() // 停止所有正在运行的 goroutine
    le.wg.Wait()    // 等待所有 goroutine 退出
    log.Printf("[%s] LeaderElection service stopped.", le.nodeID)
    return le.client.Close()
}

4.3 状态管理

为了线程安全地管理状态,我们使用 sync.RWMutex

func (le *LeaderElection) setState(s State) {
    le.stateMu.Lock()
    defer le.stateMu.Unlock()
    if le.state != s {
        log.Printf("[%s] State changed from %s to %sn", le.nodeID, le.state, s)
        le.state = s
    }
}

func (le *LeaderElection) getState() State {
    le.stateMu.RLock()
    defer le.stateMu.RUnlock()
    return le.state
}

// IsLeader 检查当前节点是否是领导者
func (le *LeaderElection) IsLeader() bool {
    return le.getState() == Leader
}

4.4 选主主循环 Run

Run 方法是选主逻辑的入口,它会根据当前状态进行循环。

// Run 启动领导者选举的主循环
func (le *LeaderElection) Run() {
    le.wg.Add(1)
    defer le.wg.Done()

    log.Printf("[%s] LeaderElection service started. Election Key: %s", le.nodeID, le.electionKey)

    for {
        select {
        case <-le.stopCtx.Done():
            log.Printf("[%s] LeaderElection Run loop exiting due to stop signal.", le.nodeID)
            return
        default:
            // 继续执行
        }

        switch le.getState() {
        case Follower:
            log.Printf("[%s] In Follower state, observing leader...", le.nodeID)
            le.observeLeader()
        case Candidate:
            log.Printf("[%s] In Candidate state, attempting to campaign...", le.nodeID)
            le.campaign()
        case Leader:
            log.Printf("[%s] In Leader state, maintaining leadership...", le.nodeID)
            le.lead()
        }
    }
}

4.5 竞选逻辑 campaign

这是选主的核心。它会尝试创建一个租约,然后使用 etcd 的事务(Txn)进行原子性的“Compare-And-Swap”操作。

// campaign 尝试获取领导权
func (le *LeaderElection) campaign() {
    // 1. 创建一个新的租约
    resp, err := le.client.Grant(le.stopCtx, int64(le.leaseTTL.Seconds()))
    if err != nil {
        log.Printf("[%s] Failed to grant lease: %v. Retrying in %v...", le.nodeID, err, le.retryInterval)
        time.Sleep(le.retryInterval)
        return
    }
    le.leaseID = resp.ID

    leaderInfo := LeaderInfo{
        ID:        le.nodeID,
        LeaseID:   int64(le.leaseID),
        Timestamp: time.Now(),
    }
    leaderInfoBytes, err := json.Marshal(leaderInfo)
    if err != nil {
        log.Printf("[%s] Failed to marshal leader info: %v. Retrying in %v...", le.nodeID, err, le.retryInterval)
        time.Sleep(le.retryInterval)
        return
    }

    // 2. 尝试使用事务将自己设置为领导者
    // 条件:如果 key 不存在,或者 key 存在但其租约已过期 (ModifiedRevision != 0 && lease is invalid)
    // etcd v3 的事务非常强大,我们可以检查 key 的版本或者关联的租约是否存在。
    // 更简洁的方式是检查 key 的版本,或者直接判断 key 是否存在。
    // 这里的 If 条件是:如果 le.electionKey 不存在 (Version == 0)
    txnResp, err := le.client.Txn(le.stopCtx).
        If(clientv3.Compare(clientv3.Version(le.electionKey), "=", 0)).
        Then(clientv3.OpPut(le.electionKey, string(leaderInfoBytes), clientv3.WithLease(le.leaseID))).
        Commit()

    if err != nil {
        log.Printf("[%s] Failed to commit transaction for leadership: %v. Revoking lease %d. Retrying in %v...", le.nodeID, err, le.leaseID, le.retryInterval)
        // 事务失败,撤销已创建的租约
        _, revokeErr := le.client.Revoke(le.stopCtx, le.leaseID)
        if revokeErr != nil {
            log.Printf("[%s] Failed to revoke lease %d after txn error: %v", le.nodeID, le.leaseID, revokeErr)
        }
        le.leaseID = clientv3.NoLease
        time.Sleep(le.retryInterval)
        le.setState(Follower) // 竞选失败,回到追随者状态
        return
    }

    if txnResp.Succeeded {
        // 成功成为领导者
        log.Printf("[%s] Successfully became leader with lease ID %d.", le.nodeID, le.leaseID)
        le.setState(Leader)
    } else {
        // 未能成为领导者,因为 key 已经存在(其他节点已成为领导者)
        log.Printf("[%s] Failed to become leader, another node already holds the leadership. Revoking lease %d. Waiting for leader to change...", le.nodeID, le.leaseID)
        _, revokeErr := le.client.Revoke(le.stopCtx, le.leaseID)
        if revokeErr != nil {
            log.Printf("[%s] Failed to revoke lease %d after losing campaign: %v", le.nodeID, le.leaseID, revokeErr)
        }
        le.leaseID = clientv3.NoLease
        le.setState(Follower) // 竞选失败,回到追随者状态
        // 不需要等待 retryInterval,立即开始观察领导者
    }
}

关于 If(clientv3.Compare(clientv3.Version(le.electionKey), "=", 0)) 的说明
这个条件判断的是 le.electionKey 的版本是否为 0。在 etcd 中,一个键的版本为 0 意味着它不存在。所以这个事务的含义是:“如果 le.electionKey 不存在,那么就将我的领导者信息写入这个键,并关联我的租约”。这确保了只有第一个成功写入的节点能成为领导者。如果这个键已经存在,说明有其他节点已经成功竞选,那么当前节点的事务就会失败。

4.6 领导者逻辑 lead

领导者需要周期性地续期租约,并执行其核心任务。为了模拟领导者任务,我们在这里简单地打印日志。

// lead 节点作为领导者时执行的逻辑
func (le *LeaderElection) lead() {
    le.leaderCtx, le.leaderCancel = context.WithCancel(le.stopCtx) // 为领导者任务创建独立的上下文

    // 启动租约续期 goroutine
    le.wg.Add(1)
    go func() {
        defer le.wg.Done()
        le.renewLease()
    }()

    // 模拟领导者执行任务
    // 实际应用中,这里会是领导者的核心业务逻辑
    ticker := time.NewTicker(le.renewInterval) // 可以用与续期相同的间隔进行一些领导者心跳或任务检查
    defer ticker.Stop()

    for {
        select {
        case <-le.leaderCtx.Done():
            log.Printf("[%s] Leader context cancelled. Stepping down from leadership.", le.nodeID)
            le.stepDown() // 主动放弃领导权
            return
        case <-ticker.C:
            // 领导者在正常工作
            log.Printf("[%s] Still the leader. Performing leader duties...", le.nodeID)
        }
    }
}

// renewLease 领导者周期性续期租约
func (le *LeaderElection) renewLease() {
    // 使用 etcd 的 KeepAlive 机制简化租约续期
    keepAliveChan, err := le.client.KeepAlive(le.leaderCtx, le.leaseID)
    if err != nil {
        log.Printf("[%s] Failed to start KeepAlive for lease %d: %v. Stepping down.", le.nodeID, le.leaseID, err)
        le.leaderCancel() // 出现错误,取消领导者上下文,导致主循环退出领导者状态
        return
    }

    log.Printf("[%s] Started KeepAlive for lease %d (TTL: %v).", le.nodeID, le.leaseID, le.leaseTTL)

    for {
        select {
        case <-le.leaderCtx.Done():
            log.Printf("[%s] KeepAlive goroutine for lease %d exiting.", le.nodeID, le.leaseID)
            return
        case kaResp, ok := <-keepAliveChan:
            if !ok {
                log.Printf("[%s] KeepAlive channel closed for lease %d. Lease probably expired or etcd connection lost. Stepping down.", le.nodeID, le.leaseID)
                le.leaderCancel() // KeepAlive 失败,取消领导者上下文
                return
            }
            // log.Printf("[%s] Lease %d renewed at TTL %d.", le.nodeID, le.leaseID, kaResp.TTL)
            _ = kaResp // 忽略续期响应,只要 channel 没关就是成功
        }
    }
}

// stepDown 领导者主动放弃领导权
func (le *LeaderElection) stepDown() {
    log.Printf("[%s] Stepping down from leadership. Revoking lease %d...", le.nodeID, le.leaseID)
    // 撤销租约,这将导致 leaderKey 被删除
    _, err := le.client.Revoke(le.stopCtx, le.leaseID)
    if err != nil {
        log.Printf("[%s] Failed to revoke lease %d: %v", le.nodeID, le.leaseID, err)
    }
    le.leaseID = clientv3.NoLease
    le.setState(Follower) // 退回到追随者状态
}

这里我们使用了 clientv3.KeepAlive,它是 etcd 客户端提供的一个非常方便的函数,它会在后台自动为指定租约发送续期请求,直到上下文被取消或连接断开。这比我们手动在 Ticker 中调用 client.KeepAliveOnce 要简洁和健壮得多。

4.7 追随者逻辑 observeLeader

追随者需要监控领导者键,以便在领导者失效时立即发起竞选。

// observeLeader 追随者观察领导者键的变化
func (le *LeaderElection) observeLeader() {
    // 尝试获取当前领导者的信息
    // 即使 leaderKey 不存在,Get 操作也不会报错
    getResp, err := le.client.Get(le.stopCtx, le.electionKey)
    if err != nil {
        log.Printf("[%s] Failed to get leader info: %v. Retrying in %v...", le.nodeID, err, le.retryInterval)
        time.Sleep(le.retryInterval)
        return
    }

    if len(getResp.Kvs) == 0 {
        // 当前没有领导者,或者领导者租约已过期,尝试成为候选者
        log.Printf("[%s] No leader found or leader key missing. Becoming candidate.", le.nodeID)
        le.setState(Candidate)
        return
    }

    // 存在领导者,解析信息
    var currentLeaderInfo LeaderInfo
    err = json.Unmarshal(getResp.Kvs[0].Value, &currentLeaderInfo)
    if err != nil {
        log.Printf("[%s] Failed to unmarshal leader info: %v. Becoming candidate (potential data corruption).", le.nodeID, err)
        le.setState(Candidate) // 如果数据损坏,也尝试竞选
        return
    }

    log.Printf("[%s] Current leader is %s (lease ID: %d). Watching for changes...", le.nodeID, currentLeaderInfo.ID, currentLeaderInfo.LeaseID)

    // 启动 Watcher 监控领导者键
    watchCtx, watchCancel := context.WithCancel(le.stopCtx)
    defer watchCancel() // 确保在函数退出时取消 watcher

    watchChan := le.client.Watch(watchCtx, le.electionKey)

    for {
        select {
        case <-le.stopCtx.Done():
            log.Printf("[%s] Watcher for leader key exiting due to stop signal.", le.nodeID)
            return
        case watchResp, ok := <-watchChan:
            if !ok {
                log.Printf("[%s] Watch channel closed. Etcd connection issue? Retrying in %v...", le.nodeID, le.retryInterval)
                time.Sleep(le.retryInterval)
                return // 重新开始 observeLeader 流程
            }
            if watchResp.Err() != nil {
                log.Printf("[%s] Watcher error: %v. Retrying in %v...", le.nodeID, watchResp.Err(), le.retryInterval)
                time.Sleep(le.retryInterval)
                return // 重新开始 observeLeader 流程
            }

            // 处理 Watch 事件
            for _, event := range watchResp.Events {
                if event.Type == clientv3.EventTypeDelete {
                    log.Printf("[%s] Leader key deleted. Leader %s (lease %d) has stepped down or lease expired. Becoming candidate.", le.nodeID, currentLeaderInfo.ID, currentLeaderInfo.LeaseID)
                    le.setState(Candidate)
                    return // 退出当前 observeLeader 循环,进入 campaign
                } else if event.Type == clientv3.EventTypePut {
                    // 领导者键被更新了,可能是新的领导者上位了,或者当前领导者续期了(KeepAlive 也会触发 Put 事件)
                    // 我们需要检查新的值是否代表新的领导者
                    var newLeaderInfo LeaderInfo
                    err := json.Unmarshal(event.Kv.Value, &newLeaderInfo)
                    if err != nil {
                        log.Printf("[%s] Failed to unmarshal new leader info from watch event: %v. Re-observing.", le.nodeID, err)
                        le.setState(Candidate) // 数据损坏,尝试竞选
                        return
                    }

                    if newLeaderInfo.ID != currentLeaderInfo.ID || newLeaderInfo.LeaseID != currentLeaderInfo.LeaseID {
                        log.Printf("[%s] Leader key updated to new leader %s (lease %d). Was %s (lease %d). Becoming candidate.",
                            le.nodeID, newLeaderInfo.ID, newLeaderInfo.LeaseID, currentLeaderInfo.ID, currentLeaderInfo.LeaseID)
                        le.setState(Candidate)
                        return // 退出当前 observeLeader 循环,进入 campaign
                    }
                    // 否则,是当前领导者的 KeepAlive 续期,继续观察
                    // log.Printf("[%s] Leader %s lease %d renewed. Continue watching.", le.nodeID, currentLeaderInfo.ID, currentLeaderInfo.LeaseID)
                }
            }
        }
    }
}

4.8 完整的 main.go 示例

为了方便测试,我们将创建一个简单的 main 函数,它允许我们通过命令行参数指定节点 ID 和 etcd 端点。

package main

import (
    "context"
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "go.etcd.io/etcd/client/v3"
)

// LeaderInfo 存储领导者的信息 (需要与 LeaderElection 结构体中的定义一致)
type LeaderInfo struct {
    ID        string    `json:"id"`
    LeaseID   int64     `json:"lease_id"`
    Timestamp time.Time `json:"timestamp"`
}

// State 定义了选主节点可能处于的状态 (需要与 LeaderElection 结构体中的定义一致)
type State int

const (
    Follower  State = iota
    Candidate
    Leader
)

func (s State) String() string {
    switch s {
    case Follower:
        return "Follower"
    case Candidate:
        return "Candidate"
    case Leader:
        return "Leader"
    default:
        return "Unknown"
    }
}

// LeaderElection 封装了基于 etcd 的领导者选举逻辑 (需要与 LeaderElection 结构体中的定义一致)
type LeaderElection struct {
    client        *clientv3.Client
    nodeID        string
    electionKey   string
    leaseID       clientv3.LeaseID
    state         State
    stateMu       sync.RWMutex
    leaseTTL      time.Duration
    renewInterval time.Duration
    retryInterval time.Duration
    leaderCtx     context.Context
    leaderCancel  context.CancelFunc
    stopCtx       context.Context
    stopCancel    context.CancelFunc
    wg            sync.WaitGroup
}

// NewLeaderElection 创建一个新的 LeaderElection 实例 (需要与 LeaderElection 结构体中的定义一致)
func NewLeaderElection(
    etcdEndpoints []string,
    nodeID string,
    electionKey string,
    leaseTTL time.Duration,
    renewInterval time.Duration,
    retryInterval time.Duration,
) (*LeaderElection, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   etcdEndpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        return nil, fmt.Errorf("failed to connect to etcd: %w", err)
    }

    stopCtx, stopCancel := context.WithCancel(context.Background())

    return &LeaderElection{
        client:        cli,
        nodeID:        nodeID,
        electionKey:   electionKey,
        state:         Follower,
        leaseTTL:      leaseTTL,
        renewInterval: renewInterval,
        retryInterval: retryInterval,
        stopCtx:       stopCtx,
        stopCancel:    stopCancel,
    }, nil
}

// Close 关闭 etcd 客户端连接 (需要与 LeaderElection 结构体中的定义一致)
func (le *LeaderElection) Close() error {
    le.stopCancel()
    le.wg.Wait()
    log.Printf("[%s] LeaderElection service stopped.", le.nodeID)
    return le.client.Close()
}

func (le *LeaderElection) setState(s State) {
    le.stateMu.Lock()
    defer le.stateMu.Unlock()
    if le.state != s {
        log.Printf("[%s] State changed from %s to %sn", le.nodeID, le.state, s)
        le.state = s
    }
}

func (le *LeaderElection) getState() State {
    le.stateMu.RLock()
    defer le.stateMu.RUnlock()
    return le.state
}

func (le *LeaderElection) IsLeader() bool {
    return le.getState() == Leader
}

func (le *LeaderElection) Run() {
    le.wg.Add(1)
    defer le.wg.Done()

    log.Printf("[%s] LeaderElection service started. Election Key: %s", le.nodeID, le.electionKey)

    for {
        select {
        case <-le.stopCtx.Done():
            log.Printf("[%s] LeaderElection Run loop exiting due to stop signal.", le.nodeID)
            return
        default:
            // 继续执行
        }

        switch le.getState() {
        case Follower:
            log.Printf("[%s] In Follower state, observing leader...", le.nodeID)
            le.observeLeader()
        case Candidate:
            log.Printf("[%s] In Candidate state, attempting to campaign...", le.nodeID)
            le.campaign()
        case Leader:
            log.Printf("[%s] In Leader state, maintaining leadership...", le.nodeID)
            le.lead()
        }
    }
}

func (le *LeaderElection) campaign() {
    resp, err := le.client.Grant(le.stopCtx, int64(le.leaseTTL.Seconds()))
    if err != nil {
        log.Printf("[%s] Failed to grant lease: %v. Retrying in %v...", le.nodeID, err, le.retryInterval)
        time.Sleep(le.retryInterval)
        le.setState(Follower) // 租约创建失败,回到追随者状态
        return
    }
    le.leaseID = resp.ID

    leaderInfo := LeaderInfo{
        ID:        le.nodeID,
        LeaseID:   int64(le.leaseID),
        Timestamp: time.Now(),
    }
    leaderInfoBytes, err := json.Marshal(leaderInfo)
    if err != nil {
        log.Printf("[%s] Failed to marshal leader info: %v. Revoking lease %d. Retrying in %v...", le.nodeID, err, le.leaseID, le.retryInterval)
        _, revokeErr := le.client.Revoke(le.stopCtx, le.leaseID)
        if revokeErr != nil {
            log.Printf("[%s] Failed to revoke lease %d after marshal error: %v", le.nodeID, le.leaseID, revokeErr)
        }
        le.leaseID = clientv3.NoLease
        time.Sleep(le.retryInterval)
        le.setState(Follower)
        return
    }

    txnResp, err := le.client.Txn(le.stopCtx).
        If(clientv3.Compare(clientv3.Version(le.electionKey), "=", 0)).
        Then(clientv3.OpPut(le.electionKey, string(leaderInfoBytes), clientv3.WithLease(le.leaseID))).
        Commit()

    if err != nil {
        log.Printf("[%s] Failed to commit transaction for leadership: %v. Revoking lease %d. Retrying in %v...", le.nodeID, err, le.leaseID, le.retryInterval)
        _, revokeErr := le.client.Revoke(le.stopCtx, le.leaseID)
        if revokeErr != nil {
            log.Printf("[%s] Failed to revoke lease %d after txn error: %v", le.nodeID, le.leaseID, revokeErr)
        }
        le.leaseID = clientv3.NoLease
        time.Sleep(le.retryInterval)
        le.setState(Follower)
        return
    }

    if txnResp.Succeeded {
        log.Printf("[%s] Successfully became leader with lease ID %d.", le.nodeID, le.leaseID)
        le.setState(Leader)
    } else {
        log.Printf("[%s] Failed to become leader, another node already holds the leadership. Revoking lease %d. Waiting for leader to change...", le.nodeID, le.leaseID)
        _, revokeErr := le.client.Revoke(le.stopCtx, le.leaseID)
        if revokeErr != nil {
            log.Printf("[%s] Failed to revoke lease %d after losing campaign: %v", le.nodeID, le.leaseID, revokeErr)
        }
        le.leaseID = clientv3.NoLease
        le.setState(Follower)
    }
}

func (le *LeaderElection) lead() {
    le.leaderCtx, le.leaderCancel = context.WithCancel(le.stopCtx)

    le.wg.Add(1)
    go func() {
        defer le.wg.Done()
        le.renewLease()
    }()

    ticker := time.NewTicker(le.renewInterval)
    defer ticker.Stop()

    for {
        select {
        case <-le.leaderCtx.Done():
            log.Printf("[%s] Leader context cancelled. Stepping down from leadership.", le.nodeID)
            le.stepDown()
            return
        case <-ticker.C:
            log.Printf("[%s] Still the leader. Performing leader duties...", le.nodeID)
            // 可以在这里添加领导者的实际业务逻辑
        }
    }
}

func (le *LeaderElection) renewLease() {
    keepAliveChan, err := le.client.KeepAlive(le.leaderCtx, le.leaseID)
    if err != nil {
        log.Printf("[%s] Failed to start KeepAlive for lease %d: %v. Stepping down.", le.nodeID, le.leaseID, err)
        le.leaderCancel()
        return
    }

    log.Printf("[%s] Started KeepAlive for lease %d (TTL: %v).", le.nodeID, le.leaseID, le.leaseTTL)

    for {
        select {
        case <-le.leaderCtx.Done():
            log.Printf("[%s] KeepAlive goroutine for lease %d exiting.", le.nodeID, le.leaseID)
            return
        case kaResp, ok := <-keepAliveChan:
            if !ok {
                log.Printf("[%s] KeepAlive channel closed for lease %d. Lease probably expired or etcd connection lost. Stepping down.", le.nodeID, le.leaseID)
                le.leaderCancel()
                return
            }
            _ = kaResp
        }
    }
}

func (le *LeaderElection) stepDown() {
    log.Printf("[%s] Stepping down from leadership. Revoking lease %d...", le.nodeID, le.leaseID)
    _, err := le.client.Revoke(le.stopCtx, le.leaseID)
    if err != nil {
        log.Printf("[%s] Failed to revoke lease %d: %v", le.nodeID, le.leaseID, err)
    }
    le.leaseID = clientv3.NoLease
    le.setState(Follower)
}

func (le *LeaderElection) observeLeader() {
    getResp, err := le.client.Get(le.stopCtx, le.electionKey)
    if err != nil {
        log.Printf("[%s] Failed to get leader info: %v. Retrying in %v...", le.nodeID, err, le.retryInterval)
        time.Sleep(le.retryInterval)
        return
    }

    if len(getResp.Kvs) == 0 {
        log.Printf("[%s] No leader found or leader key missing. Becoming candidate.", le.nodeID)
        le.setState(Candidate)
        return
    }

    var currentLeaderInfo LeaderInfo
    err = json.Unmarshal(getResp.Kvs[0].Value, &currentLeaderInfo)
    if err != nil {
        log.Printf("[%s] Failed to unmarshal leader info: %v. Becoming candidate (potential data corruption).", le.nodeID, err)
        le.setState(Candidate)
        return
    }

    log.Printf("[%s] Current leader is %s (lease ID: %d). Watching for changes...", le.nodeID, currentLeaderInfo.ID, currentLeaderInfo.LeaseID)

    watchCtx, watchCancel := context.WithCancel(le.stopCtx)
    defer watchCancel()

    watchChan := le.client.Watch(watchCtx, le.electionKey)

    for {
        select {
        case <-le.stopCtx.Done():
            log.Printf("[%s] Watcher for leader key exiting due to stop signal.", le.nodeID)
            return
        case watchResp, ok := <-watchChan:
            if !ok {
                log.Printf("[%s] Watch channel closed. Etcd connection issue? Retrying in %v...", le.nodeID, le.retryInterval)
                time.Sleep(le.retryInterval)
                return
            }
            if watchResp.Err() != nil {
                log.Printf("[%s] Watcher error: %v. Retrying in %v...", le.nodeID, watchResp.Err(), le.retryInterval)
                time.Sleep(le.retryInterval)
                return
            }

            for _, event := range watchResp.Events {
                if event.Type == clientv3.EventTypeDelete {
                    log.Printf("[%s] Leader key deleted. Leader %s (lease %d) has stepped down or lease expired. Becoming candidate.", le.nodeID, currentLeaderInfo.ID, currentLeaderInfo.LeaseID)
                    le.setState(Candidate)
                    return
                } else if event.Type == clientv3.EventTypePut {
                    var newLeaderInfo LeaderInfo
                    err := json.Unmarshal(event.Kv.Value, &newLeaderInfo)
                    if err != nil {
                        log.Printf("[%s] Failed to unmarshal new leader info from watch event: %v. Re-observing.", le.nodeID, err)
                        le.setState(Candidate)
                        return
                    }

                    if newLeaderInfo.ID != currentLeaderInfo.ID || newLeaderInfo.LeaseID != currentLeaderInfo.LeaseID {
                        log.Printf("[%s] Leader key updated to new leader %s (lease %d). Was %s (lease %d). Becoming candidate.",
                            le.nodeID, newLeaderInfo.ID, newLeaderInfo.LeaseID, currentLeaderInfo.ID, currentLeaderInfo.LeaseID)
                        le.setState(Candidate)
                        return
                    }
                }
            }
        }
    }
}

func main() {
    var nodeID string
    var etcdEndpointsStr string
    var electionKey string
    var leaseTTLSeconds int
    var renewIntervalSeconds int
    var retryIntervalSeconds int

    flag.StringVar(&nodeID, "id", "", "Unique ID for this election node")
    flag.StringVar(&etcdEndpointsStr, "etcd", "127.0.0.1:2379", "Comma-separated etcd endpoints")
    flag.StringVar(&electionKey, "key", "/leader_election/my_service", "Etcd key path for leader election")
    flag.IntVar(&leaseTTLSeconds, "ttl", 10, "Lease Time-To-Live in seconds")
    flag.IntVar(&renewIntervalSeconds, "renew-interval", 3, "Lease renewal interval in seconds")
    flag.IntVar(&retryIntervalSeconds, "retry-interval", 2, "Retry interval for failed operations in seconds")
    flag.Parse()

    if nodeID == "" {
        log.Fatal("Node ID must be provided using -id flag.")
    }

    etcdEndpoints := []string{etcdEndpointsStr} // 简单起见,只支持一个 etcd 端点,实际可解析多个

    election, err := NewLeaderElection(
        etcdEndpoints,
        nodeID,
        electionKey,
        time.Duration(leaseTTLSeconds)*time.Second,
        time.Duration(renewIntervalSeconds)*time.Second,
        time.Duration(retryIntervalSeconds)*time.Second,
    )
    if err != nil {
        log.Fatalf("Failed to create LeaderElection: %v", err)
    }
    defer election.Close()

    // 启动选主逻辑
    go election.Run()

    // 监听系统信号,优雅关闭
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan

    log.Printf("[%s] Received shutdown signal. Stopping LeaderElection service...", nodeID)
}

如何运行

  1. 启动 etcd 集群
    如果你没有 etcd 运行,最简单的方式是使用 Docker:

    docker run -d --name etcd-server -p 2379:2379 -p 2380:2380 quay.io/coreos/etcd:v3.5.0 etcd -advertise-client-urls http://0.0.0.0:2379 -listen-client-urls http://0.0.0.0:2379
  2. 编译 Go 程序
    go mod init leader_election
    go mod tidy
    go build -o election_node main.go
  3. 运行多个节点
    打开多个终端窗口,分别运行:

    ./election_node -id node-1
    ./election_node -id node-2
    ./election_node -id node-3

    你将看到其中一个节点会成为领导者,其他节点成为追随者。当你停止领导者节点(Ctrl+C),其他追随者会立即发起竞选,并选出新的领导者。

5. 高可用性与无脑裂的保障

我们实现的基于 etcd 租约的选主机制,在设计上已经充分考虑了高可用性和无脑裂问题。

5.1 高可用性

  • etcd 集群本身的高可用:etcd 是一个分布式系统,通常部署为多节点集群(推荐奇数节点,如 3 或 5 个)。它采用 Raft 协议保证数据的一致性和高可用性。即使部分 etcd 节点发生故障,只要多数节点仍然存活(形成法定人数 Quorum),etcd 服务就能继续对外提供服务。
  • 快速故障检测与恢复
    • 租约 TTL:领导者持有的租约有明确的生命周期。如果领导者崩溃或因网络问题无法续期,其租约会在 TTL 到期后自动失效,etcd 会自动删除对应的领导者键。
    • Watch 机制:追随者通过 etcd.Watch 机制实时监控领导者键。一旦键被删除或更新(表示领导者失效或更替),追随者会立即收到通知,并迅速进入竞选状态。这显著缩短了故障恢复时间(MTTR)。
    • 并发竞选:多个追随者会同时尝试竞选,etcd 的事务机制保证了只有一个能成功。这确保了在旧领导者失效后,新领导者能尽快被选出。

5.2 无脑裂风险

无脑裂是分布式系统选主机制中最重要的安全属性之一。基于 etcd 租约的方案从根本上消除了脑裂的可能性:

  • 原子性的 Compare-And-Swap (CAS):这是防止脑裂的核心。le.client.Txn(...).If(clientv3.Compare(clientv3.Version(le.electionKey), "=", 0)).Then(...) 这段代码确保了只有当 electionKey 不存在时,当前节点才能成功写入自己的信息并成为领导者。如果 electionKey 已经存在(说明已有领导者),其他节点就无法通过这个事务成功注册。etcd 的 Raft 协议保证了事务的原子性和线性一致性,即所有 etcd 节点对 electionKey 的状态都有一个一致的视图。
  • 租约的强制过期:即使一个旧的领导者因为网络分区等原因,错误地认为自己仍然是领导者,但它无法与 etcd 集群的多数节点通信并续期租约。一旦租约过期,etcd 会自动删除其领导者键。此时,新的领导者可以在 etcd 中成功注册。旧领导者如果尝试执行领导者操作(例如写入数据),由于其租约已过期,其写入操作将失败或被拒绝,从而实现了“自我隔离”(Self-Fencing)。
  • etcd 法定人数(Quorum):etcd 集群只有在多数节点存活并达成共识时才能进行写入操作。如果网络分区导致领导者处于少数派分区,它将无法与多数派 etcd 节点通信,也就无法续期租约。最终,它的租约会过期,领导权丧失。而多数派分区中的节点可以继续正常选举新的领导者。这从 etcd 层面保证了即使网络分区,也只有一个有效领导者。

6. 进阶考量与权衡

虽然基于 etcd 租约的选主机制相对简单和高效,但在实际生产环境中,仍有一些进阶考量和权衡:

  • 租约时长(Lease TTL)的设定
    • TTL 过短:会导致频繁的租约续期请求,增加 etcd 和网络的负载。同时,如果领导者在短时间内因网络抖动无法续期,可能过早地失去领导权,导致不必要的领导者切换。
    • TTL 过长:在领导者真正崩溃时,需要等待更长的时间才能检测到其失效并选举出新的领导者,这会增加系统的不可用时间。
    • 权衡:通常需要根据系统的容忍度和恢复时间目标来选择。renewInterval 通常设置为 leaseTTL / 3leaseTTL / 2,以确保在 TTL 到期前有足够的续期机会。
  • 网络分区处理:etcd 自身的法定人数机制很好地处理了网络分区。但如果 etcd 集群本身无法形成法定人数(例如,3 节点集群有两个节点同时故障),那么在 etcd 恢复之前,将无法进行任何写入操作,包括选主。这意味着在 etcd 集群彻底失效的情况下,我们的选主服务也将无法工作,这是分布式系统固有的可用性-一致性权衡。
  • 时钟同步问题:尽管 etcd 内部的租约管理是基于 etcd 服务器的内部时钟,但客户端在判断“现在是否应该发起竞选”时,会依赖于本地时钟。如果客户端节点之间存在严重的本地时钟漂移,可能会影响其对领导者失效的感知时间,但 etcd 的原子性保证了最终只有一个领导者。
  • 领导者任务的幂等性:尽管选主机制保证了单一领导者,但在领导者切换的瞬间,尤其是旧领导者在租约过期前(但已经无法续期)执行了最后一些操作,新领导者又立即接管并执行了相同的操作,可能会导致任务重复。因此,领导者执行的核心业务逻辑通常需要设计成幂等的,即多次执行相同操作产生的结果与一次执行相同。
  • 资源消耗:每个参与选主的节点都会与 etcd 保持一个连接,并在追随者状态下维持一个 Watch。领导者会进行频繁的 KeepAlive 请求。在拥有大量节点的集群中,需要评估 etcd 的性能和网络带宽是否能支持。

7. 对基于租约选主的展望

基于租约的选主机制因其简洁、高效和可靠性,在许多分布式系统中得到了广泛应用。它非常适合那些不需要全分布式共识(如状态机复制)的场景,而仅仅需要一个单一协调者来简化系统设计。

例如,Kubernetes 中的控制器管理器(Controller Manager)就使用 etcd 实现了基于租约的选主,确保同一时间只有一个控制器实例在活跃,避免了控制器之间的冲突。

未来,随着分布式系统复杂性的增加,对更灵活、更智能的选主机制的需求可能会出现。例如,结合服务网格和可观测性工具,实现更细粒度的故障检测和更平滑的领导者切换。然而,在大多数场景下,基于 etcd 租约的选主仍然是一个坚实且可靠的选择。

总结

今天我们探讨了基于租约的领导者选举机制,并利用 Go 语言与 etcd 实现了一个具备高可用性且无脑裂风险的选主服务。通过 etcd 的 Lease、Txn 和 Watch API,我们能够构建一个健壮的选主方案,确保分布式系统中始终只有一个有效领导者,并在故障发生时能够快速恢复。这种模式在简化分布式系统复杂性和提高其韧性方面发挥着关键作用。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注