JAVA Spring Cloud Gateway 请求体无法重复读取?缓存 BodyFilter 解决方案

Spring Cloud Gateway 请求体无法重复读取?缓存 BodyFilter 解决方案

大家好,今天我们来聊聊 Spring Cloud Gateway 中一个常见的痛点:请求体无法重复读取的问题,以及如何通过缓存 BodyFilter 来解决这个问题。这个问题在实际应用中非常常见,特别是在需要对请求体进行多次处理,例如校验、鉴权、日志记录等场景。

问题描述:请求体只能读取一次

Spring Cloud Gateway 基于 Netty,默认情况下,请求体被封装成 DataBuffer 对象。DataBuffer 是一个基于 Reactor 的响应式数据缓冲区,其特点是只能被读取一次。当你从 ServerHttpRequest 中获取 DataBuffer 并读取数据后,后续的过滤器或者业务逻辑就无法再次读取请求体了。

为什么会这样?

这是因为 DataBuffer 本质上是一个流式读取的数据结构。读取数据后,内部的指针会移动到流的末尾,再次读取时自然就无法获取到任何数据。

这个问题会带来哪些麻烦?

  1. 多重校验失败: 如果你在多个过滤器中都需要对请求体进行校验,那么只有第一个过滤器能够成功读取到请求体,后续的过滤器会因为无法读取请求体而导致校验失败。
  2. 日志记录缺失: 如果你需要记录请求体的内容,那么只能在第一个过滤器中进行,后续的过滤器无法获取到请求体,从而导致日志记录缺失。
  3. 鉴权失败: 如果你需要根据请求体的内容进行鉴权,那么只有第一个过滤器能够成功读取到请求体进行鉴权,后续的过滤器会因为无法读取请求体而导致鉴权失败。

如何解决请求体无法重复读取的问题?

核心思路是:在第一个过滤器中读取请求体,并将请求体的内容缓存起来,后续的过滤器可以直接从缓存中获取请求体的内容。

解决方案:缓存 BodyFilter

我们可以自定义一个 GlobalFilter,专门用于缓存请求体。这个过滤器会在所有过滤器之前执行,负责读取请求体,并将其缓存到 ServerWebExchange 的 attributes 中。后续的过滤器可以直接从 ServerWebExchange 的 attributes 中获取缓存的请求体。

具体实现步骤:

  1. 创建缓存 BodyFilter 类:

    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.cloud.gateway.filter.GlobalFilter;
    import org.springframework.core.Ordered;
    import org.springframework.core.io.buffer.DataBuffer;
    import org.springframework.core.io.buffer.DataBufferUtils;
    import org.springframework.http.HttpHeaders;
    import org.springframework.http.HttpMethod;
    import org.springframework.http.MediaType;
    import org.springframework.http.server.reactive.ServerHttpRequest;
    import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
    import org.springframework.stereotype.Component;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Objects;
    
    @Component
    public class CachedBodyFilter implements GlobalFilter, Ordered {
    
       private static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";
    
       @Override
       public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
           ServerHttpRequest request = exchange.getRequest();
    
           // 只缓存 POST、PUT、PATCH 请求,可以根据实际情况修改
           if (!HttpMethod.POST.equals(request.getMethod()) &&
               !HttpMethod.PUT.equals(request.getMethod()) &&
               !HttpMethod.PATCH.equals(request.getMethod())) {
               return chain.filter(exchange);
           }
    
           // 只有Content-Type为 application/json, text/plain, application/xml 才缓存,可以根据实际情况修改
           HttpHeaders headers = request.getHeaders();
           MediaType contentType = headers.getContentType();
           if (contentType == null ||
               (!MediaType.APPLICATION_JSON.equals(contentType) &&
                !MediaType.TEXT_PLAIN.equals(contentType) &&
                !MediaType.APPLICATION_XML.equals(contentType))) {
               return chain.filter(exchange);
           }
    
           // 读取请求体
           return DataBufferUtils.join(request.getBody())
               .flatMap(dataBuffer -> {
                   // 缓存请求体
                   byte[] bytes = new byte[dataBuffer.readableByteCount()];
                   dataBuffer.read(bytes);
                   DataBufferUtils.release(dataBuffer); // 释放资源
                   String requestBody = new String(bytes, StandardCharsets.UTF_8);
                   exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, requestBody);
    
                   // 重新封装请求体,传递给后续的过滤器
                   Flux<DataBuffer> cachedFlux = Flux.just(exchange.getResponse().bufferFactory().wrap(bytes));
                   ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(request) {
                       @Override
                       public Flux<DataBuffer> getBody() {
                           return cachedFlux;
                       }
    
                       @Override
                       public HttpHeaders getHeaders() {
                           HttpHeaders mutableHeaders = new HttpHeaders();
                           mutableHeaders.addAll(super.getHeaders());
                           // 重新设置Content-Length,防止body为空
                           mutableHeaders.setContentLength(bytes.length);
                           return mutableHeaders;
                       }
                   };
                   return chain.filter(exchange.mutate().request(mutatedRequest).build());
               });
       }
    
       @Override
       public int getOrder() {
           // 确保在所有过滤器之前执行
           return Ordered.HIGHEST_PRECEDENCE;
       }
    
       public static String getCachedBody(ServerWebExchange exchange) {
           return exchange.getAttribute(CACHED_REQUEST_BODY_KEY);
       }
    }
  2. 代码解释:

    • CACHED_REQUEST_BODY_KEY 定义了一个常量,用于存储缓存的请求体。
    • filter 方法: 实现了 GlobalFilter 接口的 filter 方法,用于处理请求。
      • 首先判断请求方法是否为 POST、PUT 或 PATCH,如果不是,则直接放行。可以根据实际情况修改。
      • 接着判断 Content-Type 是否为 application/jsontext/plainapplication/xml,如果不是,则直接放行。可以根据实际情况修改。
      • 使用 DataBufferUtils.join 方法将请求体转换为 DataBuffer
      • DataBuffer 中读取数据,并将其转换为字符串,存储到 ServerWebExchange 的 attributes 中。
      • 创建一个新的 ServerHttpRequestDecorator 对象,用于重新封装请求体。
      • ServerHttpRequestDecorator 对象的 getBody 方法中,返回缓存的请求体。
      • 将新的 ServerHttpRequestDecorator 对象设置到 ServerWebExchange 中,并传递给后续的过滤器。
      • 释放DataBuffer 资源,防止内存泄漏
    • getOrder 方法: 实现了 Ordered 接口的 getOrder 方法,用于指定过滤器的执行顺序。这里设置为 Ordered.HIGHEST_PRECEDENCE,确保在所有过滤器之前执行。
    • getCachedBody 方法: 提供一个静态方法,方便其他过滤器获取缓存的请求体。
  3. 使用示例:

    在其他的过滤器中,可以通过 CachedBodyFilter.getCachedBody(exchange) 方法获取缓存的请求体。

    import org.springframework.cloud.gateway.filter.GatewayFilter;
    import org.springframework.cloud.gateway.filter.GatewayFilterChain;
    import org.springframework.stereotype.Component;
    import org.springframework.web.server.ServerWebExchange;
    import reactor.core.publisher.Mono;
    
    @Component
    public class MyFilter implements GatewayFilter {
    
       @Override
       public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
           String requestBody = CachedBodyFilter.getCachedBody(exchange);
    
           // 对请求体进行处理,例如校验、鉴权、日志记录等
           if (requestBody != null) {
               System.out.println("Request Body: " + requestBody);
           }
    
           return chain.filter(exchange);
       }
    }
  4. 注册 Filter:

    需要在 Gateway 的配置类中将自定义的 Filter 注册为 Bean。如果CachedBodyFilter 类使用了@Component注解,Spring 会自动扫描并注册。

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.gateway.route.RouteLocator;
    import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class GatewayApplication {
    
       public static void main(String[] args) {
           SpringApplication.run(GatewayApplication.class, args);
       }
    
       @Bean
       public RouteLocator customRouteLocator(RouteLocatorBuilder builder, CachedBodyFilter cachedBodyFilter) {
           return builder.routes()
                   .route("path_route", r -> r.path("/get")
                           .filters(f -> f.filter(cachedBodyFilter)) // 使用 CachedBodyFilter
                           .uri("http://httpbin.org"))
                   .build();
       }
    }

完整代码示例:

这里提供一个完整的示例,包括 CachedBodyFilter 和一个简单的使用 CachedBodyFilterMyFilter

// CachedBodyFilter.java
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

@Component
public class CachedBodyFilter implements GlobalFilter, Ordered {

    private static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();

        if (!HttpMethod.POST.equals(request.getMethod()) &&
            !HttpMethod.PUT.equals(request.getMethod()) &&
            !HttpMethod.PATCH.equals(request.getMethod())) {
            return chain.filter(exchange);
        }

        HttpHeaders headers = request.getHeaders();
        MediaType contentType = headers.getContentType();
        if (contentType == null ||
            (!MediaType.APPLICATION_JSON.equals(contentType) &&
             !MediaType.TEXT_PLAIN.equals(contentType) &&
             !MediaType.APPLICATION_XML.equals(contentType))) {
            return chain.filter(exchange);
        }

        return DataBufferUtils.join(request.getBody())
            .flatMap(dataBuffer -> {
                byte[] bytes = new byte[dataBuffer.readableByteCount()];
                dataBuffer.read(bytes);
                DataBufferUtils.release(dataBuffer);
                String requestBody = new String(bytes, StandardCharsets.UTF_8);
                exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, requestBody);

                Flux<DataBuffer> cachedFlux = Flux.just(exchange.getResponse().bufferFactory().wrap(bytes));
                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(request) {
                    @Override
                    public Flux<DataBuffer> getBody() {
                        return cachedFlux;
                    }

                    @Override
                    public HttpHeaders getHeaders() {
                        HttpHeaders mutableHeaders = new HttpHeaders();
                        mutableHeaders.addAll(super.getHeaders());
                        mutableHeaders.setContentLength(bytes.length);
                        return mutableHeaders;
                    }
                };
                return chain.filter(exchange.mutate().request(mutatedRequest).build());
            });
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }

    public static String getCachedBody(ServerWebExchange exchange) {
        return exchange.getAttribute(CACHED_REQUEST_BODY_KEY);
    }
}

// MyFilter.java
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@Component
public class MyFilter implements GatewayFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        String requestBody = CachedBodyFilter.getCachedBody(exchange);

        if (requestBody != null) {
            System.out.println("Request Body in MyFilter: " + requestBody);
        }

        return chain.filter(exchange);
    }
}

// GatewayApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class GatewayApplication {

    public static void main(String[] args) {
        SpringApplication.run(GatewayApplication.class, args);
    }

    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder, CachedBodyFilter cachedBodyFilter, MyFilter myFilter) {
        return builder.routes()
                .route("path_route", r -> r.path("/post")
                        .filters(f -> f.filter(cachedBodyFilter)
                                       .filter(myFilter))
                        .uri("http://httpbin.org/post"))
                .build();
    }
}

注意事项:

  • 内存占用: 缓存请求体可能会占用一定的内存,特别是对于较大的请求体。需要根据实际情况评估内存占用情况,并进行相应的优化。
  • 性能影响: 读取和缓存请求体可能会带来一定的性能影响。需要根据实际情况评估性能影响,并进行相应的优化。
  • Content-Type: 上面的代码示例只缓存了 application/jsontext/plain 类型的请求体。可以根据实际情况修改。
  • 错误处理: 在读取请求体时,可能会发生异常。需要进行适当的错误处理,例如捕获异常并记录日志。

更进一步的思考:

  • 流式处理: 虽然缓存请求体可以解决请求体无法重复读取的问题,但是它也带来了一定的内存占用和性能影响。如果请求体非常大,可以考虑使用流式处理的方式,例如使用 Reactor 的 FluxMono 进行异步处理,避免将整个请求体加载到内存中。
  • 选择性缓存: 可以根据请求的 URI、请求头等信息,选择性地缓存请求体。例如,只缓存需要进行多重校验的请求。
  • 自定义缓存策略: 可以自定义缓存策略,例如使用 Redis 等缓存中间件来存储请求体,并设置缓存过期时间。

表格总结:

方案 优点 缺点 适用场景
缓存 BodyFilter 简单易用,可以解决请求体无法重复读取的问题 占用内存,影响性能,需要考虑 Content-Type 请求体不太大,需要对请求体进行多次处理,例如校验、鉴权、日志记录等
流式处理 避免将整个请求体加载到内存中,适用于较大的请求体 实现复杂,需要熟悉 Reactor 的 FluxMono 请求体非常大,需要进行异步处理
选择性缓存 可以减少内存占用和性能影响 需要根据实际情况进行配置 只需要对部分请求进行多重处理
自定义缓存策略 可以根据实际情况进行灵活配置,例如使用 Redis 等缓存中间件来存储请求体,并设置缓存过期时间 实现复杂,需要引入额外的依赖 需要更高级的缓存策略,例如需要设置缓存过期时间

请求体多次读取的解决方案总结

总而言之,Spring Cloud Gateway 请求体无法重复读取是一个常见问题,通过缓存 BodyFilter 可以有效解决。需要注意内存占用、性能影响、Content-Type 和错误处理等方面。根据实际情况选择合适的解决方案,例如流式处理、选择性缓存、自定义缓存策略等。理解这些方案的优缺点,才能在实际项目中做出最佳选择。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注