各位同仁,各位对现代搜索技术充满热情的工程师们,
今天,我们齐聚一堂,共同探讨一个在信息爆炸时代至关重要的议题:如何构建一个既能理解用户意图,又能精确匹配关键词的智能搜索系统。传统上,我们依赖于关键词搜索,以其精确性和强大的过滤能力著称。然而,随着用户查询日益口语化、概念化,单纯的关键词匹配已显得力不从心。另一方面,语义搜索凭借其对文本深层含义的理解,能够捕获关键词搜索遗漏的相关结果,但有时又可能因为过于宽泛而牺牲精度。
我们所追求的,是一种将这两者优势完美结合的范式——混合搜索(Hybrid Search)。而本次讲座的核心,便是深入剖析如何实现“Hybrid Search Fusion”:在图中无缝整合 Elasticsearch 的关键词匹配能力与 Milvus 的语义向量检索能力,并精心设计其权重分配算法,以期在精度与召回之间取得最佳平衡。
这不仅仅是简单地将两个系统的结果合并,更是一门艺术,一门关于数据科学、算法设计与工程实践的艺术。我们将从基础概念出发,逐步深入到复杂的融合策略与代码实现细节。
一、 现代搜索的基石:Elasticsearch 与 Milvus
在深入探讨融合之前,我们首先需要对这两个核心组件有清晰的认识。它们各自代表了搜索领域的不同范式,拥有独特的优势和局限性。
1.1 Elasticsearch:关键词搜索的王者
Elasticsearch (ES) 是一个高度可扩展的开源全文搜索和分析引擎。它基于 Apache Lucene 构建,以其强大的分布式特性、实时搜索能力和丰富的查询语言而闻名。
核心机制:倒排索引 (Inverted Index)
ES 的核心是倒排索引。当文档被索引时,ES 会对文档内容进行分词处理,然后创建一个映射,将每个词项(term)映射到包含该词项的文档列表。
例如,文档 A 包含 "apple pie",文档 B 包含 "apple juice"。
倒排索引可能如下:
- "apple": [文档 A, 文档 B]
- "pie": [文档 A]
- "juice": [文档 B]
相关性评分:BM25 (Best Match 25)
ES 默认使用 BM25 算法来计算文档与查询的相关性分数。BM25 考虑了以下几个关键因素:
- 词频 (TF – Term Frequency):一个词在文档中出现的次数。出现次数越多,相关性越高。
- 逆文档频率 (IDF – Inverse Document Frequency):一个词在整个索引中出现的文档数量的倒数。词越稀有,其区分度越高,相关性也越高。
- 文档长度 (DL – Document Length):文档越短,包含某个词的重要性越高(相对于长文档)。
BM25 的数学公式较为复杂,但其核心思想是:一个词在文档中出现的频率越高,且在整个语料库中越稀有,那么该文档与包含该词的查询越相关。同时,它对文档长度进行了归一化,避免了长文档因词频高而得分虚高的问题。
Elasticsearch 的优势:
- 精确匹配:对于用户输入的精确关键词,ES 能迅速找到高度匹配的结果。
- 强大的过滤与聚合:能够根据结构化数据进行复杂的过滤和数据聚合,例如按价格范围、分类、日期等。
- 实时性:近实时地索引和搜索数据。
- 生态丰富:拥有 Kibana 等可视化工具,以及广泛的客户端库。
Elasticsearch 的局限性:
- 语义理解不足:无法理解查询背后的深层含义。例如,搜索 "car" 不会直接返回 "automobile" 的结果,除非两者都在文档中明确提及。
- 同义词处理:需要额外配置同义词表,且无法处理所有潜在的语义相似性。
- 拼写错误容忍度有限:对于轻微的拼写错误,需要模糊查询(fuzzy query)或建议(suggest)功能。
代码示例:Elasticsearch 基础查询
首先,我们需要一个 Elasticsearch 客户端并建立一个索引。
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# 假设 ES 在本地运行,默认端口
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'scheme': 'http'}])
# 定义索引映射
index_name = 'products_index'
if es.indices.exists(index=index_name):
es.indices.delete(index=index_name)
mapping = {
"mappings": {
"properties": {
"id": {"type": "keyword"},
"title": {"type": "text"},
"description": {"type": "text"},
"category": {"type": "keyword"}
}
}
}
es.indices.create(index=index_name, body=mapping)
# 示例数据
documents = [
{"id": "p001", "title": "Apple MacBook Pro M1", "description": "Powerful laptop for professionals.", "category": "electronics"},
{"id": "p002", "title": "Organic Apple Juice", "description": "Freshly squeezed juice from organic apples.", "category": "food"},
{"id": "p003", "title": "Delicious Apple Pie", "description": "Homemade apple pie, perfect for dessert.", "category": "food"},
{"id": "p004", "title": "Samsung Galaxy S23 Ultra", "description": "Latest flagship smartphone from Samsung.", "category": "electronics"},
{"id": "p005", "title": "Banana Smoothie Recipe", "description": "Easy and healthy banana smoothie.", "category": "food"},
{"id": "p006", "title": "Data Science Handbook", "description": "Comprehensive guide to data science and machine learning.", "category": "books"},
{"id": "p007", "title": "Laptop Cooling Pad", "description": "Keep your laptop cool with this pad.", "category": "accessories"}
]
# 批量索引数据
actions = [
{
"_index": index_name,
"_id": doc["id"],
"_source": doc
}
for doc in documents
]
bulk(es, actions)
es.indices.refresh(index=index_name)
print(f"Indexed {len(documents)} documents into '{index_name}'")
# 定义一个获取 ES 结果的函数
def get_es_results(query_text: str, k: int = 5) -> list[dict]:
search_body = {
"query": {
"multi_match": {
"query": query_text,
"fields": ["title^3", "description"], # 提高 title 字段的权重
"fuzziness": "AUTO" # 允许一定的拼写错误
}
},
"size": k
}
response = es.search(index=index_name, body=search_body)
results = []
for hit in response['hits']['hits']:
results.append({
'id': hit['_id'],
'score': hit['_score'],
'source': hit['_source']
})
return results
# 示例查询
# es_query_results = get_es_results("apple laptop", k=3)
# print("nElasticsearch Results for 'apple laptop':")
# for res in es_query_results:
# print(f"ID: {res['id']}, Score: {res['score']:.4f}, Title: {res['source']['title']}")
1.2 Milvus:语义向量搜索的利器
Milvus 是一个专门为向量相似性搜索而设计的开源数据库。它能够存储海量特征向量,并对它们进行高效的相似性搜索。
核心机制:向量嵌入 (Vector Embeddings) 与 ANN (Approximate Nearest Neighbor)
- 向量嵌入:文本、图像、音频等非结构化数据,首先通过深度学习模型(如 BERT, Sentence-BERT, Word2Vec 等)转换为高维度的数值向量。这些向量捕获了数据的语义信息,在向量空间中,语义相似的数据点彼此靠近。
- ANN 搜索:Milvus 使用 ANN 算法(如 HNSW, IVFFlat, ANNOY 等)来快速查找与查询向量最相似的 K 个向量。与精确最近邻搜索(耗时巨大)不同,ANN 牺牲了极小的精度换取了巨大的性能提升。
相似性度量:余弦相似度 (Cosine Similarity)
在向量空间中,两个向量之间的夹角越小,它们的方向就越相似,代表的语义也越接近。余弦相似度是衡量这种相似性最常用的指标,它的值介于 -1 和 1 之间,1 表示完全相同,-1 表示完全相反,0 表示正交(不相关)。
Milvus 的优势:
- 语义理解:能够理解查询的深层含义,即使查询中不包含文档中的具体关键词,也能找到相关的结果。
- 处理同义词和近义词:通过向量表示,"car" 和 "automobile" 的向量会非常接近,从而实现语义匹配。
- 多模态搜索:不仅限于文本,还可用于图像、音频等数据的相似性搜索。
- 可扩展性:专为大规模向量数据设计,支持水平扩展。
Milvus 的局限性:
- 计算成本:生成向量嵌入需要计算资源(GPU 通常能加速)。
- 缺乏精确过滤:虽然 Milvus 3.0 及其后续版本加强了标量过滤能力,但其核心优势仍在向量相似性搜索,复杂的结构化数据过滤不如 ES 强大。
- “幻觉”问题:在某些情况下,语义搜索可能返回与查询表面不相关的结果,因为它关注的是概念而非精确匹配。
- 嵌入模型依赖:搜索质量高度依赖于所使用的嵌入模型的质量和训练数据。
代码示例:Milvus 基础查询
我们需要安装 pymilvus 和 sentence-transformers。
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
from sentence_transformers import SentenceTransformer
import numpy as np
import time
# 连接 Milvus
# 假设 Milvus 在本地运行,默认端口
connections.connect("default", host="localhost", port="19530")
# 初始化嵌入模型
model_name = 'all-MiniLM-L6-v2' # 一个轻量级但效果不错的模型
embedding_model = SentenceTransformer(model_name)
# 定义 Milvus Collection Schema
collection_name = 'product_embeddings'
dim = embedding_model.get_sentence_embedding_dimension() # 获取模型输出向量的维度
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=256),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, "Product embedding collection for hybrid search")
collection = Collection(collection_name, schema)
# 配置索引
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "COSINE", # 使用余弦相似度
"params": {"nlist": 128}
}
collection.create_index(field_name="embedding", index_params=index_params)
collection.load()
# 示例数据(与 ES 的数据对应)
# 实际上,这里需要将 ES 中的 title/description 字段组合起来生成 embedding
# 为了简化,我们直接使用 title 作为生成 embedding 的文本
milvus_data_texts = [
{"id": "p001", "text": "Apple MacBook Pro M1"},
{"id": "p002", "text": "Organic Apple Juice"},
{"id": "p003", "text": "Delicious Apple Pie"},
{"id": "p004", "text": "Samsung Galaxy S23 Ultra"},
{"id": "p005", "text": "Banana Smoothie Recipe"},
{"id": "p006", "text": "Data Science Handbook"},
{"id": "p007", "text": "Laptop Cooling Pad"}
]
# 生成 embeddings 并插入 Milvus
ids = [d["id"] for d in milvus_data_texts]
texts_to_embed = [d["text"] for d in milvus_data_texts]
embeddings = embedding_model.encode(texts_to_embed, convert_to_tensor=False)
# 确保 embeddings 是列表的列表
embeddings_list = embeddings.tolist()
insert_data = [ids, embeddings_list]
collection.insert(insert_data)
collection.flush()
print(f"Inserted {len(ids)} vectors into Milvus collection '{collection_name}'")
# 定义一个获取 Milvus 结果的函数
def get_milvus_results(query_text: str, k: int = 5) -> list[dict]:
query_vector = embedding_model.encode([query_text], convert_to_tensor=False).tolist()
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
# search 函数返回的是 list of list of Hit
results = collection.search(
data=query_vector,
anns_field="embedding",
param=search_params,
limit=k,
output_fields=["id"] # 我们可以指定返回的字段
)
formatted_results = []
# Milvus 的 COSINE 相似度通常是 0-1 之间,1 表示最相似。
# 如果索引 metric_type 是 IP (Inner Product),则值越大越相似。
# 如果是 L2 (Euclidean),则值越小越相似。
# 这里我们统一将其视为 0-1 范围,1为最佳。
for hit in results[0]: # results[0] 是第一个查询向量的结果
formatted_results.append({
'id': hit.id,
'score': hit.score, # Milvus 返回的相似度分数
'source': {'title': next(d['text'] for d in milvus_data_texts if d['id'] == hit.id)} # 模拟从原始数据获取 title
})
return formatted_results
# 示例查询
# milvus_query_results = get_milvus_results("computer for work", k=3)
# print("nMilvus Results for 'computer for work':")
# for res in milvus_query_results:
# print(f"ID: {res['id']}, Score: {res['score']:.4f}, Title: {res['source']['title']}")
二、 融合的挑战:Elasticsearch 与 Milvus 分数的鸿沟
现在我们已经了解了这两个强大的工具。然而,它们的搜索结果和相关性分数是基于完全不同的机制和度量标准生成的。直接将它们的分数相加或进行简单的比较,就好比将苹果和橘子混为一谈,这是不可取的。
2.1 分数不统一性 (Incommensurable Scores)
- Elasticsearch 的 BM25 分数:通常是一个正数,没有上限。其绝对值本身并不具备直接的业务含义,更多是用于在 ES 内部对结果进行排序。不同查询、不同索引、甚至不同时间点的相同查询,其 BM25 分数都可能存在巨大差异。
- Milvus 的余弦相似度分数:通常在 [-1, 1] 之间(如果使用 L2 距离,则是距离值,越小越相似)。1 表示完全相同,-1 表示完全相反。这是一个相对规范化的分数,但其范围和含义与 BM25 截然不同。
由于分数范围和含义的巨大差异,直接求和或平均会导致一个问题:哪个系统对最终结果的影响更大,将完全取决于其分数范围的偶然性,而非其固有的相关性强度。
2.2 冗余与互补性
- 冗余:同一个文档可能同时被 ES 和 Milvus 匹配到,并且它们的得分都可能很高。如何处理这些重复的文档,是简单取最高分,还是加权融合?
- 互补:ES 可能找到关键词精确匹配但语义稍远的结果,而 Milvus 可能找到语义高度相关但关键词不匹配的结果。融合的目标正是要充分利用这种互补性,而不是让一个系统的优势淹没另一个。
2.3 用户意图的复杂性
用户查询的意图是多变的。
- 当用户搜索 "iPhone 15 Pro Max 256GB" 时,他们期望的是精确的关键词匹配,此时 ES 的权重应该更高。
- 当用户搜索 "适合编程学习的电脑" 时,他们更期望语义上的理解,此时 Milvus 的权重应该更高。
- 如何动态地判断用户意图,并据此调整融合策略,是实现“无缝整合”的关键。
三、 核心算法:权重分配与融合策略
面对上述挑战,我们需要一套严谨的算法来对来自不同来源的搜索结果进行规范化、合并和重新排序。
3.1 规范化分数 (Score Normalization)
这是融合前的关键步骤,目的是将不同尺度的分数统一到相同的范围,通常是 [0, 1] 或 [0, Max_Value]。
3.1.1 Min-Max 缩放 (Min-Max Scaling)
将分数线性地缩放到指定范围 [min_new, max_new],通常是 [0, 1]。
公式:Score_norm = (Score - min_old) / (max_old - min_old) * (max_new - min_new) + min_new
若缩放到 [0, 1]:Score_norm = (Score - min_old) / (max_old - min_old)
- 优点:简单易懂,计算高效。
- 缺点:对异常值(outliers)非常敏感。如果结果集中有一个非常高的分数或非常低的分数,可能会压缩其他所有分数的范围。
3.1.2 Z-score 规范化 (Z-score Normalization)
将分数转换为标准正态分布,使其均值为 0,标准差为 1。
公式:Score_norm = (Score - mean) / std_dev
- 优点:对异常值不那么敏感,保留了数据的分布特征。
- 缺点:结果可能包含负数,且没有固定上限,不直接映射到 [0, 1] 范围,可能需要进一步处理。
3.1.3 Sigmoid 缩放 (Sigmoid Scaling)
使用 Sigmoid 函数将分数映射到 (0, 1) 之间。Sigmoid 函数的 S 形曲线意味着它对中间值敏感,而对极值有“饱和”效应。
公式:Score_norm = 1 / (1 + exp(-k * (Score - center)))
其中 k 控制曲线的陡峭程度,center 是曲线的中心点。
- 优点:输出范围固定,对异常值具有鲁棒性,能够处理非线性关系。
- 缺点:需要选择合适的
k和center参数,这可能需要经验或试错。
在实际应用中,对于搜索结果,我们通常在每个来源(ES 和 Milvus)的 Top-N 结果内部进行规范化。 这样可以避免整个数据集的极值影响,确保当前批次结果的相对排名得到保留和比较。
代码示例:分数规范化函数
import numpy as np
def normalize_scores(scores: list[float], method: str = 'minmax') -> list[float]:
"""
规范化分数列表。
Args:
scores: 待规范化的分数列表。
method: 规范化方法 ('minmax', 'zscore', 'sigmoid')。
Returns:
规范化后的分数列表。
"""
if not scores:
return []
np_scores = np.array(scores)
if method == 'minmax':
min_score = np_scores.min()
max_score = np_scores.max()
if max_score == min_score: # 避免除以零
return [0.5] * len(scores) # 所有分数相同,归一化为0.5
normalized_scores = (np_scores - min_score) / (max_score - min_score)
elif method == 'zscore':
mean_score = np_scores.mean()
std_score = np_scores.std()
if std_score == 0: # 避免除以零
return [0.0] * len(scores) # 所有分数相同,归一化为0
normalized_scores = (np_scores - mean_score) / std_score
# Z-score 结果不是0-1,如果需要,可以再做一次min-max或sigmoid
# 为方便融合,这里可以再做一次0-1缩放,但严格意义上不是纯zscore
# 简单处理:如果需要0-1,则需要额外步骤
# 这里我们假设 Z-score 之后会与其他规范化方法一同处理,或者在后续融合中自行处理
print("Warning: Z-score normalization does not guarantee 0-1 range.")
elif method == 'sigmoid':
# 假设 k=1,center=np_scores.mean(),这些参数可以调整
k = 1.0
center = np_scores.mean()
normalized_scores = 1 / (1 + np.exp(-k * (np_scores - center)))
else:
raise ValueError(f"Unknown normalization method: {method}")
return normalized_scores.tolist()
# 示例使用
# es_scores = [15.2, 12.8, 8.5, 7.1, 5.9]
# milvus_scores = [0.95, 0.88, 0.75, 0.62, 0.55]
#
# norm_es_scores = normalize_scores(es_scores, 'minmax')
# norm_milvus_scores = normalize_scores(milvus_scores, 'minmax')
# print(f"Normalized ES Scores (MinMax): {norm_es_scores}")
# print(f"Normalized Milvus Scores (MinMax): {norm_milvus_scores}")
3.2 融合策略 (Fusion Strategies)
在分数规范化之后,我们可以采用多种策略来融合来自不同来源的结果。
3.2.1 加权求和 (Weighted Sum)
这是最直观的融合方法。对于每个文档,我们计算其来自 ES 和 Milvus 的规范化分数,然后根据预设的权重进行加权求和。
Fusion_Score = w_es * Normalized_ES_Score + w_milvus * Normalized_Milvus_Score
其中 w_es 和 w_milvus 分别是 Elasticsearch 和 Milvus 的权重,且 w_es + w_milvus = 1。
- 优点:简单易实现,直观。
- 缺点:
- 静态权重:如果
w_es和w_milvus是固定的,那么它无法适应用户查询意图的变化。 - 未处理重叠:如果同一个文档同时出现在两个结果集中,需要先将其合并,然后应用加权求和。
- 静态权重:如果
代码示例:加权求和融合
def weighted_sum_fusion(es_results: list[dict], milvus_results: list[dict], w_es: float, w_milvus: float) -> list[dict]:
"""
使用加权求和方法融合搜索结果。
Args:
es_results: Elasticsearch 搜索结果列表,每个元素包含 'id', 'score', 'source'。
milvus_results: Milvus 搜索结果列表,每个元素包含 'id', 'score', 'source'。
w_es: Elasticsearch 结果的权重。
w_milvus: Milvus 结果的权重。
Returns:
融合后的结果列表,包含 'id', 'fusion_score', 'source'。
"""
# 步骤1: 规范化分数
es_scores = [res['score'] for res in es_results]
milvus_scores = [res['score'] for res in milvus_results]
norm_es_scores = normalize_scores(es_scores, 'minmax')
norm_milvus_scores = normalize_scores(milvus_scores, 'minmax')
# 将规范化分数映射回结果字典
for i, res in enumerate(es_results):
res['norm_score'] = norm_es_scores[i]
for i, res in enumerate(milvus_results):
res['norm_score'] = norm_milvus_scores[i]
# 步骤2: 合并结果并计算融合分数
merged_results = {}
for res in es_results:
doc_id = res['id']
if doc_id not in merged_results:
merged_results[doc_id] = {
'id': doc_id,
'es_score': res['norm_score'],
'milvus_score': 0.0, # 默认值为0
'source': res['source'] # 优先使用 ES 的源数据,或根据业务逻辑合并
}
else:
merged_results[doc_id]['es_score'] = res['norm_score']
for res in milvus_results:
doc_id = res['id']
if doc_id not in merged_results:
merged_results[doc_id] = {
'id': doc_id,
'es_score': 0.0, # 默认值为0
'milvus_score': res['norm_score'],
'source': res['source'] # 如果只有 Milvus 有,则使用 Milvus 的源数据
}
else:
merged_results[doc_id]['milvus_score'] = res['norm_score']
# 如果 ES 和 Milvus 都有,确保 source 字段存在
if 'source' not in merged_results[doc_id]:
merged_results[doc_id]['source'] = res['source']
final_results = []
for doc_id, data in merged_results.items():
fusion_score = (w_es * data['es_score']) + (w_milvus * data['milvus_score'])
final_results.append({
'id': doc_id,
'fusion_score': fusion_score,
'source': data['source'],
'es_norm_score': data['es_score'], # 方便调试和分析
'milvus_norm_score': data['milvus_score'] # 方便调试和分析
})
# 步骤3: 按融合分数降序排序
final_results.sort(key=lambda x: x['fusion_score'], reverse=True)
return final_results
3.2.2 倒数排序融合 (Reciprocal Rank Fusion – RRF)
RRF 是一种基于排名的融合算法,它不依赖于原始分数的大小,而是关注文档在各个搜索结果列表中的相对排名。这使得 RRF 对不同搜索系统之间的分数不一致性具有很强的鲁棒性。
公式:RRF_Score = sum(1 / (k + rank_i)) for each source i
其中 rank_i 是文档在第 i 个搜索结果列表中的排名(从 1 开始),k 是一个平滑因子(通常取 60)。平滑因子 k 的作用是降低高排名文档的优势,并提高低排名文档的影响力,从而避免仅依赖于少量高排名结果。
- 优点:
- 分数独立性:不受原始分数范围和分布的影响。
- 鲁棒性:对异常值和评分尺度的差异不敏感。
- 简单有效:实现相对简单,效果往往不错。
- 缺点:
- 失去分数细节:仅使用排名信息,可能会丢失原始分数中包含的细微相关性差异。
- 无法直接引入权重:虽然可以通过调整
k来间接影响,但不如加权求和那样直接控制每个来源的影响力。
代码示例:RRF 融合
def reciprocal_rank_fusion(es_results: list[dict], milvus_results: list[dict], k: int = 60) -> list[dict]:
"""
使用倒数排序融合 (RRF) 算法融合搜索结果。
Args:
es_results: Elasticsearch 搜索结果列表,每个元素包含 'id' 和 'score'。
milvus_results: Milvus 搜索结果列表,每个元素包含 'id' 和 'score'。
k: 平滑因子,通常取 60。
Returns:
融合后的结果列表,包含 'id', 'fusion_score', 'source'。
"""
# 步骤1: 构建每个文档的排名字典
rank_map = {}
for rank, res in enumerate(es_results):
doc_id = res['id']
if doc_id not in rank_map:
rank_map[doc_id] = {'es_rank': float('inf'), 'milvus_rank': float('inf'), 'source': res['source']}
rank_map[doc_id]['es_rank'] = rank + 1 # 排名从1开始
for rank, res in enumerate(milvus_results):
doc_id = res['id']
if doc_id not in rank_map:
rank_map[doc_id] = {'es_rank': float('inf'), 'milvus_rank': float('inf'), 'source': res['source']}
rank_map[doc_id]['milvus_rank'] = rank + 1 # 排名从1开始
# 如果 Milvus 结果有新的文档,确保 source 字段被填充
if 'source' not in rank_map[doc_id] or not rank_map[doc_id]['source']:
rank_map[doc_id]['source'] = res['source']
# 步骤2: 计算 RRF 分数
final_results = []
for doc_id, ranks in rank_map.items():
rrf_score = (1 / (k + ranks['es_rank'])) + (1 / (k + ranks['milvus_rank']))
final_results.append({
'id': doc_id,
'fusion_score': rrf_score,
'source': ranks['source'],
'es_rank': ranks['es_rank'], # 方便调试
'milvus_rank': ranks['milvus_rank'] # 方便调试
})
# 步骤3: 按融合分数降序排序
final_results.sort(key=lambda x: x['fusion_score'], reverse=True)
return final_results
3.2.3 动态权重分配 (Dynamic Weight Allocation)
静态权重无法满足所有查询场景。真正的“无缝整合”需要根据查询的特性动态调整 ES 和 Milvus 的权重。这通常涉及机器学习或启发式规则。
动态权重策略的核心思想:
根据查询的特征来判断其是更偏向于关键词匹配,还是更偏向于语义理解。
实现方式:
-
查询词特征分析:
- 稀有词检测:如果查询包含大量在语料库中出现频率极低的词(高 IDF 值),这通常表示用户寻求精确匹配,ES 权重应提高。
- 命名实体识别 (NER):如果查询包含产品名称、品牌、人名、地点等命名实体,ES 往往更擅长处理这些精确信息。
- 查询长度:短而精炼的查询可能更依赖关键词,长而描述性的查询可能更偏向语义。
- 词性标注 (POS Tagging):如果查询主要是名词、动词,可能更偏向语义;如果包含大量限定词、数字等,可能更偏向精确匹配。
- 是否存在引号或特定操作符:
"exact phrase"或AND,OR通常指示精确匹配意图。
-
查询分类:
- 训练一个文本分类模型,将用户查询分为“关键词主导型”、“语义主导型”或“混合型”。
- 基于分类结果,为 ES 和 Milvus 分配不同的权重。
-
用户行为反馈 (Implicit Feedback):
- 通过 A/B 测试,观察在不同权重下用户点击、停留、转化等行为数据。
- 利用强化学习 (Reinforcement Learning) 动态调整权重,以最大化用户满意度。例如,如果用户在某个查询下点击了 ES 高分结果,则下次类似查询时增加 ES 权重。
代码示例:动态权重策略(概念性)
# 假设我们有一个简单的查询分析器
def analyze_query_for_weights(query_text: str) -> tuple[float, float]:
"""
根据查询文本动态分配 ES 和 Milvus 的权重。
这是一个简化示例,实际应用中会更复杂。
Args:
query_text: 用户查询文本。
Returns:
(w_es, w_milvus) 权重元组。
"""
w_es = 0.5
w_milvus = 0.5
# 启发式规则 1: 查询长度
if len(query_text.split()) < 3: # 短查询可能更精确
w_es += 0.1
w_milvus -= 0.1
elif len(query_text.split()) > 7: # 长查询可能更语义化
w_es -= 0.1
w_milvus += 0.1
# 启发式规则 2: 特殊关键词(示例:包含品牌名或型号)
precise_keywords = ["macbook", "iphone", "samsung galaxy", "m1", "s23 ultra"]
if any(kw in query_text.lower() for kw in precise_keywords):
w_es += 0.2
w_milvus -= 0.2
# 启发式规则 3: 语义关键词(示例:描述性词语)
semantic_keywords = ["best for", "how to", "guide", "recipe", "powerful", "healthy"]
if any(kw in query_text.lower() for kw in semantic_keywords):
w_es -= 0.1
w_milvus += 0.1
# 确保权重在 [0, 1] 范围内且和为 1
w_es = max(0.1, min(0.9, w_es)) # 确保不会完全偏向一边
w_milvus = 1.0 - w_es
print(f"Query: '{query_text}' -> Dynamic Weights: ES={w_es:.2f}, Milvus={w_milvus:.2f}")
return w_es, w_milvus
# 示例使用
# w_es_dynamic, w_milvus_dynamic = analyze_query_for_weights("apple laptop for professionals")
# print(f"Dynamic weights: ES={w_es_dynamic}, Milvus={w_milvus_dynamic}")
3.3 混合融合策略 (Hybrid Fusion)
在实践中,我们通常会结合加权求和和 RRF 的优点。一个常见的模式是:
- 并行搜索:同时向 Elasticsearch 和 Milvus 发送查询。
- 初步规范化与加权:对两个系统返回的 Top-N 结果进行分数规范化,并计算一个初始的加权融合分数。
- RRF 辅助:将每个系统返回的原始排名(或基于初步加权分数的排名)输入到 RRF 算法中,生成一个 RRF 融合分数。
- 最终排序:可以结合加权分数和 RRF 分数进行最终排序,例如,使用一个超参数来平衡两者,或者根据 RRF 排名在前 N 个结果中,再使用加权分数进行微调。
这种多阶段的融合方法能够充分利用不同算法的优势,提供更稳定和准确的搜索结果。
四、 混合搜索融合的完整工作流与代码实现
现在,让我们把所有碎片拼凑起来,构建一个端到端的混合搜索系统。
4.1 整体架构概览
+----------------+ +-------------------+ +--------------------+
| User Query | ----> | Query Processor | ----> | Embedding Model |
+----------------+ | (Dynamic Weight) | +--------------------+
| +-------------------+ |
| |
v v
+------------------+ +--------------------+ +------------------+
| Elasticsearch | <---| Query to ES | | Milvus (Vector |
| (Keyword Search) | | | | Embedding Search) |
+------------------+ | | +------------------+
| | | |
| | Parallel Search | |
v | | v
+------------------+ | | +------------------+
| ES Results (Score,Doc)| <------------------ |----> | Milvus Results |
+------------------+ | | | (Score, VectorId)|
| +--------------------+ +------------------+
| |
v v
+----------------------------------------------------------------+
| Fusion Algorithm |
| (1. Normalize Scores |
| 2. Identify Overlap & Merge |
| 3. Calculate Fusion Score: Weighted Sum / RRF / Hybrid |
| 4. Sort by Fusion Score) |
+----------------------------------------------------------------+
|
v
+--------------------+
| Final Ranked Results |
+--------------------+
4.2 完整的 Python 代码实现
我们将定义一个 HybridSearchEngine 类来封装整个逻辑。
import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
from sentence_transformers import SentenceTransformer
import time
import json # 用于打印美观的JSON
class HybridSearchEngine:
def __init__(self, es_host='localhost', es_port=9200, milvus_host='localhost', milvus_port='19530',
embedding_model_name='all-MiniLM-L6-v2', es_index_name='products_index', milvus_collection_name='product_embeddings'):
self.es = Elasticsearch([{'host': es_host, 'port': es_port, 'scheme': 'http'}])
connections.connect("default", host=milvus_host, port=milvus_port)
self.embedding_model = SentenceTransformer(embedding_model_name)
self.es_index_name = es_index_name
self.milvus_collection_name = milvus_collection_name
self.milvus_collection = None # 稍后初始化
self.dim = self.embedding_model.get_sentence_embedding_dimension()
self.milvus_original_data_map = {} # 用于存储 Milvus 文档的原始数据,以便检索
print(f"HybridSearchEngine initialized with embedding model: {embedding_model_name}")
def _init_es_index(self):
if self.es.indices.exists(index=self.es_index_name):
self.es.indices.delete(index=self.es_index_name)
mapping = {
"mappings": {
"properties": {
"id": {"type": "keyword"},
"title": {"type": "text", "analyzer": "ik_smart"}, # 可以使用中文分词器
"description": {"type": "text", "analyzer": "ik_smart"},
"category": {"type": "keyword"}
}
}
}
self.es.indices.create(index=self.es_index_name, body=mapping)
print(f"Elasticsearch index '{self.es_index_name}' created.")
def _init_milvus_collection(self):
if utility.has_collection(self.milvus_collection_name):
utility.drop_collection(self.milvus_collection_name)
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=256),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dim)
]
schema = CollectionSchema(fields, "Product embedding collection for hybrid search")
self.milvus_collection = Collection(self.milvus_collection_name, schema)
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 128}
}
self.milvus_collection.create_index(field_name="embedding", index_params=index_params)
self.milvus_collection.load()
print(f"Milvus collection '{self.milvus_collection_name}' created and loaded.")
def index_documents(self, documents: list[dict]):
self._init_es_index()
self._init_milvus_collection()
es_actions = []
milvus_ids = []
milvus_texts = [] # 用于生成 embedding 的文本
for doc in documents:
# ES indexing
es_actions.append({
"_index": self.es_index_name,
"_id": doc["id"],
"_source": doc
})
# Milvus indexing
milvus_ids.append(doc["id"])
# 假设我们用 title 和 description 拼接作为 embedding 的输入
milvus_texts.append(f"{doc.get('title', '')} {doc.get('description', '')}")
self.milvus_original_data_map[doc["id"]] = doc # 存储原始数据,方便查询后获取
# Bulk index to ES
bulk(self.es, es_actions)
self.es.indices.refresh(index=self.es_index_name)
print(f"Indexed {len(es_actions)} documents into Elasticsearch.")
# Generate embeddings and insert into Milvus
embeddings = self.embedding_model.encode(milvus_texts, convert_to_tensor=False).tolist()
insert_data = [milvus_ids, embeddings]
self.milvus_collection.insert(insert_data)
self.milvus_collection.flush()
print(f"Inserted {len(milvus_ids)} vectors into Milvus.")
def _get_es_results(self, query_text: str, k: int = 10) -> list[dict]:
search_body = {
"query": {
"multi_match": {
"query": query_text,
"fields": ["title^3", "description"], # 提高 title 字段的权重
"fuzziness": "AUTO"
}
},
"size": k
}
response = self.es.search(index=self.es_index_name, body=search_body)
results = []
for hit in response['hits']['hits']:
results.append({
'id': hit['_id'],
'score': hit['_score'],
'source': hit['_source'] # 包含所有原始字段
})
return results
def _get_milvus_results(self, query_text: str, k: int = 10) -> list[dict]:
query_vector = self.embedding_model.encode([query_text], convert_to_tensor=False).tolist()
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
results = self.milvus_collection.search(
data=query_vector,
anns_field="embedding",
param=search_params,
limit=k,
output_fields=["id"] # 仅返回 id
)
formatted_results = []
for hit in results[0]:
doc_id = hit.id
formatted_results.append({
'id': doc_id,
'score': hit.score,
'source': self.milvus_original_data_map.get(doc_id, {}) # 从存储的原始数据中获取
})
return formatted_results
def _normalize_scores(self, scores: list[float], method: str = 'minmax') -> list[float]:
if not scores:
return []
np_scores = np.array(scores)
if method == 'minmax':
min_score = np_scores.min()
max_score = np_scores.max()
if max_score == min_score:
return [0.5] * len(scores)
normalized_scores = (np_scores - min_score) / (max_score - min_score)
elif method == 'sigmoid':
k = 1.0 # 默认参数,可调
center = np_scores.mean() # 默认参数,可调
normalized_scores = 1 / (1 + np.exp(-k * (np_scores - center)))
else:
raise ValueError(f"Unsupported normalization method: {method}")
return normalized_scores.tolist()
def _analyze_query_for_weights(self, query_text: str) -> tuple[float, float]:
# 这是一个简化的动态权重分配示例,实际应用中可以更复杂
w_es = 0.5
w_milvus = 0.5
# 启发式规则 1: 查询长度
num_words = len(query_text.split())
if num_words < 3:
w_es += 0.1
w_milvus -= 0.1
elif num_words > 7:
w_es -= 0.1
w_milvus += 0.1
# 启发式规则 2: 特殊关键词(示例:包含品牌名、型号、精确短语)
precise_keywords = ["macbook", "iphone", "samsung galaxy", "m1", "s23 ultra", "pro max", "256gb"]
if any(kw in query_text.lower() for kw in precise_keywords):
w_es += 0.2
w_milvus -= 0.2
# 启发式规则 3: 语义关键词(示例:描述性词语,问答式)
semantic_keywords = ["best for", "how to", "guide", "recipe", "powerful", "healthy", "recommend", "looking for"]
if any(kw in query_text.lower() for kw in semantic_keywords):
w_es -= 0.15
w_milvus += 0.15
# 确保权重在合理范围内且和为 1
w_es = max(0.1, min(0.9, w_es))
w_milvus = 1.0 - w_es
return w_es, w_milvus
def search(self, query_text: str, k: int = 10, fusion_method: str = 'weighted_sum',
normalization_method: str = 'minmax', rrf_k: int = 60) -> list[dict]:
# 步骤1: 动态权重分配 (仅适用于 weighted_sum 融合)
w_es, w_milvus = 0.5, 0.5
if fusion_method == 'weighted_sum':
w_es, w_milvus = self._analyze_query_for_weights(query_text)
print(f"Dynamic weights for '{query_text}': ES={w_es:.2f}, Milvus={w_milvus:.2f}")
# 步骤2: 并行执行搜索
es_results = self._get_es_results(query_text, k)
milvus_results = self._get_milvus_results(query_text, k)
# 步骤3: 规范化分数 (仅适用于 weighted_sum 融合)
if fusion_method == 'weighted_sum':
es_scores = [res['score'] for res in es_results]
milvus_scores = [res['score'] for res in milvus_results]
norm_es_scores = self._normalize_scores(es_scores, normalization_method)
norm_milvus_scores = self._normalize_scores(milvus_scores, normalization_method)
for i, res in enumerate(es_results):
res['norm_score'] = norm_es_scores[i] if norm_es_scores else 0.0
for i, res in enumerate(milvus_results):
res['norm_score'] = norm_milvus_scores[i] if norm_milvus_scores else 0.0
# 步骤4: 合并和融合
merged_results = {} # key: doc_id, value: {data}
# 先处理 ES 结果
for res in es_results:
doc_id = res['id']
merged_results[doc_id] = {
'id': doc_id,
'es_score': res.get('norm_score', res['score']), # weighted_sum用norm_score, RRF用原始score
'milvus_score': 0.0,
'es_rank': float('inf'), # RRF 默认值
'milvus_rank': float('inf'), # RRF 默认值
'source': res['source'] # 存储原始文档信息
}
# 再处理 Milvus 结果,更新或添加
for res in milvus_results:
doc_id = res['id']
if doc_id not in merged_results:
merged_results[doc_id] = {
'id': doc_id,
'es_score': 0.0,
'milvus_score': res.get('norm_score', res['score']),
'es_rank': float('inf'),
'milvus_rank': float('inf'),
'source': res['source']
}
else:
merged_results[doc_id]['milvus_score'] = res.get('norm_score', res['score'])
# 如果 Milvus 的 source 更完整或需要合并,可以在这里处理
# For simplicity, we assume ES source is sufficient if overlap, else Milvus.
if not merged_results[doc_id]['source'] and res['source']:
merged_results[doc_id]['source'] = res['source']
# 步骤5: 计算融合分数
final_results = []
if fusion_method == 'weighted_sum':
for doc_id, data in merged_results.items():
fusion_score = (w_es * data['es_score']) + (w_milvus * data['milvus_score'])
final_results.append({
'id': doc_id,
'fusion_score': fusion_score,
'source': data['source'],
'es_norm_score': data['es_score'],
'milvus_norm_score': data['milvus_score']
})
elif fusion_method == 'rrf':
# RRF 需要排名,我们先为每个系统内的结果计算排名
es_rank_map = {res['id']: rank + 1 for rank, res in enumerate(es_results)}
milvus_rank_map = {res['id']: rank + 1 for rank, res in enumerate(milvus_results)}
for doc_id, data in merged_results.items():
es_rank = es_rank_map.get(doc_id, float('inf'))
milvus_rank = milvus_rank_map.get(doc_id, float('inf'))
rrf_score = (1 / (rrf_k + es_rank)) + (1 / (rrf_k + milvus_rank))
final_results.append({
'id': doc_id,
'fusion_score': rrf_score,
'source': data['source'],
'es_rank': es_rank,
'milvus_rank': milvus_rank
})
else:
raise ValueError(f"Unknown fusion method: {fusion_method}")
# 步骤6: 按融合分数降序排序并返回 Top K
final_results.sort(key=lambda x: x['fusion_score'], reverse=True)
return final_results[:k]
# --- 运行示例 ---
if __name__ == "__main__":
# 确保 Elasticsearch 和 Milvus 实例正在运行
# docker run -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.17.6
# docker-compose -f milvus-standalone-docker-compose.yml up -d (Milvus 官方文档)
engine = HybridSearchEngine()
sample_documents = [
{"id": "p001", "title": "Apple MacBook Pro M1", "description": "Powerful laptop for professionals, great for software development.", "category": "electronics"},
{"id": "p002", "title": "Organic Apple Juice", "description": "Freshly squeezed juice from organic apples, healthy and refreshing.", "category": "food"},
{"id": "p003", "title": "Delicious Apple Pie", "description": "Homemade apple pie, perfect for dessert with a scoop of vanilla ice cream.", "category": "food"},
{"id": "p004", "title": "Samsung Galaxy S23 Ultra", "description": "Latest flagship smartphone from Samsung, with incredible camera features.", "category": "electronics"},
{"id": "p005", "title": "Banana Smoothie Recipe", "description": "Easy and healthy banana smoothie recipe for a quick breakfast.", "category": "food"},
{"id": "p006", "title": "Data Science Handbook", "description": "Comprehensive guide to data science and machine learning concepts.", "category": "books"},
{"id": "p007", "title": "Laptop Cooling Pad", "description": "Keep your laptop cool with this adjustable cooling pad.", "category": "accessories"},
{"id": "p008", "title": "Python Programming Book", "description": "Learn Python from scratch with this beginner-friendly guide.", "category": "books"},
{"id": "p009", "title": "Smartwatch for Fitness", "description": "Track your fitness and health with this advanced smartwatch.", "category": "electronics"},
{"id": "p010", "title": "Gaming Laptop High Performance", "description": "Experience immersive gaming with this high-performance gaming laptop.", "category": "electronics"}
]
print("--- Indexing Documents ---")
engine.index_documents(sample_documents)
time.sleep(2) # Give some time for indexing to complete
print("n--- Performing Hybrid Searches (Weighted Sum Fusion) ---")
queries_weighted_sum = [
"apple laptop for software development",
"healthy fruit drink",
"programming book",
"best smartphone with great camera",
"recipe for healthy breakfast"
]
for q in queries_weighted_sum:
print(f"nQuery: '{q}' (Weighted Sum Fusion)")
results = engine.search(q, k=5, fusion_method='weighted_sum', normalization_method='minmax')
for i, res in enumerate(results):
print(f" {i+1}. ID: {res['id']}, Score: {res['fusion_score']:.4f}, "
f"ES_Norm: {res['es_norm_score']:.4f}, Milvus_Norm: {res['milvus_norm_score']:.4f}, "
f"Title: {res['source']['title']}")
print("n--- Performing Hybrid Searches (RRF Fusion) ---")
queries_rrf = [
"macbook pro m1",
"book about data science",
"keep laptop cool"
]
for q in queries_rrf:
print(f"nQuery: '{q}' (RRF Fusion)")
results = engine.search(q, k=5, fusion_method='rrf', rrf_k=60)
for i, res in enumerate(results):
# RRF 结果没有 es_norm_score 或 milvus_norm_score,而是 es_rank 和 milvus_rank
print(f" {i+1}. ID: {res['id']}, Score: {res['fusion_score']:.4f}, "
f"ES_Rank: {res['es_rank']}, Milvus_Rank: {res['milvus_rank']}, "
f"Title: {res['source']['title']}")
表格:融合方法对比
| 特性 / 方法 | 加权求和 (Weighted Sum) | 倒数排序融合 (RRF) | 动态加权求和 (Dynamic Weighted Sum) |
|---|---|---|---|
| 基础 | 分数相加 | 排名相加 | 分数相加 (权重动态) |
| 分数规范化 | 必需 | 非必需 (但推荐进行内部排序) | 必需 |
| 对分数尺度敏感度 | 高 | 低 | 中 (取决于动态权重) |
| 对异常值鲁棒性 | 低 | 高 | 中 |
| 实现复杂度 | 低 | 中 | 高 (需查询分析) |
| 调优难度 | 中 (权重选择) | 低 (K 值) | 高 (权重函数、A/B测试) |
| 语义理解融入 | 需通过权重手动调整或动态分配 | 隐式 (通过 Milvus 排名) | 显式 (通过动态权重) |
| 推荐场景 | 初始实现、分数差异不大的场景 | 跨系统分数差异大、鲁棒性要求高 | 追求最佳用户体验、查询意图复杂 |
五、 高级考量与优化
混合搜索的旅程远不止于此,以下是一些值得探索的高级主题和优化策略:
5.1 性能优化
- 异步搜索:ES 和 Milvus 的搜索请求应并行发送,避免串行执行造成的延迟。Python 的
asyncio或线程池可以用于此。 - 缓存:对于热门查询或最近的查询结果进行缓存。
- 批处理:如果可能,将多个查询打包成批次发送。
- 资源调优:优化 ES 集群配置(JVM 内存、分片、副本),Milvus 实例配置(CPU/GPU 资源、索引参数)。
5.2 相关性调优与评估
- A/B 测试:这是评估不同融合策略和权重分配效果最有效的方法。通过向不同用户组提供不同版本的搜索结果,收集点击、停留时间、转化率等指标。
- 离线评估指标:使用标注数据集计算 NDCG (Normalized Discounted Cumulative Gain)、MAP (Mean Average Precision) 等指标,来衡量搜索结果的质量。
- 人工标注:定期进行人工标注,为搜索结果提供真实的相关性反馈,用于模型训练和评估。
5.3 混合过滤与排序
- 先过滤后搜索:可以利用 Elasticsearch 强大的过滤能力,先对文档进行精确过滤(如按商品类别、价格范围等),然后在过滤后的子集上进行语义搜索。
- 语义过滤:利用 Milvus 对向量空间进行过滤,例如,只搜索与某个概念向量距离在一定范围内的文档。
- 后处理排序:在融合结果之后,可以根据业务需求再进行一次排序,例如,优先显示有库存的商品,或优先显示最新发布的文章。
5.4 查询扩展与改写
- 语义查询扩展:使用 Milvus 查找与用户查询语义相似的词或短语,然后将这些扩展词添加到 Elasticsearch 查询中,以提高召回率。
- 关键词查询增强:当 Milvus 结果表现不佳时,可以分析查询中的关键词,尝试在 ES 中构建更复杂的查询(如使用
bool查询结合must,should,filter)。
5.5 故障与降级
- 优雅降级:如果一个搜索组件(ES 或 Milvus)出现故障或响应超时,系统应能优雅降级,仅使用另一个组件的结果。
- 超时机制:为每个搜索请求设置合理的超时时间,避免单个组件的性能问题拖垮整个系统。
5.6 多模态搜索的未来
虽然我们专注于文本,但 Milvus 的能力远不止于此。它可以轻松扩展到图像、音频等多模态搜索。未来,一个真正强大的混合搜索系统,可能需要融合文本关键词、文本语义、图像内容、音频特征等多模态信息。
通过今天的探讨,我们深入理解了将 Elasticsearch 的精确关键词匹配与 Milvus 的智能语义向量检索融合的复杂性与必要性。我们剖析了分数不统一的挑战,并探索了 Min-Max 缩放、Sigmoid 缩放等规范化方法。更重要的是,我们详细讲解了加权求和与倒数排序融合(RRF)这两种核心融合策略,并提出了根据用户查询意图进行动态权重分配的先进理念,并通过实际的代码示例展示了如何将这些理论付诸实践。
混合搜索的艺术在于对用户意图的深刻洞察,以及在精度与召回之间寻找动态平衡的持续优化。这是一个迭代的过程,需要不断地实验、评估和调整。希望今天的分享能为各位在构建下一代智能搜索系统时提供有益的思路和实践指导。