Java 服务调用链 TraceId 丢失? MDC 与异步线程上下文传递方案
各位朋友,大家好。今天我们来聊聊在分布式系统中,服务调用链追踪中TraceId丢失的问题,以及如何利用MDC(Mapped Diagnostic Context)和一些上下文传递方案来解决这个问题,尤其是在涉及到异步线程的时候。
为什么需要调用链追踪?
在微服务架构中,一个用户请求往往需要经过多个服务的协同处理。如果请求处理过程中出现问题,我们需要快速定位问题所在。调用链追踪就是为了解决这个问题而生的。它可以将一次用户请求在各个服务之间的调用关系串联起来,形成一条完整的调用链。通过分析调用链,我们可以清晰地了解请求的路径、耗时、以及各个服务的状态,从而快速定位问题。
调用链追踪的关键在于能够为每个请求生成一个唯一的ID,也就是TraceId。这个TraceId需要在整个调用链上传递,以便将各个服务的日志关联起来。
TraceId 丢失的常见场景
TraceId丢失的原因有很多,其中最常见的就是在异步线程中丢失。在多线程环境中,主线程和子线程的上下文是不同的。如果我们在主线程中设置了TraceId,但在子线程中没有正确地传递TraceId,那么子线程的日志就无法与主线程的日志关联起来,导致调用链断裂。
以下是一些可能导致TraceId丢失的常见场景:
- 异步方法调用: 使用
@Async注解或者ExecutorService提交任务时,没有进行TraceId的传递。 - 消息队列: 消息生产者设置了TraceId,但消息消费者没有正确地提取并设置TraceId。
- 线程池: 线程池中的线程被复用,如果没有在任务执行前后清理TraceId,可能会导致TraceId错乱。
- 定时任务: 定时任务在独立线程中执行,如果没有进行TraceId的传递,会导致定时任务的日志无法与请求关联。
- Servlet Filter/Interceptor: 如果Filter/Interceptor没有正确地处理异步请求,可能会导致TraceId丢失。
MDC:解决线程上下文问题的利器
MDC (Mapped Diagnostic Context) 是log4j、logback等日志框架提供的一种方便在多线程环境下存储和访问线程上下文信息的机制。它本质上是一个线程本地变量(ThreadLocal),可以在同一个线程中跨方法传递数据,而无需显式地传递参数。
我们可以利用MDC来存储TraceId,然后在需要的地方从MDC中获取TraceId。
MDC 的基本使用
-
放入 MDC: 在请求的入口处,生成TraceId,并将其放入MDC。
import org.slf4j.MDC; public class TraceIdGenerator { public static String generate() { return java.util.UUID.randomUUID().toString(); } } // 在请求入口 (例如:Servlet Filter, Spring Interceptor) String traceId = TraceIdGenerator.generate(); MDC.put("traceId", traceId); // 请求处理... // 请求结束时,移除 MDC MDC.remove("traceId"); -
日志配置: 配置日志框架,将MDC中的TraceId输出到日志中。
-
logback.xml:
<configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n traceId:%X{traceId} </pattern> </encoder> </appender> <root level="info"> <appender-ref ref="STDOUT" /> </root> </configuration> -
log4j.properties:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{1} - %m%n traceId:%X{traceId}
-
-
使用 MDC: 在任何需要的地方,都可以通过
MDC.get("traceId")获取TraceId。
示例:使用 Servlet Filter 设置 TraceId
import org.slf4j.MDC;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
public class TraceIdFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String traceId = httpRequest.getHeader("traceId"); // 从请求头中获取,如果不存在则生成
if (traceId == null || traceId.isEmpty()) {
traceId = TraceIdGenerator.generate();
}
MDC.put("traceId", traceId);
try {
chain.doFilter(request, response);
} finally {
MDC.remove("traceId"); // 确保请求结束后清理 MDC
}
}
@Override
public void destroy() {
}
}
在上面的例子中,我们创建了一个TraceIdFilter,它会在每个请求进入时生成或者从请求头中获取TraceId,并将其放入MDC。请求结束后,我们会从MDC中移除TraceId。
异步线程的 TraceId 传递
MDC 只能在同一个线程中传递数据。对于异步线程,我们需要手动地传递TraceId。
方案一:手动传递 TraceId
这是最直接的方法,即在创建异步任务时,显式地将TraceId传递给异步任务。
import org.slf4j.MDC;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncExample {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public void process(String data) {
String traceId = MDC.get("traceId");
executor.submit(() -> {
try {
MDC.put("traceId", traceId); // 在子线程中设置 TraceId
// 异步处理逻辑
System.out.println("Async processing: " + data + ", TraceId: " + MDC.get("traceId"));
} finally {
MDC.remove("traceId"); // 确保子线程结束后清理 MDC
}
});
}
}
这种方法简单直接,但是需要在每个异步任务中都手动地设置和清理TraceId,比较繁琐。
方案二:使用 Callable 和 Future
Callable接口允许任务返回值,并且可以抛出异常。我们可以利用它来传递TraceId。
import org.slf4j.MDC;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableExample {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public Future<String> process(String data) {
String traceId = MDC.get("traceId");
Callable<String> task = () -> {
try {
MDC.put("traceId", traceId);
// 异步处理逻辑
String result = "Async processing: " + data + ", TraceId: " + MDC.get("traceId");
System.out.println(result);
return result;
} finally {
MDC.remove("traceId");
}
};
return executor.submit(task);
}
}
这种方法与手动传递类似,但使用了Callable,可以方便地获取异步任务的结果。
方案三:自定义 ExecutorService 包装器
我们可以创建一个自定义的ExecutorService包装器,在提交任务之前自动地将TraceId传递给任务,并在任务执行结束后清理TraceId。
import org.slf4j.MDC;
import java.util.concurrent.*;
public class TraceAwareExecutorService implements ExecutorService {
private final ExecutorService delegate;
public TraceAwareExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public java.util.List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(wrap(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return delegate.submit(wrap(task));
}
@Override
public <T> java.util.List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(wrap(tasks));
}
@Override
public <T> java.util.List<Future<T>> invokeAll(java.util.Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(wrap(tasks), timeout, unit);
}
@Override
public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrap(tasks));
}
@Override
public <T> T invokeAny(java.util.Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrap(tasks), timeout, unit);
}
@Override
public void execute(Runnable command) {
delegate.execute(wrap(command));
}
private <T> Callable<T> wrap(Callable<T> task) {
String traceId = MDC.get("traceId");
return () -> {
if (traceId != null) {
MDC.put("traceId", traceId);
}
try {
return task.call();
} finally {
MDC.remove("traceId");
}
};
}
private Runnable wrap(Runnable task) {
String traceId = MDC.get("traceId");
return () -> {
if (traceId != null) {
MDC.put("traceId", traceId);
}
try {
task.run();
} finally {
MDC.remove("traceId");
}
};
}
private <T> java.util.Collection<? extends Callable<T>> wrap(java.util.Collection<? extends Callable<T>> tasks) {
String traceId = MDC.get("traceId");
return tasks.stream().map(task -> (Callable<T>) () -> {
if (traceId != null) {
MDC.put("traceId", traceId);
}
try {
return task.call();
} finally {
MDC.remove("traceId");
}
}).toList();
}
}
使用时,只需要将原有的ExecutorService替换为TraceAwareExecutorService即可:
ExecutorService executor = new TraceAwareExecutorService(Executors.newFixedThreadPool(10));
这种方法可以避免在每个异步任务中都手动地设置和清理TraceId,减少了代码的冗余。
方案四:使用 Spring 的 TaskDecorator
Spring 提供了TaskDecorator接口,可以在任务执行前后执行一些额外的操作。我们可以利用它来传递TraceId。
import org.slf4j.MDC;
import org.springframework.core.task.TaskDecorator;
public class MDCTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
String traceId = MDC.get("traceId");
return () -> {
if (traceId != null) {
MDC.put("traceId", traceId);
}
try {
runnable.run();
} finally {
MDC.remove("traceId");
}
};
}
}
配置 Spring 的线程池:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setTaskDecorator(new MDCTaskDecorator());
executor.initialize();
return executor;
}
}
在 Spring Boot 应用中,可以直接配置application.properties:
spring.task.execution.decorator.bean-name=MDCTaskDecorator
这种方法与自定义ExecutorService包装器类似,但使用了 Spring 提供的TaskDecorator接口,更加方便。
方案五:使用 TransmittableThreadLocal (TTL)
TransmittableThreadLocal (TTL) 是阿里巴巴开源的一个库,它可以解决在线程池等场景下,ThreadLocal变量的值传递问题。TTL 可以自动地将父线程的ThreadLocal变量传递给子线程。
-
引入 TTL:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.14.2</version> </dependency> -
使用 TTL:
import com.alibaba.ttl.TransmittableThreadLocal; import org.slf4j.MDC; public class TTLExample { private static final TransmittableThreadLocal<String> traceIdHolder = new TransmittableThreadLocal<>(); public void process(String data) { String traceId = MDC.get("traceId"); traceIdHolder.set(traceId); // 使用 TTL 存储 TraceId executor.submit(() -> { try { MDC.put("traceId", traceIdHolder.get()); // 从 TTL 获取 TraceId // 异步处理逻辑 System.out.println("Async processing: " + data + ", TraceId: " + MDC.get("traceId")); } finally { MDC.remove("traceId"); traceIdHolder.remove(); // 清理 TTL } }); } } -
包装 ExecutorService:
使用TTL提供的
TtlExecutors工具类对线程池进行包装,可以简化TTL的使用。import com.alibaba.ttl.TtlExecutors; ExecutorService executor = TtlExecutors.getTtlExecutor(Executors.newFixedThreadPool(10));
TTL 可以自动地将父线程的ThreadLocal变量传递给子线程,省去了手动传递TraceId的步骤。
消息队列的 TraceId 传递
在使用消息队列时,我们需要在消息生产者将TraceId放入消息头中,然后在消息消费者从消息头中提取TraceId,并将其放入MDC。
消息生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.slf4j.MDC;
public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
public MessageProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String messageBody) {
String traceId = MDC.get("traceId");
Message message = MessageBuilder.withBody(messageBody.getBytes())
.setHeader("traceId", traceId)
.build();
rabbitTemplate.send("myExchange", "myRoutingKey", message);
}
}
消息消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.slf4j.MDC;
public class MessageConsumer {
@RabbitListener(queues = "myQueue")
public void receiveMessage(Message message) {
String traceId = (String) message.getHeaders().get("traceId");
if (traceId != null) {
MDC.put("traceId", traceId);
}
try {
String messageBody = new String((byte[]) message.getPayload());
System.out.println("Received message: " + messageBody + ", TraceId: " + MDC.get("traceId"));
} finally {
MDC.remove("traceId");
}
}
}
TraceId 传递方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 手动传递 TraceId | 简单直接,易于理解 | 繁琐,需要在每个异步任务中都手动地设置和清理TraceId | 小型项目,异步任务数量较少 |
| Callable 和 Future | 可以方便地获取异步任务的结果 | 仍然需要在每个异步任务中都手动地设置和清理TraceId | 小型项目,需要获取异步任务的结果 |
| 自定义 ExecutorService | 避免在每个异步任务中都手动地设置和清理TraceId,减少了代码的冗余 | 需要自定义 ExecutorService,代码量较大 | 中型项目,异步任务数量较多 |
| Spring 的 TaskDecorator | 使用 Spring 提供的 TaskDecorator 接口,更加方便 | 需要配置 Spring,依赖于 Spring | Spring 项目,异步任务数量较多 |
| TransmittableThreadLocal | 自动传递 ThreadLocal 变量,省去了手动传递TraceId的步骤,使用方便 | 引入了额外的依赖 | 大型项目,异步任务数量非常多,且需要传递多个 ThreadLocal 变量 |
| 消息队列 | 可以将TraceId传递给消息消费者,实现跨服务的调用链追踪 | 需要修改消息生产者和消息消费者的代码 | 使用消息队列的分布式系统 |
最佳实践
- 统一 TraceId 生成策略: 在整个系统中,使用统一的TraceId生成策略,例如使用UUID。
- 使用 MDC: 使用 MDC 存储 TraceId,方便在同一个线程中传递TraceId。
- 选择合适的异步线程 TraceId 传递方案: 根据项目的规模和需求,选择合适的异步线程TraceId传递方案。
- 确保请求结束后清理 MDC: 在请求结束后,务必从MDC中移除TraceId,避免TraceId错乱。
- 监控和告警: 监控调用链的完整性,如果发现TraceId丢失,及时告警。
确保调用链完整,才能快速定位问题
今天我们讨论了Java服务调用链中TraceId丢失的问题,以及如何利用MDC和一些上下文传递方案来解决这个问题,特别是在异步线程的环境下。我们分析了多种方案,包括手动传递、使用Callable和Future、自定义ExecutorService包装器、Spring的TaskDecorator以及TransmittableThreadLocal。选择合适的方案取决于项目的规模、技术栈和具体需求。希望这些方法能帮助大家构建更健壮、可追踪的分布式系统。
总结与回顾
通过MDC可以方便地在同一个线程中传递TraceId,而对于异步线程,我们需要手动地传递TraceId或者使用一些上下文传递方案。确保TraceId在整个调用链上传递,才能实现完整的调用链追踪,从而快速定位问题。