端到端 RAG 工程化:从训练到评估的实践指南
大家好,今天我们来深入探讨端到端检索增强生成(RAG)的工程化实践。RAG 作为 LLM 应用的关键技术,能显著提升 LLM 在特定领域内的知识覆盖和回答准确性。但是,要真正落地一个高效、可靠的 RAG 系统,需要一套完整的工程化体系,涵盖数据准备、模型训练、部署以及持续评估等多个环节。 本次分享将围绕这些环节,提供具体的技术方案和代码示例。
1. 数据准备与索引构建
RAG 系统的基石在于高质量的数据。数据准备的目标是将原始数据转化为 LLM 能够有效理解和利用的向量表示。
1.1 数据清洗与预处理:
原始数据通常包含噪声、格式不一致等问题。清洗和预处理的目标是使数据更加干净、结构化。
- 数据清洗: 移除无效字符、纠正拼写错误、处理缺失值。
- 文本分割: 将长文本分割成更小的块,以便进行向量化。常用的分割方法包括:
- 固定大小分割:将文本按照固定长度分割。
- 语义分割:根据句子或段落的语义进行分割。
- 递归分割:先按照大的语义单元分割,再递归地分割成更小的单元。
import nltk
from nltk.tokenize import sent_tokenize
def split_text_into_chunks(text, chunk_size=512, chunk_overlap=50):
"""
将文本分割成指定大小的块,并允许块之间重叠。
Args:
text: 要分割的文本。
chunk_size: 每个块的最大长度。
chunk_overlap: 块之间的重叠长度。
Returns:
一个包含文本块的列表。
"""
sentences = sent_tokenize(text) # 使用nltk按句子分割
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 <= chunk_size:
current_chunk += sentence + " "
else:
chunks.append(current_chunk.strip())
current_chunk = sentence + " "
if current_chunk:
chunks.append(current_chunk.strip())
# 处理重叠
overlapped_chunks = []
for i in range(len(chunks)):
overlapped_chunks.append(chunks[i])
if i < len(chunks) - 1:
overlap = " ".join(chunks[i].split()[-chunk_overlap:]) + " " + " ".join(chunks[i+1].split()[:chunk_overlap])
if overlap.strip(): # 避免空overlap
overlapped_chunks.append(overlap.strip())
return overlapped_chunks
# 示例
text = "这是一段示例文本。包含多个句子。我们需要将其分割成块。以便进行后续处理。"
chunks = split_text_into_chunks(text, chunk_size=20, chunk_overlap=5)
print(chunks)
1.2 向量化 Embedding:
将文本块转换为向量表示,以便进行相似性搜索。常用的 Embedding 模型包括:
- Sentence Transformers: 提供预训练的句子 Embedding 模型,适用于各种文本任务。
- OpenAI Embedding API: 提供高质量的 Embedding 服务,但需要付费。
- FastText: 快速且轻量级的文本 Embedding 模型。
from sentence_transformers import SentenceTransformer
# 选择 Embedding 模型
model_name = 'all-mpnet-base-v2'
model = SentenceTransformer(model_name)
def embed_text(text):
"""
将文本转换为向量表示。
Args:
text: 要嵌入的文本。
Returns:
文本的向量表示。
"""
embedding = model.encode(text)
return embedding
# 示例
text = "这是一段示例文本。"
embedding = embed_text(text)
print(embedding.shape) #输出 (768,) all-mpnet-base-v2是768维向量
1.3 索引构建:
将向量化的文本块存储到向量数据库中,以便进行快速相似性搜索。常用的向量数据库包括:
- FAISS: Facebook AI Similarity Search,高性能的向量相似性搜索库。
- Milvus: 开源的向量数据库,支持海量向量数据的存储和检索。
- Pinecone: 云原生的向量数据库,提供高可用性和可扩展性。
- ChromaDB: 轻量级的嵌入式向量数据库,适合本地开发和测试。
import faiss
import numpy as np
class VectorDB:
def __init__(self, dimension):
self.dimension = dimension
self.index = faiss.IndexFlatL2(dimension) # 使用L2距离作为相似度度量
self.id_map = {} # 存储文本块ID和对应文本的映射
def add(self, vectors, texts):
"""
向向量数据库中添加向量和文本。
Args:
vectors: 要添加的向量列表,numpy array.
texts: 与向量对应的文本列表。
"""
start_id = len(self.id_map)
for i, vector in enumerate(vectors):
self.index.add(np.expand_dims(vector, axis=0)) # FAISS需要二维数组
self.id_map[start_id + i] = texts[i]
def search(self, query_vector, top_k=5):
"""
在向量数据库中搜索与查询向量最相似的文本。
Args:
query_vector: 查询向量。
top_k: 返回最相似的文本数量。
Returns:
一个包含最相似文本的列表。
"""
D, I = self.index.search(np.expand_dims(query_vector, axis=0), top_k) # D是距离,I是索引
results = []
for idx in I[0]: # I 是二维数组,取第一个(也是唯一的)查询结果的索引
results.append(self.id_map[idx])
return results
# 示例
# 假设我们已经有了 embedding 和 chunks
dimension = embedding.shape[0] # 获取向量维度
db = VectorDB(dimension)
db.add([embedding], [text]) # 这里传递的是列表,因为可以一次添加多个向量和文本
query = "示例文本"
query_embedding = embed_text(query)
results = db.search(query_embedding, top_k=1)
print(results)
2. 模型训练与微调
虽然 RAG 的核心是检索,但 LLM 的生成能力同样至关重要。在某些情况下,需要对 LLM 进行微调,以提升其在特定任务上的表现。
2.1 数据集准备:
微调 LLM 需要准备包含问题和对应答案的数据集。数据集的质量直接影响微调效果。数据可以手动构建,也可以通过数据增强技术生成。
2.2 微调方法:
常用的 LLM 微调方法包括:
- 全参数微调: 更新 LLM 的所有参数,计算成本高昂。
- LoRA (Low-Rank Adaptation): 冻结 LLM 的大部分参数,只训练少量新增的低秩矩阵,显著降低计算成本。
- Prefix Tuning: 在 LLM 的输入前添加可训练的 Prefix,引导 LLM 生成期望的答案。
# 使用 LoRA 微调 Hugging Face 模型
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer
from peft import LoraConfig, get_peft_model
import torch
# 1. 加载模型和 tokenizer
model_name = "facebook/opt-350m" #选择一个较小的模型
model = AutoModelForCausalLM.from_pretrained(model_name, device_map='auto')
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token # 确保有pad token
# 2. 准备数据集 (这里简化处理,使用一个虚拟数据集)
train_data = [
{"question": "What is the capital of France?", "answer": "The capital of France is Paris."},
{"question": "What is the largest planet in our solar system?", "answer": "The largest planet in our solar system is Jupiter."}
]
def preprocess_function(examples):
inputs = [f"Question: {q} Answer: {a}" for q, a in zip(examples["question"], examples["answer"])]
model_inputs = tokenizer(inputs, truncation=True, padding=True, max_length=128, return_tensors="pt")
model_inputs["labels"] = model_inputs["input_ids"].clone() # labels和input_ids一样,用于自回归训练
return model_inputs
# 将数据转换为 Dataset 对象 (需要先安装 datasets 库)
from datasets import Dataset
train_dataset = Dataset.from_dict({"question": [d["question"] for d in train_data], "answer": [d["answer"] for d in train_data]})
tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True)
# 3. 配置 LoRA
lora_config = LoraConfig(
r=8, # LoRA秩
lora_alpha=32,
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM"
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters() # 打印可训练参数数量
# 4. 配置 Trainer
training_args = TrainingArguments(
output_dir="./lora-tuned-model",
per_device_train_batch_size=4,
gradient_accumulation_steps=4,
learning_rate=2e-4,
logging_steps=10,
max_steps=50, #减少训练步数,快速演示
save_strategy="steps",
save_steps=50,
push_to_hub=False, # 禁用push to hub
optim="paged_adamw_32bit",
fp16=True # 使用混合精度训练
)
trainer = Trainer(
model=model,
train_dataset=tokenized_train_dataset,
args=training_args,
data_collator=lambda data: {'input_ids': torch.stack([f['input_ids'] for f in data]),
'attention_mask': torch.stack([f['attention_mask'] for f in data]),
'labels': torch.stack([f['labels'] for f in data])}
)
# 5. 训练
trainer.train()
# 6. 保存模型
model.save_pretrained("./lora-tuned-model")
tokenizer.save_pretrained("./lora-tuned-model")
# 推理示例(需要加载LoRA适配器)
from peft import PeftModel
tuned_model = PeftModel.from_pretrained(AutoModelForCausalLM.from_pretrained(model_name, device_map='auto'), "./lora-tuned-model")
input_text = "Question: What is the capital of Germany? Answer:"
input_ids = tokenizer(input_text, return_tensors="pt").to('cuda')
with torch.no_grad():
output = tuned_model.generate(**input_ids, max_length=100)
print(tokenizer.decode(output[0], skip_special_tokens=True))
2.3 评估微调效果:
使用评估指标(如 BLEU、ROUGE、Exact Match 等)评估微调后的 LLM 在生成任务上的表现。如果效果不理想,可以调整微调参数或增加训练数据。
3. RAG 流程实现
RAG 流程将检索和生成两个步骤结合起来,实现知识增强的 LLM 应用。
3.1 构建 RAG Pipeline:
RAG Pipeline 通常包含以下几个步骤:
- 问题编码: 将用户问题转换为向量表示。
- 知识检索: 在向量数据库中搜索与问题最相关的文本块。
- 上下文构建: 将检索到的文本块作为上下文,与问题一起输入 LLM。
- 答案生成: LLM 根据问题和上下文生成答案。
def rag_pipeline(query, vector_db, llm, prompt_template):
"""
实现 RAG Pipeline。
Args:
query: 用户问题。
vector_db: 向量数据库。
llm: LLM 模型。
prompt_template: prompt 模板。
Returns:
生成的答案。
"""
# 1. 问题编码
query_embedding = embed_text(query)
# 2. 知识检索
context = vector_db.search(query_embedding, top_k=3) #检索top3
# 3. 上下文构建
formatted_context = "n".join(context) #将检索到的文本块用换行符连接
prompt = prompt_template.format(context=formatted_context, question=query)
# 4. 答案生成
answer = llm(prompt)
return answer
# 示例
# 假设我们已经有了 vector_db 和 llm
# 简单的LLM mock,实际应用中替换为真正的LLM模型
def mock_llm(prompt):
return f"LLM的回答: {prompt}"
llm = mock_llm
prompt_template = """请根据以下上下文回答问题:
上下文:
{context}
问题:{question}
"""
query = "法国的首都是哪里?"
answer = rag_pipeline(query, db, llm, prompt_template)
print(answer)
3.2 Prompt 工程:
Prompt 的设计对 LLM 的生成质量至关重要。一个好的 Prompt 应该清晰地指示 LLM 的任务,并提供足够的上下文信息。
常用的 Prompt 工程技巧包括:
- 明确指令: 明确告诉 LLM 需要做什么,例如 "请根据以下上下文回答问题"。
- 提供上下文: 将检索到的文本块作为上下文提供给 LLM。
- 限制输出格式: 指定 LLM 的输出格式,例如 "请用简洁的语言回答"。
- Few-shot learning: 在 Prompt 中提供几个示例,引导 LLM 生成期望的答案。
4. 系统部署与优化
RAG 系统的部署需要考虑性能、可用性和可扩展性等因素。
4.1 API 封装:
将 RAG Pipeline 封装成 API,以便其他应用调用。常用的 API 框架包括:
- FastAPI: 高性能的 Python Web 框架,易于使用和部署。
- Flask: 轻量级的 Python Web 框架,适合小型应用。
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class Query(BaseModel):
text: str
@app.post("/rag")
async def rag_endpoint(query: Query):
"""
RAG API 接口。
"""
try:
answer = rag_pipeline(query.text, db, llm, prompt_template)
return {"answer": answer}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 运行 FastAPI 应用
# 需要安装 uvicorn: pip install uvicorn
# 运行命令: uvicorn main:app --reload (假设代码保存在 main.py 中)
4.2 性能优化:
RAG 系统的性能瓶颈通常在于向量检索和 LLM 推理。可以采取以下措施进行优化:
- 向量索引优化: 选择合适的向量索引算法,例如 HNSW、IVF 等。
- 缓存: 缓存检索结果和 LLM 的输出,避免重复计算。
- 模型量化: 降低 LLM 的精度,减少内存占用和计算量。
- 异步处理: 使用异步任务处理耗时的操作,提高并发能力。
4.3 监控与告警:
建立完善的监控体系,实时监控 RAG 系统的性能指标(如响应时间、吞吐量、错误率等)。设置告警规则,及时发现和解决问题。
5. 持续评估与迭代
RAG 系统的效果不是一成不变的。需要定期进行评估,并根据评估结果进行迭代优化。
5.1 评估指标:
常用的 RAG 评估指标包括:
- 答案相关性: 评估生成的答案与问题的相关程度。
- 答案准确性: 评估生成的答案是否正确。
- 上下文相关性: 评估检索到的上下文与问题的相关程度。
- 上下文利用率: 评估 LLM 是否有效利用了检索到的上下文。
5.2 评估方法:
- 人工评估: 由人工评估员对 RAG 系统的输出进行评估。
- 自动化评估: 使用自动化指标(如 ROUGE、BLEU、QA 评估模型等)对 RAG 系统的输出进行评估。
5.3 迭代优化:
根据评估结果,可以对 RAG 系统的各个环节进行迭代优化,包括:
- 数据更新: 更新向量数据库中的数据,增加新的知识。
- 模型微调: 使用新的数据微调 LLM 模型。
- Prompt 优化: 优化 Prompt 的设计,提升 LLM 的生成质量。
- 检索策略调整: 调整向量检索的参数和策略,提高检索效果。
# 自动化评估示例 (使用 ROUGE 指标)
from rouge import Rouge
def evaluate_rag(predictions, references):
"""
使用 ROUGE 指标评估 RAG 系统。
Args:
predictions: RAG 系统生成的答案列表。
references: 标准答案列表。
Returns:
一个包含 ROUGE 指标的字典。
"""
rouge = Rouge()
scores = rouge.get_scores(predictions, references, avg=True)
return scores
# 示例
predictions = ["The capital of France is Paris.", "The largest planet is Jupiter."]
references = ["Paris is the capital of France.", "Jupiter is the largest planet in the solar system."]
scores = evaluate_rag(predictions, references)
print(scores)
数据准备是关键,向量化要选择合适的模型
数据清洗预处理是基础,向量化embedding是核心。选择适合场景的embedding模型,保证向量的质量。文本分割方法也需要根据数据特点进行调整。
模型微调提升生成能力,LoRA是性价比之选
在特定领域,对LLM进行微调可以显著提升其生成能力。LoRA是一种高效的微调方法,能有效降低计算成本。微调数据集的质量直接影响微调效果,需要精心准备。
RAG Pipeline连接检索和生成,Prompt工程至关重要
RAG Pipeline将检索和生成两个步骤连接起来,实现知识增强的LLM应用。Prompt工程是RAG Pipeline的关键环节,好的prompt能引导LLM生成高质量的答案。
系统部署需要考虑性能和可用性,持续评估和迭代是常态
RAG系统的部署需要考虑性能、可用性和可扩展性等因素。API封装方便其他应用调用。持续评估RAG系统的效果,并根据评估结果进行迭代优化,是保证RAG系统长期有效性的关键。