JAVA 微服务链路追踪信息缺失?Sleuth TraceId 跨线程传递方案

好的,现在开始我们的讲座,主题是“JAVA 微服务链路追踪信息缺失?Sleuth TraceId 跨线程传递方案”。

大家好,今天我们要探讨一个在微服务架构中经常遇到的问题:在使用Spring Cloud Sleuth进行链路追踪时,由于多线程的存在,导致TraceId和SpanId等关键信息丢失,从而无法完整追踪请求链路。

一、问题背景:Sleuth与多线程

Spring Cloud Sleuth是一个优秀的分布式链路追踪解决方案,它能够自动为我们的微服务应用添加链路追踪所需的HTTP Headers,比如X-B3-TraceIdX-B3-SpanIdX-B3-ParentSpanIdX-B3-SampledX-B3-Flags等。这些Header会随着请求在各个微服务之间传递,从而串联起整个调用链。

然而,在多线程环境下,Sleuth的自动传递机制会失效。这是因为Sleuth默认使用ThreadLocal来存储Trace信息。ThreadLocal顾名思义,是线程本地变量,每个线程拥有独立的变量副本。当我们在主线程接收到请求并生成TraceId和SpanId后,如果将任务提交给一个新的线程执行,那么新的线程无法访问到主线程的ThreadLocal变量,导致Trace信息丢失。

举个简单的例子:

@RestController
public class MyController {

    @Autowired
    private ExecutorService executorService;

    @GetMapping("/hello")
    public String hello() {
        System.out.println("Controller Thread: " + Thread.currentThread().getName());
        // 打印当前线程的 TraceId
        System.out.println("Controller TraceId: " + MDC.get("traceId"));

        executorService.submit(() -> {
            System.out.println("Async Thread: " + Thread.currentThread().getName());
            // 打印异步线程的 TraceId
            System.out.println("Async TraceId: " + MDC.get("traceId")); // 可能会是 null
            return null;
        });

        return "Hello";
    }
}

在这个例子中,executorService是一个线程池。当/hello接口被调用时,主线程(也就是Controller所在的线程)会打印出TraceId,然后将一个任务提交给线程池执行。在异步线程中,我们尝试打印TraceId,但很有可能打印出来的是null,因为异步线程无法直接访问到主线程的Trace信息。

二、解决方案:TraceContext传播

解决这个问题的关键在于如何将TraceContext(包含TraceId、SpanId等信息)从主线程传播到子线程。常见的解决方案有以下几种:

  1. 使用TraceContext对象手动传递

    这是最直接的方式。我们可以从主线程获取TraceContext对象,然后将其传递给子线程。子线程在执行任务之前,先将TraceContext设置到当前线程的ThreadLocal中。

    @RestController
    public class MyController {
    
        @Autowired
        private ExecutorService executorService;
    
        @Autowired
        private Tracer tracer;
    
        @GetMapping("/hello")
        public String hello() {
            TraceContext traceContext = tracer.currentSpan().context(); // 获取当前TraceContext
    
            executorService.submit(() -> {
                // 设置TraceContext到当前线程
                try (Tracer.SpanInScope ws = tracer.withSpan(tracer.nextSpan(traceContext))) {
                    System.out.println("Async Thread: " + Thread.currentThread().getName());
                    System.out.println("Async TraceId: " + MDC.get("traceId"));
                    // 模拟一些业务逻辑
                    Thread.sleep(100);
                    tracer.currentSpan().tag("async.task", "completed");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return null;
            });
    
            return "Hello";
        }
    }

    这种方式需要手动获取和设置TraceContext,比较繁琐,容易出错。

  2. 使用TraceRunnableTraceCallable

    Sleuth提供了一个TraceRunnable类,它可以自动将当前的TraceContext传递给Runnable对象。类似的,还有一个TraceCallable类,用于传递给Callable对象。

    @RestController
    public class MyController {
    
        @Autowired
        private ExecutorService executorService;
    
        @Autowired
        private Tracer tracer;
    
        @GetMapping("/hello")
        public String hello() {
            executorService.submit(new TraceRunnable(tracer, () -> {
                System.out.println("Async Thread: " + Thread.currentThread().getName());
                System.out.println("Async TraceId: " + MDC.get("traceId"));
                // 模拟一些业务逻辑
                try {
                    Thread.sleep(100);
                    tracer.currentSpan().tag("async.task", "completed");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }));
    
            return "Hello";
        }
    }

    这种方式更加简洁,但是需要修改现有的Runnable和Callable对象。

  3. 自定义ThreadFactory

    这是最优雅的解决方案。我们可以自定义一个ThreadFactory,在创建新线程时,自动将当前的TraceContext传递给新线程。这样,我们就可以透明地解决Trace信息丢失的问题,而无需修改现有的代码。

    首先,创建一个自定义的ThreadFactory

    import brave.Tracer;
    import brave.propagation.TraceContext;
    import org.slf4j.MDC;
    
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TraceableThreadFactory implements ThreadFactory {
    
        private final String namePrefix;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final Tracer tracer;
    
        public TraceableThreadFactory(String namePrefix, Tracer tracer) {
            this.namePrefix = namePrefix;
            this.tracer = tracer;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
            // 传递TraceContext
            TraceContext traceContext = tracer.currentSpan() != null ? tracer.currentSpan().context() : null;
            if (traceContext != null) {
                return new TraceableThread(thread, traceContext, tracer);
            }
            return thread;
        }
    
        private static class TraceableThread extends Thread {
            private final TraceContext traceContext;
            private final Tracer tracer;
    
            public TraceableThread(Thread thread, TraceContext traceContext, Tracer tracer) {
                super(thread.getRunnable(), thread.getName());
                this.traceContext = traceContext;
                this.tracer = tracer;
            }
    
            @Override
            public void run() {
                try (Tracer.SpanInScope ws = tracer.withSpan(tracer.nextSpan(traceContext))) {
                    super.run();
                }
            }
        }
    }

    然后,在配置线程池时,使用自定义的ThreadFactory

    @Configuration
    public class ExecutorConfig {
    
        @Autowired
        private Tracer tracer;
    
        @Bean
        public ExecutorService executorService() {
            return Executors.newFixedThreadPool(10, new TraceableThreadFactory("my-thread-pool", tracer));
        }
    }

    最后,在Controller中使用线程池:

    @RestController
    public class MyController {
    
        @Autowired
        private ExecutorService executorService;
    
        @Autowired
        private Tracer tracer;
    
        @GetMapping("/hello")
        public String hello() {
            executorService.submit(() -> {
                System.out.println("Async Thread: " + Thread.currentThread().getName());
                System.out.println("Async TraceId: " + MDC.get("traceId"));
                // 模拟一些业务逻辑
                try {
                    Thread.sleep(100);
                    tracer.currentSpan().tag("async.task", "completed");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
    
            return "Hello";
        }
    }

    这样,我们就可以在异步线程中正确地获取TraceId,从而保证链路追踪的完整性。

  4. 使用InheritableThreadLocal

    InheritableThreadLocalThreadLocal 的一个子类,它允许子线程继承父线程的值。虽然它看似能解决问题,但需要谨慎使用。

    • 适用场景: 当线程创建后,值只需要传递一次,后续子线程的值不需要再与父线程同步时。
    • 局限性:
      • 数据竞争: 如果父线程在创建子线程之后修改了 InheritableThreadLocal 的值,子线程不会得到更新。
      • 内存泄漏: 如果线程池中的线程被重用,旧线程的 InheritableThreadLocal 值可能会被错误地传递给新的任务。
      • 不适用于线程池: 由于线程池线程的复用特性,InheritableThreadLocal 往往不能正确传递 TraceId,因为线程池中的线程可能已经执行过其他的任务,其 InheritableThreadLocal 中保存的可能是之前的 TraceId

    尽管如此,了解 InheritableThreadLocal 仍然是有意义的,因为它在某些特定的场景下可以作为一种简单的解决方案。

    以下是一个使用 InheritableThreadLocal 的示例(不推荐在线程池中使用):

    private static final InheritableThreadLocal<String> traceIdHolder = new InheritableThreadLocal<>();
    
    @GetMapping("/hello")
    public String hello() {
       String traceId = MDC.get("traceId");
       traceIdHolder.set(traceId); // 设置到 InheritableThreadLocal
    
       new Thread(() -> {
           String childTraceId = traceIdHolder.get();
           System.out.println("Child Thread TraceId: " + childTraceId);
       }).start();
    
       return "Hello";
    }

    总结: InheritableThreadLocal 在微服务和链路追踪的场景中,尤其是在使用线程池时,通常不是一个可靠的解决方案。 推荐使用前面介绍的 TraceableThreadFactoryTraceRunnable/TraceCallable 等方式。

三、不同方案的比较

为了更好地理解各种方案的优缺点,我们将其总结在一个表格中:

方案 优点 缺点 适用场景
手动传递TraceContext 简单易懂 繁琐,容易出错,需要修改大量代码 适用于简单的、少量使用多线程的场景,或者需要对TraceContext进行精细控制的场景
使用TraceRunnable/Callable 相对简洁 需要修改现有的Runnable和Callable对象 适用于可以修改Runnable和Callable对象的场景
自定义ThreadFactory 透明,无需修改现有代码,优雅 需要自定义ThreadFactory 适用于需要大量使用多线程,且希望以最优雅的方式解决Trace信息丢失问题的场景。 尤其是在使用线程池的场景中,此方案最佳。
使用InheritableThreadLocal 简单 不适用于线程池,可能导致数据竞争和内存泄漏,不推荐使用在链路追踪场景中 适用于线程创建后,值只需要传递一次,后续子线程的值不需要再与父线程同步的特定场景。 不推荐在微服务和链路追踪中使用,尤其是在使用线程池时。

四、代码示例:完整的可运行示例

下面提供一个完整的可运行示例,使用自定义ThreadFactory来解决Trace信息丢失的问题。

1. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>sleuth-trace-propagation</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>sleuth-trace-propagation</name>
    <description>Sleuth Trace Propagation Example</description>
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2023.0.0</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2. TraceableThreadFactory.java

package com.example.suthtracepropagation;

import brave.Tracer;
import brave.propagation.TraceContext;
import org.slf4j.MDC;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class TraceableThreadFactory implements ThreadFactory {

    private final String namePrefix;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final Tracer tracer;

    public TraceableThreadFactory(String namePrefix, Tracer tracer) {
        this.namePrefix = namePrefix;
        this.tracer = tracer;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
        // 传递TraceContext
        TraceContext traceContext = tracer.currentSpan() != null ? tracer.currentSpan().context() : null;
        if (traceContext != null) {
            return new TraceableThread(thread, traceContext, tracer);
        }
        return thread;
    }

    private static class TraceableThread extends Thread {
        private final TraceContext traceContext;
        private final Tracer tracer;

        public TraceableThread(Thread thread, TraceContext traceContext, Tracer tracer) {
            super(thread.getRunnable(), thread.getName());
            this.traceContext = traceContext;
            this.tracer = tracer;
        }

        @Override
        public void run() {
            try (Tracer.SpanInScope ws = tracer.withSpan(tracer.nextSpan(traceContext))) {
                super.run();
            }
        }
    }
}

3. ExecutorConfig.java

package com.example.suthtracepropagation;

import brave.Tracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class ExecutorConfig {

    @Autowired
    private Tracer tracer;

    @Bean
    public ExecutorService executorService() {
        return Executors.newFixedThreadPool(10, new TraceableThreadFactory("my-thread-pool", tracer));
    }
}

4. MyController.java

package com.example.suthtracepropagation;

import brave.Tracer;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutorService;

@RestController
public class MyController {

    @Autowired
    private ExecutorService executorService;

    @Autowired
    private Tracer tracer;

    @GetMapping("/hello")
    public String hello() {
        System.out.println("Controller Thread: " + Thread.currentThread().getName());
        System.out.println("Controller TraceId: " + MDC.get("traceId"));

        executorService.submit(() -> {
            System.out.println("Async Thread: " + Thread.currentThread().getName());
            System.out.println("Async TraceId: " + MDC.get("traceId"));
            try {
                Thread.sleep(100);
                tracer.currentSpan().tag("async.task", "completed");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        return "Hello";
    }
}

5. SleuthTracePropagationApplication.java

package com.example.suthtracepropagation;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SleuthTracePropagationApplication {

    public static void main(String[] args) {
        SpringApplication.run(SleuthTracePropagationApplication.class, args);
    }

}

运行这个示例,访问/hello接口,你将在控制台中看到异步线程中也正确地打印了TraceId。

五、其他注意事项

  • MDC(Mapped Diagnostic Context): Sleuth会将TraceId和SpanId等信息放入MDC中,方便我们在日志中使用。在使用多线程时,需要确保MDC也能够正确传递。自定义ThreadFactory的例子中已经包含了MDC的传递。
  • 异步框架: 如果使用Spring的@Async注解,Spring会自动处理TraceContext的传递。但需要确保开启了@EnableAsync注解。
  • 响应式编程: 在使用Reactor等响应式编程框架时,需要使用Sleuth提供的相应的工具类来传递TraceContext,例如reactor-context-propagation

六、选择合适的方案

选择哪种方案取决于具体的应用场景和需求。一般来说,自定义ThreadFactory是最通用、最优雅的解决方案,能够透明地解决Trace信息丢失的问题。如果只是在少量的地方使用多线程,可以考虑手动传递TraceContext或者使用TraceRunnable/Callable。 但是一定要避开 InheritableThreadLocal 在线程池中的使用。

确保链路追踪完整

TraceId在多线程环境下的正确传递,对于保证微服务链路追踪的完整性和准确性至关重要。选择合适的TraceContext传播方案,能够帮助我们更好地监控和诊断分布式系统的性能问题。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注