Spring Cloud Gateway 请求体无法重复读取?缓存 BodyFilter 解决方案
大家好,今天我们来聊聊 Spring Cloud Gateway 中一个常见的痛点:请求体无法重复读取的问题,以及如何通过缓存 BodyFilter 来解决这个问题。这个问题在实际应用中非常常见,特别是在需要对请求体进行多次处理,例如校验、鉴权、日志记录等场景。
问题描述:请求体只能读取一次
Spring Cloud Gateway 基于 Netty,默认情况下,请求体被封装成 DataBuffer 对象。DataBuffer 是一个基于 Reactor 的响应式数据缓冲区,其特点是只能被读取一次。当你从 ServerHttpRequest 中获取 DataBuffer 并读取数据后,后续的过滤器或者业务逻辑就无法再次读取请求体了。
为什么会这样?
这是因为 DataBuffer 本质上是一个流式读取的数据结构。读取数据后,内部的指针会移动到流的末尾,再次读取时自然就无法获取到任何数据。
这个问题会带来哪些麻烦?
- 多重校验失败: 如果你在多个过滤器中都需要对请求体进行校验,那么只有第一个过滤器能够成功读取到请求体,后续的过滤器会因为无法读取请求体而导致校验失败。
- 日志记录缺失: 如果你需要记录请求体的内容,那么只能在第一个过滤器中进行,后续的过滤器无法获取到请求体,从而导致日志记录缺失。
- 鉴权失败: 如果你需要根据请求体的内容进行鉴权,那么只有第一个过滤器能够成功读取到请求体进行鉴权,后续的过滤器会因为无法读取请求体而导致鉴权失败。
如何解决请求体无法重复读取的问题?
核心思路是:在第一个过滤器中读取请求体,并将请求体的内容缓存起来,后续的过滤器可以直接从缓存中获取请求体的内容。
解决方案:缓存 BodyFilter
我们可以自定义一个 GlobalFilter,专门用于缓存请求体。这个过滤器会在所有过滤器之前执行,负责读取请求体,并将其缓存到 ServerWebExchange 的 attributes 中。后续的过滤器可以直接从 ServerWebExchange 的 attributes 中获取缓存的请求体。
具体实现步骤:
-
创建缓存 BodyFilter 类:
import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; @Component public class CachedBodyFilter implements GlobalFilter, Ordered { private static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody"; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); // 只缓存 POST、PUT、PATCH 请求,可以根据实际情况修改 if (!HttpMethod.POST.equals(request.getMethod()) && !HttpMethod.PUT.equals(request.getMethod()) && !HttpMethod.PATCH.equals(request.getMethod())) { return chain.filter(exchange); } // 只有Content-Type为 application/json, text/plain, application/xml 才缓存,可以根据实际情况修改 HttpHeaders headers = request.getHeaders(); MediaType contentType = headers.getContentType(); if (contentType == null || (!MediaType.APPLICATION_JSON.equals(contentType) && !MediaType.TEXT_PLAIN.equals(contentType) && !MediaType.APPLICATION_XML.equals(contentType))) { return chain.filter(exchange); } // 读取请求体 return DataBufferUtils.join(request.getBody()) .flatMap(dataBuffer -> { // 缓存请求体 byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); // 释放资源 String requestBody = new String(bytes, StandardCharsets.UTF_8); exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, requestBody); // 重新封装请求体,传递给后续的过滤器 Flux<DataBuffer> cachedFlux = Flux.just(exchange.getResponse().bufferFactory().wrap(bytes)); ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(request) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } @Override public HttpHeaders getHeaders() { HttpHeaders mutableHeaders = new HttpHeaders(); mutableHeaders.addAll(super.getHeaders()); // 重新设置Content-Length,防止body为空 mutableHeaders.setContentLength(bytes.length); return mutableHeaders; } }; return chain.filter(exchange.mutate().request(mutatedRequest).build()); }); } @Override public int getOrder() { // 确保在所有过滤器之前执行 return Ordered.HIGHEST_PRECEDENCE; } public static String getCachedBody(ServerWebExchange exchange) { return exchange.getAttribute(CACHED_REQUEST_BODY_KEY); } } -
代码解释:
CACHED_REQUEST_BODY_KEY: 定义了一个常量,用于存储缓存的请求体。filter方法: 实现了GlobalFilter接口的filter方法,用于处理请求。- 首先判断请求方法是否为 POST、PUT 或 PATCH,如果不是,则直接放行。可以根据实际情况修改。
- 接着判断
Content-Type是否为application/json或text/plain或application/xml,如果不是,则直接放行。可以根据实际情况修改。 - 使用
DataBufferUtils.join方法将请求体转换为DataBuffer。 - 从
DataBuffer中读取数据,并将其转换为字符串,存储到ServerWebExchange的 attributes 中。 - 创建一个新的
ServerHttpRequestDecorator对象,用于重新封装请求体。 - 在
ServerHttpRequestDecorator对象的getBody方法中,返回缓存的请求体。 - 将新的
ServerHttpRequestDecorator对象设置到ServerWebExchange中,并传递给后续的过滤器。 - 释放DataBuffer 资源,防止内存泄漏
getOrder方法: 实现了Ordered接口的getOrder方法,用于指定过滤器的执行顺序。这里设置为Ordered.HIGHEST_PRECEDENCE,确保在所有过滤器之前执行。getCachedBody方法: 提供一个静态方法,方便其他过滤器获取缓存的请求体。
-
使用示例:
在其他的过滤器中,可以通过
CachedBodyFilter.getCachedBody(exchange)方法获取缓存的请求体。import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; @Component public class MyFilter implements GatewayFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { String requestBody = CachedBodyFilter.getCachedBody(exchange); // 对请求体进行处理,例如校验、鉴权、日志记录等 if (requestBody != null) { System.out.println("Request Body: " + requestBody); } return chain.filter(exchange); } } -
注册 Filter:
需要在 Gateway 的配置类中将自定义的 Filter 注册为 Bean。如果
CachedBodyFilter类使用了@Component注解,Spring 会自动扫描并注册。import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.gateway.route.RouteLocator; import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder; import org.springframework.context.annotation.Bean; @SpringBootApplication public class GatewayApplication { public static void main(String[] args) { SpringApplication.run(GatewayApplication.class, args); } @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder, CachedBodyFilter cachedBodyFilter) { return builder.routes() .route("path_route", r -> r.path("/get") .filters(f -> f.filter(cachedBodyFilter)) // 使用 CachedBodyFilter .uri("http://httpbin.org")) .build(); } }
完整代码示例:
这里提供一个完整的示例,包括 CachedBodyFilter 和一个简单的使用 CachedBodyFilter 的 MyFilter。
// CachedBodyFilter.java
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
@Component
public class CachedBodyFilter implements GlobalFilter, Ordered {
private static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
if (!HttpMethod.POST.equals(request.getMethod()) &&
!HttpMethod.PUT.equals(request.getMethod()) &&
!HttpMethod.PATCH.equals(request.getMethod())) {
return chain.filter(exchange);
}
HttpHeaders headers = request.getHeaders();
MediaType contentType = headers.getContentType();
if (contentType == null ||
(!MediaType.APPLICATION_JSON.equals(contentType) &&
!MediaType.TEXT_PLAIN.equals(contentType) &&
!MediaType.APPLICATION_XML.equals(contentType))) {
return chain.filter(exchange);
}
return DataBufferUtils.join(request.getBody())
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
String requestBody = new String(bytes, StandardCharsets.UTF_8);
exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, requestBody);
Flux<DataBuffer> cachedFlux = Flux.just(exchange.getResponse().bufferFactory().wrap(bytes));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
@Override
public HttpHeaders getHeaders() {
HttpHeaders mutableHeaders = new HttpHeaders();
mutableHeaders.addAll(super.getHeaders());
mutableHeaders.setContentLength(bytes.length);
return mutableHeaders;
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
public static String getCachedBody(ServerWebExchange exchange) {
return exchange.getAttribute(CACHED_REQUEST_BODY_KEY);
}
}
// MyFilter.java
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Component
public class MyFilter implements GatewayFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String requestBody = CachedBodyFilter.getCachedBody(exchange);
if (requestBody != null) {
System.out.println("Request Body in MyFilter: " + requestBody);
}
return chain.filter(exchange);
}
}
// GatewayApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder, CachedBodyFilter cachedBodyFilter, MyFilter myFilter) {
return builder.routes()
.route("path_route", r -> r.path("/post")
.filters(f -> f.filter(cachedBodyFilter)
.filter(myFilter))
.uri("http://httpbin.org/post"))
.build();
}
}
注意事项:
- 内存占用: 缓存请求体可能会占用一定的内存,特别是对于较大的请求体。需要根据实际情况评估内存占用情况,并进行相应的优化。
- 性能影响: 读取和缓存请求体可能会带来一定的性能影响。需要根据实际情况评估性能影响,并进行相应的优化。
- Content-Type: 上面的代码示例只缓存了
application/json和text/plain类型的请求体。可以根据实际情况修改。 - 错误处理: 在读取请求体时,可能会发生异常。需要进行适当的错误处理,例如捕获异常并记录日志。
更进一步的思考:
- 流式处理: 虽然缓存请求体可以解决请求体无法重复读取的问题,但是它也带来了一定的内存占用和性能影响。如果请求体非常大,可以考虑使用流式处理的方式,例如使用 Reactor 的
Flux和Mono进行异步处理,避免将整个请求体加载到内存中。 - 选择性缓存: 可以根据请求的 URI、请求头等信息,选择性地缓存请求体。例如,只缓存需要进行多重校验的请求。
- 自定义缓存策略: 可以自定义缓存策略,例如使用 Redis 等缓存中间件来存储请求体,并设置缓存过期时间。
表格总结:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 缓存 BodyFilter | 简单易用,可以解决请求体无法重复读取的问题 | 占用内存,影响性能,需要考虑 Content-Type | 请求体不太大,需要对请求体进行多次处理,例如校验、鉴权、日志记录等 |
| 流式处理 | 避免将整个请求体加载到内存中,适用于较大的请求体 | 实现复杂,需要熟悉 Reactor 的 Flux 和 Mono |
请求体非常大,需要进行异步处理 |
| 选择性缓存 | 可以减少内存占用和性能影响 | 需要根据实际情况进行配置 | 只需要对部分请求进行多重处理 |
| 自定义缓存策略 | 可以根据实际情况进行灵活配置,例如使用 Redis 等缓存中间件来存储请求体,并设置缓存过期时间 | 实现复杂,需要引入额外的依赖 | 需要更高级的缓存策略,例如需要设置缓存过期时间 |
请求体多次读取的解决方案总结
总而言之,Spring Cloud Gateway 请求体无法重复读取是一个常见问题,通过缓存 BodyFilter 可以有效解决。需要注意内存占用、性能影响、Content-Type 和错误处理等方面。根据实际情况选择合适的解决方案,例如流式处理、选择性缓存、自定义缓存策略等。理解这些方案的优缺点,才能在实际项目中做出最佳选择。