解析 ‘Hybrid Search Fusion’:在图中无缝整合 ElasticSearch 关键词与 Milvus 语义向量的权重分配算法

各位开发者,各位对智能搜索充满热情的同仁们,大家好!

今天,我们将深入探讨一个在现代搜索系统中至关重要的议题:混合搜索融合(Hybrid Search Fusion)。具体来说,我们将聚焦于如何将强大的关键词搜索引擎ElasticSearch与前沿的语义向量数据库Milvus无缝整合,并通过精心设计的权重分配算法,实现搜索结果的智能排序和展示。这不仅仅是技术的堆叠,更是对用户意图深度理解与精准匹配的艺术。

在当今信息爆炸的时代,用户对搜索的期望已经远远超越了简单的关键词匹配。他们需要系统能够理解他们的意图,即使是模糊的、口语化的查询,也能返回高度相关的结果。然而,纯粹的关键词搜索往往受限于词汇的精确度,而纯粹的语义搜索又可能在某些特定场景下,如产品ID、精确名称匹配时显得力不从心。混合搜索正是为了解决这一矛盾而生。

ElasticSearch作为业界领先的关键词搜索和分析引擎,凭借其倒排索引、BM25等成熟的评分算法,在处理结构化和半结构化数据、实现精确匹配和复杂过滤方面表现卓越。而Milvus,作为一款为大规模向量相似度搜索而生的数据库,则能够存储和检索由深度学习模型生成的语义向量,从而实现对查询意图和文档内容的深层语义理解。

将这两者结合起来,就形成了一个既能捕捉关键词的精确性,又能理解语义的广度与深度的强大搜索范式。然而,真正的挑战在于,我们如何将来自两个完全不同系统、采用不同评分机制的结果进行有效整合,并根据业务需求和查询特性,智能地分配关键词与语义搜索结果的权重。这正是我们今天要详细剖析的“权重分配算法”的核心所在。

一、 奠基石:理解关键词与语义搜索的核心机制

在深入探讨融合策略之前,我们必须首先透彻理解ElasticSearch和Milvus各自的工作原理及其优势与局限性。这将为我们后续的融合设计提供坚实的基础。

1.1 ElasticSearch:关键词搜索的基石

ElasticSearch是一个基于Lucene的分布式、RESTful风格的搜索和分析引擎。它的核心在于倒排索引,这是一种将文档中的词项映射到包含这些词项的文档列表的数据结构。

工作原理简述:
当一个文档被索引时,ElasticSearch会对其进行文本分析(分词、词干提取、停用词移除等),然后将这些处理后的词项及其在文档中的位置、频率等信息存储在倒排索引中。
当用户发起查询时,ElasticSearch会:

  1. 查询分析: 对查询字符串进行与索引时类似的分析处理。
  2. 匹配: 在倒排索引中查找匹配的词项,获取包含这些词项的文档列表。
  3. 评分: 使用相关性算法(默认为BM25)为每个匹配的文档计算一个相关性分数。BM25算法考虑了词频(Term Frequency, TF)、逆文档频率(Inverse Document Frequency, IDF)以及文档长度等因素,旨在评估文档与查询的相关程度。
  4. 排序: 根据计算出的相关性分数对文档进行排序并返回。

代码示例:ElasticSearch基本查询

假设我们有一个索引名为products,其中包含产品信息。

from elasticsearch import Elasticsearch

# 连接ElasticSearch实例
es = Elasticsearch(
    hosts=[{'host': 'localhost', 'port': 9200, 'scheme': 'http'}],
    basic_auth=('elastic', 'your_password') # 根据实际情况修改
)

# 确保索引存在并有数据
# 如果索引不存在,可以先创建一个并添加一些数据
# es.indices.create(index='products', ignore=400)
# es.index(index='products', id=1, document={'name': 'Gaming Laptop i7 16GB RAM RTX3060', 'description': 'High-performance laptop for serious gamers.'})
# es.index(index='products', id=2, document={'name': 'Office Laptop i5 8GB RAM', 'description': 'Reliable laptop for daily office tasks.'})
# es.index(index='products', id=3, document={'name': 'Mechanical Keyboard RGB', 'description': 'Gaming keyboard with customizable RGB lighting.'})

def search_es_keyword(query_text: str, index_name: str = 'products', size: int = 10):
    """
    使用ElasticSearch进行关键词搜索。
    """
    search_body = {
        "query": {
            "match": {
                "name": { # 假设我们主要搜索产品名称字段
                    "query": query_text,
                    "fuzziness": "AUTO" # 允许一定程度的模糊匹配
                }
            }
        },
        "size": size
    }
    try:
        response = es.search(index=index_name, body=search_body)
        results = []
        for hit in response['hits']['hits']:
            results.append({
                'id': hit['_id'],
                'score': hit['_score'],
                'source': hit['_source']
            })
        return results
    except Exception as e:
        print(f"ElasticSearch search error: {e}")
        return []

# 示例调用
# keyword_results = search_es_keyword("laptop i7 gaming")
# print("ElasticSearch Keyword Results:")
# for res in keyword_results:
#     print(f"  ID: {res['id']}, Score: {res['score']:.2f}, Name: {res['source']['name']}")

ElasticSearch的优势与局限性:

特性 优势 局限性
精确匹配 擅长处理精确的关键词、短语、产品SKU、ID等。 对词汇的变体、同义词、近义词理解能力有限。
过滤与聚合 强大的过滤、聚合功能,支持复杂的数据分析和统计。 无法直接理解查询的深层语义或上下文,例如“最好的编程电脑”这种模糊查询。
可解释性 评分机制相对透明(BM25),易于理解结果的来源。 面对用户使用口语化、非标准词汇时,召回率可能较低。
实时性 近实时索引,查询速度快。 需要对文本进行严格的分析配置,否则可能影响相关性。
数据类型 擅长处理文本数据,支持结构化和半结构化数据。 不直接支持高维向量相似度搜索。

1.2 Milvus:语义搜索的利器

Milvus是一个开源的向量数据库,专门用于存储、索引和查询大规模的嵌入向量。它通过将文本、图像、音频等非结构化数据转换为高维向量(通常由深度学习模型生成),然后在向量空间中进行相似度搜索,从而实现语义理解。

工作原理简述:

  1. 向量嵌入(Embedding): 原始数据(如文本)首先通过预训练的深度学习模型(如Sentence-BERT、OpenAI Embeddings、Word2Vec等)转换为固定长度的数字向量。这些向量捕获了数据的语义信息,在向量空间中,语义相似的数据点彼此靠近。
  2. 向量索引: Milvus将这些向量存储起来,并构建高效的近似最近邻(ANN)索引(如HNSW, IVF_FLAT等)。ANN算法允许在海量向量中快速找到与查询向量最相似的向量,尽管不保证找到的总是“最近”的那个,但在实践中效率极高。
  3. 向量查询: 当用户发起查询时,查询字符串同样被转换为一个查询向量。Milvus使用ANN索引查找与查询向量最相似的K个向量,并返回这些向量对应的原始数据ID及其相似度分数(如余弦相似度、欧氏距离)。

代码示例:Milvus基本查询

这里我们使用一个简化的embedding_model来模拟生成向量,实际应用中会使用专业的NLP模型。

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
from typing import List, Dict
import numpy as np
import random

# 模拟一个简单的embedding模型
class MockEmbeddingModel:
    def embed_text(self, text: str) -> List[float]:
        # 实际应用中,这里会调用Sentence-BERT, OpenAI Embeddings等模型
        # 为简化,这里返回一个随机生成的固定维度向量
        # 确保每次对相同文本返回相同向量 (在实际模型中是这样)
        random.seed(hash(text)) # 保持对同一文本的向量一致性
        return [random.uniform(-1, 1) for _ in range(128)] # 假设向量维度是128

embedding_model = MockEmbeddingModel()

# 连接Milvus实例
connections.connect("default", host="localhost", port="19530")

COLLECTION_NAME = "product_embeddings"
DIMENSION = 128 # 向量维度,与embedding模型输出一致

def create_milvus_collection():
    """创建Milvus collection"""
    if utility.has_collection(COLLECTION_NAME):
        utility.drop_collection(COLLECTION_NAME)

    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512), # 存储原始文本,方便检索
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
    ]
    schema = CollectionSchema(fields, description="Product embeddings for semantic search")
    collection = Collection(COLLECTION_NAME, schema)

    # 创建索引
    index_params = {
        "metric_type": "COSINE", # 余弦相似度
        "index_type": "HNSW",
        "params": {"M": 8, "efConstruction": 64}
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    collection.load() # 加载 collection 到内存

    return collection

collection = create_milvus_collection()

def insert_milvus_data(data: List[Dict]):
    """
    插入数据到Milvus。
    data 格式: [{"id": 1, "text": "Gaming Laptop i7 16GB RAM RTX3060"}, ...]
    """
    entities = []
    ids = []
    texts = []
    embeddings = []

    for item in data:
        ids.append(item['id'])
        texts.append(item['text'])
        embeddings.append(embedding_model.embed_text(item['text']))

    entities.append(ids)
    entities.append(texts)
    entities.append(embeddings)

    collection.insert(entities)
    collection.flush() # 确保数据写入

# 示例数据 (与ES示例数据对应)
# milvus_data = [
#     {"id": 1, "text": "Gaming Laptop i7 16GB RAM RTX3060"},
#     {"id": 2, "text": "Office Laptop i5 8GB RAM"},
#     {"id": 3, "text": "Mechanical Keyboard RGB"}
# ]
# insert_milvus_data(milvus_data)

def search_milvus_semantic(query_text: str, top_k: int = 10) -> List[Dict]:
    """
    使用Milvus进行语义搜索。
    """
    query_vector = embedding_model.embed_text(query_text)

    search_params = {
        "data": [query_vector],
        "anns_field": "embedding",
        "param": {"ef": 64}, # HNSW 搜索参数
        "limit": top_k,
        "expr": None, # 可以添加过滤条件
        "output_fields": ["id", "text"] # 返回原始ID和文本
    }

    try:
        results = collection.search(**search_params)
        formatted_results = []
        for hit in results[0]: # 假设只有一个查询向量
            formatted_results.append({
                'id': hit.id,
                'score': hit.distance, # Milvus返回的是距离,通常需要转换为相似度
                'source': {'name': hit.entity.get('text', 'N/A')} # 从entity获取原始文本
            })
        return formatted_results
    except Exception as e:
        print(f"Milvus search error: {e}")
        return []

# 示例调用 (在实际运行前需要先插入数据并加载collection)
# semantic_results = search_milvus_semantic("powerful computer for software development")
# print("nMilvus Semantic Results:")
# for res in semantic_results:
#     # 对于余弦距离,距离越小越相似,但通常我们会转换为相似度 (1 - distance)
#     # 如果 metric_type 是 COSINE,Milvus 返回的是余弦距离 (0 到 2),0表示完全相同,2表示完全相反
#     # 转换为相似度 (1 - distance/2) 或 (1 - distance) 如果距离是0到1
#     similarity_score = 1 - res['score'] # 假设这里返回的是0-1的余弦距离
#     print(f"  ID: {res['id']}, Similarity: {similarity_score:.2f}, Text: {res['source']['name']}")

Milvus的优势与局限性:

特性 优势 局限性
语义理解 能够理解查询的深层含义,处理同义词、近义词、意译等。 依赖于embedding模型的质量,如果模型训练不足,语义理解可能偏差。
召回率高 对模糊、口语化查询有更好的召回效果。 不擅长精确匹配,如产品ID、SKU等,可能会召回不相关的语义相似结果。
跨语言 通过多语言embedding模型,可以实现跨语言搜索。 向量生成和存储需要额外的计算和存储资源。
数据类型 擅长处理非结构化数据,如文本、图像、音频等。 无法直接进行复杂的关键词过滤和聚合。
可解释性 向量相似度结果不如关键词匹配直观,难以直接解释为何相关。 搜索结果的“Hallucination”(幻觉)风险,即返回表面相似但实际不相关的结果。

1.3 融合的必要性:取长补短

通过上述分析,我们可以清楚地看到ElasticSearch和Milvus各自的优势与劣势。

  • ElasticSearch: 擅长精准、基于关键词的匹配,在用户意图明确、关键词具体的场景下表现卓越,例如搜索“iPhone 15 Pro Max 256GB 蓝色”。
  • Milvus: 擅长语义理解和泛化,在用户意图模糊、表达多样化的场景下表现出色,例如搜索“适合程序员的轻薄笔记本”或“能提升工作效率的工具”。

在实际应用中,用户查询往往介于这两种极端之间,或者同时包含精确和模糊的元素。例如,“高性能游戏笔记本 RTX 4080”,既有精确的“RTX 4080”型号,也有相对模糊的“高性能游戏笔记本”描述。单一的搜索系统都无法完美处理这类查询。

因此,将ElasticSearch和Milvus的优势结合起来,通过智能的融合策略,我们能够构建一个既能满足精确匹配需求,又能提供深度语义理解,从而大幅提升用户搜索体验的混合搜索系统。

二、 融合策略的核心挑战:分值归一化与权重分配

当我们从ElasticSearch和Milvus各自获取到搜索结果后,面临的首要问题是:如何将它们结合起来?

2.1 朴素的融合尝试及其局限性

  1. 简单并集/交集:
    • 并集: 将两者的结果简单地合并。问题是,没有统一的排序标准,用户体验差。
    • 交集: 只返回两者都匹配的文档。问题是召回率过低,很多相关文档可能被遗漏。
  2. 顺序搜索:
    • 先用ElasticSearch搜索,如果结果不满意(如结果数量不足或相关性不高),再用Milvus搜索。这种方式无法真正实现优势互补,且可能增加延迟。
  3. 平行搜索后简单拼接:
    • 同时向ES和Milvus发送查询,然后将返回的两个结果列表简单拼接。同样缺乏统一的排序机制。

这些朴素的方法都无法解决核心问题:ES返回的BM25分数和Milvus返回的余弦相似度分数(或距离)是完全不同尺度和含义的。我们不能直接比较 ES_score = 5.2Milvus_score = 0.85,更不能简单相加。

2.2 核心挑战:分值归一化

要对来自不同来源的搜索结果进行融合和排序,首先必须解决分值归一化(Score Normalization)问题。目标是将不同系统的原始分数映射到一个统一的、可比较的尺度上,通常是[0, 1]

常见的分值归一化技术:

归一化方法 原理 优势 劣势
Min-Max 缩放 X_norm = (X - X_min) / (X_max - X_min) 简单易懂,将数据严格缩放到 [0, 1] 范围。 对异常值非常敏感,X_maxX_min 的波动会剧烈影响结果。适用于已知分数范围且无极端异常值的情况。
Z-score 标准化 X_norm = (X - μ) / σ 对异常值不敏感,适用于数据近似正态分布的情况。 结果不限于 [0, 1] 范围,可能需要进一步处理。
Sigmoid 函数 X_norm = 1 / (1 + exp(-k * (X - X_mid))) 将任意实数映射到 (0, 1) 范围,输出具有概率解释。对异常值有一定鲁棒性。 需要选择合适的 k (陡峭程度) 和 X_mid (中心点)。
Rank-based (RRF) 不对分数进行归一化,而是对排名进行融合。参见 2.4 节。 无需处理不同分数尺度,对异常值鲁棒。 忽略了原始分数的具体大小,只关注排名。

在搜索场景中,Sigmoid 函数是一个非常受欢迎的选择,因为它能将分数映射到 (0, 1) 范围,且对极端值有一定抑制作用,使得分数曲线更平滑,更符合相关性从低到高逐渐饱和的直觉。

代码示例:Sigmoid 归一化

import math

def sigmoid_normalize(score: float, k: float = 1.0, x_mid: float = 0.0) -> float:
    """
    使用Sigmoid函数进行分数归一化。
    Args:
        score: 原始分数。
        k: 曲线的陡峭程度。k越大,曲线越陡峭,分数变化越快。
        x_mid: 曲线的中心点,即 sigmoid(x_mid) = 0.5。
    Returns:
        归一化后的分数 (0, 1)。
    """
    return 1 / (1 + math.exp(-k * (score - x_mid)))

# 示例:
# 假设ES分数通常在 0-10 之间,平均在 2-3
# 假设Milvus相似度在 0-1 之间,平均在 0.7-0.85
# 我们需要根据实际数据分布来调整 k 和 x_mid
# 对于ES分数,假设我们希望 2 分左右对应 0.5,且分数越高越重要
# es_k = 0.5 # 调整陡峭度
# es_x_mid = 2.0 # 调整中心点

# 对于Milvus分数 (假设已转换为0-1的相似度),假设我们希望 0.8 左右对应 0.5
# milvus_k = 10.0 # 相似度变化较快,所以 k 可以大一些
# milvus_x_mid = 0.8 # 中心点

# print(f"ES Score 1.0 normalized: {sigmoid_normalize(1.0, k=es_k, x_mid=es_x_mid):.2f}")
# print(f"ES Score 2.0 normalized: {sigmoid_normalize(2.0, k=es_k, x_mid=es_x_mid):.2f}")
# print(f"ES Score 5.0 normalized: {sigmoid_normalize(5.0, k=es_k, x_mid=es_x_mid):.2f}")

# print(f"Milvus Score 0.6 normalized: {sigmoid_normalize(0.6, k=milvus_k, x_mid=milvus_x_mid):.2f}")
# print(f"Milvus Score 0.8 normalized: {sigmoid_normalize(0.8, k=milvus_k, x_mid=milvus_x_mid):.2f}")
# print(f"Milvus Score 0.9 normalized: {sigmoid_normalize(0.9, k=milvus_k, x_mid=milvus_x_mid):.2f}")

如何确定 kx_mid

这是一个经验性和实验性的过程:

  • x_mid 通常设置为你认为的“中等相关”或“及格线”分数。例如,如果ES平均相关性分数为2.5,你可以将x_mid设为2.5。如果Milvus平均相似度为0.75,可以设为0.75。
  • k 控制曲线的陡峭程度。k越大,分数对微小变化的敏感度越高。如果你的分数分布比较集中,可能需要较大的k来区分;如果分数分布很广,可能需要较小的k。这通常需要通过观察实际分数分布、结合业务需求和A/B测试来调优。

2.3 核心挑战:权重分配算法

当分数被归一化到统一尺度后,下一个关键问题就是如何分配关键词搜索和语义搜索的权重。

Final Score = W_keyword * Normalized_ES_Score + W_semantic * Normalized_Milvus_Score

其中 W_keyword + W_semantic = 1

这个权重分配可以有多种策略:

2.3.1 固定权重 (Fixed Weights)

最简单直接的方法是为关键词搜索和语义搜索设定一组固定的权重。例如,始终将关键词权重设为0.6,语义权重设为0.4。

优势: 实现简单,易于理解。
劣势: 缺乏灵活性,无法适应不同查询类型和用户意图的变化。对于精确查询,可能语义权重过高;对于模糊查询,可能关键词权重过高。

2.3.2 基于启发式的动态权重 (Heuristic-based Dynamic Weights)

这种方法根据查询本身的特性,动态调整关键词和语义搜索的权重。

启发式规则示例:

  • 查询长度: 短查询(如“iPhone 15”)可能更偏向关键词匹配;长查询(如“有什么适合旅行的轻便相机”)可能更偏向语义理解。
  • 关键词类型: 如果查询包含产品ID、SKU、品牌名称等强实体词,增加关键词权重。如果包含形容词、动词等描述性词语,增加语义权重。
  • 查询词性分析: 对查询进行词性标注,如果名词、专有名词居多,关键词权重高;如果动词、形容词居多,语义权重高。
  • 查询意图分类: 通过机器学习模型将查询分类为“精确匹配型”、“探索型”、“问答型”等,然后为每种类型预设不同的权重。

代码示例:简单启发式权重分配

def get_dynamic_weights(query_text: str) -> tuple[float, float]:
    """
    根据查询文本的特点动态分配ES和Milvus的权重。
    Args:
        query_text: 用户查询字符串。
    Returns:
        (keyword_weight, semantic_weight)
    """
    query_length = len(query_text.split()) # 按空格分隔计算词数

    keyword_indicators = ["型号", "SKU", "ID", "品牌", "规格", "尺寸"]
    semantic_indicators = ["最好", "推荐", "适合", "如何", "什么", "怎么样", "功能"]

    has_keyword_indicator = any(indicator in query_text for indicator in keyword_indicators)
    has_semantic_indicator = any(indicator in query_text for indicator in semantic_indicators)

    # 默认权重
    w_keyword = 0.5
    w_semantic = 0.5

    # 规则 1: 根据查询长度调整
    if query_length <= 3: # 短查询,可能更精确
        w_keyword += 0.1
        w_semantic -= 0.1
    elif query_length >= 7: # 长查询,可能更语义化
        w_keyword -= 0.1
        w_semantic += 0.1

    # 规则 2: 根据关键词/语义指示词调整
    if has_keyword_indicator and not has_semantic_indicator:
        w_keyword += 0.2
        w_semantic -= 0.2
    elif has_semantic_indicator and not has_keyword_indicator:
        w_keyword -= 0.2
        w_semantic += 0.2
    elif has_keyword_indicator and has_semantic_indicator:
        # 如果同时有,保持平衡或根据更复杂的逻辑处理
        pass

    # 确保权重在合理范围 [0, 1] 且和为 1
    w_keyword = max(0.1, min(0.9, w_keyword)) # 避免权重过低或过高
    w_semantic = 1.0 - w_keyword

    return w_keyword, w_semantic

# 示例调用
# w_k, w_s = get_dynamic_weights("iPhone 15 Pro Max 型号")
# print(f"Query: 'iPhone 15 Pro Max 型号' -> Keyword Weight: {w_k:.2f}, Semantic Weight: {w_s:.2f}")
# w_k, w_s = get_dynamic_weights("有哪些适合程序员的轻薄高性能笔记本推荐")
# print(f"Query: '有哪些适合程序员的轻薄高性能笔记本推荐' -> Keyword Weight: {w_k:.2f}, Semantic Weight: {w_s:.2f}")

2.3.3 基于机器学习的权重 (Learning-to-Rank, LTR)

这是最先进、最复杂的权重分配方法。它将融合问题视为一个排序问题,通过训练机器学习模型来学习如何根据各种特征(包括原始ES分数、原始Milvus分数、查询特征、文档特征等)来预测一个最优的融合分数。

核心思想:

  1. 特征工程: 从查询、文档、以及ES和Milvus的原始结果中提取大量特征。例如:
    • ES原始分数
    • Milvus原始分数
    • 查询词数、文档词数
    • 查询与文档的TF-IDF相似度
    • 查询与文档的embedding相似度(可能与Milvus原始分数有重叠,但可以有不同来源的embedding)
    • 查询是否包含数字、特殊字符等
    • 文档的点击率、购买率等行为特征
  2. 数据标注: 需要人工标注大量的查询-文档对的相对相关性。例如,对于查询“高性能笔记本”,文档A(“Gaming Laptop i7”)比文档B(“Office Laptop i5”)更相关。
  3. 模型训练: 使用标注数据训练一个排序模型(如LambdaMART, XGBoost, RankNet等)。模型会学习这些特征与文档相关性之间的复杂关系。
  4. 预测与排序: 在线查询时,提取特征,输入到训练好的模型中,模型输出一个最终的融合分数,然后根据这个分数进行排序。

优势: 能够捕捉特征之间复杂的非线性关系,实现高度优化的融合效果。
劣势: 需要大量的标注数据,模型训练和部署复杂,需要持续的维护和更新。

由于LTR的复杂性,在此处提供完整的代码实现会超出讲座的范围,但理解其原理对于构建高性能的混合搜索系统至关重要。

2.3.4 倒数排名融合 (Reciprocal Rank Fusion, RRF)

RRF是一种非常优雅且强大的融合算法,它的独特之处在于不需要对原始分数进行归一化。它仅依赖于各个搜索系统返回的文档排名。

工作原理:
对于每个文档,RRF计算一个融合分数,该分数是其在各个搜索系统中的倒数排名之和。
RRF_Score(d) = Σ [1 / (k + rank_i(d))]

  • d 是一个文档。
  • rank_i(d) 是文档 d 在第 i 个搜索系统中的排名(1-based)。
  • k 是一个常数,通常取值在 60 左右,用于平滑排名,避免排名靠前的文档分数过高,并给予排名靠后的文档一定的贡献。

优势:

  • 无需分数归一化: 这是RRF最大的优势,避免了调整 kx_mid 的繁琐过程。
  • 对异常值鲁棒: 排名本身就对原始分数的绝对值不敏感。
  • 实现简单: 逻辑清晰,易于实现。

劣势:

  • 忽略分数大小: RRF只关注排名,不关注原始分数的具体大小。如果一个文档在ES中以极高的分数排名第一,而另一个文档只是勉强排名第一,RRF会平等对待它们的“第一名”地位。
  • 可能不如LRT精确: 在有足够标注数据和精心特征工程的情况下,LRT通常能获得更好的效果。

代码示例:Reciprocal Rank Fusion (RRF)

from collections import defaultdict
from typing import List, Dict

def reciprocal_rank_fusion(
    search_results_lists: List[List[Dict]], k: int = 60
) -> List[Dict]:
    """
    使用倒数排名融合 (RRF) 算法合并多个搜索结果列表。
    Args:
        search_results_lists: 一个列表,每个元素是一个搜索系统的结果列表。
                              每个结果是一个字典,包含 'id' 和其他信息。
        k: RRF算法中的平滑常数。
    Returns:
        融合后的结果列表,按RRF分数降序排列。
    """
    fused_scores = defaultdict(float)
    document_data = {} # 存储每个文档的原始数据,用于最终返回

    for results_list in search_results_lists:
        for rank, item in enumerate(results_list):
            doc_id = item['id']
            # 排名从1开始,所以这里是 (rank + 1)
            fused_scores[doc_id] += 1 / (k + (rank + 1))

            # 存储文档的原始数据,确保最终结果包含完整信息
            if doc_id not in document_data:
                document_data[doc_id] = item['source'] # 假设 'source' 包含原始文档内容

    # 将融合分数与文档ID关联,并按分数降序排序
    sorted_fused_results = sorted(
        fused_scores.items(), key=lambda item: item[1], reverse=True
    )

    final_results = []
    for doc_id, score in sorted_fused_results:
        final_results.append({
            'id': doc_id,
            'fused_score': score,
            'source': document_data.get(doc_id, {})
        })
    return final_results

# 示例数据
# es_results_for_rrf = [
#     {'id': 1, 'score': 5.2, 'source': {'name': 'Gaming Laptop i7 16GB RAM RTX3060'}},
#     {'id': 3, 'score': 4.8, 'source': {'name': 'Mechanical Keyboard RGB'}},
#     {'id': 2, 'score': 3.1, 'source': {'name': 'Office Laptop i5 8GB RAM'}}
# ]

# milvus_results_for_rrf = [
#     {'id': 2, 'score': 0.92, 'source': {'name': 'Office Laptop i5 8GB RAM'}}, # Milvus可能认为办公本更符合模糊查询
#     {'id': 1, 'score': 0.88, 'source': {'name': 'Gaming Laptop i7 16GB RAM RTX3060'}},
#     {'id': 4, 'score': 0.85, 'source': {'name': 'Ergonomic Mouse'}} # 假设有新文档
# ]

# # 注意:RRF假设两个列表中的ID是文档的唯一标识。
# # 在实际应用中,需要确保文档ID在两个系统中是对应的。
# fused_rrf_results = reciprocal_rank_fusion([es_results_for_rrf, milvus_results_for_rrf])
# print("nReciprocal Rank Fusion Results:")
# for res in fused_rrf_results:
#     print(f"  ID: {res['id']}, Fused Score: {res['fused_score']:.4f}, Name: {res['source'].get('name', 'N/A')}")

RRF的强大之处在于它不需要了解各个搜索系统的内部评分机制,只需要它们的排名。这使得它在集成异构搜索系统时非常方便。

三、 混合搜索融合的实践架构与实现细节

现在我们已经理解了分值归一化和权重分配的各种策略,接下来我们将把这些组件整合到一个实际的混合搜索系统中。

3.1 架构设计

一个典型的混合搜索系统架构如下:

+-------------------+                                                                +----------------------+
|                   |                                                                |                      |
|  User Query       |                                                                | Embedding Service    |
|                   |                                                                | (e.g., Sentence-BERT)|
+--------+----------+                                                                +----------+-----------+
         |                                                                                     |
         | Query Request                                                                       | Embed Query
         v                                                                                     |
+--------------------------------+                                         +------------------+------------------+
|                                |                                         |                                     |
|  API Gateway / Orchestration   | <---------------------------------------|                                     |
|  (Python Flask/FastAPI Service)|                                         |  Milvus Vector Database             |
|                                |                                         |  (Product Embeddings)               |
+-----------+--------------------+                                         +------------------+------------------+
            |                                                                       ^
            |  1. Parse Query                                                       |
            |  2. Generate Query Embedding (via Embedding Service)                  |
            |  3. Parallel Search Requests                                          | Semantic Search Results (ID, Score)
            |                                                                       |
            +-----------------------------------------------------------------------------------------------------+
            |                                                                       |
            |  Keyword Search Request (e.g., "Gaming Laptop i7")                    |
            |                                                                       |
            v                                                                       |
+--------------------------------+                                         +------------------+------------------+
|                                |                                         |                                     |
|  ElasticSearch Keyword Engine  | <---------------------------------------|                                     |
|  (Product Text Data)           |                                         |  4. Merge & Fuse Results            |
|                                |                                         |     - Normalize Scores              |
+-----------+--------------------+                                         |     - Apply Weighting Algorithm     |
            |                                                                |     - Re-rank                       |
            | Keyword Search Results (ID, Score, Source)                     |                                     |
            |                                                                |  5. Return Fused Results            |
            v                                                                |                                     |
+-----------------------------------------------------------------------------------------------------------------+
|                                                                                                                   |
|                                         Final Fused Search Results                                                |
|                                                                                                                   |
+-----------------------------------------------------------------------------------------------------------------+

关键组件:

  1. API Gateway / Orchestration Service: 这是整个系统的入口,负责接收用户查询,协调后端搜索服务,并执行融合逻辑。通常使用Python (Flask/FastAPI) 或Node.js等框架构建。
  2. Embedding Service: 专门负责将文本(用户查询和文档内容)转换为向量。它可以是一个独立的微服务,使用预训练模型(如Sentence-BERT、OpenAI Embeddings API)来生成高质量的向量。
  3. ElasticSearch: 存储和管理产品文本数据,并提供关键词搜索功能。
  4. Milvus: 存储和管理产品语义向量,并提供向量相似度搜索功能。

3.2 数据同步与索引

在实现混合搜索之前,我们需要确保数据能够被正确地索引到ElasticSearch和Milvus中。

索引流程:

  1. 原始数据: 接收产品数据(如产品名称、描述、类别等)。
  2. ElasticSearch索引: 将产品文本信息直接发送给ElasticSearch进行索引。
  3. Milvus索引:
    • 将产品文本数据发送给Embedding Service
    • Embedding Service生成对应文本的向量。
    • 将生成的向量连同产品ID等元数据发送给Milvus进行索引。
# 假设我们有一个统一的文档结构
# { 'id': 1, 'name': 'Gaming Laptop i7 16GB RAM RTX3060', 'description': '...', 'category': '...' }

def index_document_hybrid(doc: Dict, es_client: Elasticsearch, milvus_collection: Collection, embed_model: MockEmbeddingModel):
    """
    将文档同时索引到ElasticSearch和Milvus。
    """
    doc_id = doc['id']
    text_to_embed = doc['name'] + " " + doc.get('description', '') # 组合文本用于Embedding

    # 1. 索引到ElasticSearch
    es_doc = {k: v for k, v in doc.items() if k != 'id'} # ES自动生成_id或使用doc_id
    es_client.index(index='products', id=doc_id, document=es_doc)
    print(f"Indexed to ES: ID {doc_id}")

    # 2. 索引到Milvus
    embedding = embed_model.embed_text(text_to_embed)
    milvus_collection.insert([[doc_id], [text_to_embed], [embedding]])
    milvus_collection.flush()
    print(f"Indexed to Milvus: ID {doc_id}")

# 示例数据
sample_docs = [
    {'id': 1, 'name': 'Gaming Laptop i7 16GB RAM RTX3060', 'description': 'High-performance laptop for serious gamers.'},
    {'id': 2, 'name': 'Office Laptop i5 8GB RAM', 'description': 'Reliable laptop for daily office tasks, lightweight.'},
    {'id': 3, 'name': 'Mechanical Keyboard RGB', 'description': 'Gaming keyboard with customizable RGB lighting and clicky switches.'},
    {'id': 4, 'name': 'Ergonomic Mouse Wireless', 'description': 'Comfortable wireless mouse for long working hours.'}
]

# 假设 es 和 collection 已经初始化
# for doc in sample_docs:
#     index_document_hybrid(doc, es, collection, embedding_model)

3.3 混合搜索查询与融合实现

现在,我们整合所有组件,实现一个完整的混合搜索功能。

import concurrent.futures # 用于并行执行ES和Milvus查询
import time

def hybrid_search_fusion(
    query_text: str,
    es_client: Elasticsearch,
    milvus_collection: Collection,
    embed_model: MockEmbeddingModel,
    top_k: int = 10,
    fusion_method: str = "weighted_sum", # "weighted_sum" or "rrf"
    es_k_sigmoid: float = 0.5, # Sigmoid parameter for ES
    es_x_mid_sigmoid: float = 2.0, # Sigmoid parameter for ES
    milvus_k_sigmoid: float = 10.0, # Sigmoid parameter for Milvus
    milvus_x_mid_sigmoid: float = 0.8 # Sigmoid parameter for Milvus
) -> List[Dict]:
    """
    执行混合搜索并融合结果。
    """
    start_time = time.time()

    # 1. 并行执行ES和Milvus查询
    es_results = []
    milvus_results = []
    query_vector = embed_model.embed_text(query_text)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # ES查询
        es_future = executor.submit(search_es_keyword, query_text, 'products', top_k * 2) # 多取一些结果供融合
        # Milvus查询
        milvus_search_params = {
            "data": [query_vector],
            "anns_field": "embedding",
            "param": {"ef": 64},
            "limit": top_k * 2,
            "output_fields": ["id", "text"]
        }
        milvus_future = executor.submit(milvus_collection.search, **milvus_search_params)

        es_raw_results = es_future.result()
        milvus_raw_results = milvus_future.result()

        # 格式化 Milvus 结果
        if milvus_raw_results and milvus_raw_results[0]:
            for hit in milvus_raw_results[0]:
                # 对于余弦距离,Milvus返回的是距离 (0到2),0表示完全相同,2表示完全相反
                # 转换为相似度 (1 - distance/2),使其在0-1之间,1表示完全相似
                # 注意:如果 Milvus metric_type 是 COSINE_SIMIL,它可能直接返回相似度
                similarity_score = 1 - (hit.distance / 2.0) 
                milvus_results.append({
                    'id': hit.id,
                    'score': similarity_score, # 使用转换后的相似度
                    'source': {'name': hit.entity.get('text', 'N/A')}
                })

        # 格式化 ES 结果
        es_results = es_raw_results

    # 2. 准备文档集合
    # 将来自ES和Milvus的结果统一到一个字典中,以便按ID处理
    all_docs = {}
    for res in es_results:
        all_docs[res['id']] = {'es_score': res['score'], 'milvus_score': 0.0, 'source': res['source']}
    for res in milvus_results:
        if res['id'] in all_docs:
            all_docs[res['id']]['milvus_score'] = res['score']
        else:
            all_docs[res['id']] = {'es_score': 0.0, 'milvus_score': res['score'], 'source': res['source']}

    # 3. 融合逻辑
    final_fused_results = []
    if fusion_method == "weighted_sum":
        # 3.1. 分值归一化
        normalized_es_scores = {}
        normalized_milvus_scores = {}

        # 如果ES结果为空,则所有ES归一化分数为0
        if es_results:
            max_es_score = max(r['score'] for r in es_results) if es_results else 1.0
            min_es_score = min(r['score'] for r in es_results) if es_results else 0.0
            # 使用Sigmoid归一化
            for doc_id, doc_data in all_docs.items():
                if doc_data['es_score'] > 0: # 仅对有ES分数的文档进行归一化
                    normalized_es_scores[doc_id] = sigmoid_normalize(doc_data['es_score'], k=es_k_sigmoid, x_mid=es_x_mid_sigmoid)
                else:
                    normalized_es_scores[doc_id] = 0.0
        else:
            for doc_id in all_docs:
                normalized_es_scores[doc_id] = 0.0

        if milvus_results:
            # Milvus分数通常是0-1的相似度,我们假设它已经在这个范围内
            # 仍然使用sigmoid,可以进一步调整其分布,例如让0.85以上的分数更显著
            for doc_id, doc_data in all_docs.items():
                if doc_data['milvus_score'] > 0: # 仅对有Milvus分数的文档进行归一化
                    normalized_milvus_scores[doc_id] = sigmoid_normalize(doc_data['milvus_score'], k=milvus_k_sigmoid, x_mid=milvus_x_mid_sigmoid)
                else:
                    normalized_milvus_scores[doc_id] = 0.0
        else:
            for doc_id in all_docs:
                normalized_milvus_scores[doc_id] = 0.0

        # 3.2. 权重分配 (使用启发式动态权重)
        w_keyword, w_semantic = get_dynamic_weights(query_text)
        print(f"Dynamic Weights for '{query_text}': Keyword={w_keyword:.2f}, Semantic={w_semantic:.2f}")

        # 3.3. 计算最终融合分数并排序
        for doc_id, doc_data in all_docs.items():
            final_score = (w_keyword * normalized_es_scores.get(doc_id, 0.0) +
                           w_semantic * normalized_milvus_scores.get(doc_id, 0.0))
            final_fused_results.append({
                'id': doc_id,
                'fused_score': final_score,
                'es_norm_score': normalized_es_scores.get(doc_id, 0.0),
                'milvus_norm_score': normalized_milvus_scores.get(doc_id, 0.0),
                'source': doc_data['source']
            })

        final_fused_results = sorted(final_fused_results, key=lambda x: x['fused_score'], reverse=True)

    elif fusion_method == "rrf":
        # RRF不需要分数归一化,直接使用原始结果的排名
        # 需要将ES和Milvus的结果格式化成RRF函数期望的列表形式
        rrf_input_lists = []
        if es_results:
            rrf_input_lists.append(es_results)
        if milvus_results:
            rrf_input_lists.append(milvus_results)

        if rrf_input_lists:
            final_fused_results = reciprocal_rank_fusion(rrf_input_lists)
        else:
            final_fused_results = []
    else:
        raise ValueError(f"Unknown fusion_method: {fusion_method}")

    end_time = time.time()
    print(f"Hybrid search completed in {end_time - start_time:.2f} seconds.")

    return final_fused_results[:top_k] # 返回前 top_k 个结果

# --- 主程序入口(需要先运行ES和Milvus,并初始化连接和数据) ---
if __name__ == "__main__":
    # 初始化ES和Milvus连接、embedding模型
    es_client = Elasticsearch(
        hosts=[{'host': 'localhost', 'port': 9200, 'scheme': 'http'}],
        basic_auth=('elastic', 'your_password') # 根据实际情况修改
    )
    connections.connect("default", host="localhost", port="19530")
    milvus_collection = Collection(COLLECTION_NAME)
    milvus_collection.load() # 确保Milvus collection已加载
    embedding_model = MockEmbeddingModel()

    # 确保ES索引和Milvus collection有数据
    # for doc in sample_docs: # 运行一次此循环以索引数据
    #     index_document_hybrid(doc, es_client, milvus_collection, embedding_model)
    # time.sleep(2) # 等待数据刷新

    print("n--- Testing Weighted Sum Fusion ---")
    query1 = "gaming laptop powerful"
    fused_results1 = hybrid_search_fusion(query1, es_client, milvus_collection, embedding_model, top_k=5, fusion_method="weighted_sum")
    print(f"nHybrid Search Results for '{query1}':")
    for i, res in enumerate(fused_results1):
        print(f"{i+1}. ID: {res['id']}, Fused Score: {res['fused_score']:.4f}, "
              f"ES Norm: {res['es_norm_score']:.2f}, Milvus Norm: {res['milvus_norm_score']:.2f}, "
              f"Name: {res['source'].get('name', 'N/A')}")

    print("n--- Testing RRF Fusion ---")
    query2 = "office work reliable computer"
    fused_results2 = hybrid_search_fusion(query2, es_client, milvus_collection, embedding_model, top_k=5, fusion_method="rrf")
    print(f"nHybrid Search Results for '{query2}' (RRF):")
    for i, res in enumerate(fused_results2):
        print(f"{i+1}. ID: {res['id']}, Fused Score: {res['fused_score']:.4f}, "
              f"Name: {res['source'].get('name', 'N/A')}")

    print("n--- Testing Weighted Sum Fusion with precise query ---")
    query3 = "Mechanical Keyboard RGB"
    fused_results3 = hybrid_search_fusion(query3, es_client, milvus_collection, embedding_model, top_k=5, fusion_method="weighted_sum")
    print(f"nHybrid Search Results for '{query3}':")
    for i, res in enumerate(fused_results3):
        print(f"{i+1}. ID: {res['id']}, Fused Score: {res['fused_score']:.4f}, "
              f"ES Norm: {res['es_norm_score']:.2f}, Milvus Norm: {res['milvus_norm_score']:.2f}, "
              f"Name: {res['source'].get('name', 'N/A')}")

代码解析:

  1. 并行查询: 使用concurrent.futures.ThreadPoolExecutor同时向ElasticSearch和Milvus发送查询,以减少总延迟。
  2. 结果收集与统一: 将ES和Milvus返回的原始结果收集起来,并根据文档ID进行合并。对于在某个系统中未找到的文档,其对应分数设为0。Milvus返回的距离需要转换为相似度。
  3. 分值归一化:weighted_sum模式下,对ES和Milvus的原始分数分别进行Sigmoid归一化。es_k_sigmoid, es_x_mid_sigmoid, milvus_k_sigmoid, milvus_x_mid_sigmoid参数需要根据实际数据分布进行调优。
  4. 权重分配: 调用get_dynamic_weights函数,根据查询文本动态计算关键词和语义权重。
  5. 融合与排序:
    • Weighted Sum: 将归一化后的分数乘以各自的权重,然后相加得到最终融合分数,并根据此分数进行排序。
    • RRF: 直接将ES和Milvus的结果列表作为输入,调用reciprocal_rank_fusion函数获取融合后的结果。
  6. 返回结果: 截取前 top_k 个结果返回。

3.4 性能优化与评估

性能优化:

  • 并行化: 确保ES和Milvus的查询是并行执行的。
  • 缓存: 对热门查询的embedding和搜索结果进行缓存。
  • 高效Embedding: 选择推理速度快、效果好的embedding模型。Embedding Service的扩容性也很重要。
  • 索引优化: 保持ElasticSearch和Milvus的索引健康,定期优化。

评估指标:

  • 相关性指标:
    • Precision@K (P@K): 前K个结果中相关文档的比例。
    • Recall@K (R@K): 所有相关文档中,前K个结果召回的比例。
    • F1-score: Precision和Recall的调和平均。
    • Normalized Discounted Cumulative Gain (NDCG): 考虑了相关性等级和位置的指标。
    • Mean Reciprocal Rank (MRR): 衡量第一个相关结果出现的位置。
  • 用户体验指标:
    • 点击率 (CTR)
    • 转化率
    • 平均查询时长
    • 搜索结果满意度(通过用户反馈或A/B测试收集)

持续的A/B测试和用户反馈是优化融合算法和权重分配的关键。

四、 持续优化与未来展望

混合搜索融合是现代智能搜索系统的核心能力。通过精心设计的分值归一化和权重分配算法,我们能够充分发挥ElasticSearch在关键词匹配上的精确性和Milvus在语义理解上的广度,为用户提供更智能、更准确、更符合其意图的搜索体验。

从简单的固定权重到复杂的Learning-to-Rank模型,权重分配算法的选择应根据业务需求、数据可用性和团队资源进行权衡。RRF作为一种无需分数归一化的强大方法,在许多场景下提供了一个优秀的起点。

未来,混合搜索的演进将更加注重:多模态搜索(结合文本、图片、视频等多种信息)、个性化搜索(根据用户历史行为调整权重)、以及利用更先进的AI模型(如大型语言模型)进行更深层次的查询意图理解和结果重排。这是一个充满活力和创新的领域,值得我们持续探索和投入。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注