Python实现基于OpenTelemetry的ML全链路追踪:跨框架、跨服务的Context传递
大家好,今天我们来聊聊如何利用 OpenTelemetry 在 Python 的机器学习(ML)项目中实现全链路追踪,重点关注跨框架、跨服务的 Context 传递。 传统的ML项目追踪往往停留在单个服务或框架内部,难以洞察整个ML pipeline的性能瓶颈和数据流转情况。OpenTelemetry 作为云原生可观测性事实标准,提供了一套标准化的 API、SDK 和数据格式,可以帮助我们构建一个统一、完整的 ML 全链路追踪系统。
1. 为什么需要ML全链路追踪?
在复杂的ML系统中,一个完整的pipeline可能涉及多个阶段,例如数据预处理、特征工程、模型训练、模型评估和在线Serving。这些阶段可能由不同的服务或框架处理,比如:
- 数据预处理: 使用 Spark 或 Dask 进行大规模数据清洗和转换。
- 特征工程: 使用 Pandas 或 Scikit-learn 进行特征提取和选择。
- 模型训练: 使用 TensorFlow、PyTorch 或 XGBoost 进行模型训练。
- 模型评估: 使用自定义脚本或 MLflow 进行模型性能评估。
- 在线Serving: 使用 Flask、FastAPI 或 Triton Inference Server 提供模型服务。
如果没有全链路追踪,很难回答以下问题:
- 某个请求在哪个阶段花费了最长时间?
- 哪个服务或框架导致了性能瓶颈?
- 某个模型在训练阶段的输入数据是什么?
- 在线 Serving 阶段的模型预测结果是否符合预期?
- 数据在不同服务之间如何流转的?
全链路追踪可以帮助我们:
- 快速定位问题: 缩短故障排除时间,提高系统可用性。
- 优化性能: 发现性能瓶颈,改进算法和架构。
- 监控数据质量: 确保数据在各个阶段的正确性和一致性。
- 理解系统行为: 更好地理解整个ML pipeline的运行机制。
2. OpenTelemetry 核心概念
在深入代码之前,我们先了解一下 OpenTelemetry 的几个核心概念:
- Trace: 一个完整的请求或事务的端到端视图。例如,一个用户请求从Web应用到数据预处理服务,再到模型训练服务,最后返回结果的整个过程就是一个Trace。
- Span: Trace 中的一个单独的操作或步骤。例如,一个函数调用、一个HTTP请求或一个数据库查询都可以是一个Span。Span包含操作的名称、开始时间和结束时间等信息。
- Context: 用于在不同的 Span 之间传递追踪信息的机制。Context 包含 Trace ID、Span ID 和其他自定义信息。
- Tracer: 用于创建和管理 Span 的组件。
- Propagator: 用于在不同服务之间传递 Context 的组件。常见的 Propagator 有 W3C Trace Context 和 B3。
- Exporter: 用于将 Trace 数据发送到后端存储系统的组件。常见的 Exporter 有 Jaeger、Zipkin 和 Prometheus。
3. OpenTelemetry Python SDK 安装和配置
首先,我们需要安装 OpenTelemetry Python SDK:
pip install opentelemetry-sdk opentelemetry-api
opentelemetry-exporter-jaeger
opentelemetry-instrumentation-logging
opentelemetry-instrumentation-requests
opentelemetry-instrumentation-flask
这里我们安装了:
opentelemetry-sdk:OpenTelemetry Python SDK 的核心库。opentelemetry-api:OpenTelemetry API 接口。opentelemetry-exporter-jaeger:Jaeger Exporter,用于将 Trace 数据发送到 Jaeger。opentelemetry-instrumentation-logging:Logging Instrumentation,用于自动追踪 Python logging 日志。opentelemetry-instrumentation-requests:Requests Instrumentation,用于自动追踪 HTTP 请求。opentelemetry-instrumentation-flask:Flask Instrumentation,用于自动追踪 Flask 应用。
然后,我们需要配置 OpenTelemetry SDK。创建一个名为 tracer.py 的文件,包含以下代码:
import logging
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
# 服务名称
SERVICE_NAME = os.getenv("SERVICE_NAME", "my-ml-service")
# Jaeger Agent 地址
JAEGER_HOST = os.getenv("JAEGER_HOST", "localhost")
JAEGER_PORT = int(os.getenv("JAEGER_PORT", "6831"))
# 创建资源,包含服务名称等信息
resource = Resource.create({
ResourceAttributes.SERVICE_NAME: SERVICE_NAME,
ResourceAttributes.SERVICE_VERSION: "1.0.0",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: "production"
})
# 创建 TracerProvider
tracer_provider = TracerProvider(resource=resource)
# 创建 Jaeger Exporter
jaeger_exporter = JaegerExporter(
collector_endpoint=f"http://{JAEGER_HOST}:{JAEGER_PORT}/api/traces?format=jaeger.thrift", # 使用collector endpoint代替agent address
service_name=SERVICE_NAME,
)
# 创建 BatchSpanProcessor,将 Span 批量发送到 Jaeger
span_processor = BatchSpanProcessor(jaeger_exporter)
tracer_provider.add_span_processor(span_processor)
# 设置全局 TracerProvider
trace.set_tracer_provider(tracer_provider)
# 获取 Tracer
tracer = trace.get_tracer(__name__)
# 自动追踪 logging
LoggingInstrumentor().instrument(log_level=logging.INFO, tracer_provider=tracer_provider)
# 自动追踪 requests
RequestsInstrumentor().instrument(tracer_provider=tracer_provider)
def init_tracer(app=None):
"""
初始化 tracer,如果传入 app,则对 Flask 应用进行 instrument
"""
if app:
FlaskInstrumentor().instrument_app(app, tracer_provider=tracer_provider)
return tracer
这段代码做了以下几件事:
- 定义服务名称和 Jaeger 地址: 从环境变量中读取服务名称和 Jaeger 地址,如果没有设置,则使用默认值。
- 创建 Resource: Resource 包含有关服务的元数据,例如服务名称、版本和部署环境。
- 创建 TracerProvider: TracerProvider 是 Tracer 的提供者,负责创建和管理 Tracer。
- 创建 Jaeger Exporter: Jaeger Exporter 将 Trace 数据发送到 Jaeger。
- 创建 BatchSpanProcessor: BatchSpanProcessor 将 Span 批量发送到 Jaeger,可以提高性能。
- 设置全局 TracerProvider: 将 TracerProvider 设置为全局 TracerProvider,这样就可以在任何地方使用
trace.get_tracer()获取 Tracer。 - 获取 Tracer: 使用
trace.get_tracer(__name__)获取 Tracer。 - 自动追踪 logging 和 requests: 使用
LoggingInstrumentor和RequestsInstrumentor自动追踪 logging 日志和 HTTP 请求。 - 初始化 Flask 应用:
init_tracer函数可以对 Flask 应用进行 instrument,自动追踪 Flask 请求。
4. 跨框架的 Context 传递
现在我们来演示如何在不同的框架之间传递 Context。假设我们有一个 Flask 应用,它调用一个使用 Scikit-learn 的函数进行特征工程。
首先,创建一个 Flask 应用 app.py:
from flask import Flask, request
from tracer import init_tracer, tracer
from feature_engineering import feature_engineering
app = Flask(__name__)
init_tracer(app) # 初始化 Flask instrumentation
@app.route("/predict")
def predict():
with tracer.start_as_current_span("predict_request"):
data = request.args.get("data")
# 调用特征工程函数
features = feature_engineering(data)
# 模拟模型预测
prediction = sum(features)
return f"Prediction: {prediction}"
if __name__ == "__main__":
app.run(debug=True)
然后,创建一个 feature_engineering.py 文件,包含以下代码:
from opentelemetry import context, trace
tracer = trace.get_tracer(__name__)
def feature_engineering(data):
with tracer.start_as_current_span("feature_engineering") as span:
# 获取当前 Context
current_context = context.get_current()
# 在 Span 中添加属性
span.set_attribute("input_data", data)
# 模拟特征工程过程
features = [float(x) for x in data.split(",")]
# 返回特征
return features
在这个例子中,我们使用了 context.get_current() 来获取当前的 Context。虽然 feature_engineering 函数没有显式地接收 Context 参数,但是 OpenTelemetry 会自动将 Context 从 Flask 应用传递到 feature_engineering 函数中。
5. 跨服务的 Context 传递
接下来,我们演示如何在不同的服务之间传递 Context。假设我们有一个 Flask 应用,它调用一个远程服务进行模型预测。
首先,创建一个 Flask 应用 app.py:
from flask import Flask, request
import requests
from tracer import init_tracer, tracer
app = Flask(__name__)
init_tracer(app)
MODEL_SERVICE_URL = "http://localhost:5001/predict" # 模型服务地址
@app.route("/predict")
def predict():
with tracer.start_as_current_span("predict_request"):
data = request.args.get("data")
# 调用模型服务
try:
response = requests.get(MODEL_SERVICE_URL, params={"data": data})
response.raise_for_status() # 检查状态码
prediction = response.text
return f"Prediction: {prediction}"
except requests.exceptions.RequestException as e:
return f"Error: {e}", 500
if __name__ == "__main__":
app.run(debug=True)
然后,创建一个模型服务 model_service.py:
from flask import Flask, request
from tracer import init_tracer, tracer
app = Flask(__name__)
init_tracer(app)
@app.route("/predict")
def predict():
with tracer.start_as_current_span("model_prediction"):
data = request.args.get("data")
# 模拟模型预测
features = [float(x) for x in data.split(",")]
prediction = sum(features) * 2
return f"Prediction: {prediction}"
if __name__ == "__main__":
app.run(debug=True, port=5001) # 运行在5001端口
在这个例子中,我们使用了 requests 库来调用模型服务。由于我们安装了 opentelemetry-instrumentation-requests,OpenTelemetry 会自动将 Context 注入到 HTTP 请求头中,并传递到模型服务。
6. 自定义 Span 和属性
除了自动追踪之外,我们还可以创建自定义 Span 和属性,以提供更详细的追踪信息。例如,我们可以在 feature_engineering 函数中添加自定义 Span 和属性:
from opentelemetry import context, trace
tracer = trace.get_tracer(__name__)
def feature_engineering(data):
with tracer.start_as_current_span("feature_engineering") as span:
# 获取当前 Context
current_context = context.get_current()
# 在 Span 中添加属性
span.set_attribute("input_data", data)
# 模拟特征工程过程
features = [float(x) for x in data.split(",")]
# 添加更多属性
span.set_attribute("feature_count", len(features))
span.set_attribute("feature_mean", sum(features) / len(features))
# 返回特征
return features
我们使用了 span.set_attribute() 方法来添加自定义属性。这些属性可以帮助我们更好地理解特征工程过程。
7. 使用 Baggage 进行 Context 传递
Baggage 是 OpenTelemetry 中用于传递自定义上下文信息的机制。与 Span Context 不同,Baggage 可以跨服务边界传递任意数据。
例如,我们可以在 Flask 应用中添加 Baggage:
from flask import Flask, request
import requests
from tracer import init_tracer, tracer
from opentelemetry import baggage
app = Flask(__name__)
init_tracer(app)
MODEL_SERVICE_URL = "http://localhost:5001/predict"
@app.route("/predict")
def predict():
with tracer.start_as_current_span("predict_request"):
data = request.args.get("data")
# 创建 Baggage
b = baggage.set_baggage("user_id", "12345")
# 调用模型服务
try:
response = requests.get(MODEL_SERVICE_URL, params={"data": data})
response.raise_for_status()
prediction = response.text
return f"Prediction: {prediction}"
except requests.exceptions.RequestException as e:
return f"Error: {e}", 500
然后在模型服务中获取 Baggage:
from flask import Flask, request
from tracer import init_tracer, tracer
from opentelemetry import baggage
app = Flask(__name__)
init_tracer(app)
@app.route("/predict")
def predict():
with tracer.start_as_current_span("model_prediction"):
data = request.args.get("data")
# 获取 Baggage
user_id = baggage.get_baggage("user_id")
if user_id:
print(f"User ID: {user_id}") # 打印 user_id
# 模拟模型预测
features = [float(x) for x in data.split(",")]
prediction = sum(features) * 2
return f"Prediction: {prediction}"
在这个例子中,我们使用了 baggage.set_baggage() 方法来设置 Baggage,并使用 baggage.get_baggage() 方法来获取 Baggage。
8. 总结:构建清晰可观测的ML Pipeline
通过以上步骤,我们可以使用 OpenTelemetry 在 Python 的 ML 项目中实现全链路追踪,包括跨框架和跨服务的 Context 传递。 这使得我们能够清晰地观测整个 ML pipeline 的性能和行为,快速定位问题并优化性能。
关键步骤与最佳实践:
| 步骤/实践 | 说明 |
|---|---|
| 安装 OpenTelemetry SDK | 使用 pip 安装必要的 OpenTelemetry 库,包括 SDK、API、Exporter 和 Instrumentation。 |
| 配置 TracerProvider | 创建 TracerProvider,配置 Resource 和 Exporter,并将 TracerProvider 设置为全局 TracerProvider。 |
| 自动 Instrumentation | 使用 Instrumentation 自动追踪常用的框架和库,例如 logging、requests 和 Flask。 |
| 手动创建 Span | 使用 tracer.start_as_current_span() 方法手动创建 Span,并使用 span.set_attribute() 方法添加自定义属性。 |
| Context 传递 | OpenTelemetry 会自动在同一进程内的不同函数之间传递 Context。对于跨服务的调用,需要确保使用了支持 Context 传递的 HTTP 客户端,例如 requests。 |
| 使用 Baggage | 使用 Baggage 传递自定义的上下文信息,例如用户 ID 或会话 ID。 |
| 选择合适的 Exporter | 根据实际需求选择合适的 Exporter,例如 Jaeger、Zipkin 或 Prometheus。 |
9. 深入应用,提升可观测性
OpenTelemetry 的强大之处在于它的灵活性和可扩展性。 通过自定义 Span、属性和 Baggage,我们可以构建一个高度定制化的全链路追踪系统,满足各种复杂的 ML 应用场景的需求。 不断探索和实践,将 OpenTelemetry 融入到你的 ML 项目中,提升系统的可观测性和可维护性。
更多IT精英技术系列讲座,到智猿学院