Spring Cloud Sleuth traceId跨线程池丢失?MDC透传与Reactor Context融合方案

好的,下面是一篇关于Spring Cloud Sleuth traceId跨线程池丢失以及MDC透传与Reactor Context融合方案的技术文章,以讲座的形式呈现。

Spring Cloud Sleuth TraceId 跨线程池丢失与解决方案

大家好,今天我们来聊聊在使用 Spring Cloud Sleuth 进行链路追踪时,经常会遇到的一个问题:traceId 在跨线程池时丢失。这个问题会导致链路追踪不完整,给问题排查带来很大的麻烦。本次讲座将深入探讨 traceId 丢失的原因,并提供基于 MDC 透传和 Reactor Context 融合的解决方案。

问题背景:Sleuth 与 TraceId

首先,我们简单回顾一下 Spring Cloud Sleuth 的作用。Sleuth 是一个分布式追踪解决方案,它可以帮助我们追踪请求在微服务架构中的调用链。每个请求都会被分配一个唯一的 traceId,这个 traceId 会在整个调用链中传递,从而将所有相关的日志关联起来。

Sleuth 依赖于 Brave 库来实现追踪功能。它通过拦截 HTTP 请求、消息队列消息等方式,自动生成和传递 traceId

TraceId 丢失的原因分析

traceId 丢失通常发生在以下场景:

  1. 显式使用线程池: 当我们在代码中显式地创建和使用线程池时,traceId 可能会丢失。这是因为线程池中的线程是由线程池管理的,而不是由 Sleuth 直接管理的。
  2. 异步执行任务: 使用 @Async 注解或 CompletableFuture 等异步编程方式时,traceId 传递也可能出现问题。
  3. Reactor 或 RxJava: 在使用 Reactor 或 RxJava 等响应式编程框架时,如果没有正确地处理 Context,traceId 也会丢失。

根本原因:ThreadLocal 的局限性

Sleuth 默认使用 ThreadLocal 来存储 traceId 等追踪信息。ThreadLocal 的特点是线程隔离,即每个线程都拥有自己的 ThreadLocal 变量副本。当请求从一个线程传递到另一个线程时,ThreadLocal 中的 traceId 不会被自动传递过去,这就导致了 traceId 丢失。

解决方案一:MDC (Mapped Diagnostic Context) 透传

MDC 是 Logback 和 Log4j 等日志框架提供的一种机制,允许我们将一些上下文信息(例如 traceId)放入 MDC 中,然后在日志输出时自动包含这些信息。

原理:

MDC 本质上也是一个 ThreadLocal,但我们可以通过一些手段在线程之间传递 MDC 的内容。

实现步骤:

  1. 配置 MDC:logback.xmllog4j2.xml 中配置日志格式,使其包含 traceIdspanId 等信息。例如,在 logback.xml 中:

    <configuration>
        <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n traceId: %X{traceId}, spanId: %X{spanId}</pattern>
            </encoder>
        </appender>
    
        <root level="INFO">
            <appender-ref ref="CONSOLE"/>
        </root>
    </configuration>
  2. 使用 TraceContext 手动设置 MDC: 在线程池执行任务之前,从 TraceContext 中获取 traceIdspanId,并将其放入 MDC 中。在任务执行完毕后,清除 MDC。

    import org.slf4j.MDC;
    import org.springframework.cloud.sleuth.Tracer;
    import org.springframework.cloud.sleuth.Span;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class MdcExample {
    
        private final ExecutorService executorService = Executors.newFixedThreadPool(10);
        private final Tracer tracer;
    
        public MdcExample(Tracer tracer) {
            this.tracer = tracer;
        }
    
        public void submitTask(Runnable task) {
            Span currentSpan = tracer.currentSpan();
            String traceId = currentSpan != null ? currentSpan.context().traceId() : null;
            String spanId = currentSpan != null ? currentSpan.context().spanId() : null;
    
            executorService.submit(() -> {
                try {
                    if (traceId != null) {
                        MDC.put("traceId", traceId);
                    }
                    if (spanId != null) {
                        MDC.put("spanId", spanId);
                    }
                    task.run();
                } finally {
                    MDC.remove("traceId");
                    MDC.remove("spanId");
                }
            });
        }
    }

优点:

  • 实现简单,易于理解。
  • 适用于各种线程池场景。

缺点:

  • 需要手动设置和清除 MDC,代码侵入性较强。
  • 容易忘记清除 MDC,导致 traceId 泄露。

解决方案二: Reactor Context 融合

Reactor 是一个基于响应式编程模型的框架,它提供了一种强大的方式来处理异步数据流。Reactor Context 是一种特殊的上下文,它可以携带一些信息,并在整个数据流中传递。

原理:

我们可以将 traceId 放入 Reactor Context 中,然后通过 Reactor 的操作符(例如 contextWritecontextRead)来传递 traceId

实现步骤:

  1. 获取 TraceContext 并放入 Context: 在 Reactor 数据流的入口处,从 TraceContext 中获取 traceIdspanId,并将其放入 Context 中。

    import org.springframework.cloud.sleuth.Tracer;
    import reactor.core.publisher.Mono;
    import reactor.util.context.Context;
    
    public class ReactorContextExample {
    
        private final Tracer tracer;
    
        public ReactorContextExample(Tracer tracer) {
            this.tracer = tracer;
        }
    
        public Mono<String> processData(String data) {
            return Mono.just(data)
                    .flatMap(this::doSomething)
                    .contextWrite(context -> {
                        if (tracer.currentSpan() != null) {
                            return context.put("traceId", tracer.currentSpan().context().traceId())
                                    .put("spanId", tracer.currentSpan().context().spanId());
                        }
                        return context;
                    });
        }
    
        private Mono<String> doSomething(String data) {
            return Mono.just("Processed: " + data)
                    .contextRead(context -> {
                        String traceId = context.getOrDefault("traceId", "N/A").toString();
                        String spanId = context.getOrDefault("spanId", "N/A").toString();
                        System.out.println("TraceId in doSomething: " + traceId + ", SpanId: " + spanId);
                        return context;
                    });
        }
    }
  2. 使用 contextRead 从 Context 中获取 traceId 在需要使用 traceId 的地方,使用 contextRead 从 Context 中获取 traceId

  3. 配置 MDC 传递: 在订阅的时候,使用doOnEach手动将traceId 放入到MDC

    import org.slf4j.MDC;
    import reactor.core.publisher.Mono;
    
    public class ReactorContextExample {
    
        public static void main(String[] args) {
            Mono<String> reactiveOperation = Mono.just("data")
                    .contextWrite(context -> context.put("traceId", "12345")) // 模拟放入traceId
                    .flatMap(data -> Mono.deferContextual(contextView -> {
                        String traceId = contextView.get("traceId");
                        return Mono.just("Processed: " + data + " with traceId: " + traceId);
                    }))
                    .doOnEach(signal -> {
                        if (signal.getContextView().hasKey("traceId")) {
                            MDC.put("traceId", signal.getContextView().get("traceId").toString());
                        }
                    })
                    .doFinally(signalType -> {
                        MDC.remove("traceId"); // 清除MDC
                    });
    
            reactiveOperation.subscribe(
                    result -> System.out.println("Result: " + result),
                    error -> System.err.println("Error: " + error.getMessage()),
                    () -> System.out.println("Completed")
            );
        }
    }

优点:

  • 与 Reactor 框架集成良好,代码风格统一。
  • 避免了手动设置和清除 MDC,减少了代码侵入性。
  • 可以方便地传递其他上下文信息。

缺点:

  • 只适用于 Reactor 框架。
  • 需要对 Reactor Context 有一定的了解。

代码示例:集成 MDC 和 Reactor Context

为了更好地说明如何将 MDC 和 Reactor Context 结合使用,我们提供一个更完整的示例,它演示了如何将 traceId 从 HTTP 请求传递到 Reactor 数据流,并在线程池中执行任务。

import org.slf4j.MDC;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RestController
public class TraceController {

    private final Tracer tracer;
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    public TraceController(Tracer tracer) {
        this.tracer = tracer;
    }

    @GetMapping("/trace")
    public Mono<String> trace() {
        return Mono.just("Start")
                .flatMap(this::processData)
                .contextWrite(context -> {
                    if (tracer.currentSpan() != null) {
                        return context.put("traceId", tracer.currentSpan().context().traceId())
                                .put("spanId", tracer.currentSpan().context().spanId());
                    }
                    return context;
                })
                .doOnEach(signal -> {
                    if (signal.getContextView().hasKey("traceId")) {
                        MDC.put("traceId", signal.getContextView().get("traceId").toString());
                    }
                })
                .doFinally(signalType -> {
                    MDC.remove("traceId"); // 清除MDC
                })
                .subscribeOn(Schedulers.fromExecutor(executorService));
    }

    private Mono<String> processData(String data) {
        return Mono.just("Processed: " + data)
                .contextRead(context -> {
                    String traceId = context.getOrDefault("traceId", "N/A").toString();
                    String spanId = context.getOrDefault("spanId", "N/A").toString();
                    System.out.println("TraceId in processData: " + traceId + ", SpanId: " + spanId);
                    return context;
                });
    }
}

代码解释:

  1. @RestController 注解:将 TraceController 标记为一个 REST 控制器。
  2. @GetMapping("/trace") 注解:将 trace() 方法映射到 /trace 路径。
  3. tracer.currentSpan():获取当前的 Span 对象,从中可以获取 traceIdspanId
  4. contextWrite():将 traceIdspanId 放入 Reactor Context 中。
  5. contextRead():从 Reactor Context 中获取 traceIdspanId
  6. subscribeOn(Schedulers.fromExecutor(executorService)):将 Reactor 数据流的执行切换到线程池中。
  7. doOnEach()doFinally():在Reactor的生命周期中,手动将traceId放入MDC和移除MDC.

测试步骤:

  1. 启动应用程序。
  2. 发送 HTTP 请求到 /trace 路径。
  3. 查看日志,确认 traceIdspanId 在整个调用链中都被正确地传递。

其他注意事项

  • 线程池配置: 建议使用 Spring 提供的 ThreadPoolTaskExecutor,它可以更好地与 Sleuth 集成。
  • 自定义 Span: 可以使用 Tracer 接口手动创建和结束 Span,以更精确地控制追踪范围。
  • 采样率: 可以通过配置采样率来控制 Sleuth 的追踪频率,以减少性能开销。

方案对比:MDC vs Reactor Context

为了更清晰地对比两种方案,我们提供一个表格:

特性 MDC 透传 Reactor Context 融合
适用场景 各种线程池场景 Reactor 框架
代码侵入性 较强,需要手动设置和清除 MDC 较低,与 Reactor 框架集成
易用性 简单易懂 需要对 Reactor Context 有一定的了解
性能 较好 可能会有轻微的性能影响 (Context 传递开销)
框架依赖 日志框架 (Logback, Log4j) Reactor 框架

总结:如何选择合适的方案

在选择解决方案时,需要根据具体的应用场景进行权衡。

  • 如果你的应用没有使用 Reactor 框架,或者只需要在少量地方进行线程池操作,那么 MDC 透传可能是一个更简单的选择。
  • 如果你的应用大量使用了 Reactor 框架,并且希望代码风格保持一致,那么 Reactor Context 融合可能更适合你。

无论选择哪种方案,都需要确保 traceId 在整个调用链中都被正确地传递,从而保证链路追踪的完整性。

最后想说

希望本次讲座能够帮助大家解决 Spring Cloud Sleuth 中 traceId 跨线程池丢失的问题。记住,理解问题的本质,选择合适的解决方案,并进行充分的测试,才能保证链路追踪的可靠性。

关于TraceId 跨线程池丢失的问题,理解本质,选择合适的解决方案,并进行充分的测试。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注