Spring Cloud Gateway 响应式限流 Redis Lua 脚本在 Redis Cluster 模式下的 HashTag 分片策略
大家好!今天我们来深入探讨一个在微服务架构中非常常见且关键的问题:Spring Cloud Gateway 如何利用 Redis Lua 脚本实现响应式限流,并且特别关注当 Redis 部署在 Cluster 模式下,数据分片导致 KEYS 分布在不同 Slot 时,如何使用 HashTag 强制分片,确保 Lua 脚本的原子性执行。
1. 限流的必要性及 Spring Cloud Gateway 的选择
在微服务架构中,服务间的调用非常频繁,如果不加以控制,可能会出现以下问题:
- 资源耗尽: 大量请求涌入,导致服务资源(CPU、内存、网络带宽)耗尽,影响服务的正常运行。
- 服务雪崩: 某个服务出现故障,导致依赖它的服务也崩溃,最终整个系统瘫痪。
- 恶意攻击: 恶意用户发起大量请求,消耗服务资源,影响正常用户的体验。
因此,限流是保护服务,提高系统稳定性的重要手段。
Spring Cloud Gateway 是 Spring Cloud 官方提供的网关组件,它基于 Spring WebFlux 和 Project Reactor 实现,具有以下优点:
- 高性能: 基于响应式编程模型,能够处理高并发请求。
- 灵活的路由配置: 支持基于各种条件(路径、Header、请求参数等)的动态路由。
- 强大的扩展性: 提供了丰富的 Filter 和 Route Predicate,可以自定义扩展功能。
- 易于集成: 与 Spring Cloud 生态系统完美集成。
Spring Cloud Gateway 通过 RateLimiter Filter 来实现限流功能。 我们可以选择不同的 RateLimiter 实现,例如基于内存的、基于 Redis 的等。
2. 基于 Redis Lua 脚本的限流方案
基于 Redis 的限流方案具有以下优点:
- 高性能: Redis 是内存数据库,读写速度非常快。
- 原子性: Redis 的 Lua 脚本能够保证多个操作的原子性。
- 持久化: Redis 可以将数据持久化到磁盘,防止数据丢失。
- 分布式: Redis Cluster 支持分布式部署,能够应对高并发场景。
Lua 脚本在 Redis 中执行具有原子性,这意味着脚本中的所有命令要么全部执行成功,要么全部不执行。 这对于限流场景至关重要,可以避免并发竞争导致的限流失效问题。
以下是一个简单的 Lua 脚本,用于实现基于滑动窗口的限流:
-- KEYS[1]: 限流的 key,例如:rate_limit:user_id
-- ARGV[1]: 时间窗口大小,单位:秒
-- ARGV[2]: 允许的请求数量
-- ARGV[3]: 当前时间戳,单位:秒
local key = KEYS[1]
local window_size = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
-- 移除窗口外的数据
redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window_size)
-- 获取窗口内的请求数量
local count = redis.call('ZCARD', key)
-- 判断是否超过限制
if count < limit then
-- 允许请求,将当前时间戳添加到窗口
redis.call('ZADD', key, current_time, current_time)
return 1
else
-- 拒绝请求
return 0
end
这个脚本的逻辑如下:
- 移除过期数据: 使用
ZREMRANGEBYSCORE命令移除在时间窗口之外的请求记录。 - 获取当前请求数量: 使用
ZCARD命令获取当前时间窗口内的请求数量。 - 判断是否超过限制: 如果请求数量小于允许的请求数量,则允许请求,并将当前时间戳添加到有序集合中。否则,拒绝请求。
3. Redis Cluster 模式下的数据分片问题
Redis Cluster 通过将数据分散存储在多个节点上来提高性能和可用性。 Redis Cluster 使用 Hash Slot 来进行数据分片。 总共有 16384 个 Hash Slot,每个 key 会被分配到一个 Hash Slot。 Redis Cluster 会将这些 Hash Slot 分配给不同的节点。
默认情况下,Redis Cluster 使用以下公式计算 Key 对应的 Hash Slot:
HASH_SLOT = CRC16(key) mod 16384
这意味着,如果多个 Key 的 CRC16 值不同,它们可能会被分配到不同的 Hash Slot,从而存储在不同的节点上。
问题:
在 Redis Cluster 模式下,上面的 Lua 脚本存在一个严重的问题:KEYS[1] (即限流的 key) 可能会被分配到不同的节点上。 这会导致 Lua 脚本无法原子性地执行,因为 Lua 脚本只能在一个 Redis 节点上执行。
例如,如果我们要对 user_id 为 1001 和 1002 的用户进行限流,那么对应的 Key 可能是 rate_limit:1001 和 rate_limit:1002。 这两个 Key 可能会被分配到不同的节点上,导致 Lua 脚本无法正确地进行限流。
4. HashTag 强制分片策略
为了解决这个问题,我们需要使用 HashTag 强制分片策略。 HashTag 允许我们将 Key 的一部分用 {} 包裹起来,Redis Cluster 会使用 {} 中的内容计算 Hash Slot,而不是整个 Key。
例如,我们可以将 Key 修改为 rate_limit:{user}:1001 和 rate_limit:{user}:1002。 这样,Redis Cluster 会使用 user 计算 Hash Slot,而不是 rate_limit:{user}:1001 或 rate_limit:{user}:1002。 只要 {} 中的内容相同,那么所有的 Key 都会被分配到同一个 Hash Slot,从而存储在同一个节点上。
修改后的 Lua 脚本:
无需修改 Lua 脚本本身,只需要修改传递给 Lua 脚本的 Key。
示例:
假设我们要对不同的 user_id 进行限流,我们可以使用以下 Key:
rate_limit:{user}:1001rate_limit:{user}:1002rate_limit:{user}:1003
由于所有的 Key 都包含 {user},因此它们都会被分配到同一个 Hash Slot,从而存储在同一个节点上。 这保证了 Lua 脚本能够原子性地执行。
5. Spring Cloud Gateway 中的实现
我们需要修改 Spring Cloud Gateway 中的 RedisRateLimiter,使其使用 HashTag 强制分片策略。
以下是一个简单的示例:
import org.springframework.cloud.gateway.filter.ratelimit.AbstractRateLimiter;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config> {
private final ReactiveRedisTemplate<String, String> redisTemplate;
private final RedisScript<List<Long>> redisScript;
public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
RedisScript<List<Long>> redisScript) {
super(Config.class);
this.redisTemplate = redisTemplate;
this.redisScript = redisScript;
}
@Override
public Mono<Response> isAllowed(String routeId, String id) {
// 使用 HashTag 强制分片
String key = "rate_limit:{user}:" + id; // 使用user 作为hashTag 保证都在一个redis节点上
Number replenishRate = getConfig().getReplenishRate();
Number burstCapacity = getConfig().getBurstCapacity();
return redisTemplate.execute(this.redisScript,
Arrays.asList(key),
Arrays.asList(replenishRate.toString(), burstCapacity.toString(),
Instant.now().getEpochSecond() + "")
)
.flatMap(results -> {
long allowed = results.get(0);
long tokensLeft = results.get(1);
Response response = new Response(allowed == 1, toHeaders(tokensLeft));
return Mono.just(response);
}).onErrorResume(throwable -> {
// log.error("Error determining if user {} is allowed", id, throwable);
return Mono.just(new Response(true, toHeaders(-1)));
});
}
private Headers toHeaders(long tokensLeft) {
Headers headers = new Headers();
headers.add("X-RateLimit-Remaining", String.valueOf(tokensLeft));
return headers;
}
public static class Config {
private int replenishRate;
private int burstCapacity;
public int getReplenishRate() {
return replenishRate;
}
public Config setReplenishRate(int replenishRate) {
this.replenishRate = replenishRate;
return this;
}
public int getBurstCapacity() {
return burstCapacity;
}
public Config setBurstCapacity(int burstCapacity) {
this.burstCapacity = burstCapacity;
return this;
}
}
}
代码解释:
CustomRedisRateLimiter类: 继承自AbstractRateLimiter,实现了自定义的限流逻辑。redisTemplate和redisScript: 分别是 RedisTemplate 和 Lua 脚本的实例。isAllowed方法: 实现了限流的核心逻辑。- 构建 Key: 使用
rate_limit:{user}:+id构建 Key,其中{user}是 HashTag。 - 执行 Lua 脚本: 使用
redisTemplate.execute方法执行 Lua 脚本,并将 Key 和参数传递给 Lua 脚本。 - 处理结果: 从 Lua 脚本的返回值中获取是否允许请求和剩余的令牌数量,并构建
Response对象。
- 构建 Key: 使用
Config类: 定义了限流的配置参数,例如replenishRate和burstCapacity。
Lua 脚本 (refillToken.lua):
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local last_refreshed = redis.call("get", key .. ":last_refreshed")
if not last_refreshed then
last_refreshed = 0
end
last_refreshed = tonumber(last_refreshed)
local tokens = redis.call("get", key .. ":tokens")
if not tokens then
tokens = capacity
else
tokens = tonumber(tokens)
end
local delta = math.max(0, now - last_refreshed)
local refill_amount = delta * rate
tokens = math.min(capacity, tokens + refill_amount)
if tokens >= 1 then
tokens = tokens - 1
redis.call("set", key .. ":tokens", tokens)
redis.call("set", key .. ":last_refreshed", now)
return {1, tokens}
else
redis.call("set", key .. ":tokens", tokens)
redis.call("set", key .. ":last_refreshed", now)
return {0, tokens}
end
Spring 配置:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
@Configuration
public class RedisRateLimiterConfig {
@Bean
public RedisScript<List<Long>> redisScript() {
return RedisScript.of(new ClassPathResource("refillToken.lua"), List.class);
}
@Bean
public CustomRedisRateLimiter customRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> redisScript) {
return new CustomRedisRateLimiter(redisTemplate, redisScript);
}
}
配置 Spring Cloud Gateway 路由:
spring:
cloud:
gateway:
routes:
- id: user-route
uri: http://localhost:8081
predicates:
- Path=/user/**
filters:
- name: CustomRateLimiter
args:
replenishRate: 10 # 每秒补充10个令牌
burstCapacity: 30 # 令牌桶的容量为30
6. 总结
本文详细介绍了 Spring Cloud Gateway 如何利用 Redis Lua 脚本实现响应式限流,并重点讨论了在 Redis Cluster 模式下,数据分片导致 KEYS 分布在不同 Slot 时,如何使用 HashTag 强制分片,确保 Lua 脚本的原子性执行。 通过使用 HashTag,我们可以保证相关的 Key 都被分配到同一个 Redis 节点上,从而确保 Lua 脚本的原子性,避免并发竞争导致的限流失效问题。 同时,我们提供了具体的代码示例,展示了如何在 Spring Cloud Gateway 中实现基于 HashTag 的 Redis 限流方案。
7. 展望未来
随着微服务架构的不断发展,限流技术也在不断演进。 未来,我们可以探索更高级的限流算法,例如基于漏桶算法的限流、基于令牌桶算法的限流等。 此外,我们还可以结合监控系统,动态调整限流策略,以适应不同的业务场景。 同时,将限流策略与熔断、降级等策略相结合,构建更加健壮的微服务系统。