在现代分布式系统中,API 限流(Rate Limiting)是一项至关重要的技术。它不仅能保护后端服务免受过载,还能确保资源的公平使用,并有效抵御恶意攻击,如拒绝服务(DoS)攻击。一个设计良好的限流机制能够保障系统的稳定性和可用性,同时优化用户体验。本讲座将深入探讨两种最经典的限流算法——令牌桶(Token Bucket)和漏桶(Leaky Bucket),并着重演示如何利用 Go 语言的原子操作(atomic operations)实现高性能、并发安全的限流器。
API 限流的必要性与基本概念
为什么需要 API 限流?
- 保护后端服务过载: 当流量突增时,限流可以防止请求淹没服务,导致服务响应缓慢甚至崩溃。
- 资源公平分配: 确保所有用户或客户端都能获得一定量的服务资源,防止少数用户耗尽所有资源。
- 防止恶意攻击: 限制单个 IP 地址、用户或应用在短时间内发起大量请求,有效抵御 DoS 或暴力破解等攻击。
- 控制成本: 对于按量付费的云服务或第三方 API,限流可以帮助控制使用成本。
- 维护服务质量: 通过限制请求速率,可以确保服务在可接受的延迟范围内运行。
限流的基本挑战
最简单的限流方法是固定窗口计数器(Fixed Window Counter)。例如,每分钟允许 100 次请求。这种方法的实现相对简单,但存在明显的弊端:
- 窗口边缘效应(Burst at Window Edge): 在一个窗口的末尾和下一个窗口的开头,可能会允许两倍于限额的请求量,造成瞬时流量高峰。例如,在第 59 秒发送 100 个请求,在第 60 秒(新窗口开始)立即又发送 100 个请求,实际在 1 秒内处理了 200 个请求。
- 无法平滑流量: 请求可能在窗口内任意时间点集中涌入,无法保证请求的均匀分布。
为了克服这些挑战,我们需要更精细、更智能的限流算法,其中令牌桶和漏桶是两种最常用且有效的解决方案。
核心限流算法:令牌桶与漏桶
令牌桶和漏桶算法虽然都用于限制流量,但它们的内部机制和对外表现有着显著的区别。理解这些区别对于选择合适的算法至关重要。
漏桶算法 (Leaky Bucket)
概念与原理
漏桶算法的核心思想是将请求(或数据包)视为水滴,将限流器视为一个带有固定容量的桶,桶的底部有一个恒定速率出水的孔。
- 请求流入: 当有请求到来时,它会尝试流入桶中。
- 桶的容量: 如果桶未满,请求会成功流入。
- 桶满溢出: 如果桶已满,新来的请求将被拒绝(丢弃)或放入等待队列。
- 恒定流出: 桶中的请求会以一个恒定的速率“漏”出,即被处理。
特点
- 平滑输出速率: 漏桶算法强制请求以一个恒定的速率处理。无论请求的突发情况如何,离开漏桶的请求流都是平滑的。这使得它非常适合需要稳定输出速率的场景,例如网络流量整形(traffic shaping)。
- 不处理突发流量: 如果请求速率超过漏桶的流出速率,并且桶已满,额外的请求将被直接拒绝。漏桶本身不具备处理突发流量的能力,因为它始终以恒定速率处理请求。
- 实现相对复杂: 需要维护桶的当前水量、桶的容量以及漏出速率。
适用场景
- 需要严格控制下游系统处理速率的场景。
- 网络流量整形,确保数据包以稳定速率发送。
- 保护数据库写入等对吞吐量敏感的后端服务。
令牌桶算法 (Token Bucket)
概念与原理
令牌桶算法与漏桶算法在某些方面是互补的。它的核心思想是维护一个固定容量的令牌桶,并以恒定速率向桶中添加令牌。每个请求需要消耗一个或多个令牌才能被处理。
- 令牌生成: 令牌以一个恒定的速率被添加到桶中。
- 桶的容量: 桶有最大容量,如果桶已满,新生成的令牌会被丢弃。
- 请求消耗令牌: 当有请求到来时,它会尝试从桶中获取一个令牌。
- 允许/拒绝: 如果桶中有足够的令牌,请求被允许,并消耗相应数量的令牌。如果桶中没有令牌,请求将被拒绝(丢弃)或放入等待队列。
特点
- 允许突发流量: 令牌桶的关键优势在于它能够处理突发流量。如果桶中积累了足够的令牌,请求可以以远高于令牌生成速率的速度被处理,直到令牌耗尽。突发流量的大小受到桶容量的限制。
- 平均速率控制: 令牌的生成速率决定了长期平均请求速率的上限。即使允许突发,平均速率也不会超过令牌生成速率。
- 实现相对简单: 需要维护桶中当前令牌数量、桶的容量和令牌生成速率。
适用场景
- Web API 限流,允许短时间的流量突发,同时控制长期平均速率。
- 允许用户在短时间内进行高频操作,但整体使用量受到限制的场景。
- 对用户体验要求高,希望在不影响系统稳定性的前提下,尽可能快速响应请求的场景。
Go 原子操作与高性能并发
Go 语言以其强大的并发特性而闻名,内置的 Goroutine 和 Channel 使得编写并发程序变得相对容易。然而,在实现高性能的并发限流器时,我们需要关注更底层的同步机制。sync/atomic 包提供了原子操作,可以在不使用互斥锁(sync.Mutex)的情况下安全地修改共享变量,从而避免了锁的开销,显著提升性能。
为什么需要原子操作?
考虑一个简单的计数器 count++。这在 Go 语言中并非原子操作,它通常被编译为三个独立的指令:
- 从内存中读取
count的值。 - 将读取的值加 1。
- 将新值写回内存中的
count。
在多 Goroutine 环境中,如果两个 Goroutine 同时执行 count++,可能会发生竞态条件:
- Goroutine A 读取
count(假设为 0)。 - Goroutine B 读取
count(仍然为 0)。 - Goroutine A 将
count加 1 (变为 1)。 - Goroutine B 将
count加 1 (变为 1)。 - Goroutine A 将 1 写回
count。 - Goroutine B 将 1 写回
count。
最终 count 的值是 1,而不是期望的 2。这就是数据竞争。
传统的解决方案是使用 sync.Mutex 来保护对共享变量的访问。然而,互斥锁涉及到系统调用和上下文切换,在高并发场景下可能会引入显著的性能开销。
sync/atomic 包提供了一组原子操作,例如 AddInt64、LoadInt64、StoreInt64、CompareAndSwapInt64 等。这些操作是 CPU 级别保证的,它们在一个不可分割的步骤中完成,从而避免了竞态条件,并且通常比互斥锁具有更低的延迟和更高的吞吐量。
Go 原子操作常用函数
atomic.AddInt64(addr *int64, delta int64) int64:原子地将delta添加到addr指向的值,并返回新值。atomic.LoadInt64(addr *int64) int64:原子地加载addr指向的值。atomic.StoreInt64(addr *int64, val int64):原子地将val存储到addr指向的位置。atomic.CompareAndSwapInt64(addr *int64, old, new int64) bool:如果addr指向的值等于old,则原子地将其更新为new,并返回true。否则不进行更新,返回false。这是一个非常强大的原语,常用于实现无锁数据结构。
在实现限流器时,我们需要频繁地读取和更新桶的状态(例如当前令牌数量、上一次更新时间),这些操作非常适合使用原子操作来提高性能。
基于 Go 原子操作的漏桶实现
我们将实现一个基于 Go 原子操作的漏桶,它能够以恒定速率处理请求,并丢弃超出容量的请求。
package ratelimiter
import (
"sync/atomic"
"time"
)
// LeakyBucket 漏桶限流器
type LeakyBucket struct {
rate int64 // 漏出速率,每秒允许通过的请求数
capacity int64 // 桶的容量,即最多能积攒的请求数
bucket int64 // 当前桶中的请求数 (原子操作)
lastLeakNs int64 // 上一次漏出操作的时间戳 (纳秒) (原子操作)
}
// NewLeakyBucket 创建一个新的漏桶限流器
// rate: 每秒允许通过的请求数
// capacity: 桶的容量
func NewLeakyBucket(rate, capacity int64) *LeakyBucket {
if rate <= 0 || capacity <= 0 {
panic("rate and capacity must be positive")
}
return &LeakyBucket{
rate: rate,
capacity: capacity,
bucket: 0,
lastLeakNs: time.Now().UnixNano(),
}
}
// Allow 尝试允许一个请求通过
// 如果请求被允许,返回 true;否则返回 false
func (lb *LeakyBucket) Allow() bool {
nowNs := time.Now().UnixNano()
// 1. 计算自上次漏出以来应该漏出的请求数量
// 这是一个关键步骤,我们需要确保在计算和更新期间,其他Goroutine不会干扰lastLeakNs
// 使用CAS循环来更新lastLeakNs
var oldLastLeakNs int64
for {
oldLastLeakNs = atomic.LoadInt64(&lb.lastLeakNs)
// 计算时间差,转换为秒
elapsedSeconds := float64(nowNs-oldLastLeakNs) / float64(time.Second)
// 根据漏出速率计算应该漏出的请求数
// 这里我们假设每次Allow()调用时,漏桶都会尝试漏出
leaked := int64(elapsedSeconds * float64(lb.rate))
if leaked < 0 { // 防止时钟回拨或计算错误导致负值
leaked = 0
}
// 2. 更新桶中的请求数量
var currentBucket int64
for {
currentBucket = atomic.LoadInt64(&lb.bucket)
// 新的桶中请求数 = 旧桶中请求数 - 漏出的请求数
newBucket := currentBucket - leaked
if newBucket < 0 {
newBucket = 0 // 桶中请求数不能为负
}
// 尝试原子地更新桶中请求数
if atomic.CompareAndSwapInt64(&lb.bucket, currentBucket, newBucket) {
break // 更新成功,跳出内层循环
}
// 如果更新失败,说明lb.bucket已被其他Goroutine修改,重试
}
// 3. 尝试将新请求放入桶中
// 再次加载桶中请求数,因为在更新lastLeakNs之前可能又发生了变化
currentBucket = atomic.LoadInt64(&lb.bucket)
if currentBucket < lb.capacity {
// 桶未满,尝试放入一个请求
if atomic.CompareAndSwapInt64(&lb.bucket, currentBucket, currentBucket+1) {
// 放入成功,并且更新lastLeakNs
// 只有在请求成功放入桶中,并且桶的实际漏出逻辑被处理后,我们才更新lastLeakNs
// 否则,如果请求被拒绝,我们不应该更新lastLeakNs,因为它没有实际“漏出”
if atomic.CompareAndSwapInt64(&lb.lastLeakNs, oldLastLeakNs, nowNs) {
return true // 请求允许通过
}
// 如果更新lastLeakNs失败,意味着有其他Goroutine在我们更新lb.bucket之后先更新了lastLeakNs
// 这种情况比较复杂,我们选择回滚lb.bucket并重试外层循环
// 实际生产中,可能需要更精细的错误处理或重试策略
atomic.CompareAndSwapInt64(&lb.bucket, currentBucket+1, currentBucket) // 回滚
continue // 重试外层循环
}
// 如果放入失败,说明lb.bucket已被其他Goroutine修改,重试外层循环
continue
}
// 桶已满,无法放入新请求
// 如果桶已满,且我们之前成功更新了lastLeakNs,我们应该回滚lastLeakNs
// 但为了简化,这里直接返回false,不更新lastLeakNs
return false
}
}
漏桶实现分析:
rate和capacity: 分别代表每秒漏出的请求数和桶的最大容量。bucket: 使用atomic.Int64存储当前桶中的请求数量,这是一个原子变量。lastLeakNs: 使用atomic.Int64存储上次更新bucket和lastLeakNs的纳秒时间戳。Allow()方法:- 首先获取当前时间
nowNs。 - 核心逻辑: 使用一个
for循环(CAS 循环)来确保lastLeakNs的原子更新。elapsedSeconds:计算自上次漏出以来经过的时间。leaked:根据elapsedSeconds和rate计算在这段时间内应该漏出的请求数量。- 内层
for循环:原子地更新lb.bucket,减去leaked的数量,并确保bucket不为负。 - 请求入桶: 再次检查
lb.bucket是否小于lb.capacity。如果小于,则尝试原子地将lb.bucket加 1。 - 原子更新
lastLeakNs: 只有当请求成功放入桶中并且桶的漏出逻辑被处理后,才原子地更新lastLeakNs为nowNs。这个原子更新是至关重要的,它确保了在计算leaked时所基于的oldLastLeakNs是最新的、准确的。 - CAS 失败处理: 如果
CompareAndSwapInt64返回false,说明在尝试更新时,其他 Goroutine 已经修改了该变量,需要重试。
- 如果桶已满,或者无法成功将请求放入桶中(在 CAS 循环中),则返回
false。
- 首先获取当前时间
注意: 漏桶的 Allow 方法中的 CAS 循环逻辑需要特别小心。为了简化并确保 lastLeakNs 和 bucket 状态的一致性,这里的实现采取了一种策略:先尝试更新桶中的水(漏出),然后尝试放入新的请求,最后在成功放入请求时才更新 lastLeakNs。如果任何一步失败(例如,桶满或者 CAS 失败),可能需要重试整个过程。实际生产级的漏桶实现可能需要更复杂的 CAS 循环或考虑读写锁与原子操作的混合使用,以平衡性能和代码复杂度。
基于 Go 原子操作的令牌桶实现
现在我们来实现一个基于 Go 原子操作的令牌桶,它能够允许短时间的突发流量,同时控制长期的平均速率。
package ratelimiter
import (
"sync/atomic"
"time"
)
// TokenBucket 令牌桶限流器
type TokenBucket struct {
rate int64 // 令牌生成速率,每秒生成的令牌数
capacity int64 // 桶的容量,即最多能积攒的令牌数
tokens int64 // 当前桶中的令牌数 (原子操作)
lastRefillNs int64 // 上一次令牌填充操作的时间戳 (纳秒) (原子操作)
}
// NewTokenBucket 创建一个新的令牌桶限流器
// rate: 每秒生成的令牌数
// capacity: 桶的容量
func NewTokenBucket(rate, capacity int64) *TokenBucket {
if rate <= 0 || capacity <= 0 {
panic("rate and capacity must be positive")
}
return &TokenBucket{
rate: rate,
capacity: capacity,
tokens: capacity, // 初始化时桶是满的
lastRefillNs: time.Now().UnixNano(),
}
}
// Allow 尝试允许一个请求通过
// 如果请求被允许(成功获取一个令牌),返回 true;否则返回 false
func (tb *TokenBucket) Allow() bool {
nowNs := time.Now().UnixNano()
// 1. 计算自上次填充以来应该生成的令牌数量,并更新桶中的令牌数
// 这是一个CAS循环,确保lastRefillNs和tokens的原子更新
var oldLastRefillNs int64
for {
oldLastRefillNs = atomic.LoadInt64(&tb.lastRefillNs)
oldTokens := atomic.LoadInt64(&tb.tokens)
// 计算时间差,转换为秒
elapsedSeconds := float64(nowNs-oldLastRefillNs) / float64(time.Second)
// 根据令牌生成速率计算应该生成的令牌数
generatedTokens := int64(elapsedSeconds * float64(tb.rate))
if generatedTokens < 0 { // 防止时钟回拨或计算错误导致负值
generatedTokens = 0
}
// 新的令牌数 = 旧令牌数 + 生成的令牌数,但不能超过容量
newTokens := oldTokens + generatedTokens
if newTokens > tb.capacity {
newTokens = tb.capacity
}
// 尝试原子地更新令牌数和上一次填充时间
// 这里的关键是:我们希望在更新tokens的同时,也更新lastRefillNs
// 但CompareAndSwap只能针对一个变量。
// 最稳健的方式是先更新tokens,然后更新lastRefillNs。
// 如果两者都成功,则视为一次原子操作。如果lastRefillNs更新失败,需要重试整个过程。
// 更好的做法是,将“填充”和“消耗”逻辑分开处理,或者使用一个更复杂的CAS loop来处理两个变量。
// 为简化,这里先尝试更新tokens,再更新lastRefillNs。
// 实际生产中,对于涉及多个变量的原子操作,通常会使用一个CAS循环来确保所有相关变量在同一逻辑批次中更新。
// 尝试更新令牌数量
// 如果oldTokens不再是当前tb.tokens的值,说明其他Goroutine已经修改了它,需要重试整个循环
if !atomic.CompareAndSwapInt64(&tb.tokens, oldTokens, newTokens) {
continue // 重试外层循环
}
// 如果令牌数量更新成功,尝试更新lastRefillNs
// 同样,如果oldLastRefillNs不再是当前tb.lastRefillNs的值,说明其他Goroutine已经修改了它
// 此时,我们回滚tokens的更新,并重试整个循环
if !atomic.CompareAndSwapInt64(&tb.lastRefillNs, oldLastRefillNs, nowNs) {
// 回滚tokens的更新,因为lastRefillNs未成功更新,导致本次填充操作不完整
atomic.CompareAndSwapInt64(&tb.tokens, newTokens, oldTokens)
continue // 重试外层循环
}
// 走到这里,说明令牌填充和时间戳更新都成功了。
// 此时,tb.tokens已经包含了最新填充的令牌。
break // 填充成功,跳出外层循环
}
// 2. 尝试消耗一个令牌
// 再次加载当前令牌数,因为在填充之后可能其他Goroutine已经消耗了令牌
var currentTokens int64
for {
currentTokens = atomic.LoadInt64(&tb.tokens)
if currentTokens > 0 {
// 桶中有令牌,尝试消耗一个
if atomic.CompareAndSwapInt64(&tb.tokens, currentTokens, currentTokens-1) {
return true // 消耗成功,请求允许通过
}
// 如果消耗失败,说明tb.tokens已被其他Goroutine修改,重试
continue
}
// 桶中没有令牌
return false // 请求被拒绝
}
}
令牌桶实现分析:
rate和capacity: 分别代表每秒生成的令牌数和桶的最大容量。tokens: 使用atomic.Int64存储当前桶中的令牌数量。lastRefillNs: 使用atomic.Int64存储上次令牌填充操作的纳秒时间戳。Allow()方法:- 获取当前时间
nowNs。 - 核心逻辑: 同样使用一个
for循环(CAS 循环)来确保lastRefillNs和tokens的原子更新。elapsedSeconds:计算自上次填充以来经过的时间。generatedTokens:根据elapsedSeconds和rate计算在这段时间内应该生成的令牌数量。newTokens:计算新的令牌数量,并确保不超过capacity。- 原子更新
tokens和lastRefillNs: 这是一个双重 CAS 操作。首先尝试更新tokens,如果成功,再尝试更新lastRefillNs。如果lastRefillNs更新失败,意味着其他 Goroutine 已经提前更新了时间戳,因此需要回滚tokens的更新,并重试整个过程。这种方式是为了保证tokens和lastRefillNs之间的一致性。 - CAS 失败处理: 如果任何一步
CompareAndSwapInt64返回false,说明在尝试更新时,其他 Goroutine 已经修改了该变量,需要重试整个 CAS 循环。
- 消耗令牌: 在令牌填充完成后(或者说,桶状态更新到最新后),再使用一个内层
for循环尝试原子地从tb.tokens中减去 1。如果tokens大于 0 且CompareAndSwapInt64成功,则允许请求并返回true。 - 如果桶中没有令牌,则返回
false。
- 获取当前时间
注意: 令牌桶的 Allow 方法中,对于 tokens 和 lastRefillNs 这两个相关状态的更新,采用了先更新 tokens 再更新 lastRefillNs 的方式,并在 lastRefillNs 更新失败时回滚 tokens。这种模式在并发环境下处理多个相关原子变量时,需要仔细设计以确保正确性和避免活锁。更复杂的场景可能需要利用更高阶的并发原语或设计更精妙的 CAS 循环。
漏桶与令牌桶的比较
下表总结了漏桶和令牌桶算法的主要区别:
| 特性 | 漏桶算法 (Leaky Bucket) | 令牌桶算法 (Token Bucket) |
|---|---|---|
| 核心机制 | 固定容量的桶,请求以恒定速率处理(漏出),桶满则拒绝。 | 固定容量的桶,以恒定速率添加令牌,请求消耗令牌,无令牌则拒绝。 |
| 处理突发流量 | 不允许。请求以平滑速率处理,超出部分直接拒绝。 | 允许。桶中积累的令牌可用于处理短时突发,突发大小受桶容量限制。 |
| 输出速率 | 恒定、平滑。 | 允许突发,但长期平均速率受令牌生成速率限制。 |
| 流量整形 | 主要用于流量整形,强制输出平滑。 | 主要用于限制平均速率,同时允许短时突破。 |
| 类比 | 带有孔的水桶,水流入和流出。 | 装满令牌的桶,令牌生成和消耗。 |
| 适用场景 | 需要严格控制输出速率、保护下游系统不被瞬间流量压垮。 | API 限流,允许用户短时高频操作,同时控制整体使用量。 |
| 实现复杂度 | 相对复杂,需要精确计算漏出量和更新桶状态。 | 相对简单,主要围绕令牌的生成和消耗。 |
何时选择哪种算法?
- 选择漏桶: 当你希望强制所有请求以一个非常平滑、恒定的速率被处理,并且不允许任何形式的突发时。例如,向外部系统发送数据,而该系统有严格的每秒处理能力上限。
- 选择令牌桶: 当你希望在限制平均请求速率的同时,允许客户端在短时间内发送一波请求(突发),以提高响应性和用户体验时。例如,大多数 RESTful API 的限流场景。
高级考虑与实际应用
分布式限流
上述实现是单机限流器。在分布式系统中,多个服务实例可能同时处理请求,因此需要一个全局一致的限流机制。
- 集中式存储: 使用 Redis 或其他分布式缓存存储限流器的状态(例如,当前令牌数、上次更新时间)。所有服务实例都从这个集中式存储读取和更新状态。Redis 的原子操作(如
INCR、GETSET、Lua 脚本)非常适合实现分布式令牌桶或漏桶。 - 分布式锁或共识: 使用 ZooKeeper、etcd 或 Consul 等工具协调不同实例的限流状态,但这会引入较高的复杂度和性能开销。
- 客户端侧限流: 通过 HTTP 响应头(如
RateLimit-Limit,RateLimit-Remaining,RateLimit-Reset)告知客户端当前的限流状态,让客户端自行进行流量控制。
动态限流
在某些场景下,限流策略需要动态调整。例如:
- 根据用户等级(VIP 用户有更高限额)。
- 根据系统负载(系统负载高时降低限额)。
- 根据资源可用性。
这通常需要在限流器中引入配置热加载机制,或者从配置中心(如 Nacos, Apollo)获取最新的限额。
优雅降级与错误处理
当请求被限流时,不应该直接返回一个生硬的错误。
- HTTP 状态码: 通常返回
HTTP 429 Too Many Requests。 - 响应头: 包含
Retry-After头,告知客户端在多少秒后可以重试。 - 日志与监控: 记录被拒绝的请求,并对限流器的状态进行监控,以便及时发现问题。
- 排队: 对于非实时性要求高的请求,可以将其放入消息队列,稍后处理,而不是直接拒绝。
时间精度与系统时钟
限流算法对时间精度非常敏感。使用 time.Now().UnixNano() 可以提供纳秒级的精度。然而,系统时钟跳变(例如,通过 NTP 同步)可能会导致时间回溯,这会对限流器的计算造成负面影响。在设计时需要考虑如何处理这种情况(例如,忽略回溯,或者限制时间回溯的幅度)。
性能考量
使用 sync/atomic 包可以显著提高并发访问共享状态的性能,因为它避免了操作系统级别的锁开销。但在某些极端高并发的场景下,CPU 缓存一致性协议(MESI 协议)导致的缓存线颠簸(cache line contention)仍可能成为瓶颈。在这种情况下,可能需要考虑:
- 分片(Sharding): 将限流器分解为多个独立的限流器,每个限流器负责一部分请求,减少单个限流器的竞争。
- 批量处理: 允许请求批量消耗令牌或批量流入漏桶,减少原子操作的频率。
示例:将限流器集成到 Go HTTP 服务
为了展示如何将上述限流器集成到实际应用中,我们可以创建一个简单的 Go HTTP 中间件。
package main
import (
"fmt"
"log"
"net/http"
"time"
"your_module_path/ratelimiter" // 假设ratelimiter包在你的模块路径下
)
// LimitMiddleware 是一个 HTTP 中间件,用于应用限流
func LimitMiddleware(limiter *ratelimiter.TokenBucket) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
// 请求被限流
w.Header().Set("Retry-After", "1") // 建议客户端1秒后重试
http.Error(w, "Too Many Requests", http.StatusTooManyRequests)
return
}
// 请求未被限流,继续处理
next.ServeHTTP(w, r)
})
}
}
func main() {
// 创建一个令牌桶限流器:每秒生成10个令牌,桶容量为20
// 意味着平均每秒允许10个请求,但可以应对短时20个请求的突发
tokenBucketLimiter := ratelimiter.NewTokenBucket(10, 20)
log.Printf("Token Bucket Limiter initialized: rate=%d/s, capacity=%d", tokenBucketLimiter.rate, tokenBucketLimiter.capacity)
// 创建一个漏桶限流器:每秒漏出5个请求,桶容量为10
// 意味着请求将以每秒5个的恒定速率处理,多余的会被拒绝
leakyBucketLimiter := ratelimiter.NewLeakyBucket(5, 10)
log.Printf("Leaky Bucket Limiter initialized: rate=%d/s, capacity=%d", leakyBucketLimiter.rate, leakyBucketLimiter.capacity)
// 定义一个处理器
helloHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, you are allowed! Time: %s", time.Now().Format(time.RFC3339Nano))
})
// 将令牌桶限流器应用于 /token-bucket 路径
http.Handle("/token-bucket", LimitMiddleware(tokenBucketLimiter)(helloHandler))
log.Println("Server listening on :8080 for /token-bucket with Token Bucket limiter")
// 将漏桶限流器应用于 /leaky-bucket 路径
// 注意:漏桶的Allow()方法在并发高时会因CAS循环而更复杂,这里仅作演示
// 实际应用中需要根据具体需求选择。
http.Handle("/leaky-bucket", func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !leakyBucketLimiter.Allow() {
w.Header().Set("Retry-After", "1")
http.Error(w, "Too Many Requests (Leaky Bucket)", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}(helloHandler))
log.Println("Server listening on :8080 for /leaky-bucket with Leaky Bucket limiter")
// 启动 HTTP 服务器
log.Fatal(http.ListenAndServe(":8080", nil))
}
运行此示例后,你可以尝试使用 curl 命令快速发起大量请求:
- 令牌桶 (
/token-bucket): 尝试for i in $(seq 1 30); do curl -s "http://localhost:8080/token-bucket" & done。你会发现前 20 个请求可能会立即通过,然后会有一些请求被拒绝,之后又会以每秒 10 个左右的速率通过。 - 漏桶 (
/leaky-bucket): 尝试for i in $(seq 1 30); do curl -s "http://localhost:8080/leaky-bucket" & done。你会发现请求通过的速率会非常平滑,大约每秒 5 个,多余的请求会立即被拒绝。
通过上述示例和分析,我们深入理解了令牌桶和漏桶这两种经典的限流算法,并学习了如何利用 Go 语言的原子操作实现高性能的并发限流器。选择合适的限流算法,并结合 Go 的并发特性,是构建稳定、高效分布式系统的关键一步。在实际应用中,还需要综合考虑分布式环境、动态调整、优雅降级等因素,以构建一个健壮的限流解决方案。