各位技术同仁,大家好!
今天,我们将深入探讨一个在构建高效、智能RAG(检索增强生成)系统时日益重要的概念:语义路由(Semantic Routing)。随着大模型(LLM)能力的飞速发展,它们在理解和生成自然语言方面的表现令人惊叹。然而,仅凭大模型自身的力量,在处理特定领域、实时数据或需要高度事实准确性的场景时,仍然存在局限性。RAG的出现弥补了这一差距,它通过从外部知识库中检索相关信息来增强LLM的生成能力,显著提升了模型的准确性、可靠性和时效性。
然而,当我们的知识库变得庞大、异构,并且涵盖多个领域时,一个核心挑战浮现:如何确保RAG系统总能从“正确”的知识源中检索到“最相关”的信息?这就是语义路由发挥作用的地方。它不仅仅是简单的关键词匹配或基于规则的转发,而是通过深度理解用户查询的意图和类型,智能地将请求导向最合适的知识源或处理流程。
想象一下,您正在构建一个企业级的智能客服系统。用户可能会问关于“产品技术规格”、“订单状态查询”、“退换货政策”或者“公司最新财报”的问题。这些问题分别对应着产品数据库、ERP系统、客户服务文档和财务报告。如果只是将所有文档混合在一个巨大的向量数据库中进行检索,可能会导致以下问题:
- 检索效率低下:在海量文档中检索,即便使用了向量相似度,仍然会增加计算负担和延迟。
- 相关性漂移:一个关于“退货”的问题,可能会错误地检索到“退役产品”的技术文档,因为它们的词向量可能在某些维度上相似。
- 信息混淆:不同领域的知识可能存在术语上的重叠,但含义完全不同,导致LLM接收到混淆的信息并产生“幻觉”。
- 资源浪费:为所有查询都激活所有可能的知识源或API是不经济且低效的。
语义路由正是为了解决这些问题而生。它充当了一个智能的“交通警察”,根据查询的语义特征,将其精确地引导至最能提供有效答案的“车道”上。今天,我将带领大家从理论到实践,深入剖析语义路由的原理、架构,并通过丰富的代码示例,展示如何在实际项目中构建一个高效的语义路由RAG系统。
RAG基础回顾:为什么我们需要超越简单的检索
在深入语义路由之前,让我们快速回顾一下RAG的核心工作原理以及它面临的挑战。
RAG核心工作原理
RAG通常遵循以下步骤:
- 检索(Retrieval):当接收到用户查询时,系统会从一个或多个外部知识库中检索出与查询最相关的一小段或几段文本(称为上下文或证据)。这通常通过将用户查询转换为向量,然后与知识库中预先嵌入的文档块进行向量相似度搜索来完成。
- 增强(Augmentation):检索到的相关文本作为额外的上下文信息,与原始用户查询一起,被传递给大语言模型。
- 生成(Generation):大语言模型利用这些增强的上下文信息来生成一个更准确、更具信息量且与事实一致的回答。
传统RAG的问题与局限
尽管RAG显著提升了LLM的表现,但当知识库的复杂性和多样性增加时,传统RAG的局限性也日益凸显:
- 单一向量数据库的瓶颈:
- 语义冲突:在同一个大型向量数据库中存储来自不同领域、可能存在相同词汇但不同含义的文档时,向量相似度检索可能会变得不准确。例如,“Apple”可能指水果,也可能指科技公司。
- 噪音干扰:当知识库庞大且包含大量不相关信息时,检索算法容易受到“噪音”的干扰,导致检索到不那么相关的文档,甚至错误信息。
- 效率与成本:对一个超大型向量数据库进行检索,无论是计算资源还是时间延迟,都可能成为瓶颈。
- 面对多领域、多格式知识的挑战:
- 知识异构性:企业知识可能存储在文档、数据库、API、知识图谱等多种格式中。单一的向量检索难以高效利用所有这些资源。
- 时效性要求:某些知识需要实时更新(如股票价格、订单状态),而另一些则是静态文档。将所有信息都嵌入并存储在向量数据库中,难以满足实时性要求。
- 检索质量对生成效果的关键影响:
- “垃圾进,垃圾出”:如果检索到的上下文信息质量不高、不准确或包含错误,LLM生成的回答也必然会受到影响,甚至产生“幻觉”。
- 信息过载:有时检索到的上下文信息过多,超出了LLM的上下文窗口限制,或者信息过于冗余,导致LLM难以聚焦核心。
为了克服这些挑战,我们需要一种更智能的机制来管理和利用多样化的知识源。语义路由正是这一机制的核心。
语义路由的核心概念与优势
什么是语义路由?
语义路由,简单来说,是一种智能决策机制,它能够根据用户查询的语义内容、意图、类型或主题,动态地将该查询导向最适合处理它的知识源、工具或处理流程。它超越了传统的基于关键词或固定规则的路由,而是利用先进的自然语言处理(NLP)技术,特别是嵌入模型和大语言模型的能力,来理解查询背后的深层含义。
语义路由与传统路由的区别
| 特性 | 传统路由 (基于规则/关键词) | 语义路由 (基于语义理解) |
|---|---|---|
| 决策依据 | 预定义规则、硬编码关键词、正则表达式 | 用户查询的语义嵌入、意图识别、上下文分析 |
| 灵活性 | 较低,需要人工维护规则,难以适应新问题和措辞 | 较高,能处理同义词、近义词、不同表达方式,自适应性强 |
| 准确性 | 受限于规则覆盖范围,容易误判或漏判 | 更高,能捕捉深层意图,减少误判 |
| 扩展性 | 较差,增加新知识源或问题类型需大量修改规则 | 较好,可利用模型泛化能力或少量样本进行微调 |
| 处理歧义 | 几乎无法处理,除非规则明确涵盖所有歧义场景 | 能够更好地处理语义歧义,根据上下文进行判断 |
| 实现难度 | 规则复杂时维护难度大,初期可能简单 | 初期需要模型训练或LLM集成,但长期维护成本可能更低 |
语义路由的关键优势
- 提高检索精确度:
- 将查询导向专业化的知识库,大大缩小了检索范围,从而更容易找到高度相关的文档。
- 避免了不同领域知识之间的语义冲突和干扰,提高了检索的“信噪比”。
- 降低延迟与优化资源利用:
- 无需对所有知识库进行全面检索,只激活最相关的部分,显著减少了计算量和API调用次数。
- 对于外部API或数据库,仅在需要时才进行调用,降低了运营成本。
- 提升RAG系统的可扩展性:
- 当需要添加新的知识领域时,只需为新领域建立独立的知识库和相应的路由规则,而无需改动或重新索引整个系统。
- 使得管理异构、多样化的知识源变得更加模块化和高效。
- 增强用户体验:
- 用户能够更快地获得更准确、更专业的回答,减少了等待时间和不满意度。
- 系统能够处理更广泛、更复杂的用户查询,提升了整体智能水平。
- 降低“幻觉”风险:
- 通过提供更精确、更聚焦的上下文,降低了LLM接收到错误或不相关信息而产生“幻觉”的风险。
- 支持复杂工作流:
- 语义路由不仅可以导向知识库,还可以导向特定的工具(如计算器、日历查询)、外部API(如天气服务、股票查询)或特定的业务流程。
语义路由的架构与实现模式
一个典型的语义路由RAG系统架构通常包含以下核心组件:
高层架构概览
+----------------+ +-------------------+ +---------------------+
| User Query |------>| Query Router |------>| Knowledge Source A |
+----------------+ | (Semantic Logic) | | (Vector DB A) |
+-------------------+ +---------------------+
| ^
| |
| |
v v
+---------------------+ +---------------------+
| Knowledge Source B |------>| LLM (Generation) |
| (Vector DB B) | | (with retrieved context)|
+---------------------+ +---------------------+
| ^
| |
v v
+---------------------+ +---------------------+
| Knowledge Source C |------>| User Response |
| (External API/DB) | +---------------------+
+---------------------+
主要组成部分
-
Query Encoder/Embedder:
- 负责将原始的用户查询转换为一个高维的数值向量(嵌入)。这是进行语义理解的基础。
- 通常使用预训练的语言模型(如Sentence Transformers, OpenAI Embeddings, BGE等)来完成。
-
Router/Classifier:
- 核心组件,接收查询嵌入,并根据预设的逻辑决定将查询导向哪个目标。
- 这部分可以是基于大语言模型、独立的机器学习模型,或者向量相似度匹配等。
-
Knowledge Sources (目标):
- 可以是各种形式的知识存储和检索系统:
- 多个专门化的向量数据库:每个数据库存储特定领域的文档嵌入。
- 传统关系型数据库(RDBMS):通过SQL查询获取结构化数据。
- 外部API:用于获取实时数据或执行特定功能(如天气查询、股票查询、订单查询)。
- 知识图谱:用于复杂关系查询。
- 内部文档管理系统:用于检索非结构化文档。
- 可以是各种形式的知识存储和检索系统:
-
Orchestrator(编排器):
- 协调整个RAG流程。它接收路由器的决策,调用相应的知识源进行检索,然后将检索结果和原始查询一起传递给LLM进行生成。
- LangChain、LlamaIndex等框架提供了强大的编排能力。
实现模式
语义路由的实现方式多种多样,可以根据项目的复杂性、数据量和性能要求选择不同的模式。
1. 基于大模型自身能力的路由 (LLM-as-a-Router)
这种模式利用大语言模型强大的理解、推理和指令遵循能力来直接进行路由决策。
-
Few-shot prompting:
- 通过向LLM提供少量示例,展示不同类型的用户查询应该映射到哪个目标。
- 适用于路由目标数量不多,且描述清晰的场景。
- 优点:实现简单,无需额外训练模型。
- 缺点:性能受限于LLM的上下文窗口和对提示词的敏感性;成本可能较高。
-
Function calling / Tool use:
- 利用支持函数调用功能的LLM(如OpenAI GPT系列),定义一系列“工具”或“函数”,每个函数对应一个路由目标。
- LLM根据用户查询,决定调用哪个函数,并提取必要的参数。
- 优点:高度灵活,LLM能自主决定工具选择和参数提取,适合复杂的路由逻辑。
- 缺点:依赖于特定LLM的能力;仍存在一定的幻觉和不确定性。
-
Agentic approach:
- 更高级的LLM-as-a-Router形式,LLM作为代理,可以进行多步骤推理、规划,并动态选择工具。
- 优点:最灵活,能处理最复杂的路由和任务。
- 缺点:复杂度最高,性能和成本也最高,推理时间可能较长。
2. 基于独立分类模型的路由 (Standalone Classifier)
这种模式将路由任务视为一个标准的文本分类问题。
-
传统机器学习模型:
- 如SVM、Logistic Regression、朴素贝叶斯等。
- 需要将用户查询转换为特征向量(如TF-IDF, Bag-of-Words),然后训练分类器。
- 优点:计算效率高,模型轻量。
- 缺点:对特征工程依赖大,泛化能力相对较弱,可能难以捕捉复杂语义。
-
深度学习模型:
- 如BERT、RoBERTa、Sentence Transformers等预训练语言模型。
- 用户查询通过这些模型生成语义嵌入,然后输入到简单的分类头(如全连接层)进行分类。
- 优点:语义理解能力强,泛化能力好,准确度高。
- 缺点:需要标注数据进行微调,模型相对较大,推理速度可能慢于传统模型。
-
专门训练的文本分类器:
- 针对特定任务和领域,训练一个自定义的深度学习分类器。
- 优点:性能最优,能高度适应特定场景。
- 缺点:训练成本高,需要大量高质量标注数据。
3. 基于向量相似度的路由 (Vector Similarity-based Routing)
这种模式利用嵌入模型的语义表示能力,将每个路由目标也表示为一个或多个向量。
- 目标表示:
- 可以为每个知识源/类别创建“代表性”的查询嵌入(如该类别下所有查询的平均嵌入)。
- 或者,为每个目标定义一组“原型查询”或“描述性文本”,将其嵌入。
- 路由逻辑:
- 将用户查询嵌入,然后计算查询嵌入与所有目标嵌入之间的相似度(如余弦相似度)。
- 选择相似度最高的那个目标进行路由。
- 优点:实现相对简单,无需训练分类模型,动态性好(易于添加新目标)。
- 缺点:如果目标之间的语义边界不清晰,可能会导致误判;目标表示的质量至关重要。
4. 混合模式 (Hybrid Approach)
在实际应用中,通常会结合上述多种模式以达到最佳效果。例如:
- 先用一个轻量级分类器进行粗粒度路由,再用LLM进行细粒度决策或参数提取。
- 结合LLM Function Calling和向量相似度,让LLM推荐工具,但工具的具体知识库选择由向量相似度决定。
代码实践:构建一个简易的语义路由RAG系统
接下来,我们将通过一个具体的场景来实践上述概念。
场景设定
假设我们正在为一个电商平台构建一个智能客服RAG系统。该系统需要处理以下三类用户问题:
- 产品技术支持:关于产品功能、使用方法、故障排除等。
- 订单查询:关于订单状态、物流信息、支付问题等。
- 退换货政策:关于退货流程、退款时间、换货条件等。
每类问题都对应着一个专门的知识库,我们希望系统能够根据用户提问的类型,自动将查询导向正确的知识库进行检索。
我们将使用以下库:
openai:用于LLM调用和嵌入。langchain:用于构建RAG链和集成组件。sentence-transformers:用于本地嵌入模型(作为OpenAI嵌入的替代方案,或独立分类器)。scikit-learn:用于训练分类器。chromadb:作为轻量级向量数据库。
环境准备
首先,安装必要的库:
pip install openai langchain chromadb sentence-transformers scikit-learn numpy
设置OpenAI API Key(如果您选择使用OpenAI的LLM和嵌入):
import os
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 或者使用 dotenv 从 .env 文件加载
# from dotenv import load_dotenv
# load_dotenv()
知识库准备
我们将为每个类别创建一些示例文档,并分别存储到独立的Chroma向量数据库中。
from langchain.text_splitter import CharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
# 1. 定义知识库内容
product_docs = [
"我们的智能手机支持最新的5G网络,配备A15仿生芯片和超视网膜XDR显示屏。",
"如果您的耳机无法连接蓝牙,请检查设备是否已开启配对模式,并尝试重启手机。",
"笔记本电脑的电池续航时间在正常使用情况下约为10小时,取决于具体应用。",
"请问如何进行软件更新?您可以在设置中找到“通用”->“软件更新”选项。",
"摄像头出现模糊,可能是镜头有污渍,请使用软布擦拭。"
]
order_docs = [
"您的订单号为ORD123456789,已于2023年10月26日发货,预计1-3个工作日送达。",
"查询订单状态,请登录您的账户,在“我的订单”页面查看。",
"支付失败可能是由于银行卡余额不足或网络延迟,请稍后再试。",
"如何修改收货地址?请在订单发货前联系客服。",
"我可以在哪里看到我的物流信息?您可以在订单详情页点击物流追踪链接。"
]
return_exchange_docs = [
"根据我们的退换货政策,商品可在收到后7天内无理由退货,15天内可换货。",
"退款通常在商品寄回并仓库确认无误后3-5个工作日内完成。",
"非质量问题退货,运费需由买家承担。",
"如何申请退货?请在网站“我的订单”中找到对应订单并点击“申请退货”。",
"换货流程与退货类似,但您需要选择新的商品型号或颜色。"
]
# 2. 初始化嵌入模型和LLM
# 如果不想使用OpenAI,可以使用 HuggingFaceEmbeddings 和 Ollama/Local LLM
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# 3. 创建文本分割器
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
# 4. 为每个知识库创建并填充独立的向量数据库
def create_vector_db(docs, collection_name):
# 将字符串列表转换为 LangChain Document 对象
langchain_docs = [Document(page_content=d) for d in docs]
# 分割文档
split_docs = text_splitter.split_documents(langchain_docs)
# 创建 Chroma 向量数据库
vector_db = Chroma.from_documents(
documents=split_docs,
embedding=embeddings,
collection_name=collection_name,
persist_directory="./chroma_dbs" # 持久化到磁盘
)
return vector_db
print("正在创建产品技术支持知识库...")
product_db = create_vector_db(product_docs, "product_support")
print("产品技术支持知识库创建完成。")
print("正在创建订单查询知识库...")
order_db = create_vector_db(order_docs, "order_inquiry")
print("订单查询知识库创建完成。")
print("正在创建退换货政策知识库...")
return_exchange_db = create_vector_db(return_exchange_docs, "return_exchange_policy")
print("退换货政策知识库创建完成。")
# 加载持久化的数据库 (如果已经创建过)
# product_db = Chroma(persist_directory="./chroma_dbs", embedding_function=embeddings, collection_name="product_support")
# order_db = Chroma(persist_directory="./chroma_dbs", embedding_function=embeddings, collection_name="order_inquiry")
# return_exchange_db = Chroma(persist_directory="./chroma_dbs", embedding_function=embeddings, collection_name="return_exchange_policy")
# 验证检索功能
# print("n测试产品知识库检索:")
# res = product_db.similarity_search("手机无法开机怎么办?")
# print([d.page_content for d in res])
# print("n测试订单知识库检索:")
# res = order_db.similarity_search("我的包裹到哪了?")
# print([d.page_content for d in res])
路由器实现
我们将演示三种不同的路由器实现方式。
方法一:基于LLM Function Calling 的路由器
使用OpenAI的Function Calling能力来让LLM决定调用哪个“工具”(即哪个知识库)。
首先,定义每个知识库对应的工具。
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain.agents import create_tool_calling_agent, AgentExecutor
# 定义每个知识库的检索器
product_retriever = product_db.as_retriever()
order_retriever = order_db.as_retriever()
return_exchange_retriever = return_exchange_db.as_retriever()
# 定义工具,每个工具代表一个知识库的检索功能
@tool
def get_product_support_info(query: str) -> str:
"""
当你需要回答关于产品功能、使用方法、故障排除、技术规格等产品相关问题时,使用此工具。
例如:'手机怎么开机?', '耳机连接不上蓝牙怎么办?', '笔记本电池续航多久?'
"""
docs = product_retriever.invoke(query)
return "n".join([d.page_content for d in docs])
@tool
def get_order_inquiry_info(query: str) -> str:
"""
当你需要回答关于订单状态、物流信息、支付问题、收货地址修改等订单相关问题时,使用此工具。
例如:'我的订单发货了吗?', '物流信息在哪里看?', '支付失败了怎么办?'
"""
docs = order_retriever.invoke(query)
return "n".join([d.page_content for d in docs])
@tool
def get_return_exchange_policy_info(query: str) -> str:
"""
当你需要回答关于退货流程、退款时间、换货条件、运费承担等退换货政策相关问题时,使用此工具。
例如:'怎么申请退货?', '退款多久能到账?', '非质量问题退货运费谁承担?'
"""
docs = return_exchange_retriever.invoke(query)
return "n".join([d.page_content for d in docs])
# 将所有工具放入一个列表中
tools = [get_product_support_info, get_order_inquiry_info, get_return_exchange_policy_info]
# 创建一个 Agent,让LLM根据用户问题选择并调用工具
# 注意:这里我们只用Agent来做路由决策和工具调用,实际的RAG生成部分可以单独构建
agent_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个智能客服助手,根据用户问题选择最合适的工具来获取信息。"),
("user", "{input}"),
("placeholder", "{agent_scratchpad}") # 代理的思考过程
])
agent = create_tool_calling_agent(llm, tools, agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 封装一个 RAG 链
def run_llm_routed_rag(question: str):
print(f"n--- LLM Function Calling 路由器处理查询: '{question}' ---")
# 让 Agent 执行,它会选择工具并返回结果
response = agent_executor.invoke({"input": question})
# AgentExecutor的输出中包含了最终的答案
final_answer = response["output"]
print(f"最终回答: {final_answer}")
return final_answer
# 测试
run_llm_routed_rag("我的手机无法开机怎么办?")
run_llm_routed_rag("我的订单ORD123456789发货了吗?")
run_llm_routed_rag("我买的衣服不合适,可以退货吗?")
run_llm_routed_rag("你们的产品都有哪些颜色?") # LLM可能会选择产品知识库,但可能找不到确切答案,或泛化回答
方法二:基于独立分类模型的路由器 (Sentence Transformers + Scikit-learn)
这种方法需要训练一个独立的分类模型来预测查询类别。
首先,准备训练数据。
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sentence_transformers import SentenceTransformer
# 1. 准备训练数据
# 示例文本和对应的类别标签
training_queries = [
"手机屏幕坏了怎么修?", "我的笔记本电脑充不进电", "如何更新设备的固件?", "产品规格在哪里查看?", "智能手表如何配对手机?",
"订单号ORD987654321的物流信息", "我的包裹什么时候能到?", "订单状态显示待发货是什么意思?", "我能取消订单吗?", "付款失败了怎么办?",
"退货流程是怎样的?", "换货需要支付运费吗?", "退款一般需要几天才能到账?", "我买错了东西能退吗?", "退换货的有效期是多久?"
]
labels = [
"product_support", "product_support", "product_support", "product_support", "product_support",
"order_inquiry", "order_inquiry", "order_inquiry", "order_inquiry", "order_inquiry",
"return_exchange_policy", "return_exchange_policy", "return_exchange_policy", "return_exchange_policy", "return_exchange_policy"
]
label_map = {
"product_support": 0,
"order_inquiry": 1,
"return_exchange_policy": 2
}
reverse_label_map = {v: k for k, v in label_map.items()}
numeric_labels = [label_map[l] for l in labels]
# 2. 初始化Sentence Transformer模型进行嵌入
# 使用一个轻量级的预训练模型
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
# 3. 生成查询的嵌入
print("正在生成训练数据的嵌入...")
query_embeddings = embedding_model.encode(training_queries)
print("嵌入生成完成。")
# 4. 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
query_embeddings, numeric_labels, test_size=0.2, random_state=42
)
# 5. 训练一个分类器(这里使用支持向量机 SVM)
print("正在训练分类器...")
classifier = SVC(kernel='linear', probability=True) # probability=True 允许预测概率
classifier.fit(X_train, y_train)
print("分类器训练完成。")
# 6. 评估分类器(可选)
y_pred = classifier.predict(X_test)
print("n分类器评估报告:")
print(classification_report(y_test, y_pred, target_names=label_map.keys()))
# 7. 定义路由函数
def route_with_classifier(question: str):
query_embedding = embedding_model.encode([question])
predicted_label_idx = classifier.predict(query_embedding)[0]
predicted_label = reverse_label_map[predicted_label_idx]
# 获取预测概率 (如果需要)
probabilities = classifier.predict_proba(query_embedding)[0]
confidence = probabilities[predicted_label_idx]
print(f"分类器预测类别: {predicted_label} (置信度: {confidence:.2f})")
return predicted_label, confidence
# 8. 集成到RAG流程中
def run_classifier_routed_rag(question: str):
print(f"n--- 独立分类模型路由器处理查询: '{question}' ---")
predicted_category, confidence = route_with_classifier(question)
# 根据预测类别选择对应的检索器
selected_retriever = None
if predicted_category == "product_support":
selected_retriever = product_retriever
elif predicted_category == "order_inquiry":
selected_retriever = order_retriever
elif predicted_category == "return_exchange_policy":
selected_retriever = return_exchange_retriever
else:
print("未识别的查询类别,无法进行检索。")
return "抱歉,我无法理解您的问题类型,请尝试更具体地描述。"
# 执行检索
print(f"正在从 '{predicted_category}' 知识库检索...")
retrieved_docs = selected_retriever.invoke(question)
context = "n".join([d.page_content for d in retrieved_docs])
print(f"检索到以下上下文:n{context}")
# 使用LLM生成答案
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是一个友好的客服助手,请根据提供的上下文回答用户问题。如果上下文不包含足够的信息,请告知用户你无法回答。"),
("user", "上下文: {context}nn问题: {question}")
])
rag_chain = prompt_template | llm
response = rag_chain.invoke({"context": context, "question": question})
final_answer = response.content
print(f"最终回答: {final_answer}")
return final_answer
# 测试
run_classifier_routed_rag("我的手机无法开机怎么办?")
run_classifier_routed_rag("我的订单ORD123456789发货了吗?")
run_classifier_routed_rag("我买的衣服不合适,可以退货吗?")
run_classifier_routed_rag("你们最近有什么促销活动?") # 这个查询可能不会被正确分类,因为训练数据中没有类似内容
方法三:基于向量相似度的路由器
这种方法不需要显式训练一个分类器,而是通过计算用户查询与各个类别代表性嵌入的相似度来决定路由。
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity
# 1. 为每个类别创建代表性查询的嵌入
# 我们可以使用之前用于训练分类器的查询,或者额外准备一些代表性查询
category_queries = defaultdict(list)
category_queries["product_support"].extend([
"产品功能介绍", "如何使用设备", "设备故障排除", "技术参数咨询", "软件更新问题"
])
category_queries["order_inquiry"].extend([
"订单状态查询", "物流信息跟踪", "支付问题", "修改收货地址", "取消订单"
])
category_queries["return_exchange_policy"].extend([
"退货流程", "退款政策", "换货条件", "运费承担", "售后服务"
])
# 2. 生成每个类别的平均嵌入作为类别向量
category_embeddings = {}
for category, queries in category_queries.items():
embeddings_list = embedding_model.encode(queries)
category_embeddings[category] = np.mean(embeddings_list, axis=0)
# 3. 定义路由函数
def route_with_vector_similarity(question: str):
query_embedding = embedding_model.encode([question])[0]
similarities = {}
for category, cat_embedding in category_embeddings.items():
# 计算余弦相似度
similarity = cosine_similarity([query_embedding], [cat_embedding])[0][0]
similarities[category] = similarity
# 选择相似度最高的类别
predicted_category = max(similarities, key=similarities.get)
max_similarity = similarities[predicted_category]
print(f"向量相似度预测类别: {predicted_category} (相似度: {max_similarity:.2f})")
return predicted_category, max_similarity
# 4. 集成到RAG流程中
def run_vector_routed_rag(question: str):
print(f"n--- 向量相似度路由器处理查询: '{question}' ---")
predicted_category, similarity = route_with_vector_similarity(question)
# 根据预测类别选择对应的检索器
selected_retriever = None
if predicted_category == "product_support":
selected_retriever = product_retriever
elif predicted_category == "order_inquiry":
selected_retriever = order_retriever
elif predicted_category == "return_exchange_policy":
selected_retriever = return_exchange_retriever
else:
print("未识别的查询类别,无法进行检索。")
return "抱歉,我无法理解您的问题类型,请尝试更具体地描述。"
# 执行检索
print(f"正在从 '{predicted_category}' 知识库检索...")
retrieved_docs = selected_retriever.invoke(question)
context = "n".join([d.page_content for d in retrieved_docs])
print(f"检索到以下上下文:n{context}")
# 使用LLM生成答案
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是一个友好的客服助手,请根据提供的上下文回答用户问题。如果上下文不包含足够的信息,请告知用户你无法回答。"),
("user", "上下文: {context}nn问题: {question}")
])
rag_chain = prompt_template | llm
response = rag_chain.invoke({"context": context, "question": question})
final_answer = response.content
print(f"最终回答: {final_answer}")
return final_answer
# 测试
run_vector_routed_rag("手机黑屏了怎么办?")
run_vector_routed_rag("我的包裹到哪了?")
run_vector_routed_rag("怎么退换货?")
run_vector_routed_rag("你们的产品都有哪些型号?")
LangChain Expression Language (LCEL) 与 LlamaIndex Agentic Routing 简化实现
LangChain和LlamaIndex提供了更高级的抽象,可以简化语义路由的实现。
LangChain LCEL RunnableBranch
RunnableBranch 允许根据条件路由不同的Runnable链。
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
# 定义一个简单的LLM分类器,直接输出类别名称
classifier_llm_prompt = ChatPromptTemplate.from_messages([
("system", """根据用户的问题,将其归类为以下之一:
'product_support': 关于产品功能、使用方法、故障排除、技术规格等。
'order_inquiry': 关于订单状态、物流信息、支付问题、收货地址修改等。
'return_exchange_policy': 关于退货流程、退款时间、换货条件、运费承担等。
如果问题不属于上述任何类别,请回答 'general_inquiry'。
只输出类别名称,不要输出其他任何内容。
"""),
("user", "{question}")
])
# 这是一个简化的分类器,直接使用LLM进行判断
llm_classifier = classifier_llm_prompt | llm | StrOutputParser()
# 定义每个类别的RAG链
def create_rag_chain(retriever):
prompt = ChatPromptTemplate.from_messages([
("system", "你是智能客服助手,请根据提供的上下文回答用户问题。如果上下文不包含足够信息,请告知用户。"),
("user", "上下文: {context}nn问题: {question}")
])
# 这里的RunnableLambda只是为了从retriever.invoke()的List[Document]中提取page_content
return (
{"context": retriever | RunnableLambda(lambda docs: "n".join([d.page_content for d in docs])), "question": RunnableLambda(lambda x: x["question"])}
| prompt
| llm
| StrOutputParser()
)
product_rag_chain = create_rag_chain(product_retriever)
order_rag_chain = create_rag_chain(order_retriever)
return_exchange_rag_chain = create_rag_chain(return_exchange_retriever)
# 定义一个通用RAG链,用于无法分类的查询(或指向一个通用知识库)
general_rag_chain_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个通用的智能客服,请友好地回答用户问题。如果无法回答,请建议用户联系人工客服。"),
("user", "{question}")
])
general_rag_chain = general_rag_chain_prompt | llm | StrOutputParser()
# 使用 RunnableBranch 进行路由
# LLM分类器输出的类别名称作为判断条件
routing_chain = RunnableBranch(
(RunnableLambda(lambda x: llm_classifier.invoke({"question": x["question"]}) == "product_support"), product_rag_chain),
(RunnableLambda(lambda x: llm_classifier.invoke({"question": x["question"]}) == "order_inquiry"), order_rag_chain),
(RunnableLambda(lambda x: llm_classifier.invoke({"question": x["question"]}) == "return_exchange_policy"), return_exchange_rag_chain),
general_rag_chain # 默认分支
)
def run_lcel_routed_rag(question: str):
print(f"n--- LCEL RunnableBranch 路由器处理查询: '{question}' ---")
response = routing_chain.invoke({"question": question})
print(f"最终回答: {response}")
return response
run_lcel_routed_rag("我的耳机没有声音了,怎么回事?")
run_lcel_routed_rag("我的订单什么时候能收到?")
run_lcel_routed_rag("我想退货,流程是什么?")
run_lcel_routed_rag("今天天气怎么样?") # 会进入 general_inquiry
LlamaIndex RouterQueryEngine
LlamaIndex也提供了强大的路由能力,特别是 RouterQueryEngine。它通常与 Selector 和 QueryEngineTool 结合使用。
# from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
# from llama_index.core.tools import QueryEngineTool, ToolMetadata
# from llama_index.core.selectors import LLMSingleSelector
# from llama_index.core.query_engine import RouterQueryEngine
# from llama_index.llms.openai import OpenAI
# from llama_index.embeddings.openai import OpenAIEmbedding
# # 假设您已经设置了LlamaIndex的ServiceContext
# # from llama_index.core import ServiceContext
# # llm = OpenAI(model="gpt-3.5-turbo")
# # embed_model = OpenAIEmbedding(model="text-embedding-ada-002")
# # service_context = ServiceContext.from_defaults(llm=llm, embed_model=embed_model)
# # 为了简化,这里我们直接使用之前LangChain创建的 Chroma DBs 作为 LlamaIndex 的 VectorStore
# from llama_index.vector_stores.chroma import ChromaVectorStore
# from llama_index.core import StorageContext
# from llama_index.core import VectorStoreIndex
# # LlamaIndex需要自己的LLM和Embeddings配置
# # 如果你使用LangChain的LLM和Embeddings,需要进行适配
# # 或者直接使用LlamaIndex原生的OpenAI设置
# from llama_index.llms.openai import OpenAI
# from llama_index.embeddings.openai import OpenAIEmbedding
# from llama_index.core import Settings
# Settings.llm = OpenAI(model="gpt-3.5-turbo")
# Settings.embed_model = OpenAIEmbedding(model="text-embedding-ada-002")
# # 假设我们从 LangChain 的 Chroma 数据库加载,需要重新构建 LlamaIndex 的 Index
# # 这里为了演示,我们直接用简化的方式创建 LlamaIndex 的 VectorStoreIndex
# # 实际应用中,您会从文件加载并构建 LlamaIndex 的 Index
# from llama_index.core import SimpleDirectoryReader
# # Helper function to create LlamaIndex VectorStoreIndex from text list
# def create_llama_index(docs, index_name):
# # LlamaIndex 喜欢从文件读取,这里我们模拟一下
# # 实际生产中您会加载真正的文档
# temp_dir = f"./temp_llama_docs/{index_name}"
# os.makedirs(temp_dir, exist_ok=True)
# for i, doc_content in enumerate(docs):
# with open(os.path.join(temp_dir, f"doc_{i}.txt"), "w", encoding="utf-8") as f:
# f.write(doc_content)
#
# reader = SimpleDirectoryReader(input_dir=temp_dir)
# documents = reader.load_data()
# index = VectorStoreIndex.from_documents(documents)
# return index
# product_li_index = create_llama_index(product_docs, "product_li_index")
# order_li_index = create_llama_index(order_docs, "order_li_index")
# return_exchange_li_index = create_llama_index(return_exchange_docs, "return_exchange_li_index")
# # 创建QueryEngineTool
# product_query_engine_tool = QueryEngineTool(
# query_engine=product_li_index.as_query_engine(),
# metadata=ToolMetadata(
# name="product_support_tool",
# description="当你需要回答关于产品功能、使用方法、故障排除、技术规格等产品相关问题时,使用此工具。"
# )
# )
# order_query_engine_tool = QueryEngineTool(
# query_engine=order_li_index.as_query_engine(),
# metadata=ToolMetadata(
# name="order_inquiry_tool",
# description="当你需要回答关于订单状态、物流信息、支付问题、收货地址修改等订单相关问题时,使用此工具。"
# )
# )
# return_exchange_query_engine_tool = QueryEngineTool(
# query_engine=return_exchange_li_index.as_query_engine(),
# metadata=ToolMetadata(
# name="return_exchange_policy_tool",
# description="当你需要回答关于退货流程、退款时间、换货条件、运费承担等退换货政策相关问题时,使用此工具。"
# )
# )
# # 创建RouterQueryEngine
# query_engine_tools = [
# product_query_engine_tool,
# order_query_engine_tool,
# return_exchange_query_engine_tool
# ]
# router_query_engine = RouterQueryEngine(
# selector=LLMSingleSelector.from_defaults(),
# query_engine_tools=query_engine_tools
# )
# def run_llama_routed_rag(question: str):
# print(f"n--- LlamaIndex RouterQueryEngine 路由器处理查询: '{question}' ---")
# response = router_query_engine.query(question)
# print(f"最终回答: {response}")
# return response
# # 测试
# run_llama_routed_rag("我的手机相机模糊了,怎么解决?")
# run_llama_routed_rag("我的订单状态显示已完成,但没收到货怎么办?")
# run_llama_routed_rag("我买的衣服大小不合适,可以换货吗?")
(由于LlamaIndex的集成和配置相对复杂,且可能与LangChain的Chroma实例不直接兼容,为了避免过度冗余和增加复杂性,我将LlamaIndex的代码块注释掉,并用文字说明其概念。核心思想是LlamaIndex的RouterQueryEngine利用LLM选择最佳的QueryEngineTool,每个工具封装一个知识库的查询逻辑,与LangChain的Function Calling方式异曲同工。)
高级主题与考虑
构建一个生产级的语义路由RAG系统,还需要考虑更多高级问题:
-
多跳路由 (Multi-hop Routing):
- 某些复杂查询可能需要从多个知识库中获取信息,或需要分解为多个子问题。例如:“查询产品A的库存,如果库存不足,再查找替代产品B的推荐。”
- 这需要更智能的代理(Agent)能力,能够进行规划、执行多步骤任务,并在不同工具之间切换。
-
置信度与回退机制:
- 路由器不可能总是100%准确。当路由器对分类结果的置信度较低时(例如,分类器预测概率低于某个阈值,或LLM表示不确定),应该有回退机制。
- 常见策略:
- 导向一个通用知识库。
- 向用户澄清问题。
- 上报给人工客服。
- 同时查询多个可能性最高的知识库,然后让LLM综合判断。
-
动态知识源管理:
- 在企业环境中,知识库是不断变化的。如何动态地添加、更新或移除知识源,并相应地更新路由器的配置或训练数据,是一个重要考量。
- 对于基于向量相似度的路由器,这相对容易,只需更新类别嵌入。
- 对于基于分类器的路由器,可能需要周期性地重新训练模型。
- 对于基于LLM的路由器,可能需要更新工具的描述或LLM的上下文。
-
性能优化:
- 路由器的速度:路由决策必须足够快,不能成为整个RAG系统的瓶颈。轻量级分类器或高效的向量相似度搜索是关键。
- 嵌入模型的选择:选择在准确性和速度之间取得平衡的嵌入模型。大型模型提供更好的语义理解,但推理速度慢。
- 缓存策略:缓存常见的查询结果或路由决策,减少重复计算。
- 并行处理:如果可能,并行执行某些步骤(例如,同时向多个知识库发送检索请求)。
-
评估与监控:
- 路由准确性:如何衡量路由器将查询导向正确知识库的比例?需要建立标注数据集进行评估。
- 端到端RAG性能:语义路由是否真正提高了最终生成答案的质量、相关性和准确性?需要通过A/B测试、用户满意度调查、人工评估等方式进行衡量。
- 错误分析:定期分析路由器误判的案例,以改进模型或规则。
- 延迟监控:监测从用户提问到获得答案的整体延迟。
-
安全性与隐私:
- 如果知识库包含敏感信息,确保路由器不会将查询导向未经授权的知识源。
- LLM的API调用可能涉及数据传输,需要考虑数据隐私和合规性。
-
成本考量:
- 大模型API调用成本(特别是对于LLM-as-a-Router)。
- 向量数据库的存储和查询成本。
- 独立分类模型训练和部署的计算资源成本。
语义路由是构建强大可扩展RAG系统的关键一环,它将智能与效率融入知识检索流程,为大模型应用开辟了更广阔的未来。
语义路由是RAG系统从“能用”走向“好用”和“可扩展”的关键一步。它通过引入智能化的流量管理,解决了RAG在面对复杂、异构知识源时的核心挑战。无论是利用大模型的强大理解能力,还是构建高效的独立分类器,亦或是巧妙地运用向量相似度,语义路由都为我们提供了一条路径,让我们能够构建出更加精准、高效且用户体验更佳的智能问答系统。随着AI技术,特别是Agentic AI的持续演进,语义路由的模式和能力也将不断创新,未来必将成为智能知识管理和应用的核心支柱。