Java应用中的可观测性:Metrics、Traces、Logs的统一采集与Context传递

Java应用中的可观测性:Metrics、Traces、Logs的统一采集与Context传递

大家好,今天我们来深入探讨Java应用中的可观测性,重点关注Metrics、Traces和Logs的统一采集以及Context的传递。可观测性是现代软件开发的关键组成部分,它使我们能够理解系统的内部状态,诊断问题,并优化性能。一个良好的可观测性方案不仅能帮助我们快速发现问题,还能帮助我们理解问题产生的原因,从而提高系统的稳定性和可靠性。

可观测性的三大支柱:Metrics、Traces、Logs

可观测性通常由三个关键支柱组成:Metrics、Traces和Logs。这三者相互补充,共同构成一个全面的视图,帮助我们理解系统的行为。

  • Metrics (指标):Metrics是对系统在一段时间内行为的数字度量。它们通常以时间序列数据的形式表示,例如CPU利用率、内存使用率、请求延迟、错误率等。Metrics可以帮助我们监控系统的健康状况,识别性能瓶颈,并设置告警。

  • Traces (追踪):Traces记录了单个请求或事务从开始到结束的完整过程。它们可以帮助我们理解请求在不同服务之间的流转路径,识别瓶颈,并分析延迟。Traces由多个Span组成,每个Span代表请求在单个服务中的一个操作。

  • Logs (日志):Logs记录了应用程序运行时的事件。它们可以包含各种信息,例如错误消息、警告、调试信息和审计日志。Logs可以帮助我们诊断问题,了解系统的行为,并进行安全分析。

下表总结了这三个支柱的特点:

特性 Metrics Traces Logs
数据类型 数值型时间序列数据 分布式请求的调用链数据 文本数据
目的 监控系统健康状况,识别性能瓶颈,设置告警 跟踪请求在服务之间的流转,分析延迟 诊断问题,了解系统行为,进行安全分析
数据量 通常较小,聚合后的数据 通常较大,但可以通过采样进行控制 通常较大,取决于日志级别和应用程序的行为
查询方式 基于时间范围的聚合查询和告警 基于Trace ID的查询,查看请求的完整路径 基于关键词和时间范围的查询,查找特定事件
存储方式 时间序列数据库 (TSDB),例如Prometheus, InfluxDB 分布式追踪系统,例如Jaeger, Zipkin 日志管理系统,例如ELK Stack, Splunk

统一采集:OpenTelemetry

为了简化Metrics、Traces和Logs的采集,我们可以使用OpenTelemetry。OpenTelemetry是一个开源的可观测性框架,它提供了一组API、SDK和工具,用于生成、收集和导出可观测性数据。OpenTelemetry支持多种编程语言,包括Java,并且可以与各种后端系统集成。

添加OpenTelemetry依赖:

pom.xml中添加以下依赖:

<dependencies>
    <!-- OpenTelemetry API -->
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-api</artifactId>
        <version>1.34.1</version>
    </dependency>
    <!-- OpenTelemetry SDK -->
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk</artifactId>
        <version>1.34.1</version>
    </dependency>
    <!-- OpenTelemetry Exporter (OTLP) -->
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-otlp</artifactId>
        <version>1.34.1</version>
    </dependency>
    <!-- OpenTelemetry Auto Instrumentation (optional) -->
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk-autoconfigure</artifactId>
        <version>1.34.1</version>
    </dependency>
    <!-- Logging framework (SLF4J) -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.11</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.11</version>
        <scope>runtime</scope>
    </dependency>
</dependencies>

配置OpenTelemetry SDK:

我们需要配置OpenTelemetry SDK,以便将数据导出到后端系统。例如,我们可以使用OTLP (OpenTelemetry Protocol) 将数据导出到Jaeger或Zipkin。

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;

import java.util.concurrent.TimeUnit;

public class OpenTelemetryConfig {

    public static OpenTelemetry initOpenTelemetry(String serviceName, String otlpEndpoint) {
        Resource resource = Resource.getDefault().toBuilder()
                .put(ResourceAttributes.SERVICE_NAME, serviceName)
                .build();

        // Configure the Span exporter
        OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
                .setEndpoint(otlpEndpoint) // e.g., "http://localhost:4317"
                .setTimeout(30, TimeUnit.SECONDS)
                .build();

        // Configure the Tracer provider
        SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
                .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
                .setResource(resource)
                .build();

        // Configure the Metric exporter
        OtlpGrpcMetricExporter metricExporter = OtlpGrpcMetricExporter.builder()
                .setEndpoint(otlpEndpoint) // e.g., "http://localhost:4317"
                .setTimeout(30, TimeUnit.SECONDS)
                .build();

        // Configure the Meter provider
        SdkMeterProvider meterProvider = SdkMeterProvider.builder()
                .registerMetricReader(PeriodicMetricReader.builder(metricExporter).setInterval(10, TimeUnit.SECONDS).build())
                .setResource(resource)
                .build();

        // Build the OpenTelemetry SDK
        OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder()
                .setTracerProvider(tracerProvider)
                .setMeterProvider(meterProvider)
                .buildAndRegisterGlobal();

        return openTelemetrySdk;
    }

    public static void main(String[] args) {
        //Example Usage
        OpenTelemetry openTelemetry = initOpenTelemetry("my-java-service", "http://localhost:4317");

        // Your application code here
        // Add tracing, metrics, and logging using the OpenTelemetry API

        // Shutdown OpenTelemetry SDK when the application is shutting down
        Runtime.getRuntime().addShutdownHook(new Thread(openTelemetrySdk::close));
    }
}

这段代码配置了OpenTelemetry SDK,使用OTLP协议将Traces和Metrics导出到指定的Otlp Endpoint。请确保你的Otlp Collector(例如Jaeger或OpenTelemetry Collector)正在运行并监听指定的端口。

采集Metrics:

使用OpenTelemetry API可以方便地采集Metrics。例如,我们可以创建一个Counter来记录请求的数量。

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;

public class MetricsExample {

    private final LongCounter requestCounter;

    public MetricsExample(OpenTelemetry openTelemetry) {
        Meter meter = openTelemetry.getMeter("my-java-service", "1.0.0");
        requestCounter = meter.counterBuilder("request_count")
                .setDescription("Number of requests")
                .setUnit("requests")
                .build();
    }

    public void incrementRequestCount() {
        requestCounter.add(1);
    }

    public static void main(String[] args) {
        // Initialize OpenTelemetry (replace with your actual initialization)
        OpenTelemetry openTelemetry = OpenTelemetryConfig.initOpenTelemetry("my-java-service", "http://localhost:4317");

        MetricsExample metricsExample = new MetricsExample(openTelemetry);

        // Simulate some requests
        for (int i = 0; i < 10; i++) {
            metricsExample.incrementRequestCount();
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        System.out.println("Metrics collected. Check your OpenTelemetry backend.");

        // Shutdown OpenTelemetry (important to flush metrics)
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (openTelemetry instanceof io.opentelemetry.sdk.OpenTelemetrySdk) {
                ((io.opentelemetry.sdk.OpenTelemetrySdk) openTelemetry).close();
            }
        }));
    }
}

这段代码创建了一个名为request_count的Counter,并在每次调用incrementRequestCount()方法时增加它的值。

采集Traces:

使用OpenTelemetry API可以方便地采集Traces。例如,我们可以创建一个Span来跟踪一个方法的执行时间。

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;

public class TracingExample {

    private final Tracer tracer;

    public TracingExample(OpenTelemetry openTelemetry) {
        tracer = openTelemetry.getTracer("my-java-service", "1.0.0");
    }

    public void doSomething() {
        Span span = tracer.spanBuilder("doSomething").startSpan();
        try {
            // Your code here
            Thread.sleep(50); // Simulate some work
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            span.end();
        }
    }

    public static void main(String[] args) {
        // Initialize OpenTelemetry (replace with your actual initialization)
        OpenTelemetry openTelemetry = OpenTelemetryConfig.initOpenTelemetry("my-java-service", "http://localhost:4317");

        TracingExample tracingExample = new TracingExample(openTelemetry);

        // Perform some operations
        for (int i = 0; i < 5; i++) {
            tracingExample.doSomething();
        }

        System.out.println("Traces collected. Check your OpenTelemetry backend.");

        // Shutdown OpenTelemetry (important to flush traces)
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (openTelemetry instanceof io.opentelemetry.sdk.OpenTelemetrySdk) {
                ((io.opentelemetry.sdk.OpenTelemetrySdk) openTelemetry).close();
            }
        }));
    }
}

这段代码创建了一个名为doSomething的Span,并在方法的执行前后分别调用startSpan()end()方法。

采集Logs:

虽然OpenTelemetry主要关注Metrics和Traces,但它也提供了一些机制来关联Logs和Traces。我们可以将Trace ID和Span ID添加到日志消息中,以便在日志中关联特定的请求。

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.opentelemetry.context.Context;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;

public class LoggingExample {

    private static final Logger logger = LoggerFactory.getLogger(LoggingExample.class);
    private final Tracer tracer;

    public LoggingExample(OpenTelemetry openTelemetry) {
        tracer = openTelemetry.getTracer("my-java-service", "1.0.0");
    }

    public void doSomething() {
        Span span = tracer.spanBuilder("doSomething").startSpan();
        try {
            // Your code here
            logger.info("Doing something...");
            logContext(span); // Log with Trace ID and Span ID
            Thread.sleep(50); // Simulate some work
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("Interrupted!", e);
            logContext(span); // Log with Trace ID and Span ID
        } finally {
            span.end();
        }
    }

    private void logContext(Span span) {
        SpanContext spanContext = span.getSpanContext();
        String traceId = spanContext.getTraceId();
        String spanId = spanContext.getSpanId();
        TraceFlags traceFlags = spanContext.getTraceFlags();
        TraceState traceState = spanContext.getTraceState();

        logger.info("Trace ID: {}, Span ID: {}, Trace Flags: {}, Trace State: {}", traceId, spanId, traceFlags, traceState);

    }

    public static void main(String[] args) {
        // Initialize OpenTelemetry (replace with your actual initialization)
        OpenTelemetry openTelemetry = OpenTelemetryConfig.initOpenTelemetry("my-java-service", "http://localhost:4317");

        LoggingExample loggingExample = new LoggingExample(openTelemetry);

        // Perform some operations
        for (int i = 0; i < 5; i++) {
            loggingExample.doSomething();
        }

        System.out.println("Logs collected (check log output). Traces also collected. Check your OpenTelemetry backend.");

        // Shutdown OpenTelemetry (important to flush data)
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (openTelemetry instanceof io.opentelemetry.sdk.OpenTelemetrySdk) {
                ((io.opentelemetry.sdk.OpenTelemetrySdk) openTelemetry).close();
            }
        }));
    }
}

这段代码在日志消息中包含了Trace ID和Span ID,可以使用MDC (Mapped Diagnostic Context) 来自动将这些信息添加到日志消息中。许多日志框架(例如Logback和Log4j2)都支持MDC。也可以直接通过Span.current()获取当前的Span,并获取SpanContext。

Context传递

Context传递是指在不同的服务或线程之间传递与请求相关的上下文信息。例如,我们可以传递Trace ID、Span ID、用户ID、请求ID等信息。Context传递可以帮助我们跟踪请求在不同服务之间的流转路径,并关联相关的日志和Metrics。

OpenTelemetry提供了一组API来管理Context。我们可以使用Context.current()获取当前的Context,并使用Context.with()方法创建一个新的Context,并将新的Context设置为当前Context。

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ContextPropagationExample {

    private final Tracer tracer;
    private final ExecutorService executorService = Executors.newFixedThreadPool(2);

    public ContextPropagationExample(OpenTelemetry openTelemetry) {
        tracer = openTelemetry.getTracer("my-java-service", "1.0.0");
    }

    public void handleRequest() {
        Span parentSpan = tracer.spanBuilder("handleRequest").startSpan();
        try (Scope scope = parentSpan.makeCurrent()) {
            // Simulate work in the main thread
            System.out.println("Handling request in main thread...");
            doSomeWork("Main Thread");

            // Submit a task to another thread, propagating the context
            executorService.submit(() -> {
                Span childSpan = tracer.spanBuilder("asyncTask").setParent(Context.current()).startSpan();
                try (Scope childScope = childSpan.makeCurrent()) {
                    System.out.println("Handling request in async thread...");
                    doSomeWork("Async Thread");
                } finally {
                    childSpan.end();
                }
            });
        } finally {
            parentSpan.end();
        }

        // Shut down the executor service
        executorService.shutdown();
    }

    private void doSomeWork(String threadName) {
        Span span = Span.current();
        span.setAttribute("thread.name", threadName);
        System.out.println("Doing some work in " + threadName + " - Span ID: " + span.getSpanContext().getSpanId());
        try {
            Thread.sleep(100); // Simulate some work
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        // Initialize OpenTelemetry (replace with your actual initialization)
        OpenTelemetry openTelemetry = OpenTelemetryConfig.initOpenTelemetry("my-java-service", "http://localhost:4317");

        ContextPropagationExample contextPropagationExample = new ContextPropagationExample(openTelemetry);
        contextPropagationExample.handleRequest();

        // Shutdown OpenTelemetry and ExecutorService
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (openTelemetry instanceof io.opentelemetry.sdk.OpenTelemetrySdk) {
                ((io.opentelemetry.sdk.OpenTelemetrySdk) openTelemetry).close();
            }
            contextPropagationExample.executorService.shutdownNow();
        }));
    }
}

这个例子展示了如何在多线程环境下传播Context。Span.makeCurrent()方法返回一个Scope对象,在try-with-resources块中使用它,确保在离开该块时,Context恢复到原来的状态。setParent(Context.current()) 用于将当前Context设置为新Span的父Context,从而保持Trace的连续性。

使用ThreadLocal进行Context传递的弊端:

虽然可以使用ThreadLocal来存储Context,但在多线程或异步环境下,ThreadLocal可能会导致Context泄漏或不一致的问题。这是因为ThreadLocal的值是线程本地的,如果线程被重用,则ThreadLocal的值可能会被错误地保留。因此,建议使用OpenTelemetry提供的Context API来管理Context。

自动Instrumentation

OpenTelemetry还提供了自动Instrumentation的功能,可以自动采集Metrics和Traces,而无需修改应用程序的代码。OpenTelemetry Agent可以自动检测常见的框架和库,例如Spring、Servlet、JDBC等,并自动生成相应的Metrics和Traces。

使用自动Instrumentation,只需要在启动Java应用程序时添加OpenTelemetry Agent的参数即可。

java -javaagent:/path/to/opentelemetry-javaagent.jar 
     -Dotel.service.name=my-java-service 
     -Dotel.exporter.otlp.endpoint=http://localhost:4317 
     -jar my-application.jar

精简总结:统一可观测性,Context传递的重要性

我们讨论了Java应用中可观测性的三大支柱:Metrics、Traces和Logs,并介绍了如何使用OpenTelemetry进行统一采集。Context传递是确保请求在不同服务或线程之间保持关联的关键,OpenTelemetry提供了一组API来管理Context。最后,我们提到了自动Instrumentation可以简化可观测性的配置。通过这些技术,我们可以构建一个全面的可观测性方案,提高系统的稳定性和可靠性。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注