gRPC 负载均衡算法:Go 服务发现中一致性哈希与平滑加权轮询的实现
在现代分布式系统中,微服务架构已成为主流。随着服务数量的爆炸式增长,如何高效、可靠地将请求分发到后端服务实例,成为了系统设计中不可或缺的一环。负载均衡作为解决这一问题的核心技术,其重要性不言而喻。gRPC,作为一种高性能、开源的通用 RPC 框架,以其基于 HTTP/2、Protocol Buffers 和多种语言支持的特性,在微服务通信中占据了重要地位。本文将深入探讨 gRPC 中的负载均衡机制,并重点介绍如何在 Go 语言中实现两种高级负载均衡算法:一致性哈希(Consistent Hashing)和平滑加权轮询(Smooth Weighted Round Robin, SWRR),并将它们与服务发现机制相结合。
I. 引言:gRPC 与服务发现中的负载均衡基石
在分布式系统中,负载均衡(Load Balancing)是将网络流量分散到多个服务器或服务实例的过程,旨在优化资源利用、最大化吞吐量、最小化响应时间,并避免任何单个资源过载。它不仅是提升系统性能和可伸缩性的关键,更是确保高可用性和故障恢复能力的重要手段。
gRPC 是 Google 开发的现代 RPC 框架,它利用 HTTP/2 作为传输协议,Protocol Buffers 作为接口描述语言,并支持多种编程语言。gRPC 的设计使其天然适用于微服务架构,尤其是在服务间通信中表现出色。然而,高性能的 RPC 框架本身并不能解决所有分布式系统的挑战,例如如何有效地选择一个健康的后端服务实例来处理请求,这就需要负载均衡发挥作用。
服务发现(Service Discovery)是负载均衡的前置条件。在动态变化的微服务环境中,服务实例的启动、停止、扩容和缩容是常态。服务发现机制负责注册和查询这些服务实例的网络位置,使得客户端无需硬编码服务地址,从而实现服务的动态管理和弹性伸缩。常见的服务发现工具有 Consul、Etcd、ZooKeeper 或 Kubernetes 内置的服务发现机制。
gRPC 支持客户端侧负载均衡。这意味着客户端负责维护可用后端服务的列表,并根据特定的负载均衡策略选择一个后端发起请求。相较于代理侧(或服务端侧)负载均衡(例如使用 Nginx、HAProxy 或 Envoy 等代理),客户端侧负载均衡可以减少一跳网络延迟,并允许客户端根据自己的特定需求选择更细粒度的负载均衡策略。本文将重点聚焦于 gRPC 的客户端侧负载均衡实现。
本文的目标是:
- 深入理解 gRPC 负载均衡的核心架构和可插拔机制。
- 详细阐述一致性哈希算法的原理、优势及 Go 语言实现,并演示其在 gRPC 中的应用,特别是在需要会话粘性或缓存路由场景下的价值。
- 详细阐述平滑加权轮询算法的原理、优势及 Go 语言实现,并演示其在 gRPC 中的应用,以实现更平滑、更公平的按权重分发。
- 展示如何将服务发现与自定义 gRPC 负载均衡器相结合,构建一个完整的客户端侧负载均衡解决方案。
通过本文的探讨,读者将能够掌握在 Go 语言中使用 gRPC 实现高级负载均衡算法的实践知识,从而更好地设计和构建高性能、高可用的分布式系统。
II. gRPC 负载均衡的核心机制与可插拔架构
gRPC 客户端负载均衡的核心在于其高度可插拔的架构。它将地址解析、连接管理和请求路由决策这三个主要功能进行了清晰的分离,并通过一系列接口暴露给开发者,允许我们实现自定义的行为。
A. gRPC 负载均衡组件概览
gRPC 负载均衡主要涉及以下几个核心组件:
-
grpc.Resolver(地址解析器):- 负责将目标名称(例如
my-service.default.svc.cluster.local或consul://my-service)解析为一组可用的后端网络地址(net.Addr)。 - 它会监听服务发现系统的变化,并在后端地址列表发生增删改时通知 gRPC 客户端。
Resolver是连接服务发现系统与 gRPC 客户端的桥梁。
- 负责将目标名称(例如
-
grpc.Balancer(负载均衡器):- 接收
Resolver提供的后端地址列表,并负责管理到这些后端服务实例的连接(SubConn)。 - 它会监控这些
SubConn的连接状态(例如连接中、就绪、瞬时失败、关闭),并将这些状态报告给 gRPC 客户端。 - 当后端地址列表或连接状态发生变化时,
Balancer会创建一个新的Picker。
- 接收
-
SubConn(子连接):SubConn是Balancer管理的到单个后端服务实例的抽象连接。Balancer通过SubConn接口来控制和获取连接的状态。
-
Picker(选择器):Picker是负载均衡算法的真正实现者。每次 gRPC 客户端需要发送请求时,它会向Picker请求一个可用的SubConn来处理该请求。Picker根据其内部实现的负载均衡逻辑(例如轮询、一致性哈希、SWRR)从所有就绪的SubConn中选择一个。Picker应该是无状态的(对于请求本身),因为Balancer会在每次后端状态变化时创建一个新的Picker。
-
grpc.BalancerBuilder(负载均衡器构建器):grpc.BalancerBuilder是用于创建grpc.Balancer实例的工厂接口。- gRPC 客户端通过
grpc.WithBalancer选项注册和使用BalancerBuilder。
-
grpc.PickerBuilder(选择器构建器):- 虽然
PickerBuilder并不是 gRPC 暴露的顶级接口,但在Balancer内部,通常会有一个逻辑来根据当前就绪的SubConn列表创建一个新的Picker实例。我们可以将其视为Balancer内部的逻辑组件。
- 虽然
这些组件共同协作,构建了一个灵活且强大的客户端侧负载均衡框架。其工作流程大致如下:
- gRPC 客户端启动时,会根据配置的
Resolver名称,找到并实例化对应的Resolver。 Resolver开始解析目标名称,并定期或实时地从服务发现系统获取后端服务地址列表。Resolver将地址列表传递给Balancer。Balancer根据新的地址列表创建或销毁SubConn,并尝试建立连接。Balancer监控SubConn的连接状态。当有SubConn变为就绪状态时,Balancer会创建一个新的Picker实例,其中包含了所有就绪的SubConn。- 每次客户端发起 RPC 请求时,它会调用当前
Picker的Pick方法,获取一个就绪的SubConn来发送请求。
B. gRPC 客户端如何注册和使用自定义 Balancer
要使用自定义的负载均衡器,gRPC 客户端需要通过 grpc.Dial 函数的选项进行配置。
package main
import (
"context"
"fmt"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
pb "your/proto/package" // 假设这是一个通用的 proto 包
)
const (
// 定义自定义 resolver 和 balancer 的名称
customResolverScheme = "myresolver"
customBalancerName = "mybalancer"
)
// 注册自定义 resolver 和 balancer
func init() {
resolver.Register(&customResolverBuilder{}) // 注册自定义 resolver
balancer.Register(&customBalancerBuilder{}) // 注册自定义 balancer
}
func main() {
// 客户端连接选项
// 注意:这里的 target 格式是 "scheme:///service_name"
// 例如 "myresolver:///my-service"
conn, err := grpc.Dial(
fmt.Sprintf("%s:///my-service", customResolverScheme), // 使用自定义 resolver scheme
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, customBalancerName)), // 指定自定义 balancer
grpc.WithInsecure(), // 禁用传输层安全,仅用于演示
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewGreeterClient(conn) // 假设 GreeterClient 是一个示例服务
for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// 在这里,你可以通过 context.WithValue 或 metadata 来传递一致性哈希的 key
// 例如:ctx = metadata.AppendToOutgoingContext(ctx, "consistent-hash-key", "user-id-123")
r, err := client.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world-%d", i)})
cancel()
if err != nil {
log.Printf("could not greet: %v", err)
continue
}
log.Printf("Greeting: %s", r.GetMessage())
time.Sleep(100 * time.Millisecond)
}
}
// --------------------------------------------------------------------------------
// 以下是自定义 resolver 和 balancer 的骨架,具体实现将在后续章节展开
// customResolverBuilder 实现了 resolver.Builder 接口
type customResolverBuilder struct{}
func (b *customResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
log.Printf("Building custom resolver for target: %+v", target)
// 实际生产中,这里会连接到服务发现系统,如 Consul、Etcd 或 K8s API
// 模拟返回一些固定的后端地址
addrs := []resolver.Address{
{Addr: "localhost:50051", Attributes: nil}, // Attributes 可以携带权重等元数据
{Addr: "localhost:50052", Attributes: nil},
{Addr: "localhost:50053", Attributes: nil},
}
cc.UpdateState(resolver.State{Addresses: addrs})
return &customResolver{cc: cc}, nil
}
func (b *customResolverBuilder) Scheme() string {
return customResolverScheme
}
// customResolver 实现了 resolver.Resolver 接口
type customResolver struct {
cc resolver.ClientConn
}
func (r *customResolver) ResolveNow(options resolver.ResolveNowOptions) {
log.Println("ResolveNow called, updating addresses...")
// 实际生产中,这里会重新查询服务发现系统
// 暂时不做任何更新,模拟一个静态的地址列表
}
func (r *customResolver) Close() {
log.Println("customResolver closed")
}
// customBalancerBuilder 实现了 balancer.Builder 接口
type customBalancerBuilder struct{}
func (b *customBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
log.Printf("Building custom balancer: %s", customBalancerName)
return &customBalancer{cc: cc}
}
func (b *customBalancerBuilder) Name() string {
return customBalancerName
}
// customBalancer 实现了 balancer.Balancer 接口
type customBalancer struct {
cc balancer.ClientConn
// 其他状态,例如 SubConn 映射,Picker 等
}
func (b *customBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
log.Printf("Balancer received new client conn state: %+v", state)
// 在这里根据 state.ResolverState.Addresses 创建或更新 SubConn
// 并根据新的 SubConn 状态创建 Picker
b.cc.UpdateState(balancer.State{
ConnectivityState: balancer.Ready, // 假设总是 Ready
Picker: &passthroughPicker{}, // 暂时使用一个简单的 Picker
})
return nil
}
func (b *customBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
log.Printf("SubConn state updated for %p: %+v", subConn, state)
// 处理 SubConn 状态变化,例如从 Connecting 到 Ready
// 这可能需要重新创建 Picker
}
func (b *customBalancer) Close() {
log.Println("customBalancer closed")
}
// passthroughPicker 是一个简单的 Picker,用于占位
type passthroughPicker struct{}
func (p *passthroughPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// 实际的 Picker 会从可用的 SubConn 中选择一个
// 这里只是一个简单的占位符,不进行实际的选择
return balancer.PickResult{
// SubConn: ...,
// Done: ...,
}, balancer.ErrNoSubConnAvailable // 模拟没有可用的 SubConn
}
上述代码提供了一个 gRPC 客户端配置自定义 Resolver 和 Balancer 的框架。接下来的章节将填充 customResolver 和 customBalancer 的具体实现,以支持一致性哈希和平滑加权轮询。
III. 经典负载均衡算法回顾
在深入探讨一致性哈希和平滑加权轮询之前,我们先简要回顾几种经典的负载均衡算法,它们是理解更复杂算法的基础。
A. 轮询 (Round Robin)
- 原理: 依次将请求分发给每个后端服务器。例如,如果有三个服务器 A、B、C,请求会依次发送给 A、B、C、A、B、C…
- 优点: 实现简单,能够保证请求在所有服务器之间均匀分布(假设请求处理时间相似)。
- 缺点: 不考虑服务器的实际处理能力(例如配置、负载、健康状况),如果服务器性能不均,可能导致低性能服务器过载。
B. 随机 (Random)
- 原理: 随机选择一个后端服务器来处理请求。
- 优点: 实现简单,在大量请求下,也能实现相对均匀的分布。
- 缺点: 与轮询类似,不考虑服务器的实际处理能力。在请求量较小或分布不均匀时,可能出现短时间内的请求倾斜。
C. 加权轮询 (Weighted Round Robin, WRR)
- 原理: 在轮询的基础上引入权重,权重越高的服务器被选中的概率越大或次数越多。例如,服务器 A、B、C 的权重分别为 5、1、1,则请求可能按 A、A、A、A、A、B、C、A、A、A、A、A、B、C 的顺序分发。
- 优点: 能够根据服务器的性能差异进行更合理的请求分配,高权重服务器处理更多请求。
- 缺点: 传统的加权轮询实现(例如直接重复高权重服务器)在短时间内可能导致请求过于集中到高权重服务器,造成瞬时负载不均衡。平滑加权轮询正是为了解决这一问题而生。
这些经典算法为我们理解更高级的负载均衡策略奠定了基础。一致性哈希解决了特定请求路由到特定后端的问题,而平滑加权轮询则优化了加权分发时的平滑性。
IV. 深入剖析:一致性哈希 (Consistent Hashing) 在 gRPC 中的应用
一致性哈希是一种特殊的哈希技术,它旨在解决分布式系统中节点动态增减时,尽量减少数据迁移的问题。在 gRPC 负载均衡中,它常用于需要会话粘性、缓存路由或特定请求路由到特定后端的场景。
A. 问题背景与一致性哈希的优势
传统的哈希(例如 hash(key) % N,其中 N 是服务器数量)在服务器数量 N 发生变化时,几乎所有键的映射都会改变,导致大量数据需要迁移,或客户端需要重新发现服务。这对于以下场景是不可接受的:
- 会话粘性 (Session Affinity): 用户会话通常需要保持与特定后端服务器的连接,以维护状态。如果后端服务器频繁更换,用户体验会受到影响。
- 缓存路由 (Cache Routing): 当使用分布式缓存(如 Redis Cluster)时,客户端需要根据缓存键将请求路由到存储该键的特定缓存节点。
- 特定服务路由: 某些业务逻辑要求特定的请求(例如针对某个用户 ID 或订单 ID 的请求)总是由同一个后端服务实例处理,以简化状态管理或数据一致性。
一致性哈希通过构建一个“哈希环”来解决这些问题。
B. 基本原理
- 哈希环 (Hash Ring): 想象一个环形的整数空间,通常是 $0$ 到 $2^{32}-1$ 或 $0$ 到 $2^{64}-1$。
- 节点映射: 将每个后端服务实例(节点)通过哈希函数映射到环上的一个或多个点。
- 键映射: 将每个请求的键(例如用户 ID、会话 ID)也通过相同的哈希函数映射到环上的一个点。
- 请求路由: 对于一个给定的键,沿着哈希环顺时针查找,遇到的第一个节点就是负责处理该键的服务器。
节点动态伸缩的影响:
- 添加节点: 当一个新节点加入时,它只会影响环上它逆时针方向到其前一个节点之间的键的映射,这些键现在会路由到新节点。其他大部分键的映射保持不变。
- 移除节点: 当一个节点被移除时,它所负责的所有键会顺时针转移到环上的下一个可用节点。同样,只影响受影响节点负责的键。
这种机制大大减少了节点增减时键的重新映射数量,从而降低了数据迁移或重新建立连接的成本。
C. 虚拟节点 (Virtual Nodes / Replicas)
在实际应用中,如果后端服务实例数量较少,或者哈希函数分布不均,可能会导致环上的节点分布不均匀,从而使某些节点承载过多请求。为了解决这个问题,引入了虚拟节点的概念。
每个物理节点可以映射到环上的多个虚拟节点。例如,一个物理服务器 Server A 可以对应 Server A-01, Server A-02, Server A-03 等多个虚拟节点。这些虚拟节点会被独立地哈希到环上。
虚拟节点的优势:
- 更好的负载均衡: 虚拟节点越多,环上的节点分布越均匀,负载均衡的效果越好。
- 更小的影响范围: 当一个物理节点失效时,它对应的所有虚拟节点都会被移除。这些虚拟节点负责的键会均匀地分散到环上其他物理节点的虚拟节点上,从而进一步平滑了节点失效对系统整体负载的影响。
D. Go 语言实现细节
在 Go 语言中实现一致性哈希,我们需要考虑以下几个方面:
- 哈希函数: 选择一个高效且分布均匀的哈希函数。
fnv.New64a或crc32都是不错的选择。对于字符串键,也可以先进行 MD5 或 SHA1 哈希,再将结果转换为uint64。 - 哈希环的数据结构:
- 环上的节点哈希值需要是有序的,以便快速查找。Go 语言的
[]uint64切片,配合sort.Search进行二分查找,是一个高效且简洁的实现方式。 - 需要一个映射来存储哈希值到实际节点名称的对应关系。
- 环上的节点哈希值需要是有序的,以便快速查找。Go 语言的
- 虚拟节点管理: 在添加物理节点时,生成多个虚拟节点,并将其哈希值和物理节点名称存储到环中。
下面是一个 Go 语言一致性哈希的独立实现示例:
package consistenthash
import (
"hash/crc32"
"sort"
"strconv"
"sync"
)
// Hash defines the function to hash bytes to uint32
type Hash func(data []byte) uint32
// ConsistentHash represents the consistent hashing ring
type ConsistentHash struct {
hash Hash
replicas int // 虚拟节点数量
keys []uint32 // sorted hash values of virtual nodes
hashMap map[uint32]string // hash value to actual node name mapping
mu sync.RWMutex
}
// NewConsistentHash creates a new ConsistentHash ring with the specified number of replicas.
// If fn is nil, crc32.ChecksumIEEE is used.
func NewConsistentHash(replicas int, fn Hash) *ConsistentHash {
if fn == nil {
fn = crc32.ChecksumIEEE
}
return &ConsistentHash{
hash: fn,
replicas: replicas,
hashMap: make(map[uint32]string),
}
}
// IsEmpty returns true if there are no items in the hash.
func (c *ConsistentHash) IsEmpty() bool {
return len(c.keys) == 0
}
// Add adds a list of nodes to the hash ring.
func (c *ConsistentHash) Add(nodes ...string) {
c.mu.Lock()
defer c.mu.Unlock()
for _, node := range nodes {
for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(node + strconv.Itoa(i))) // Add replica index to node name
c.keys = append(c.keys, hash)
c.hashMap[hash] = node
}
}
sort.Slice(c.keys, func(i, j int) bool {
return c.keys[i] < c.keys[j]
})
}
// Remove removes a node from the hash ring.
func (c *ConsistentHash) Remove(node string) {
c.mu.Lock()
defer c.mu.Unlock()
var newKeys []uint32
for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(node + strconv.Itoa(i)))
delete(c.hashMap, hash)
}
// Rebuild keys slice without the removed node's hashes
for _, k := range c.keys {
// Check if the hash's corresponding node name still exists in hashMap
// This is important because multiple virtual nodes can hash to the same value
// though unlikely with good hash functions. More robust: check if the node name
// for this hash value is NOT the one we are trying to remove.
if c.hashMap[k] != node {
newKeys = append(newKeys, k)
}
}
c.keys = newKeys
sort.Slice(c.keys, func(i, j int) bool {
return c.keys[i] < c.keys[j]
})
}
// Get gets the closest node in the hash to the provided key.
func (c *ConsistentHash) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
if c.IsEmpty() {
return ""
}
hash := c.hash([]byte(key))
// Binary search for the first key in c.keys that is >= hash
idx := sort.Search(len(c.keys), func(i int) bool {
return c.keys[i] >= hash
})
// If we're at the end of the slice, loop back to the beginning
if idx == len(c.keys) {
idx = 0
}
return c.hashMap[c.keys[idx]]
}
代码解释:
ConsistentHash结构体维护了哈希函数、虚拟节点数量、排序的哈希键切片keys和哈希值到节点名称的映射hashMap。Add方法将节点添加到环中,为每个节点生成replicas个虚拟节点,并计算它们的哈希值,然后将这些哈希值及其对应的物理节点名称存储起来,并对keys切片进行排序。Remove方法从环中移除一个节点,需要找到并删除该节点所有虚拟节点的哈希值和映射。Get方法根据请求键计算哈希值,然后通过二分查找 (sort.Search) 在keys切片中找到第一个大于或等于该哈希值的虚拟节点。如果找不到(即键的哈希值大于所有虚拟节点的哈希值),则环绕到第一个虚拟节点。sync.RWMutex用于保护并发访问keys和hashMap时的线程安全。
E. 集成到 gRPC Picker
现在,我们将上述一致性哈希逻辑集成到 gRPC 的 Picker 中。关键在于如何从 gRPC 请求的 context 中获取用于哈希的键。通常,这可以通过 metadata 或 context.WithValue 来实现。
我们假设客户端会将哈希键通过 metadata 传递,键名为 consistent-hash-key。
package main
// ... (省略之前的 imports 和 init 函数)
import (
"context"
"fmt"
"log"
"sort"
"strconv"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"github.com/your-org/your-repo/consistenthash" // 导入我们上面实现的 consistenthash 包
pb "your/proto/package" // 假设这是一个通用的 proto 包
)
const (
// ... (之前的常量)
consistentHashBalancerName = "consistent_hash_lb"
consistentHashKeyMetadata = "consistent-hash-key" // 定义 metadata key
)
func init() {
resolver.Register(&customResolverBuilder{})
// 注册一致性哈希负载均衡器
balancer.Register(base.NewBalancerBuilderV2(consistentHashBalancerName, &consistentHashPickerBuilder{}, base.Config{HealthCheck: true}))
}
func main() {
// ... (之前的 main 函数内容)
conn, err := grpc.Dial(
fmt.Sprintf("%s:///my-service", customResolverScheme),
// 使用一致性哈希负载均衡器
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, consistentHashBalancerName)),
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewGreeterClient(conn)
// 演示一致性哈希
keys := []string{"user-A", "user-B", "user-C", "user-D", "user-E", "user-A"} // user-A 会两次路由到同一台服务器
for i, key := range keys {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// 通过 metadata 传递一致性哈希的 key
ctx = metadata.AppendToOutgoingContext(ctx, consistentHashKeyMetadata, key)
r, err := client.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("%s-%d", key, i)})
cancel()
if err != nil {
log.Printf("could not greet for key %s: %v", key, err)
continue
}
log.Printf("Greeting for key %s: %s", key, r.GetMessage())
time.Sleep(100 * time.Millisecond)
}
}
// --------------------------------------------------------------------------------
// customResolverBuilder, customResolver 保持不变,但需要修改其返回的地址属性以传递权重
// 模拟的 customResolverBuilder 可以在这里传递后端元数据,例如权重
// 为了简化,我们假设 consistentHash不需要权重,但 SWRR 需要。
// 在这里,我们将使用一个更通用的方式来定义后端信息
type BackendInfo struct {
Addr string
Weight int // 权重,用于 SWRR
}
// customResolverBuilder 实现了 resolver.Builder 接口
type customResolverBuilder struct{}
func (b *customResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
log.Printf("Building custom resolver for target: %+v", target)
// 模拟返回一些固定的后端地址及其元数据 (例如权重)
backendInfos := []BackendInfo{
{Addr: "localhost:50051", Weight: 5},
{Addr: "localhost:50052", Weight: 3},
{Addr: "localhost:50053", Weight: 2},
}
addrs := make([]resolver.Address, len(backendInfos))
for i, info := range backendInfos {
attr := resolver.Set (nil)
attr = attr.WithValue("weight", info.Weight) // 将权重作为属性传递
addrs[i] = resolver.Address{Addr: info.Addr, Attributes: attr}
}
cc.UpdateState(resolver.State{Addresses: addrs})
return &customResolver{cc: cc}, nil
}
// customResolver 和 customBalancerBuilder 保持与之前类似的骨架,但我们现在使用 base.NewBalancerBuilderV2
// 所以不再需要单独实现 customBalancerBuilder 和 customBalancer 的骨架。
// 而是实现 PickerBuilder 接口。
// consistentHashPickerBuilder 实现了 base.PickerBuilder 接口
type consistentHashPickerBuilder struct{}
func (b *consistentHashPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
log.Printf("consistentHashPickerBuilder: Build called with %d subconns", len(info.ReadySCs))
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
ch := consistenthash.NewConsistentHash(100, nil) // 100个虚拟节点
subConnMap := make(map[string]balancer.SubConn)
for sc, scInfo := range info.ReadySCs {
addr := scInfo.Address.Addr
ch.Add(addr) // 将后端地址作为节点添加到一致性哈希环
subConnMap[addr] = sc // 映射地址到 SubConn
}
return &consistentHashPicker{
consistentHash: ch,
subConnMap: subConnMap,
}
}
// consistentHashPicker 实现了 balancer.Picker 接口
type consistentHashPicker struct {
consistentHash *consistenthash.ConsistentHash
subConnMap map[string]balancer.SubConn // 实际地址到 SubConn 的映射
}
func (p *consistentHashPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// 从 context 中获取一致性哈希的 key
md, ok := metadata.FromOutgoingContext(info.Ctx)
if !ok {
return balancer.PickResult{}, fmt.Errorf("consistent hash key not found in metadata")
}
keys := md.Get(consistentHashKeyMetadata)
if len(keys) == 0 {
return balancer.PickResult{}, fmt.Errorf("consistent hash key missing from metadata")
}
hashKey := keys[0]
// 根据 hashKey 选择后端地址
selectedAddr := p.consistentHash.Get(hashKey)
if selectedAddr == "" {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// 获取对应的 SubConn
sc, ok := p.subConnMap[selectedAddr]
if !ok {
// 理论上不应该发生,除非 consistentHash 环中的节点不在 subConnMap 中
return balancer.PickResult{}, balancer.Errorf(balancer.TransientFailure, "selected subconn %s not found in map", selectedAddr)
}
log.Printf("Consistent Hash: key '%s' routed to '%s'", hashKey, selectedAddr)
return balancer.PickResult{SubConn: sc}, nil
}
代码解释:
- 我们注册了一个名为
consistent_hash_lb的BalancerBuilder,它使用了 gRPC 提供的base.NewBalancerBuilderV2辅助函数,该函数简化了Balancer的实现,我们只需要提供一个PickerBuilder。 consistentHashPickerBuilder在Build方法中接收base.PickerBuildInfo,其中包含了所有就绪的SubConn。它会创建一个consistenthash.ConsistentHash实例,并将所有就绪的后端地址作为节点添加到哈希环中。consistentHashPicker的Pick方法是核心。它首先从info.Ctx中提取consistentHashKeyMetadata。然后,使用这个键调用p.consistentHash.Get(hashKey)来获取对应的后端地址,并从p.subConnMap中查找对应的SubConn。- 客户端通过
metadata.AppendToOutgoingContext(ctx, consistentHashKeyMetadata, key)将哈希键传递给 RPC 请求。
现在,我们有了一个功能完整的一致性哈希负载均衡器。
V. 深入剖析:平滑加权轮询 (Smooth Weighted Round Robin, SWRR) 在 gRPC 中的应用
平滑加权轮询(Smooth Weighted Round Robin, SWRR)是一种比传统加权轮询更优秀的算法,它旨在实现按权重比例的请求分发,同时避免在短时间内将大量请求集中到高权重服务器,从而使负载分发更加平滑和均匀。Nginx 的负载均衡模块中就采用了这种算法。
A. 问题背景与 SWRR 的优势
传统的加权轮询可能很简单地通过重复高权重服务器的条目来实现。例如,A(5), B(1), C(1) 的权重,可能按 A, A, A, A, A, B, C 的顺序调度。这在短时间内会导致 A 服务器连续处理大量请求,可能造成瞬时压力过大。
SWRR 旨在解决这个问题,它通过动态调整每个服务器的“当前权重”来实现更平滑的调度,使得在任何时间点,被选择的服务器都尽可能地平衡了它的权重和已经处理的请求量。
B. 算法原理
SWRR 算法的核心思想是为每个后端服务器维护三个权重值:
weight(静态权重): 配置的原始权重,表示服务器的相对处理能力。current_weight(当前权重): 动态变化的权重,每次选择服务器时会更新。effective_weight(有效权重): 用于动态调整current_weight,初始等于weight,但会在服务器出现连接失败时暂时降低,成功时缓慢恢复,以避免将请求发送给有问题但尚未完全标记为不健康的服务器。
算法步骤:
对于每个请求,SWRR 算法会执行以下操作来选择服务器:
-
遍历所有可用服务器,对于每台服务器
Si:- 将
Si.current_weight加上Si.effective_weight。 - 如果
Si.effective_weight < Si.weight且Si状态良好,则Si.effective_weight缓慢增加(例如Si.effective_weight++)。 - 如果
Si出现连接失败或超时,则Si.effective_weight降低(例如Si.effective_weight--),但不能低于 0。
- 将
-
选择
current_weight最大的服务器Sj。 -
将选中的服务器
Sj.current_weight减去所有服务器的effective_weight之和 (total_effective_weight)。 -
返回服务器
Sj。
通过这种方式,current_weight 最大的服务器会被选中,但它的 current_weight 会被大幅度降低,使其在下一轮选择中不太可能再次被选中,从而给予其他服务器机会,实现加权但平滑的轮询。
举例:
假设有三台服务器 A, B, C,它们的 weight 分别为 5, 1, 1。
初始状态:
A: weight=5, current_weight=0, effective_weight=5
B: weight=1, current_weight=0, effective_weight=1
C: weight=1, current_weight=0, effective_weight=1
total_effective_weight = 5 + 1 + 1 = 7
第一轮请求:
- A:
current_weight= 0 + 5 = 5
B:current_weight= 0 + 1 = 1
C:current_weight= 0 + 1 = 1 - 选择 A (current_weight=5 最大)。
- A:
current_weight= 5 – 7 = -2
结果: A
第二轮请求:
- A:
current_weight= -2 + 5 = 3
B:current_weight= 1 + 1 = 2
C:current_weight= 1 + 1 = 2 - 选择 A (current_weight=3 最大)。
- A:
current_weight= 3 – 7 = -4
结果: A
第三轮请求:
- A:
current_weight= -4 + 5 = 1
B:current_weight= 2 + 1 = 3
C:current_weight= 2 + 1 = 3 - 选择 B 或 C (假设选择 B,current_weight=3)。
- B:
current_weight= 3 – 7 = -4
结果: B
… 以此类推。可以看到,尽管 A 的权重很高,但它不会连续被选中 5 次,而是会穿插其他服务器,使得分发更加平滑。
C. Go 语言实现细节
在 Go 中实现 SWRR,我们需要为每个后端服务器维护其状态:
package swrr
import (
"log"
"sync"
)
// Backend represents a backend server for SWRR.
type Backend struct {
Addr string
Weight int // Configured weight
CurrentWeight int // Current weight, dynamically adjusted
EffectiveWeight int // Effective weight, can be decreased on failures
}
// Balancer implements the Smooth Weighted Round Robin algorithm.
type Balancer struct {
backends []*Backend
mu sync.Mutex
}
// NewBalancer creates a new SWRR Balancer with given backends.
func NewBalancer(backendInfos []struct {
Addr string
Weight int
}) *Balancer {
backends := make([]*Backend, len(backendInfos))
for i, info := range backendInfos {
backends[i] = &Backend{
Addr: info.Addr,
Weight: info.Weight,
CurrentWeight: 0, // Initial current_weight is 0
EffectiveWeight: info.Weight,
}
}
return &Balancer{
backends: backends,
}
}
// Pick selects a backend using the SWRR algorithm.
func (b *Balancer) Pick() *Backend {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backends) == 0 {
return nil
}
totalEffectiveWeight := 0
for _, backend := range b.backends {
totalEffectiveWeight += backend.EffectiveWeight
}
var best *Backend
maxCurrentWeight := -1
for _, backend := range b.backends {
// Step 1: Add effective_weight to current_weight
backend.CurrentWeight += backend.EffectiveWeight
// Step 2: Update effective_weight (simplified for now, real-world would involve health checks)
// If backend is healthy, effective_weight should slowly recover to weight.
// If backend fails, effective_weight should decrease.
// For simplicity in this example, we assume all are healthy.
if backend.EffectiveWeight < backend.Weight {
backend.EffectiveWeight++ // Slowly recover
}
// Step 3: Find the backend with the largest current_weight
if backend.CurrentWeight > maxCurrentWeight {
maxCurrentWeight = backend.CurrentWeight
best = backend
}
}
if best == nil {
return nil // Should not happen if backends exist
}
// Step 4: Decrease the current_weight of the selected backend
best.CurrentWeight -= totalEffectiveWeight
log.Printf("SWRR: Picked %s (current_weight: %d, effective_weight: %d)", best.Addr, best.CurrentWeight, best.EffectiveWeight)
return best
}
// UpdateBackendHealth can be called to adjust effective weights based on health checks.
func (b *Balancer) UpdateBackendHealth(addr string, healthy bool) {
b.mu.Lock()
defer b.mu.Unlock()
for _, backend := range b.backends {
if backend.Addr == addr {
if healthy {
// If healthy, effective_weight should not be less than 0
if backend.EffectiveWeight < 0 {
backend.EffectiveWeight = 0
}
// Optionally, effective_weight can slowly recover to backend.Weight
// For simplicity, we just set it to Weight if it was previously reduced due to failures.
if backend.EffectiveWeight < backend.Weight {
backend.EffectiveWeight++
}
} else {
// If unhealthy, decrease effective_weight. Minimum 0.
backend.EffectiveWeight--
if backend.EffectiveWeight < 0 {
backend.EffectiveWeight = 0
}
}
log.Printf("SWRR: Backend %s health updated. Healthy: %v, New effective_weight: %d", addr, healthy, backend.EffectiveWeight)
return
}
}
}
// UpdateBackends updates the list of backends (e.g., from service discovery).
// This method should be called when resolver provides new addresses.
func (b *Balancer) UpdateBackends(backendInfos []struct {
Addr string
Weight int
}) {
b.mu.Lock()
defer b.mu.Unlock()
newBackendsMap := make(map[string]*Backend)
for _, info := range backendInfos {
newBackendsMap[info.Addr] = &Backend{
Addr: info.Addr,
Weight: info.Weight,
CurrentWeight: 0, // Reset current_weight for new backends or those whose weights changed
EffectiveWeight: info.Weight,
}
}
var updatedBackends []*Backend
for _, existingBackend := range b.backends {
if newBackend, ok := newBackendsMap[existingBackend.Addr]; ok {
// If backend still exists, preserve its current_weight and effective_weight
newBackend.CurrentWeight = existingBackend.CurrentWeight
newBackend.EffectiveWeight = existingBackend.EffectiveWeight
// If weight changed, update it.
newBackend.Weight = newBackend.Weight
updatedBackends = append(updatedBackends, newBackend)
delete(newBackendsMap, existingBackend.Addr) // Mark as processed
}
}
// Add new backends
for _, newBackend := range newBackendsMap {
updatedBackends = append(updatedBackends, newBackend)
}
b.backends = updatedBackends
log.Printf("SWRR: Backends updated. New list: %+v", updatedBackends)
}
代码解释:
Backend结构体存储了每个服务器的Addr、Weight、CurrentWeight和EffectiveWeight。Balancer结构体维护一个Backend切片。NewBalancer初始化CurrentWeight为 0,EffectiveWeight为Weight。Pick方法实现了 SWRR 的核心逻辑:- 计算
totalEffectiveWeight。 - 遍历所有
Backend,更新CurrentWeight和EffectiveWeight(简化了健康检查部分)。 - 找到
CurrentWeight最大的Backend。 - 将选中
Backend的CurrentWeight减去totalEffectiveWeight。
- 计算
UpdateBackendHealth方法模拟了健康检查对EffectiveWeight的影响。UpdateBackends方法用于处理服务发现带来的后端列表更新,它会尽量保留现有后端的状态,并添加/移除新的后端。
D. 集成到 gRPC Picker
现在,我们将 SWRR 逻辑集成到 gRPC 的 Picker 中。
package main
// ... (省略之前的 imports 和 init 函数)
import (
"context"
"fmt"
"log"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/resolver"
"github.com/your-org/your-repo/swrr" // 导入我们上面实现的 swrr 包
pb "your/proto/package"
)
const (
// ... (之前的常量)
swrrBalancerName = "swrr_lb"
)
func init() {
resolver.Register(&customResolverBuilder{})
// 注册一致性哈希负载均衡器 (如果需要)
balancer.Register(base.NewBalancerBuilderV2(consistentHashBalancerName, &consistentHashPickerBuilder{}, base.Config{HealthCheck: true}))
// 注册平滑加权轮询负载均衡器
balancer.Register(base.NewBalancerBuilderV2(swrrBalancerName, &swrrPickerBuilder{}, base.Config{HealthCheck: true}))
}
func main() {
// ... (之前的 main 函数内容)
// 演示 SWRR
log.Println("n--- Demonstrating Smooth Weighted Round Robin ---")
connSWRR, err := grpc.Dial(
fmt.Sprintf("%s:///my-service", customResolverScheme),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, swrrBalancerName)),
grpc.WithInsecure(),
)
if err != nil {
log.Fatalf("did not connect for SWRR: %v", err)
}
defer connSWRR.Close()
clientSWRR := pb.NewGreeterClient(connSWRR)
for i := 0; i < 20; i++ { // 发送更多请求以观察 SWRR 效果
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
r, err := clientSWRR.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("swrr-user-%d", i)})
cancel()
if err != nil {
log.Printf("could not greet for SWRR: %v", err)
continue
}
log.Printf("SWRR Greeting %d: %s", i, r.GetMessage())
time.Sleep(50 * time.Millisecond)
}
}
// --------------------------------------------------------------------------------
// customResolverBuilder 保持不变,它会通过 Attributes 传递 Weight
// swrrPickerBuilder 实现了 base.PickerBuilder 接口
type swrrPickerBuilder struct{}
func (b *swrrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
log.Printf("swrrPickerBuilder: Build called with %d subconns", len(info.ReadySCs))
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
// 收集后端信息及其权重
var backendInfos []struct {
Addr string
Weight int
}
subConnMap := make(map[string]balancer.SubConn)
for sc, scInfo := range info.ReadySCs {
addr := scInfo.Address.Addr
weightAttr := scInfo.Address.Attributes.Value("weight")
weight, ok := weightAttr.(int)
if !ok || weight <= 0 {
weight = 1 // 默认权重
log.Printf("Warning: Missing or invalid weight for %s, using default 1", addr)
}
backendInfos = append(backendInfos, struct {
Addr string
Weight int
}{Addr: addr, Weight: weight})
subConnMap[addr] = sc
}
swrrBalancer := swrr.NewBalancer(backendInfos)
return &swrrPicker{
swrrBalancer: swrrBalancer,
subConnMap: subConnMap,
}
}
// swrrPicker 实现了 balancer.Picker 接口
type swrrPicker struct {
swrrBalancer *swrr.Balancer
subConnMap map[string]balancer.SubConn // 实际地址到 SubConn 的映射
}
func (p *swrrPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
selectedBackend := p.swrrBalancer.Pick()
if selectedBackend == nil {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
sc, ok := p.subConnMap[selectedBackend.Addr]
if !ok {
return balancer.PickResult{}, balancer.Errorf(balancer.TransientFailure, "selected subconn %s not found in map", selectedBackend.Addr)
}
return balancer.PickResult{SubConn: sc}, nil
}
代码解释:
- 我们注册了一个名为
swrr_lb的BalancerBuilder。 swrrPickerBuilder在Build方法中,从base.PickerBuildInfo提供的SubConn中提取后端地址和权重(从scInfo.Address.Attributes获取)。然后使用这些信息创建一个swrr.Balancer实例。swrrPicker的Pick方法非常简单,它直接调用p.swrrBalancer.Pick()来获取选定的后端,然后从p.subConnMap中返回对应的SubConn。
至此,我们已经拥有了一致性哈希和平滑加权轮询两种自定义负载均衡器的 gRPC 集成方案。
VI. 服务发现与 Balancer 的协同工作
服务发现是负载均衡的基础。grpc.Resolver 接口定义了如何将服务名称解析为可用的后端地址列表,并将这些地址传递给 Balancer。
A. grpc.Resolver 的详细工作机制
一个 grpc.Resolver 的生命周期和职责如下:
- 注册: 在
init()函数中,通过resolver.Register(builder)注册resolver.Builder实例。 - 构建: 当 gRPC 客户端拨号(
grpc.Dial)一个使用该scheme的目标时,gRPC 会调用resolver.Builder的Build方法。Build方法接收resolver.Target(解析目标,如myresolver:///my-service) 和resolver.ClientConn(用于回调 gRPC 客户端的接口)。Build方法应返回一个resolver.Resolver实例。
- 地址更新:
resolver.Resolver负责:- 在启动时进行首次地址解析。
- 持续监听服务发现系统的变化(例如,通过轮询、watch 机制)。
- 当后端地址列表发生变化时,调用
resolver.ClientConn的UpdateState方法,将最新的resolver.State(包含地址列表Addresses和服务配置ServiceConfig) 传递给 gRPC 客户端。 Addresses可以包含Attributes,用于携带额外元数据,例如我们传递的后端权重。
- 按需解析: gRPC 客户端可能会调用
resolver.Resolver的ResolveNow方法,请求立即重新解析地址(例如,在遇到连接错误时)。 - 关闭: 当 gRPC 客户端连接关闭时,会调用
resolver.Resolver的Close方法,以便清理资源。
B. 如何编写一个自定义 Resolver
在前面的代码示例中,我们已经提供了一个 customResolverBuilder 和 customResolver 的骨架。现在,我们将对其进行扩展,使其能够模拟从服务发现系统获取地址和权重,并支持动态更新。
为了简化,我们不会真的去连接 Consul 或 Etcd,而是使用一个 Goroutine 模拟异步的服务发现更新。
package main
// ... (省略之前的 imports)
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"github.com/your-org/your-repo/consistenthash"
"github.com/your-org/your-repo/swrr"
pb "your/proto/package"
)
// ... (之前的常量)
// customResolverBuilder 实现了 resolver.Builder 接口
type customResolverBuilder struct{}
func (b *customResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
log.Printf("Building custom resolver for target: %+v", target)
r := &customResolver{
target: target,
cc: cc,
quit: make(chan struct{}),
rn: make(chan struct{}, 1),
}
go r.start() // 启动一个 Goroutine 模拟服务发现更新
return r, nil
}
func (b *customResolverBuilder) Scheme() string {
return customResolverScheme
}
// customResolver 实现了 resolver.Resolver 接口
type customResolver struct {
target resolver.Target
cc resolver.ClientConn
quit chan struct{}
rn chan struct{} // ResolveNow channel
mu sync.Mutex
backends map[string]BackendInfo // addr -> BackendInfo
}
func (r *customResolver) start() {
r.resolve() // 首次解析
ticker := time.NewTicker(5 * time.Second) // 模拟每 5 秒检查一次服务发现
defer ticker.Stop()
for {
select {
case <-r.quit:
log.Println("customResolver: watcher quit")
return
case <-ticker.C:
r.resolve() // 定期更新
case <-r.rn:
log.Println("customResolver: ResolveNow triggered")
r.resolve() // 立即更新
}
}
}
func (r *customResolver) resolve() {
// 模拟从服务发现系统获取地址和权重
// 动态改变后端列表,模拟服务上下线
currentBackends := map[string]BackendInfo{
"localhost:50051": {Addr: "localhost:50051", Weight: 5},
"localhost:50052": {Addr: "localhost:50052", Weight: 3},
"localhost:50053": {Addr: "localhost:50053", Weight: 2},
}
// 模拟随机增加或移除一个后端
if rand.Intn(10) < 3 { // 30% 概率发生变化
if rand.Intn(2) == 0 { // 50% 概率移除
if len(currentBackends) > 1 {
var addrToRemove string
for addr := range currentBackends {
addrToRemove = addr
break
}
log.Printf("--- Simulating backend %s going offline ---", addrToRemove)
delete(currentBackends, addrToRemove)
}
} else { // 50% 概率添加
newAddr := fmt.Sprintf("localhost:5005%d", 4+rand.Intn(2)) // 50054 or 50055
if _, exists := currentBackends[newAddr]; !exists {
log.Printf("--- Simulating new backend %s coming online ---", newAddr)
currentBackends[newAddr] = BackendInfo{Addr: newAddr, Weight: rand.Intn(4) + 1} // 随机权重 1-4
}
}
}
r.mu.Lock()
r.backends = currentBackends
r.mu.Unlock()
addrs := make([]resolver.Address, 0, len(currentBackends))
for _, info := range currentBackends {
attr := resolver.Set(nil)
attr = attr.WithValue("weight", info.Weight)
addrs = append(addrs, resolver.Address{Addr: info.Addr, Attributes: attr})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
log.Printf("customResolver: Updated state with %d addresses: %+v", len(addrs), addrs)
}
func (r *customResolver) ResolveNow(options resolver.ResolveNowOptions) {
select {
case r.rn <- struct{}{}:
default:
}
}
func (r *customResolver) Close() {
close(r.quit)
log.Println("customResolver closed")
}
// ... (后续的 PickerBuilder 和 Picker 实现不变)
代码解释:
customResolver内部新增了quit和rn两个 channel,用于控制startGoroutine 的生命周期和响应ResolveNow。startGoroutine 模拟了服务发现的定期轮询更新,并在每次更新时调用r.resolve()。resolve()方法现在会随机模拟后端服务的上线和下线,并更新resolver.ClientConn的状态。- 通过
resolver.Address.Attributes传递后端权重,这使得Balancer能够获取到这些元数据。
C. Balancer 如何响应地址更新和连接状态变化
当 resolver.ClientConn 调用 UpdateState 方法时,gRPC 客户端会将新的地址列表传递给 balancer.Balancer 的 UpdateClientConnState 方法。
在 base.NewBalancerBuilderV2 这种封装模式下,gRPC 已经处理了 SubConn 的创建和管理。我们只需要在 PickerBuilder 的 Build 方法中,接收 info.ReadySCs (就绪的 SubConn 列表) 来构建我们的 Picker。
当 SubConn 的连接状态发生变化(例如从 Connecting 到 Ready,或从 Ready 到 TransientFailure)时,gRPC 会调用 balancer.Balancer 的 UpdateSubConnState 方法。同样,base.NewBalancerBuilderV2 会自动处理这些状态变化,并在 Ready 状态的 SubConn 列表发生变化时,重新调用 PickerBuilder 的 Build 方法,生成一个新的 Picker。
这意味着,我们的 consistentHashPickerBuilder 和 swrrPickerBuilder 会在以下情况被调用来创建新的 Picker:
Resolver提供了新的地址列表。- 某个
SubConn的连接状态从非就绪变为就绪。 - 某个
SubConn从就绪变为非就绪。
每次 Build 方法被调用时,它会基于当前最新的就绪 SubConn 列表重新构建哈希环或 SWRR 调度器,确保负载均衡器始终使用最新的后端信息。
VII. 构建一个完整的 gRPC 自定义负载均衡器 (Go 语言实战)
为了演示上述概念,我们需要一个完整的 Go 语言示例,包括:
- gRPC 服务端:多个实例,每个实例可以标识自己。
- 自定义
Resolver:模拟服务发现,提供动态变化的后端地址和权重。 - 自定义
Balancer:基于base.NewBalancerBuilderV2,包含一致性哈希和 SWRR 两种Picker。 - gRPC 客户端:使用自定义
Resolver和Balancer发送请求。
我们将使用一个简单的 Greeter 服务来演示。
A. Protocol Buffers 定义
// proto/greeter.proto
syntax = "proto3";
option go_package = "your/proto/package";
package greeter;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
string server_id = 2; // 用于标识哪个服务器响应了请求
}
编译:protoc --go_out=. --go-grpc_out=. proto/greeter.proto
B. gRPC 服务端实现
我们将启动多个 gRPC 服务器实例,每个实例监听不同的端口,并返回自己的 ID。
// server/main.go
package main
import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"google.golang.org/grpc"
pb "your/proto/package" // 替换为你的 proto 包路径
)
// server 结构体,实现 GreeterServer 接口
type server struct {
pb.UnimplementedGreeterServer
id string
}
// SayHello 实现 GreeterServer 接口的 SayHello 方法
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Server %s received: %s", s.id, in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName(), ServerId: s.id}, nil
}
func main() {
// 启动三个服务器实例
ports := []int{50051, 50052, 50053, 50054, 50055} // 预留一些端口给动态上线
serverIDs := []string{"Server-A", "Server-B", "Server-C", "Server-D", "Server-E"}
for i := 0; i < 3; i++ { // 先启动3个
go startGRPCServer(fmt.Sprintf("localhost:%d", ports[i]), serverIDs[i])
time.Sleep(100 * time.Millisecond) // 错开启动时间
}
// 等待中断信号以优雅地关闭服务器
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
log.Println("Shutting down servers...")
}
func startGRPCServer(address string, id string) {
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{id: id})
log.Printf("Server %s listening on %s", id, address)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
C. 客户端、Resolver 和 Balancer 实现(整合在一起)
我们将前面章节中分散的客户端、Resolver 和 Balancer 代码整合到一个文件中,以便于运行和理解。
// client/main.go
package main
import (
"context"
"fmt"
"hash/crc32"
"log"
"math/rand"
"sort"
"strconv"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
pb "your/proto/package" // 替换为你的 proto 包路径
)
// --- Consistent Hash Implementation (from consistenthash package) ---
type Hash func(data []byte) uint32
type ConsistentHash struct {
hash Hash
replicas int
keys []uint32
hashMap map[uint32]string
mu sync.RWMutex
}
func NewConsistentHash(replicas int, fn Hash) *ConsistentHash {
if fn == nil {
fn = crc32.ChecksumIEEE
}
return &ConsistentHash{
hash: fn,
replicas: replicas,
hashMap: make(map[uint32]string),
}
}
func (c *ConsistentHash) IsEmpty() bool {
return len(c.keys) == 0
}
func (c *ConsistentHash) Add(nodes ...string) {
c.mu.Lock()
defer c.mu.Unlock()
for _, node := range nodes {
for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(node + strconv.Itoa(i)))
c.keys = append(c.keys, hash)
c.hashMap[hash] = node
}
}
sort.Slice(c.keys, func(i, j int) bool {
return c.keys[i] < c.keys[j]
})
}
func (c *ConsistentHash) Remove(node string) {
c.mu.Lock()
defer c.mu.Unlock()
var newKeys []uint32
for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(node + strconv.Itoa(i)))
delete(c.hashMap, hash)
}
for _, k := range c.keys {
// Only keep keys whose corresponding node is NOT the one being removed.
// Note: This assumes unique hash values for virtual nodes.
// A more robust check might involve iterating over hashMap values.
nodeFound := false
for _, v := range c.hashMap {
if v == node && c.hash([]byte(v + strconv.Itoa(0))) == k { // Simplified check for virtual node 0
nodeFound = true
break
}
}
if !nodeFound {
newKeys = append(newKeys, k)
}
}
// A more correct way to remove:
// Filter c.keys by checking if c.hashMap[key] matches the 'node' to remove.
// This would require a temporary map of what node each hash belongs to.
// For now, simpler: rebuild keys from hashMap after deleting.
c.keys = nil
for k := range c.hashMap {
c.keys = append(c.keys, k)
}
sort.Slice(c.keys, func(i, j int) bool {
return c.keys[i] < c.keys[j]
})
}
func (c *ConsistentHash) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
if c.IsEmpty() {
return ""
}
hash := c.hash([]byte(key))
idx := sort.Search(len(c.keys), func(i int) bool {
return c.keys[i] >= hash
})
if idx == len(c.keys) {
idx = 0
}
return c.hashMap[c.keys[idx]]
}
// --- Smooth Weighted Round Robin Implementation (from swrr package) ---
type Backend struct {
Addr string
Weight int
CurrentWeight int
EffectiveWeight int
}
type SWRRBalancer struct { // Renamed to SWRRBalancer to avoid conflict with gRPC balancer.Balancer
backends []*Backend
mu sync.Mutex
}
func NewSWRRBalancer(backendInfos []struct {
Addr string
Weight int
}) *SWRRBalancer {
backends := make([]*Backend, len(backendInfos))
for i, info := range backendInfos {
backends[i] = &Backend{
Addr: info.Addr,
Weight: info.Weight,
CurrentWeight: 0,
EffectiveWeight: info.Weight,
}
}
return &SWRRBalancer{
backends: backends,
}
}
func (b *SWRRBalancer) Pick() *Backend {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backends) == 0 {
return nil
}
totalEffectiveWeight := 0
for _, backend := range b.backends {
totalEffectiveWeight += backend.EffectiveWeight
}
var best *Backend
maxCurrentWeight := -1
for _, backend := range b.backends {
backend.CurrentWeight += backend.EffectiveWeight
// Simplified health recovery/degradation. In real scenarios, this
// would be tied to gRPC's health checks or external signals.
if backend.EffectiveWeight < backend.Weight {
backend.EffectiveWeight++
}
if backend.EffectiveWeight > backend.Weight { // Cap at actual weight
backend.EffectiveWeight = backend.Weight
}
if backend.CurrentWeight > maxCurrentWeight {
maxCurrentWeight = backend.CurrentWeight
best = backend
}
}
if best == nil {
return nil
}
best.CurrentWeight -= totalEffectiveWeight
return best
}
func (b *SWRRBalancer) UpdateBackendHealth(addr string, healthy bool) {
b.mu.Lock()
defer b.mu.Unlock()
for _, backend := range b.backends {
if backend.Addr == addr {
if healthy {
if backend.EffectiveWeight < 0 {
backend.EffectiveWeight = 0
}
if backend.EffectiveWeight < backend.Weight {
backend.EffectiveWeight++
}
} else {
backend.EffectiveWeight--
if backend.EffectiveWeight < 0 {
backend.EffectiveWeight = 0
}
}
return
}
}
}
func (b *SWRRBalancer) UpdateBackends(backendInfos []struct {
Addr string
Weight int
}) {
b.mu.Lock()
defer b.mu.Unlock()
newBackendsMap := make(map[string]*Backend)
for _, info := range backendInfos {
newBackendsMap[info.Addr] = &Backend{
Addr: info.Addr,
Weight: info.Weight,
CurrentWeight: 0, // Reset for new backends or if weight changed significantly
EffectiveWeight: info.Weight,
}
}
var updatedBackends []*Backend
for _, existingBackend := range b.backends {
if newBackend, ok := newBackendsMap[existingBackend.Addr]; ok {
newBackend.CurrentWeight = existingBackend.CurrentWeight
newBackend.EffectiveWeight = existingBackend.EffectiveWeight
newBackend.Weight = newBackend.Weight // Update configured weight
updatedBackends = append(updatedBackends, newBackend)
delete(newBackendsMap, existingBackend.Addr)
}
}
for _, newBackend := range newBackendsMap {
updatedBackends = append(updatedBackends, newBackend)
}
b.backends = updatedBackends
}
// --- gRPC Resolver and Balancer Integration ---
const (
customResolverScheme = "myresolver"
consistentHashBalancerName = "consistent_hash_lb"
swrrBalancerName = "swrr_lb"
consistentHashKeyMetadata = "consistent-hash-key"
)
func init() {
resolver.Register(&customResolverBuilder{})
balancer.Register(base.NewBalancerBuilderV2(consistentHashBalancerName, &consistentHashPickerBuilder{}, base.Config{HealthCheck: true}))
balancer.Register(base.NewBalancerBuilderV2(swrrBalancerName, &swrrPickerBuilder{}, base.Config{HealthCheck: true}))
}
// BackendInfo for customResolver
type BackendInfo struct {