RAG 检索链路加入多路召回后如何通过工程化权重融合提升准确率

RAG 检索链路多路召回的权重融合工程实践

各位朋友,大家好!今天我们来聊聊如何通过工程化的权重融合来提升 RAG (Retrieval-Augmented Generation) 检索链路的准确率,尤其是在引入多路召回策略之后。

RAG 已经成为构建基于大型语言模型 (LLM) 应用的重要技术。它通过检索外部知识库,然后将检索到的内容与用户查询一起传递给 LLM,从而增强 LLM 的知识覆盖面和生成内容的准确性。而多路召回则是进一步提升 RAG 性能的关键手段。

1. 多路召回:拓宽知识检索的维度

传统的 RAG 系统通常依赖单一的检索方法,例如基于关键词的检索或基于向量相似度的检索。然而,单一方法往往难以覆盖所有相关的知识。多路召回的核心思想是利用多种不同的检索策略,从不同的角度检索知识,从而提高召回率。

常见的多路召回策略包括:

  • 关键词检索 (Keyword Search): 基于关键词匹配的传统检索方法,例如使用 TF-IDF 或 BM25 算法。
  • 向量检索 (Vector Search): 将用户查询和知识库文档嵌入到同一向量空间,然后根据向量相似度进行检索。常用的嵌入模型包括 Sentence Transformers、OpenAI Embeddings 等。
  • 语义检索 (Semantic Search): 利用语义理解模型,捕捉用户查询和文档的深层语义关系,进行检索。
  • 知识图谱检索 (Knowledge Graph Retrieval): 如果知识库以知识图谱的形式存在,则可以利用图谱查询语言 (例如 SPARQL) 进行检索。
  • 混合检索 (Hybrid Search): 结合多种检索策略,例如先进行关键词检索,然后对检索结果进行向量相似度排序。

2. 多路召回带来的新问题:权重融合的必要性

引入多路召回后,我们得到的是来自不同检索策略的结果列表。这些结果列表通常具有不同的排序和评分标准。直接将它们合并可能会导致以下问题:

  • 不同策略的评分尺度不一致: 例如,关键词检索的 TF-IDF 分数可能远大于向量检索的余弦相似度分数。
  • 不同策略的准确率不同: 某些检索策略可能更擅长处理特定类型的查询。
  • 冗余信息: 多个检索策略可能召回相同或相似的文档。

为了解决这些问题,我们需要对来自不同召回策略的结果进行权重融合。权重融合的目标是根据不同策略的可靠性和相关性,对它们的结果进行加权组合,从而生成一个更准确的排序列表。

3. 权重融合的工程化方法:由简至繁

权重融合的方法有很多种,从简单到复杂,可以根据实际情况选择合适的方案。

3.1 基于规则的权重融合 (Rule-Based Weighting)

这是最简单的权重融合方法。它基于预定义的规则,为不同的召回策略分配固定的权重。

def rule_based_fusion(keyword_results, vector_results, keyword_weight=0.6, vector_weight=0.4):
  """
  基于规则的权重融合。

  Args:
    keyword_results: 关键词检索结果列表,每个元素是一个包含 document_id 和 score 的字典。
    vector_results: 向量检索结果列表,每个元素是一个包含 document_id 和 score 的字典。
    keyword_weight: 关键词检索结果的权重。
    vector_weight: 向量检索结果的权重。

  Returns:
    融合后的结果列表,按照 score 降序排列。
  """

  fused_results = {}

  for result in keyword_results:
    doc_id = result['document_id']
    score = result['score'] * keyword_weight
    fused_results[doc_id] = fused_results.get(doc_id, 0) + score

  for result in vector_results:
    doc_id = result['document_id']
    score = result['score'] * vector_weight
    fused_results[doc_id] = fused_results.get(doc_id, 0) + score

  # 将字典转换为列表并按照 score 排序
  fused_results_list = [{'document_id': doc_id, 'score': score} for doc_id, score in fused_results.items()]
  fused_results_list = sorted(fused_results_list, key=lambda x: x['score'], reverse=True)

  return fused_results_list

# 示例
keyword_results = [
    {'document_id': 'doc1', 'score': 0.8},
    {'document_id': 'doc2', 'score': 0.5},
    {'document_id': 'doc3', 'score': 0.3}
]

vector_results = [
    {'document_id': 'doc1', 'score': 0.9},
    {'document_id': 'doc4', 'score': 0.7},
    {'document_id': 'doc2', 'score': 0.4}
]

fused_results = rule_based_fusion(keyword_results, vector_results)
print(fused_results)

优点: 简单易实现,无需训练数据。

缺点: 权重是固定的,无法根据查询的特性进行调整。对不同查询场景的适应性差。需要人工调整权重,耗时耗力。

3.2 基于排序归一化的权重融合 (Rank Normalization)

这种方法首先将每个召回策略的结果列表进行排序归一化,然后根据预定义的权重进行加权组合。常见的排序归一化方法包括:

  • Min-Max 归一化: 将分数缩放到 [0, 1] 范围。
  • Z-Score 归一化: 将分数转换为标准正态分布。
  • Reciprocal Rank Fusion (RRF): RRF 是一种常用的排序融合算法,它根据文档在每个列表中的排名来计算其最终得分。
import numpy as np

def reciprocal_rank_fusion(results_list, k=60):
    """
    使用 Reciprocal Rank Fusion (RRF) 融合多个结果列表。

    Args:
        results_list: 一个列表,包含多个结果列表。每个结果列表是一个包含 document_id 和 score 的字典的列表。
        k: 用于 RRF 的常数。

    Returns:
        融合后的结果列表,按照 score 降序排列。
    """
    fused_scores = {}
    for results in results_list:
        for rank, result in enumerate(results):
            doc_id = result['document_id']
            fused_scores[doc_id] = fused_scores.get(doc_id, 0) + 1 / (rank + k)

    fused_results = [{'document_id': doc_id, 'score': score} for doc_id, score in fused_scores.items()]
    fused_results = sorted(fused_results, key=lambda x: x['score'], reverse=True)

    return fused_results

# 示例
keyword_results = [
    {'document_id': 'doc1', 'score': 0.8},
    {'document_id': 'doc2', 'score': 0.5},
    {'document_id': 'doc3', 'score': 0.3}
]

vector_results = [
    {'document_id': 'doc1', 'score': 0.9},
    {'document_id': 'doc4', 'score': 0.7},
    {'document_id': 'doc2', 'score': 0.4}
]

results_list = [keyword_results, vector_results]
fused_results = reciprocal_rank_fusion(results_list)
print(fused_results)

优点: 可以缓解不同策略评分尺度不一致的问题。

缺点: 仍然需要人工调整权重。无法充分利用原始分数信息。

3.3 基于机器学习的权重融合 (Learning to Rank)

这种方法将权重融合问题视为一个排序问题,利用机器学习模型来学习最佳的权重组合。常用的 Learning to Rank 模型包括:

  • LambdaMART: 一种基于梯度提升树 (Gradient Boosting Tree) 的排序模型。
  • RankNet: 一种基于神经网络的排序模型。
  • ListNet: 一种直接优化列表排序指标的排序模型。

3.3.1 数据准备

首先,需要准备训练数据。训练数据包含以下信息:

  • 查询 (Query): 用户输入的查询。
  • 文档 (Document): 从知识库中检索到的文档。
  • 特征 (Features): 从查询和文档中提取的特征,例如:
    • 关键词检索分数 (TF-IDF, BM25)。
    • 向量检索相似度 (Cosine Similarity)。
    • 查询和文档的长度。
    • 查询和文档的关键词匹配度。
    • 文档的 PageRank 值 (如果知识库是图结构)。
  • 标签 (Label): 文档与查询的相关性标签,例如:
    • 0: 不相关。
    • 1: 相关。
    • 2: 非常相关。

标签可以通过人工标注或利用点击日志等数据进行生成。

3.3.2 模型训练

然后,使用准备好的训练数据训练 Learning to Rank 模型。

import lightgbm as lgb
import numpy as np

def train_lgbm_ranker(train_data, train_labels, feature_names):
    """
    使用 LightGBM 训练 Learning to Rank 模型。

    Args:
        train_data: 训练数据,是一个 numpy 数组,每一行代表一个文档的特征向量。
        train_labels: 训练标签,是一个 numpy 数组,每一行代表一个文档的相关性标签。
        feature_names: 特征名称列表。

    Returns:
        训练好的 LightGBM 模型。
    """

    # 创建 LightGBM 数据集
    lgb_train = lgb.Dataset(train_data, train_labels)

    # 设置 LightGBM 参数
    params = {
        'objective': 'lambdarank',  # 使用 LambdaRank 损失函数
        'metric': 'ndcg',          # 使用 NDCG 作为评估指标
        'boosting_type': 'gbdt',     # 使用梯度提升树
        'num_leaves': 31,           # 叶子节点数量
        'learning_rate': 0.05,      # 学习率
        'feature_fraction': 0.9,    # 特征采样比例
        'bagging_fraction': 0.8,     # 数据采样比例
        'bagging_freq': 5,           # 数据采样频率
        'verbose': -1                # 静默模式
    }

    # 训练模型
    model = lgb.train(params, lgb_train, num_boost_round=100, feature_name=feature_names)

    return model

# 示例
# 假设我们有 4 个文档的训练数据
train_data = np.array([
    [0.8, 0.9, 0.7, 0.5],  # doc1
    [0.5, 0.4, 0.3, 0.2],  # doc2
    [0.3, 0.7, 0.2, 0.8],  # doc3
    [0.1, 0.2, 0.9, 0.6]   # doc4
])

# 对应的相关性标签
train_labels = np.array([2, 0, 1, 0])

# 特征名称
feature_names = ['keyword_score', 'vector_score', 'length_ratio', 'keyword_match']

# 训练模型
model = train_lgbm_ranker(train_data, train_labels, feature_names)

# 保存模型 (可选)
# model.save_model('lgbm_ranker.txt', num_iteration=model.best_iteration)

3.3.3 模型预测

最后,使用训练好的模型对新的查询和文档进行预测,得到融合后的排序列表。

def predict_lgbm_ranker(model, test_data):
    """
    使用训练好的 LightGBM 模型进行预测。

    Args:
        model: 训练好的 LightGBM 模型。
        test_data: 测试数据,是一个 numpy 数组,每一行代表一个文档的特征向量。

    Returns:
        预测的分数,是一个 numpy 数组。
    """

    # 使用模型进行预测
    predictions = model.predict(test_data)

    return predictions

def rerank_results(results_list, model, feature_names):
    """
    使用 Learning to Rank 模型对多路召回的结果进行重排序。

    Args:
        results_list: 一个列表,包含多个结果列表。每个结果列表是一个包含 document_id 和 原始特征 的字典的列表。
        model: 训练好的 Learning to Rank 模型。
        feature_names: 特征名称列表。

    Returns:
        重排序后的结果列表,按照 Learning to Rank 模型预测的分数降序排列。
    """
    all_features = []
    doc_ids = []

    for results in results_list:
        for result in results:
            features = [result[feature] for feature in feature_names] # 提取特征
            all_features.append(features)
            doc_ids.append(result['document_id'])

    all_features = np.array(all_features) # 转换为 numpy 数组

    # 使用模型预测分数
    scores = predict_lgbm_ranker(model, all_features)

    # 创建包含 document_id 和 score 的结果列表
    reranked_results = [{'document_id': doc_id, 'score': score} for doc_id, score in zip(doc_ids, scores)]

    # 按照分数降序排列
    reranked_results = sorted(reranked_results, key=lambda x: x['score'], reverse=True)

    return reranked_results

# 示例 (使用之前训练的模型)
# 假设我们有新的测试数据
test_data = np.array([
    [0.7, 0.8, 0.6, 0.4],  # doc5
    [0.6, 0.5, 0.4, 0.3],  # doc6
    [0.4, 0.6, 0.3, 0.7]   # doc7
])

# 假设我们从多路召回得到的结果列表(包含特征)
results_list = [
    [
        {'document_id': 'doc5', 'keyword_score': 0.7, 'vector_score': 0.8, 'length_ratio': 0.6, 'keyword_match': 0.4},
        {'document_id': 'doc6', 'keyword_score': 0.6, 'vector_score': 0.5, 'length_ratio': 0.4, 'keyword_match': 0.3}
    ],
    [
        {'document_id': 'doc7', 'keyword_score': 0.4, 'vector_score': 0.6, 'length_ratio': 0.3, 'keyword_match': 0.7}
    ]
]

# 使用 Learning to Rank 模型进行重排序
reranked_results = rerank_results(results_list, model, feature_names)
print(reranked_results)

优点: 可以根据查询的特性动态调整权重。可以充分利用各种特征信息。

缺点: 需要大量的训练数据。模型训练和部署的复杂度较高。

4. 工程实践中的注意事项

  • 特征工程: 特征工程是 Learning to Rank 的关键步骤。选择合适的特征可以显著提升模型的性能。
  • 数据质量: 训练数据的质量直接影响模型的性能。需要保证训练数据的准确性和多样性。
  • 模型选择: 选择合适的 Learning to Rank 模型需要根据实际情况进行实验和评估。
  • 在线学习: 可以利用在线学习技术,不断更新模型,以适应用户行为的变化。
  • AB 测试: 在将权重融合策略部署到生产环境之前,需要进行 AB 测试,评估其效果。

5. 案例分析:电商搜索

假设我们正在构建一个电商搜索系统,用户可以输入关键词搜索商品。我们可以采用以下多路召回策略:

  • 关键词检索: 基于商品标题和描述进行关键词检索。
  • 类目检索: 根据用户输入的关键词匹配商品类目。
  • 属性检索: 根据用户输入的关键词匹配商品属性。
  • 向量检索: 基于商品标题和描述的向量嵌入进行相似度检索。

对于权重融合,我们可以首先尝试基于规则的权重融合,然后逐步升级到基于 Learning to Rank 的权重融合。

6. 优化RAG检索效果:不仅仅是权重

除了权重融合之外,还有一些其他的工程技巧可以用来提升 RAG 检索链路的准确率:

  • 查询改写 (Query Rewriting): 利用 LLM 对用户查询进行改写,例如扩展关键词、纠正拼写错误、添加上下文信息。
  • 文档预处理 (Document Preprocessing): 对知识库文档进行清洗、去重、分段等预处理操作。
  • 相关性过滤 (Relevance Filtering): 利用 LLM 对检索结果进行相关性过滤,去除不相关的文档。

7. 总结:权重融合是提升RAG性能的关键

多路召回是提升 RAG 系统召回率的有效手段,而权重融合则是解决多路召回带来的结果融合问题的关键。选择合适的权重融合方法,并结合其他的工程技巧,可以显著提升 RAG 系统的准确率和用户体验。

检索策略选择与融合方法:需要依据场景特点

多路召回策略的选择和权重融合方法的选择需要根据具体的应用场景进行调整。没有一种通用的解决方案可以适用于所有情况。需要进行大量的实验和评估,才能找到最佳的方案。

不断迭代优化:持续提升RAG系统性能

RAG 系统的构建是一个持续迭代的过程。需要不断地收集用户反馈,分析系统性能,并进行相应的优化。通过持续的迭代和优化,才能构建出高质量的 RAG 系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注