JAVA 微服务网关丢请求体?BodyRewriteFilter 与 Reactive Stream 处理机制

JAVA 微服务网关丢请求体?BodyRewriteFilter 与 Reactive Stream 处理机制

各位好,今天我们来聊聊一个在微服务架构中经常遇到的问题:微服务网关丢请求体。特别是在使用 Spring Cloud Gateway 框架,并利用 BodyRewriteFilter 修改请求体时,这个问题更容易发生。我们将深入探讨这个问题的原因,以及如何利用 Reactive Streams 的特性来正确处理请求体,避免数据丢失。

一、问题描述:请求体丢失的场景

假设我们有一个微服务网关,它的主要职责是将外部请求转发到内部的微服务。我们使用 Spring Cloud Gateway,并配置了一个 BodyRewriteFilter,用于修改请求体。例如,我们可能需要将请求体中的某些字段进行加密、转换格式,或者添加一些额外的元数据。

以下是一个简单的 BodyRewriteFilter 配置示例:

@Configuration
public class GatewayConfig {

    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()
                .route("rewrite_request_body", r -> r.path("/api/**")
                        .filters(f -> f.rewriteBody(String.class, String.class, (serverRequest, body) -> {
                            // 在这里对请求体进行修改
                            String modifiedBody = "Modified: " + body;
                            return Mono.just(modifiedBody);
                        }))
                        .uri("http://localhost:8081")) // 假设内部微服务运行在 8081 端口
                .build();
    }
}

这个配置定义了一个路由,当请求的路径以 /api/** 开头时,会经过 rewriteBody 过滤器,将请求体加上 "Modified: " 前缀,然后将请求转发到 http://localhost:8081

然而,在某些情况下,我们可能会发现,内部微服务接收到的请求体为空,或者是不完整的。这表明请求体在经过网关时丢失了。

二、问题根源:Reactive Stream 的背压机制与数据消费

问题的根源在于 Spring Cloud Gateway 基于 Reactive Streams 构建,而 Reactive Streams 具有背压 (Backpressure) 机制。理解背压机制对于解决这个问题至关重要。

  • Reactive Streams 的背压机制: 在 Reactive Streams 中,数据流的生产者(例如,接收HTTP请求的组件)可以生成数据,而数据流的消费者(例如,将数据转发到下游微服务的组件)可以消费数据。如果生产者生成数据的速度快于消费者消费数据的速度,就会发生背压。为了避免消费者被大量数据淹没,Reactive Streams 提供了一种背压机制,允许消费者告知生产者自己能处理多少数据。

  • DataBuffer 的处理: 在 Spring WebFlux 中,HTTP 请求体被表示为 Flux<DataBuffer>DataBuffer 是一个字节缓冲区,它包含了请求体的一部分数据。Flux<DataBuffer> 表示一个包含多个 DataBuffer 的流,这些 DataBuffer 按照顺序组成了完整的请求体。

  • BodyRewriteFilter 的消费行为:BodyRewriteFilter 读取 Flux<DataBuffer> 时,它会消费这个流。这意味着一旦 BodyRewriteFilter 读取了 Flux<DataBuffer> 中的数据,这些数据就不再可用了。如果我们在 BodyRewriteFilter 中读取了请求体,却没有正确地将修改后的请求体重新写入到请求中,那么下游的微服务将无法接收到请求体。

  • 问题发生的原因: 在一些错误的实现中,开发者可能会直接从 Flux<DataBuffer> 中读取数据,例如使用 DataBufferUtils.join(serverRequest.bodyToFlux(DataBuffer.class)).block() 这样的代码。 这会直接将流中的所有数据读取并阻塞等待完成,但没有将数据重新写入请求,导致后续的过滤器或转发逻辑无法访问请求体。 另外,如果没有正确处理 DataBuffer 的引用计数,也可能导致 DataBuffer 被提前释放,从而导致数据丢失。

三、正确处理请求体:使用 ServerHttpRequestDecoratorDataBufferFactory

为了解决请求体丢失的问题,我们需要使用 ServerHttpRequestDecoratorDataBufferFactory 来正确地处理请求体。

  • ServerHttpRequestDecorator: 这是一个非常有用的类,它允许我们创建一个新的 ServerHttpRequest 对象,该对象可以修改原始请求的某些属性,例如请求头、请求体等。我们可以使用 ServerHttpRequestDecorator 来替换原始请求的请求体,从而将修改后的请求体传递给下游的微服务。

  • DataBufferFactory: 这是一个用于创建 DataBuffer 对象的工厂类。我们需要使用 DataBufferFactory 来创建新的 DataBuffer 对象,并将修改后的请求体写入到这些 DataBuffer 对象中。

以下是一个改进后的 BodyRewriteFilter 配置示例:

@Configuration
public class GatewayConfig {

    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()
                .route("rewrite_request_body", r -> r.path("/api/**")
                        .filters(f -> f.modifyRequestBody(String.class, String.class, (serverRequest, body) -> {
                            String modifiedBody = "Modified: " + body;
                            DataBufferFactory bufferFactory = serverRequest.exchange().getApplicationContext().getBean(DataBufferFactory.class);
                            DataBuffer dataBuffer = bufferFactory.wrap(modifiedBody.getBytes(StandardCharsets.UTF_8));
                            return Mono.just(dataBuffer);
                        }))
                        .uri("http://localhost:8081")) // 假设内部微服务运行在 8081 端口
                .build();
    }
}

在这个示例中,我们使用了 modifyRequestBody 过滤器,而不是 rewriteBodymodifyRequestBody 内部使用了 ServerHttpRequestDecoratorDataBufferFactory 来处理请求体。让我们分析一下这个示例:

  1. 获取 DataBufferFactory: 我们首先从 ServerWebExchange 中获取 DataBufferFactoryServerWebExchange 包含了当前请求的所有上下文信息。
  2. 创建 DataBuffer: 我们使用 DataBufferFactory 创建一个新的 DataBuffer 对象,并将修改后的请求体写入到这个 DataBuffer 对象中。 注意,我们需要使用 StandardCharsets.UTF_8 来指定字符编码。
  3. 返回 Mono<DataBuffer>: 我们将新的 DataBuffer 对象封装在一个 Mono 对象中,并将其返回。 Spring Cloud Gateway 会自动将这个 Mono<DataBuffer> 对象设置为新的请求体。

四、更细致的控制:自定义 BodyRewriteFilter

如果我们需要更细致地控制请求体的处理过程,我们可以自定义 BodyRewriteFilter。以下是一个自定义 BodyRewriteFilter 的示例:

import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

@Component
public class CustomBodyRewriteFilter implements GatewayFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        HttpMethod method = request.getMethod();
        MediaType contentType = request.getHeaders().getContentType();

        // 只处理 POST 和 PUT 请求,并且 Content-Type 是 application/json
        if ((HttpMethod.POST.equals(method) || HttpMethod.PUT.equals(method)) &&
            MediaType.APPLICATION_JSON.equals(contentType)) {

            return request.getBody()
                    .collectList() // 将 Flux<DataBuffer> 收集成 List<DataBuffer>
                    .flatMap(dataBuffers -> {
                        // 将 List<DataBuffer> 合并成一个 DataBuffer
                        DataBuffer joinedDataBuffer = exchange.getResponse().bufferFactory().join(dataBuffers);

                        // 将 DataBuffer 转换成 String
                        String requestBody = new String(joinedDataBuffer.asByteBuffer().array(), StandardCharsets.UTF_8);

                        // 修改请求体
                        String modifiedBody = "Custom Modified: " + requestBody;

                        // 创建新的 DataBuffer
                        DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
                        DataBuffer modifiedDataBuffer = bufferFactory.wrap(modifiedBody.getBytes(StandardCharsets.UTF_8));

                        // 创建新的 ServerHttpRequest
                        ServerHttpRequest modifiedRequest = new ServerHttpRequestDecorator(request) {
                            @Override
                            public Flux<DataBuffer> getBody() {
                                return Flux.just(modifiedDataBuffer);
                            }

                            @Override
                            public HttpHeaders getHeaders() {
                                HttpHeaders headers = new HttpHeaders();
                                headers.putAll(super.getHeaders());
                                // 注意:需要重新设置 Content-Length,否则下游服务可能无法正确解析请求体
                                headers.setContentLength(modifiedBody.getBytes(StandardCharsets.UTF_8).length);
                                return headers;
                            }
                        };

                        // 传递修改后的请求
                        return chain.filter(exchange.mutate().request(modifiedRequest).build());
                    });
        } else {
            // 对于其他类型的请求,直接传递
            return chain.filter(exchange);
        }
    }
}

在这个示例中,我们做了以下几件事情:

  1. 判断请求类型: 我们首先判断请求的类型是否为 POSTPUT,以及 Content-Type 是否为 application/json。 只有满足这些条件的请求才会被修改。
  2. 读取请求体: 我们使用 request.getBody().collectList()Flux<DataBuffer> 收集成 List<DataBuffer>,然后使用 exchange.getResponse().bufferFactory().join(dataBuffers)List<DataBuffer> 合并成一个 DataBuffer。 最后,我们将 DataBuffer 转换成 String
  3. 修改请求体: 我们对请求体进行修改。
  4. 创建新的 DataBuffer: 我们使用 DataBufferFactory 创建一个新的 DataBuffer 对象,并将修改后的请求体写入到这个 DataBuffer 对象中。
  5. 创建新的 ServerHttpRequest: 我们使用 ServerHttpRequestDecorator 创建一个新的 ServerHttpRequest 对象,并将新的 DataBuffer 对象设置为新的请求体。 非常重要的一点是,我们需要重新设置 Content-Length 请求头,否则下游服务可能无法正确解析请求体。 如果不设置,下游服务可能会因为读取到的数据长度与 Content-Length 不一致而报错。
  6. 传递修改后的请求: 我们使用 exchange.mutate().request(modifiedRequest).build() 创建一个新的 ServerWebExchange 对象,并将修改后的请求传递给下游的过滤器。

五、高级话题:处理流式请求体(Streaming Body)

在某些情况下,请求体可能非常大,无法一次性读取到内存中。这时,我们需要使用流式处理的方式来处理请求体。

以下是一个处理流式请求体的示例:

import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

@Component
public class StreamingBodyRewriteFilter implements GatewayFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        HttpMethod method = request.getMethod();
        MediaType contentType = request.getHeaders().getContentType();

        // 只处理 POST 和 PUT 请求,并且 Content-Type 是 application/json
        if ((HttpMethod.POST.equals(method) || HttpMethod.PUT.equals(method)) &&
            MediaType.APPLICATION_JSON.equals(contentType)) {

            DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();

            Flux<DataBuffer> modifiedBody = request.getBody()
                    .map(dataBuffer -> {
                        // 读取 DataBuffer 中的数据
                        byte[] content = new byte[dataBuffer.readableByteCount()];
                        dataBuffer.read(content);
                        String chunk = new String(content, StandardCharsets.UTF_8);

                        // 修改数据块
                        String modifiedChunk = "Chunk Modified: " + chunk;

                        // 释放 DataBuffer
                        org.springframework.core.io.buffer.DataBufferUtils.release(dataBuffer);

                        // 创建新的 DataBuffer
                        return bufferFactory.wrap(modifiedChunk.getBytes(StandardCharsets.UTF_8));
                    });

            ServerHttpRequest modifiedRequest = new ServerHttpRequestDecorator(request) {
                @Override
                public Flux<DataBuffer> getBody() {
                    return modifiedBody;
                }

                @Override
                public HttpHeaders getHeaders() {
                    HttpHeaders headers = new HttpHeaders();
                    headers.putAll(super.getHeaders());
                    // 注意:移除 Content-Length,让下游服务使用 Transfer-Encoding: chunked
                    headers.remove(HttpHeaders.CONTENT_LENGTH);
                    return headers;
                }
            };

            return chain.filter(exchange.mutate().request(modifiedRequest).build());
        } else {
            // 对于其他类型的请求,直接传递
            return chain.filter(exchange);
        }
    }
}

在这个示例中,我们做了以下几件事情:

  1. 使用 map 操作符: 我们使用 map 操作符来对 Flux<DataBuffer> 中的每个 DataBuffer 进行处理。
  2. 读取 DataBuffer 中的数据: 我们读取 DataBuffer 中的数据,并将其转换成 String
  3. 修改数据块: 我们对数据块进行修改。
  4. 创建新的 DataBuffer: 我们使用 DataBufferFactory 创建一个新的 DataBuffer 对象,并将修改后的数据块写入到这个 DataBuffer 对象中。
  5. 释放 DataBuffer: 在处理完 DataBuffer 后,我们需要使用 DataBufferUtils.release(dataBuffer) 来释放 DataBuffer这是非常重要的,否则会导致内存泄漏。
  6. 移除 Content-Length: 由于我们无法提前知道修改后的请求体的总长度,因此我们需要移除 Content-Length 请求头。 这样,下游服务会使用 Transfer-Encoding: chunked 来接收请求体。

六、最佳实践与注意事项

  • 选择合适的过滤器: Spring Cloud Gateway 提供了多种过滤器,例如 rewriteBodymodifyRequestBody 等。 选择合适的过滤器可以简化代码,并提高性能。 modifyRequestBody 通常是更安全的选择,因为它内部处理了 ServerHttpRequestDecoratorDataBufferFactory
  • 正确处理 DataBuffer: 在使用 DataBuffer 时,务必注意引用计数和释放。 如果没有正确释放 DataBuffer,会导致内存泄漏。
  • 设置 Content-Length: 如果可以提前知道修改后的请求体的总长度,务必设置 Content-Length 请求头。 否则,移除 Content-Length 请求头,让下游服务使用 Transfer-Encoding: chunked
  • 处理异常: 在处理请求体时,可能会发生各种异常,例如字符编码错误、IO 错误等。 务必处理这些异常,避免程序崩溃。
  • 测试: 对修改请求体的过滤器进行充分的测试,确保其能够正确处理各种类型的请求,并且不会导致数据丢失。
  • 监控: 对网关进行监控,观察请求体的处理情况,及时发现和解决问题。

七、一些有用的工具类和方法

工具类/方法 功能描述
DataBufferFactory 用于创建 DataBuffer 对象。
ServerHttpRequestDecorator 用于创建一个新的 ServerHttpRequest 对象,该对象可以修改原始请求的某些属性。
DataBufferUtils.join() 将多个 DataBuffer 对象合并成一个 DataBuffer 对象。
DataBufferUtils.release() 释放 DataBuffer 对象,避免内存泄漏。
StandardCharsets.UTF_8 指定字符编码。

八、总结:正确理解 Reactive Stream,避免请求体丢失

理解 Reactive Streams 的背压机制以及 DataBuffer 的处理方式是解决网关请求体丢失问题的关键。通过使用 ServerHttpRequestDecoratorDataBufferFactory,我们可以正确地修改请求体,并将修改后的请求传递给下游的微服务。在处理流式请求体时,我们需要使用 map 操作符,并注意释放 DataBuffer,避免内存泄漏。 务必记住,在修改请求体后,需要重新设置 Content-Length 请求头,或者移除该请求头。 最后,充分的测试和监控是保证网关稳定运行的重要手段。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注