各位同仁、技术专家们,大家好!
今天,我们齐聚一堂,共同探讨一个在AI时代,尤其是大型语言模型(LLM)应用中日益凸显的、至关重要的话题——Data Drift(数据漂移)监测。当我们的LLM系统从实验室走向真实世界,面对海量、动态的用户输入时,如何确保其表现始终如一,不偏离我们的预期?又如何在偏离发生时,第一时间通过LangSmith这样的强大工具,及时获得预警?这正是我们今天要深入剖析的核心。
我们将以一场技术讲座的形式,从概念定义出发,逐步深入到实战代码,力求逻辑严谨,洞察深刻。
引言:LLM应用中的数据漂移——沉默的杀手
在机器学习领域,数据漂移是一个众所周知的问题。它指的是生产环境中输入数据的统计特性随着时间推移发生变化,导致模型性能下降的现象。对于传统机器学习模型,如图像分类或推荐系统,数据漂移可能表现为图像分辨率的变化、用户行为模式的改变等。
然而,在大型语言模型(LLM)的世界里,数据漂移的含义被赋予了更深层次的复杂性。这里的“数据”是文本,其变化远不止数值统计那么简单。我们面对的可能是:
- 主题漂移 (Topic Drift):用户开始询问与模型训练时预期完全不同的主题或领域。
- 语义漂移 (Semantic Drift):用户使用新的词汇、俚语、缩写,或者表达相同概念的方式发生变化,导致模型难以理解。
- 风格漂移 (Style Drift):用户输入的语气、正式程度、复杂性等发生变化。
- 意图漂移 (Intent Drift):用户在与模型交互时,其潜在的意图发生了改变,例如从信息查询转向寻求情感支持。
- 质量漂移 (Quality Drift):用户输入中包含更多错别字、语法错误,或者变得更加模糊、不完整。
这些漂移,无论哪一种,都可能悄无声息地侵蚀LLM的性能,导致输出质量下降、响应不准确、用户满意度降低,甚至引发安全或伦理问题。更糟糕的是,这些问题往往不会立即显现,而是随着时间逐渐累积,直到用户开始抱怨,我们才后知后觉。
因此,建立一套健壮、及时的数据漂移监测机制,对于LLM应用的长期稳定运行至关重要。而LangSmith,作为LangChain生态系统的核心组成部分,为我们提供了实现这一目标的关键基础设施。
LangSmith:LLM运维监控的神经中枢
在深入探讨如何监测数据漂移之前,我们首先需要理解LangSmith在整个LLM生命周期中的定位和作用。LangSmith是一个强大的平台,旨在帮助开发者:
- 调试与可观测性 (Observability):记录LLM调用的所有中间步骤、输入、输出、耗时等,形成完整的追踪链条 (Traces)。这对于理解复杂Agent的决策过程至关重要。
- 评估与测试 (Evaluation & Testing):支持创建数据集、运行评估、比较不同版本模型的性能,并为评估结果提供可视化界面。
- 监控与警报 (Monitoring & Alerting):通过持续的评估和性能指标收集,发现潜在问题并配置警报。
- 数据收集与标注 (Data Collection & Labeling):从生产流量中收集有用的数据点,用于模型微调或数据集构建。
正是LangSmith的“可观测性”、“评估”和“监控”这三大核心能力,为我们实施数据漂移监测奠定了坚实的基础。我们将利用它来记录生产数据、存储基线数据、运行自定义的漂移检测逻辑,并最终在检测到漂移时发出预警。
定义“测试集”与“偏离”:建立漂移的基准
在讨论任何形式的“偏离”之前,我们必须首先明确“正常”或“基线”是什么。在数据漂移监测的语境中,这个基线通常来源于两个方面:
-
初始测试集 (Initial Test Set):这是模型上线前经过精心策划和验证的数据集。它代表了我们期望模型处理的理想输入分布。它应该是:
- 代表性 (Representative):尽可能覆盖目标用户群体的各种输入类型、主题和风格。
- 多样性 (Diverse):包含各种长度、复杂度和语义的内容。
- 高质量 (High Quality):通常经过人工审核,确保其准确性。
- 标签化 (Labeled, if applicable):如果模型有特定任务(如分类、问答),测试集应包含对应的真实标签,以便进行性能评估。
-
早期生产数据 (Early Production Data):在模型上线初期,收集一小段时间内真实的生产用户输入,作为辅助基线。这有助于捕捉到测试集可能未完全覆盖的早期真实世界特征。
如何定义“偏离”?
“偏离”并非一个单一的指标,而是一系列统计量或距离度量的集合,它们用于量化当前生产数据与基线数据之间的差异。这些度量可以从不同维度捕捉数据分布的变化:
- 统计学特征变化:如输入文本的平均长度、词汇丰富度、情感倾向、复杂度等。
- 语义特征变化:通过词嵌入或句子嵌入来量化文本的语义空间变化。
- 主题分布变化:通过主题模型(如LDA、BERTopic)来比较主题构成。
- 语言特征变化:如特定关键词的频率、语法结构的变化、甚至语言本身(如果支持多语言)。
我们的目标是,当这些关键特征的分布与基线显著不同时,就发出预警。
特征工程:从原始文本到可量化指标
为了监测数据漂移,我们首先需要将原始的用户输入文本转化为可量化的数值特征。这些特征将作为我们进行统计比较的基础。
以下是一些常用的特征工程策略:
1. 简单统计特征 (Simple Statistical Features)
这些特征易于计算,能快速反映文本的基本属性。
- 文本长度:字符数、词数。
- 词汇丰富度:唯一词汇数 / 总词数(Type-Token Ratio)。
- 平均词长:反映文本的复杂性。
- 标点符号使用率:感叹号、问号等。
- 数字比例:文本中包含数字的比例。
2. 可读性指标 (Readability Scores)
衡量文本的易读性,通常与受众的阅读水平相关。
- Flesch Reading Ease Score
- Flesch-Kincaid Grade Level
- Automated Readability Index (ARI)
3. 情感分析 (Sentiment Analysis)
检测文本的情感倾向(积极、消极、中性),对于客服、评论分析等场景尤其重要。
4. 命名实体识别 (Named Entity Recognition, NER)
提取人名、地名、组织名等,可以监测特定实体类型的出现频率变化。
5. 关键词与短语提取 (Keyword/Phrase Extraction)
识别文本中的重要关键词或短语,追踪它们在生产数据中的频率变化。
6. 主题模型 (Topic Modeling)
使用LDA (Latent Dirichlet Allocation)、NMF (Non-negative Matrix Factorization) 或更现代的BERTopic等技术,将文本映射到一组主题,然后比较主题分布的变化。
7. 文本嵌入 (Text Embeddings)
这是捕捉语义漂移最强大的方法。通过预训练的语言模型(如BERT、RoBERTa、Sentence-BERT等)将文本转换为高维向量。这些向量在语义上相似的文本在向量空间中距离较近。
我们可以:
- 计算生产输入嵌入与基线输入嵌入之间的平均距离。
- 使用聚类算法识别新的语义簇。
- 监测嵌入空间中数据点的密度变化。
示例:使用Python和Sentence Transformers进行特征提取
import numpy as np
import spacy
from textstat import textstat
from sentence_transformers import SentenceTransformer
from collections import Counter
import re
from typing import List, Dict, Any
# 加载Spacy模型用于NLP处理,这里使用一个较小的模型,生产环境可能需要更大的
try:
nlp = spacy.load("en_core_web_sm")
except OSError:
print("Downloading spacy model 'en_core_web_sm'...")
from spacy.cli import download
download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
# 加载Sentence Transformer模型用于生成文本嵌入
# sentence_transformer_model = SentenceTransformer('all-MiniLM-L6-v2')
# 为了避免下载大型模型,这里模拟一个,实际使用时请取消注释上一行
class MockSentenceTransformer:
def encode(self, sentences, convert_to_numpy=True):
# 模拟生成一个固定维度的向量
if isinstance(sentences, str):
sentences = [sentences]
embeddings = np.random.rand(len(sentences), 384) # all-MiniLM-L6-v2 produces 384-dim embeddings
return embeddings if convert_to_numpy else list(embeddings)
sentence_transformer_model = MockSentenceTransformer()
def extract_features(text: str) -> Dict[str, Any]:
"""
从单个文本中提取一系列特征。
"""
doc = nlp(text)
# 1. 简单统计特征
char_count = len(text)
word_count = len([token for token in doc if not token.is_punct and not token.is_space])
unique_word_count = len(set(token.text.lower() for token in doc if not token.is_punct and not token.is_space))
type_token_ratio = unique_word_count / word_count if word_count > 0 else 0
avg_word_length = np.mean([len(token.text) for token in doc if not token.is_punct and not token.is_space]) if word_count > 0 else 0
num_digit_chars = sum(1 for char in text if char.isdigit())
digit_ratio = num_digit_chars / char_count if char_count > 0 else 0
# 2. 可读性指标 (需要处理空文本)
flesch_reading_ease = textstat.flesch_reading_ease(text) if text.strip() else 0
flesch_kincaid_grade = textstat.flesch_kincaid_grade(text) if text.strip() else 0
# 3. 情感分析 (这里使用一个简单的基于规则的模拟,实际应使用专门库如NLTK VADER或Transformer模型)
positive_words = ["good", "great", "excellent", "happy", "love"]
negative_words = ["bad", "terrible", "horrible", "sad", "hate"]
sentiment_score = 0
text_lower = text.lower()
for p_word in positive_words:
sentiment_score += text_lower.count(p_word)
for n_word in negative_words:
sentiment_score -= text_lower.count(n_word)
# 4. 命名实体计数 (示例,只计算最常见的实体类型)
ner_counts = Counter([ent.label_ for ent in doc.ents])
# 5. 文本嵌入 (高维向量,通常用于计算相似度而不是直接统计)
embedding = sentence_transformer_model.encode([text])[0]
return {
"char_count": char_count,
"word_count": word_count,
"type_token_ratio": type_token_ratio,
"avg_word_length": avg_word_length,
"digit_ratio": digit_ratio,
"flesch_reading_ease": flesch_reading_ease,
"flesch_kincaid_grade": flesch_kincaid_grade,
"sentiment_score": sentiment_score,
"ner_person_count": ner_counts.get("PERSON", 0),
"ner_org_count": ner_counts.get("ORG", 0),
"embedding": embedding.tolist() # 转换为列表以便存储或传输
}
def batch_extract_features(texts: List[str]) -> List[Dict[str, Any]]:
"""
批量提取文本特征。
"""
all_features = []
# 批量生成嵌入
embeddings = sentence_transformer_model.encode(texts)
for i, text in enumerate(texts):
features = extract_features(text)
features["embedding"] = embeddings[i].tolist() # 替换为实际批量生成的嵌入
all_features.append(features)
return all_features
# 示例使用
if __name__ == "__main__":
sample_text_1 = "Hello, what is the capital of France? I need to know for my trip."
sample_text_2 = "Tell me about the latest advancements in quantum computing. This is truly fascinating."
sample_text_3 = "Where is John Doe working now? Is it Google or Microsoft?"
features_1 = extract_features(sample_text_1)
features_2 = extract_features(sample_text_2)
features_3 = extract_features(sample_text_3)
print("Sample Text 1 Features:", {k: v for k, v in features_1.items() if k != 'embedding'})
print("Sample Text 2 Features:", {k: v for k, v in features_2.items() if k != 'embedding'})
print("Sample Text 3 Features:", {k: v for k, v in features_3.items() if k != 'embedding'})
batch_texts = [sample_text_1, sample_text_2, sample_text_3]
batch_feats = batch_extract_features(batch_texts)
print("nBatch Features (first item, excluding embedding):", {k: v for k, v in batch_feats[0].items() if k != 'embedding'})
统计方法:量化“偏离”的程度
一旦我们有了可量化的特征,下一步就是比较当前生产数据与基线数据的特征分布。这里有几种常用的统计方法:
1. 均值/中位数比较 (Mean/Median Comparison)
对于数值特征(如词数、情感分数),可以直接比较它们的均值或中位数是否发生显著变化。
- 指标:当前均值 vs 基线均值。
- 阈值:设定一个绝对差值或百分比差值。
2. 分布比较 (Distribution Comparison)
对于数值特征的完整分布,我们可以使用统计检验来判断两个分布是否来自同一个总体。
- Kolmogorov-Smirnov (KS) Test:用于比较两个连续、单变量分布。它能检测出分布形状、位置、尺度的差异。
- Chi-Squared Test:用于比较分类特征的分布(如NER标签的频率)。
3. 距离度量 (Distance Metrics)
对于高维特征(如文本嵌入),我们通常使用距离度量来量化它们之间的差异。
- 余弦相似度 (Cosine Similarity):衡量两个向量之间的角度,广泛用于文本相似度。当平均余弦相似度下降时,可能意味着语义漂移。
- 欧氏距离 (Euclidean Distance):两个向量在欧氏空间中的直线距离。
- Wasserstein Distance (Earth Mover’s Distance):衡量将一个分布转换为另一个分布所需的“工作量”,对于比较高维分布更为鲁棒。
4. 异常检测 (Anomaly Detection)
将漂移视为一种异常。在基线数据上训练一个异常检测模型(如Isolation Forest, One-Class SVM),然后用它来检测生产数据中的异常点。
示例:使用Python进行统计比较
import numpy as np
from scipy.stats import ks_2samp, chi2_contingency
from sklearn.metrics.pairwise import cosine_similarity
def calculate_drift_metrics(baseline_features: List[Dict[str, Any]], current_features: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
计算当前特征与基线特征之间的漂移度量。
"""
drift_results = {}
if not baseline_features or not current_features:
return {"error": "Baseline or current features are empty."}
# 提取所有数值特征的列表
baseline_char_counts = [f["char_count"] for f in baseline_features]
current_char_counts = [f["char_count"] for f in current_features]
baseline_word_counts = [f["word_count"] for f in baseline_features]
current_word_counts = [f["word_count"] for f in current_features]
baseline_flesch_scores = [f["flesch_reading_ease"] for f in baseline_features]
current_flesch_scores = [f["flesch_reading_ease"] for f in current_features]
baseline_sentiment_scores = [f["sentiment_score"] for f in baseline_features]
current_sentiment_scores = [f["sentiment_score"] for f in current_features]
# 1. 均值比较示例
drift_results["mean_char_count_diff_pct"] = (np.mean(current_char_counts) - np.mean(baseline_char_counts)) / np.mean(baseline_char_counts) if np.mean(baseline_char_counts) > 0 else 0
drift_results["mean_word_count_diff_pct"] = (np.mean(current_word_counts) - np.mean(baseline_word_counts)) / np.mean(baseline_word_counts) if np.mean(baseline_word_counts) > 0 else 0
drift_results["mean_sentiment_score_diff"] = np.mean(current_sentiment_scores) - np.mean(baseline_sentiment_scores)
# 2. KS检验示例 (用于连续数值特征)
# KS检验返回statistic和p-value,p-value越小表示两个分布差异越大
# 我们关注statistic,它是一个距离度量
ks_stat_char_count, ks_p_char_count = ks_2samp(baseline_char_counts, current_char_counts)
drift_results["ks_stat_char_count"] = ks_stat_char_count
# drift_results["ks_p_char_count"] = ks_p_char_count # 可以根据p-value设置更严格的阈值
ks_stat_flesch, ks_p_flesch = ks_2samp(baseline_flesch_scores, current_flesch_scores)
drift_results["ks_stat_flesch_reading_ease"] = ks_stat_flesch
# 3. 余弦相似度 (针对嵌入)
# 计算当前批次中每个嵌入与基线批次中所有嵌入的平均最大相似度
# 或者计算当前批次所有嵌入的平均嵌入,然后与基线平均嵌入比较
baseline_embeddings = np.array([f["embedding"] for f in baseline_features])
current_embeddings = np.array([f["embedding"] for f in current_features])
if baseline_embeddings.shape[0] > 0 and current_embeddings.shape[0] > 0:
# 计算当前批次中每个样本与基线所有样本的最高相似度,然后取平均
# 这可以反映当前数据点是否仍然“接近”基线中的某些点
max_similarities = []
for current_emb in current_embeddings:
sims = cosine_similarity([current_emb], baseline_embeddings)[0]
max_similarities.append(np.max(sims))
drift_results["avg_max_cosine_similarity_to_baseline"] = np.mean(max_similarities)
# 也可以计算两个批次的平均嵌入的相似度
mean_baseline_embedding = np.mean(baseline_embeddings, axis=0).reshape(1, -1)
mean_current_embedding = np.mean(current_embeddings, axis=0).reshape(1, -1)
drift_results["cosine_similarity_mean_embeddings"] = cosine_similarity(mean_baseline_embedding, mean_current_embedding)[0][0]
else:
drift_results["avg_max_cosine_similarity_to_baseline"] = 0
drift_results["cosine_similarity_mean_embeddings"] = 0
# 4. Chi-Squared Test 示例 (针对分类特征,如NER计数)
# 假设我们只关心 "PERSON" 和 "ORG" 实体的比例
baseline_ner_person_total = sum([f["ner_person_count"] for f in baseline_features])
baseline_ner_org_total = sum([f["ner_org_count"] for f in baseline_features])
baseline_ner_other_total = sum([f["word_count"] - f["ner_person_count"] - f["ner_org_count"] for f in baseline_features]) # 简化处理,其他词汇
current_ner_person_total = sum([f["ner_person_count"] for f in current_features])
current_ner_org_total = sum([f["ner_org_count"] for f in current_features])
current_ner_other_total = sum([f["word_count"] - f["ner_person_count"] - f["ner_org_count"] for f in current_features])
# 构建列联表
# 为了避免零导致卡方检验失败,添加小值平滑
epsilon = 1e-9
baseline_counts = np.array([baseline_ner_person_total + epsilon, baseline_ner_org_total + epsilon, baseline_ner_other_total + epsilon])
current_counts = np.array([current_ner_person_total + epsilon, current_ner_org_total + epsilon, current_ner_other_total + epsilon])
# 对比的是比例,所以需要归一化
baseline_proportions = baseline_counts / np.sum(baseline_counts)
current_proportions = current_counts / np.sum(current_counts)
# 卡方检验通常需要原始计数,这里我们模拟一个场景,假设我们有足够多的观测值
# 实际应用中,如果数据量太小,卡方检验可能不适用
# 假设我们有1000个样本,乘以比例得到期望计数
total_baseline_samples = len(baseline_features)
total_current_samples = len(current_features)
contingency_table = np.array([
baseline_proportions * total_baseline_samples,
current_proportions * total_current_samples
])
if np.min(contingency_table) > 0 and contingency_table.shape == (2,3): # 卡方检验要求期望频数不为0
chi2_stat, chi2_p, _, _ = chi2_contingency(contingency_table)
drift_results["chi2_stat_ner_distribution"] = chi2_stat
else:
drift_results["chi2_stat_ner_distribution"] = 0 # 无法计算
return drift_results
# 示例使用
if __name__ == "__main__":
# 模拟基线数据
baseline_texts = [
"What is the weather like today in London?",
"How do I reset my password for my account?",
"Can you tell me about the history of artificial intelligence?",
"I love this product, it's so easy to use.",
"What time is the flight to New York?",
"Who invented the light bulb?",
"Can Google help me find a good restaurant?",
"Tell me about the CEO of Apple, Tim Cook."
]
baseline_feats = batch_extract_features(baseline_texts)
# 模拟当前生产数据 (漂移示例: 更多关于体育和名人八卦,更短的查询,更口语化)
current_drift_texts = [
"Who won the last football match?",
"Did Messi score?",
"Tell me about Taylor Swift's new album.",
"This movie was terrible.",
"Quick, what's happening?",
"Is Tom Hanks in that new film?",
"Where's the nearest Starbucks?",
"My laptop is broken, help!"
]
current_drift_feats = batch_extract_features(current_drift_texts)
# 模拟当前生产数据 (无漂移示例: 类似基线)
current_no_drift_texts = [
"What's the weather forecast for Paris tomorrow?",
"How to change my email settings?",
"Explain the concept of machine learning.",
"I am very happy with your service.",
"When does the train to Boston leave?",
"Who discovered penicillin?",
"Can Microsoft Bing assist with my search?",
"Tell me about Satya Nadella, CEO of Microsoft."
]
current_no_drift_feats = batch_extract_features(current_no_drift_texts)
print("n--- Drift Detection Results (Drifted Data) ---")
drift_results_drifted = calculate_drift_metrics(baseline_feats, current_drift_feats)
for k, v in drift_results_drifted.items():
print(f"{k}: {v:.4f}")
print("n--- Drift Detection Results (No Drift Data) ---")
drift_results_no_drift = calculate_drift_metrics(baseline_feats, current_no_drift_feats)
for k, v in drift_results_no_drift.items():
print(f"{k}: {v:.4f}")
# 预期:
# drift_results_drifted 的均值差异、KS统计量、NER卡方统计量会更大
# avg_max_cosine_similarity_to_baseline 和 cosine_similarity_mean_embeddings 会更小
实施漂移监测:LangSmith实践
现在,我们已经有了特征提取和漂移检测的工具,接下来是如何将它们集成到LangSmith中,以实现自动化监测和预警。
核心思路是:
- 持续记录生产流量:将所有真实用户输入和LLM的响应发送到LangSmith。
- 创建基线数据集:在LangSmith中创建一个代表“正常”输入的数据集。
- 开发自定义评估器:这个评估器将执行我们的特征提取和漂移检测逻辑。
- 定期运行评估:LangSmith可以自动或手动触发针对生产数据的评估运行。
- 配置警报:基于评估结果中的漂移指标,在LangSmith中设置阈值警报。
步骤一:设置LangSmith环境与LangChain应用
首先,确保你安装了必要的库并配置了LangSmith环境变量。
pip install langchain langsmith openai spacy textstat sentence-transformers numpy scipy scikit-learn
python -m spacy download en_core_web_sm
设置环境变量:
export LANGCHAIN_TRACING_V2="true"
export LANGCHAIN_API_KEY="YOUR_LANGSMITH_API_KEY"
export LANGCHAIN_PROJECT="llm-data-drift-monitor" # 你的项目名称
export OPENAI_API_KEY="YOUR_OPENAI_API_KEY"
一个简单的LangChain应用示例:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langsmith import traceable, Client
# 初始化LangSmith客户端
client = Client()
# 简单的LLM链
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant. Answer the user's questions concisely."),
("user", "{question}")
])
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
output_parser = StrOutputParser()
chain = prompt | llm | output_parser
@traceable(run_type="llm_app", project_name=os.environ["LANGCHAIN_PROJECT"])
def invoke_llm_app(user_input: str):
"""
模拟LLM应用的调用,并自动追踪到LangSmith。
"""
print(f"Processing input: '{user_input}'")
response = chain.invoke({"question": user_input})
print(f"LLM Response: '{response}'")
return response
# 示例调用
if __name__ == "__main__":
invoke_llm_app("What is the capital of France?")
invoke_llm_app("Explain quantum entanglement in simple terms.")
运行上述代码,你会在LangSmith UI中看到llm-data-drift-monitor项目下生成了一条条追踪记录。
步骤二:创建基线数据集 (Baseline Dataset) 到LangSmith
我们将使用前面提取的特征来代表基线。在LangSmith中,可以手动创建数据集,也可以通过API上传。我们将上传一个包含基线输入及其提取特征的数据集。
import os
from langsmith import Client, Dataset
from typing import List, Dict, Any
# 假设我们在本地已经准备好了基线文本和它们的特征
# baseline_texts = [...] # 从你的测试集或早期生产数据中获取
# baseline_features_list = batch_extract_features(baseline_texts) # 调用之前的特征提取函数
# 为了演示,我们再次生成一些模拟基线数据
baseline_texts_for_dataset = [
"What is the weather like today in London?",
"How do I reset my password for my account?",
"Can you tell me about the history of artificial intelligence?",
"I love this product, it's so easy to use.",
"What time is the flight to New York?",
"Who invented the light bulb?",
"Can Google help me find a good restaurant?",
"Tell me about the CEO of Apple, Tim Cook.",
"Explain the concept of gravity.",
"What are the benefits of meditation?"
]
baseline_features_for_dataset = batch_extract_features(baseline_texts_for_dataset)
# LangSmith客户端
client = Client()
# 创建或获取数据集
dataset_name = "Baseline-LLM-Inputs-with-Features"
try:
dataset = client.read_dataset(dataset_name=dataset_name)
print(f"Dataset '{dataset_name}' already exists. Recreating it for demonstration.")
client.delete_dataset(dataset_name=dataset_name) # 为了演示,先删除再创建
dataset = client.create_dataset(
dataset_name=dataset_name,
description="Baseline inputs for LLM drift detection, with pre-extracted features."
)
except Exception:
dataset = client.create_dataset(
dataset_name=dataset_name,
description="Baseline inputs for LLM drift detection, with pre-extracted features."
)
# 添加示例到数据集
for i, (text, features) in enumerate(zip(baseline_texts_for_dataset, baseline_features_for_dataset)):
client.create_example(
inputs={"input_text": text, "features": features},
outputs={"expected_output": "Not applicable for drift baseline"}, # drift baseline通常不需要期望输出
dataset_id=dataset.id,
name=f"baseline-example-{i}"
)
print(f"Uploaded {len(baseline_texts_for_dataset)} examples to dataset '{dataset_name}'.")
现在,LangSmith中就有了一个名为Baseline-LLM-Inputs-with-Features的数据集,包含了基线文本和它们的特征。
步骤三:开发自定义评估器 (Custom Evaluator)
这是核心部分。我们将创建一个Python函数,它接收生产运行的输入(以及可能的输出),提取其特征,然后与我们存储在LangSmith基线数据集中的特征进行比较,计算出漂移指标。
LangSmith的评估器函数需要遵循特定接口,它接收一个run对象和一个example对象(如果评估是针对某个特定示例)。但对于数据漂移,我们通常是评估一批生产运行与一个基线数据集的整体。
我们可以创建一个独立的Python脚本来执行这个逻辑,并使用client.create_feedback将结果发送回LangSmith。
import os
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any, Tuple
import numpy as np
from langsmith import Client, Run
from langsmith.schemas import Example, Feedback, RunTypeEnum
# 导入之前定义的特征提取和漂移检测函数
# from your_feature_extraction_module import batch_extract_features
# from your_drift_detection_module import calculate_drift_metrics
# 假设这些函数已经导入或定义在当前文件中
# (为了代码的完整性,这里再次包含它们,实际应用中可以模块化)
# --- Start of re-defined functions for self-containment ---
import spacy
from textstat import textstat
from sentence_transformers import SentenceTransformer
from collections import Counter
import re
from scipy.stats import ks_2samp, chi2_contingency
from sklearn.metrics.pairwise import cosine_similarity
try:
nlp = spacy.load("en_core_web_sm")
except OSError:
from spacy.cli import download
download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
class MockSentenceTransformer:
def encode(self, sentences, convert_to_numpy=True):
if isinstance(sentences, str):
sentences = [sentences]
embeddings = np.random.rand(len(sentences), 384)
return embeddings if convert_to_numpy else list(embeddings)
sentence_transformer_model = MockSentenceTransformer()
def extract_features(text: str) -> Dict[str, Any]:
doc = nlp(text)
char_count = len(text)
word_count = len([token for token in doc if not token.is_punct and not token.is_space])
unique_word_count = len(set(token.text.lower() for token in doc if not token.is_punct and not token.is_space))
type_token_ratio = unique_word_count / word_count if word_count > 0 else 0
avg_word_length = np.mean([len(token.text) for token in doc if not token.is_punct and not token.is_space]) if word_count > 0 else 0
num_digit_chars = sum(1 for char in text if char.isdigit())
digit_ratio = num_digit_chars / char_count if char_count > 0 else 0
flesch_reading_ease = textstat.flesch_reading_ease(text) if text.strip() else 0
flesch_kincaid_grade = textstat.flesch_kincaid_grade(text) if text.strip() else 0
positive_words = ["good", "great", "excellent", "happy", "love"]
negative_words = ["bad", "terrible", "horrible", "sad", "hate"]
sentiment_score = 0
text_lower = text.lower()
for p_word in positive_words:
sentiment_score += text_lower.count(p_word)
for n_word in negative_words:
sentiment_score -= text_lower.count(n_word)
ner_counts = Counter([ent.label_ for ent in doc.ents])
embedding = sentence_transformer_model.encode([text])[0]
return {
"char_count": char_count,
"word_count": word_count,
"type_token_ratio": type_token_ratio,
"avg_word_length": avg_word_length,
"digit_ratio": digit_ratio,
"flesch_reading_ease": flesch_reading_ease,
"flesch_kincaid_grade": flesch_kincaid_grade,
"sentiment_score": sentiment_score,
"ner_person_count": ner_counts.get("PERSON", 0),
"ner_org_count": ner_counts.get("ORG", 0),
"embedding": embedding.tolist()
}
def batch_extract_features(texts: List[str]) -> List[Dict[str, Any]]:
all_features = []
if not texts: return []
embeddings = sentence_transformer_model.encode(texts)
for i, text in enumerate(texts):
features = extract_features(text)
features["embedding"] = embeddings[i].tolist()
all_features.append(features)
return all_features
def calculate_drift_metrics(baseline_features: List[Dict[str, Any]], current_features: List[Dict[str, Any]]) -> Dict[str, Any]:
drift_results = {}
if not baseline_features or not current_features:
return {"error": "Baseline or current features are empty."}
baseline_char_counts = [f["char_count"] for f in baseline_features]
current_char_counts = [f["char_count"] for f in current_features]
baseline_word_counts = [f["word_count"] for f in baseline_features]
current_word_counts = [f["word_count"] for f in current_features]
baseline_flesch_scores = [f["flesch_reading_ease"] for f in baseline_features]
current_flesch_scores = [f["flesch_reading_ease"] for f in current_features]
baseline_sentiment_scores = [f["sentiment_score"] for f in baseline_features]
current_sentiment_scores = [f["sentiment_score"] for f in current_features]
drift_results["mean_char_count_diff_pct"] = (np.mean(current_char_counts) - np.mean(baseline_char_counts)) / np.mean(baseline_char_counts) if np.mean(baseline_char_counts) > 0 else 0
drift_results["mean_word_count_diff_pct"] = (np.mean(current_word_counts) - np.mean(baseline_word_counts)) / np.mean(baseline_word_counts) if np.mean(baseline_word_counts) > 0 else 0
drift_results["mean_sentiment_score_diff"] = np.mean(current_sentiment_scores) - np.mean(baseline_sentiment_scores)
ks_stat_char_count, _ = ks_2samp(baseline_char_counts, current_char_counts)
drift_results["ks_stat_char_count"] = ks_stat_char_count
ks_stat_flesch, _ = ks_2samp(baseline_flesch_scores, current_flesch_scores)
drift_results["ks_stat_flesch_reading_ease"] = ks_stat_flesch
baseline_embeddings = np.array([f["embedding"] for f in baseline_features])
current_embeddings = np.array([f["embedding"] for f in current_features])
if baseline_embeddings.shape[0] > 0 and current_embeddings.shape[0] > 0:
max_similarities = []
for current_emb in current_embeddings:
sims = cosine_similarity([current_emb], baseline_embeddings)[0]
max_similarities.append(np.max(sims))
drift_results["avg_max_cosine_similarity_to_baseline"] = np.mean(max_similarities)
mean_baseline_embedding = np.mean(baseline_embeddings, axis=0).reshape(1, -1)
mean_current_embedding = np.mean(current_embeddings, axis=0).reshape(1, -1)
drift_results["cosine_similarity_mean_embeddings"] = cosine_similarity(mean_baseline_embedding, mean_current_embedding)[0][0]
else:
drift_results["avg_max_cosine_similarity_to_baseline"] = 0
drift_results["cosine_similarity_mean_embeddings"] = 0
baseline_ner_person_total = sum([f["ner_person_count"] for f in baseline_features])
baseline_ner_org_total = sum([f["ner_org_count"] for f in baseline_features])
baseline_ner_other_total = sum([f["word_count"] - f["ner_person_count"] - f["ner_org_count"] for f in baseline_features])
current_ner_person_total = sum([f["ner_person_count"] for f in current_features])
current_ner_org_total = sum([f["ner_org_count"] for f in current_features])
current_ner_other_total = sum([f["word_count"] - f["ner_person_count"] - f["ner_org_count"] for f in current_features])
epsilon = 1e-9
baseline_counts = np.array([baseline_ner_person_total + epsilon, baseline_ner_org_total + epsilon, baseline_ner_other_total + epsilon])
current_counts = np.array([current_ner_person_total + epsilon, current_ner_org_total + epsilon, current_ner_other_total + epsilon])
baseline_proportions = baseline_counts / np.sum(baseline_counts)
current_proportions = current_counts / np.sum(current_counts)
total_baseline_samples = len(baseline_features)
total_current_samples = len(current_features)
contingency_table = np.array([
baseline_proportions * total_baseline_samples,
current_proportions * total_current_samples
])
if np.min(contingency_table) > 0 and contingency_table.shape == (2,3):
chi2_stat, _, _, _ = chi2_contingency(contingency_table)
drift_results["chi2_stat_ner_distribution"] = chi2_stat
else:
drift_results["chi2_stat_ner_distribution"] = 0
return drift_results
# --- End of re-defined functions ---
client = Client()
def get_baseline_features(dataset_name: str) -> List[Dict[str, Any]]:
"""
从LangSmith数据集中读取基线特征。
"""
dataset = client.read_dataset(dataset_name=dataset_name)
examples = client.list_examples(dataset_id=dataset.id)
baseline_features = []
for example in examples:
if "features" in example.inputs:
baseline_features.append(example.inputs["features"])
return baseline_features
def get_production_runs_for_evaluation(
project_name: str,
start_time: datetime,
end_time: datetime,
run_type: RunTypeEnum = RunTypeEnum.LLM_APP,
max_runs: int = 1000
) -> List[Run]:
"""
从LangSmith项目中获取指定时间范围内的生产运行。
"""
runs = client.list_runs(
project_name=project_name,
start_time=start_time,
end_time=end_time,
run_type=run_type,
limit=max_runs
)
# 过滤掉没有输入的运行
filtered_runs = [run for run in runs if run.inputs and 'question' in run.inputs]
return filtered_runs
@traceable(run_type="evaluator", name="Data Drift Detector")
def run_drift_evaluation(
project_name: str,
baseline_dataset_name: str,
evaluation_interval_minutes: int = 60
) -> Dict[str, Any]:
"""
执行数据漂移评估。
"""
print(f"Starting data drift evaluation for project '{project_name}'...")
# 1. 获取基线特征
print("Fetching baseline features...")
baseline_features = get_baseline_features(baseline_dataset_name)
if not baseline_features:
print("Error: Baseline features not found in dataset.")
return {"status": "failed", "message": "Baseline features not found."}
print(f"Loaded {len(baseline_features)} baseline examples.")
# 2. 获取当前生产运行数据 (例如,过去1小时内的)
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=evaluation_interval_minutes)
print(f"Fetching production runs from {start_time} to {end_time}...")
current_production_runs = get_production_runs_for_evaluation(
project_name=project_name,
start_time=start_time,
end_time=end_time
)
if not current_production_runs:
print("No production runs found in the specified interval. Skipping evaluation.")
return {"status": "skipped", "message": "No production runs found."}
print(f"Found {len(current_production_runs)} production runs.")
# 3. 从生产运行中提取输入文本和特征
current_texts = [run.inputs['question'] for run in current_production_runs]
print(f"Extracting features from {len(current_texts)} current production texts...")
current_features = batch_extract_features(current_texts)
# 4. 计算漂移指标
print("Calculating drift metrics...")
drift_metrics = calculate_drift_metrics(baseline_features, current_features)
# 5. 将漂移指标作为反馈发送到LangSmith
# 这里我们将所有指标聚合到一个Feedback对象中,附在评估器自身的run上
# 可以在LangSmith UI中查看这个Evaluator Run的Feedback
feedback_score = 0 # 综合漂移分数,需要根据具体指标加权计算
comments = "Data drift evaluation completed."
# 简单示例:如果任何KS统计量或NER卡方统计量超过阈值,则认为有漂移
is_drift_detected = False
drift_thresholds = {
"ks_stat_char_count": 0.2, # KS统计量阈值,0表示完全相同,1表示完全不同
"ks_stat_flesch_reading_ease": 0.2,
"chi2_stat_ner_distribution": 10.0, # 卡方统计量阈值,取决于自由度和显著性水平
"cosine_similarity_mean_embeddings": 0.85 # 平均嵌入余弦相似度,低于此值可能表示语义漂移
}
detailed_scores = {}
for metric, value in drift_metrics.items():
detailed_scores[metric] = value
if metric in drift_thresholds:
if "ks_stat" in metric or "chi2_stat" in metric: # 越大越漂移
if value > drift_thresholds[metric]:
is_drift_detected = True
comments += f" WARNING: {metric} ({value:.4f}) exceeded threshold ({drift_thresholds[metric]})."
elif "cosine_similarity" in metric: # 越小越漂移
if value < drift_thresholds[metric]:
is_drift_detected = True
comments += f" WARNING: {metric} ({value:.4f}) fell below threshold ({drift_thresholds[metric]})."
# 最终的漂移状态作为评分
feedback_score = 1.0 if is_drift_detected else 0.0
feedback_key = "data_drift_status"
# 将评估结果附加到当前的评估器运行上
client.create_feedback(
run_id=client.current_run_id(), # 获取当前评估器运行的ID
key=feedback_key,
score=feedback_score,
value="Drift Detected" if is_drift_detected else "No Drift",
comment=comments,
source_info={
"run_ids": [run.id for run in current_production_runs], # 关联到被评估的生产运行
"metadata": detailed_scores # 存储所有详细的漂移指标
}
)
print(f"Data drift evaluation finished. Drift detected: {is_drift_detected}")
return {"status": "completed", "drift_detected": is_drift_detected, "metrics": detailed_scores}
# 示例:手动运行评估器
if __name__ == "__main__":
# 确保 LangSmith 环境变量已设置
if not all(k in os.environ for k in ["LANGCHAIN_API_KEY", "LANGCHAIN_PROJECT"]):
print("Please set LANGCHAIN_API_KEY and LANGCHAIN_PROJECT environment variables.")
exit()
# 模拟一些生产流量以确保有数据可供评估
print("Simulating some production traffic for the last 5 minutes...")
for _ in range(5): # 模拟5次调用
invoke_llm_app("What is the capital of France?")
invoke_llm_app("Tell me a joke.")
invoke_llm_app("What's up with the stock market?")
time.sleep(1) # 间隔1秒
# 运行漂移评估
drift_eval_results = run_drift_evaluation(
project_name=os.environ["LANGCHAIN_PROJECT"],
baseline_dataset_name="Baseline-LLM-Inputs-with-Features",
evaluation_interval_minutes=5 # 评估过去5分钟的生产数据
)
print("nDrift Evaluation Results:", drift_eval_results)
# 模拟漂移情况
print("nSimulating drifted production traffic for the last 5 minutes...")
drifted_queries = [
"Yo, what's new with that celeb gossip?",
"Did you see the game last night, who won?",
"My dog ate my homework, help!",
"Tell me about the latest viral TikTok challenge.",
"Where can I find cheap flights to Cancun?"
]
for query in drifted_queries:
invoke_llm_app(query)
time.sleep(1)
print("nRunning drift evaluation with simulated drifted data...")
drift_eval_results_drifted = run_drift_evaluation(
project_name=os.environ["LANGCHAIN_PROJECT"],
baseline_dataset_name="Baseline-LLM-Inputs-with-Features",
evaluation_interval_minutes=5 # 评估过去5分钟的生产数据
)
print("nDrift Evaluation Results (with drift):", drift_eval_results_drifted)
这段代码执行了以下操作:
get_baseline_features:从LangSmith中读取之前上传的基线数据集,获取其特征。get_production_runs_for_evaluation:从指定LangSmith项目中获取特定时间窗口内的LLM应用运行记录。run_drift_evaluation:这是核心评估器。- 它获取基线特征和当前生产运行的输入。
- 对当前生产输入进行特征提取。
- 调用
calculate_drift_metrics计算各种漂移指标。 - 根据预设的阈值判断是否发生漂移。
- 使用
client.create_feedback将漂移状态(data_drift_status)和详细指标记录到LangSmith中,关联到当前的评估器运行。
运行这个脚本后,你可以在LangSmith UI中找到一个名为Data Drift Detector的Evaluator Run。在这个Run的详情页面,你会看到一个data_drift_status的Feedback,其score和value会指示是否检测到漂移,并且source_info中的metadata会包含所有详细的漂移指标。
步骤四:配置警报 (Alerting Mechanism)
LangSmith本身提供基于评估结果的警报功能。一旦你的漂移评估器运行并生成了data_drift_status这样的Feedback,你就可以在LangSmith的UI中配置警报。
- 导航到项目:在LangSmith UI中选择你的项目 (
llm-data-drift-monitor)。 - 进入“Alerts”页面:通常在左侧导航栏。
- 创建新警报:
- Alert Name:例如 "Critical Data Drift Detected"。
- Monitor Type:选择 "Feedback Score"。
- Metric:选择你自定义的Feedback Key,例如
data_drift_status。 - Threshold Type:选择 "Is greater than or equal to"。
- Threshold Value:设置为
1.0(因为我们的漂移检测到时分数为1.0)。 - Alert Frequency:根据你的评估器运行频率设置(例如,每小时检查一次)。
- Notification Channels:配置通过邮件、Slack等方式发送通知。
这样,每当Data Drift Detector评估器运行,并且其data_drift_status Feedback的score达到1.0时(即检测到漂移),LangSmith就会自动触发你配置的警报。
高级考虑与最佳实践
1. 语义漂移的深度检测
- 聚类分析:对基线和生产数据的嵌入分别进行聚类。新的簇或现有簇的合并/分裂都可能指示语义漂移。可以使用t-SNE或UMAP进行降维可视化。
- 概念漂移 (Concept Drift):数据分布变化导致模型对相同输入输出不同结果。这通常通过监测模型性能指标(如准确率、F1分数)来间接发现。数据漂移是概念漂移的一个原因。
- 对抗性漂移:恶意用户试图通过特定输入绕过安全防护或操纵模型行为。需要专门的对抗性攻击检测。
2. 基线维护与更新
- 动态基线:基线不应一成不变。随着用户行为的自然演进,基线也需要定期更新。可以使用滑动窗口平均或指数加权平均来构建动态基线。
- 人工审核:在更新基线之前,对新数据进行人工审核,确保其代表性、质量和无偏性。
3. 阈值的设定
- 经验法则:根据历史数据和领域知识设定初始阈值。
- 统计学方法:使用3σ原则、IQR(四分位距)等统计方法从基线数据中学习阈值。
- A/B测试:在小范围生产环境中测试不同阈值对警报数量和准确性的影响。
- 假阳性与假阴性:平衡警报的敏感度。过多的假阳性会导致“警报疲劳”,错过真正的危机;过多的假阴性则会使监测系统失效。
4. 集成到MLOps管道
- 自动化调度:使用Airflow, Kubeflow, GitHub Actions等工具定期调度漂移评估器。
- 版本控制:对特征提取代码、漂移检测逻辑和基线数据集进行版本控制。
- 回滚策略:当检测到严重漂移并确认是模型问题时,应有能力快速回滚到旧版本模型。
5. 成本与性能
- 计算开销:特征提取和距离计算,特别是嵌入生成,可能消耗大量计算资源。需要权衡监测的频率和粒度。
- 采样:对于海量生产数据,可以对输入进行采样,而不是对所有数据进行检测。
- 增量更新:设计增量式计算逻辑,避免重复处理历史数据。
6. 多维度漂移报告
- 提供一个包含所有漂移指标的综合报告,帮助团队快速定位问题源头。
- 可视化工具,如直方图、密度图、散点图等,直观展示数据分布的变化。
7. 漂移的归因
- 仅仅知道发生了漂移是不够的,还需要知道是“什么”漂移了,“为什么”漂移了。例如,是用户的主题变了?还是语言风格变了?是否有外部事件(如新闻热点、季节性变化)导致了漂移?
结语
数据漂移监测是LLM应用从原型走向生产,并长期保持其健康运行的基石。通过LangSmith提供的强大追踪、评估和监控能力,结合我们自定义的特征工程和统计检测逻辑,我们能够构建一个及时、有效的预警系统。这不仅能帮助我们维护模型的性能,更能加深我们对真实用户行为模式的理解,为LLM的持续改进提供宝贵的数据洞察。拥抱数据漂移,就是拥抱真实世界的复杂性,并为我们的AI系统打造一条更加稳健的未来之路。