局部嵌入缓存:利用 Redis 优化向量计算
各位同仁,下午好!今天我们来深入探讨一个在现代 AI 应用中至关重要的性能优化策略:局部嵌入缓存 (Local Embedding Caching)。随着自然语言处理(NLP)技术,特别是大型语言模型(LLM)的飞速发展,文本嵌入(Text Embeddings)已成为构建语义搜索、推荐系统、问答系统、RAG (Retrieval-Augmented Generation) 等应用的核心。然而,生成这些高维向量的过程往往资源密集且耗时。本讲座将聚焦如何利用高性能内存数据库 Redis 缓存已生成的向量,从而避免对同一段文本进行重复计算,显著提升系统性能和用户体验。
1. 嵌入(Embeddings)的基石与其计算成本
在深入缓存策略之前,我们必须首先理解什么是文本嵌入,以及为什么它们的计算成本如此之高。
1.1 什么是文本嵌入?
文本嵌入是一种将文本(如单词、句子、段落甚至文档)映射到低维或高维实数向量空间的技术。这些向量捕捉了文本的语义信息和上下文关系,使得语义上相似的文本在向量空间中距离更近。例如,“猫”和“小猫”的嵌入向量会比“猫”和“汽车”的向量更接近。
常用的嵌入模型包括:
- 经典模型: Word2Vec, GloVe, FastText (基于统计和浅层神经网络)。
- 上下文感知模型: BERT, RoBERTa, XLNet (基于 Transformer 架构,能理解词语在不同上下文中的含义)。
- 句子/段落嵌入模型: Sentence-BERT (SBERT), OpenAI Embeddings API, Cohere Embeddings (专门用于生成高质量的句子或段落级嵌入)。
这些嵌入向量是许多现代 NLP 应用的基石,例如:
- 语义搜索: 用户查询的嵌入与文档库中所有文本的嵌入进行相似度比较,找出最相关的文档。
- 推荐系统: 用户兴趣描述或已消费内容的嵌入与商品/内容的嵌入进行匹配。
- 聚类与分类: 基于文本嵌入的相似性进行无监督聚类或有监督分类。
- RAG 系统: 从知识库中检索相关上下文,然后输入给 LLM 生成更准确和相关的回答。
1.2 嵌入计算的成本分析
尽管嵌入带来了巨大的价值,但其计算并非没有代价。
1.2.1 计算资源消耗:
生成嵌入通常需要深度学习模型进行推理。这些模型,尤其是基于 Transformer 的模型,参数量巨大,需要大量的浮点运算。
- CPU 密集型: 对于中小型模型或低吞吐量场景,CPU 可以胜任。但当请求量增加时,CPU 很快会成为瓶颈。
- GPU 密集型: 对于大型模型和高吞吐量场景,GPU 是首选,因为其并行计算能力能显著加速推理。然而,GPU 资源昂贵且稀缺。
1.2.2 延迟(Latency):
模型推理需要一定时间。对于实时应用(如交互式搜索、聊天机器人),即使是几十毫秒的延迟也可能影响用户体验。如果每次请求都需要重新计算嵌入,累计的延迟将不可接受。
1.2.3 API 调用成本:
许多高质量的嵌入服务(如 OpenAI Embeddings API, Cohere Embeddings API)是按使用量收费的。每次 API 调用都会产生费用。在没有缓存的情况下,对相同文本的重复请求将导致重复计费,迅速推高运营成本。
1.2.4 扩展性挑战:
随着用户量和数据量的增长,如果没有有效的优化策略,后端服务需要不断增加计算资源来处理嵌入请求,这不仅成本高昂,而且扩展过程复杂。
综上所述,虽然嵌入是强大的工具,但其计算成本、延迟和资源消耗是真实存在的。为了构建高效、可扩展且经济的 AI 应用,我们必须寻找到一种机制来缓解这些问题,而“缓存”正是解决之道。
2. 局部嵌入缓存:核心理念与优势
“局部嵌入缓存”的核心思想是:当一段文本的嵌入向量被首次计算出来后,将其存储起来。当后续对同一段文本再次请求其嵌入时,直接从存储中读取,而不是重新计算。这里的“局部”指的是缓存服务通常与应用服务部署在同一网络区域,以最小化访问延迟。
2.1 缓存的工作原理
一个典型的嵌入缓存流程如下:
- 请求: 应用服务收到一段文本
T,需要获取其嵌入E(T)。 - 缓存查询: 应用首先检查缓存中是否已存在文本
T的嵌入。- 缓存命中 (Cache Hit): 如果缓存中找到了
E(T),直接返回给应用。这个过程非常快,避免了昂贵的计算。 - 缓存未命中 (Cache Miss): 如果缓存中没有
E(T),应用会调用嵌入模型(或外部 API)来计算E(T)。
- 缓存命中 (Cache Hit): 如果缓存中找到了
- 缓存存储: 计算出
E(T)后,将其存储到缓存中,并返回给应用。下次再有对T的请求时,就能直接命中缓存。
2.2 缓存的显著优势
采用局部嵌入缓存能带来多方面的好处:
- 降低延迟: 缓存命中时,响应时间从数百毫秒(模型推理时间)降低到几毫秒(缓存读取时间),极大地提升了用户体验。
- 节省计算资源: 减少了 GPU/CPU 的负载,使得现有资源能够处理更多的请求,或允许使用更小的实例规模。
- 降低 API 成本: 对外部嵌入 API 的调用次数大幅减少,直接转化为运营成本的节约。
- 提高系统吞吐量: 缓存减轻了后端服务的压力,使得系统能够处理更高的并发请求量。
- 提升稳定性与可靠性: 减少对外部嵌入服务的依赖,即使外部服务暂时不可用或限流,缓存也能提供一定程度的服务。
2.3 适用场景
局部嵌入缓存尤其适用于以下场景:
- 重复性高的文本: 如果应用中存在大量重复出现的文本(如商品名称、FAQ 问题、文档标题),缓存收益最大。
- 高并发/高吞吐量系统: 任何需要快速响应和处理大量请求的系统都将受益于缓存。
- 成本敏感型应用: 对于依赖付费嵌入 API 的应用,缓存是降低成本的关键策略。
- 离线数据预处理: 即使是离线任务,缓存也能加速处理流程,尤其是在调试或重新运行任务时。
3. Redis:嵌入缓存的理想选择
选择合适的缓存存储是实现高效局部嵌入缓存的关键。Redis,作为一款开源、高性能的内存数据结构存储,是这一任务的绝佳选择。
3.1 为什么选择 Redis?
Redis 具备多项特性,使其成为嵌入缓存的理想方案:
- 极致的速度: Redis 是一个内存数据库,所有数据都存储在 RAM 中,读写速度极快,通常在微秒级别。这对于需要低延迟的缓存至关重要。
- 丰富的数据结构: Redis 不仅仅是一个简单的键值存储,它支持多种高级数据结构,如字符串 (Strings)、哈希 (Hashes)、列表 (Lists)、集合 (Sets)、有序集合 (Sorted Sets) 等。这为我们存储嵌入数据提供了灵活性。
- 持久化选项: 尽管是内存数据库,Redis 提供了 RDB (快照) 和 AOF (Append-Only File) 两种持久化机制,可以在 Redis 重启后恢复数据,避免缓存数据的完全丢失。
- 内置过期策略 (TTL): Redis 允许为键设置生存时间 (Time-To-Live, TTL)。键在 TTL 到期后会自动被删除,这对于管理缓存数据的生命周期非常有用,可以防止缓存过时。
- 灵活的内存管理与淘汰策略: 当内存达到上限时,Redis 可以根据配置的淘汰策略(如 LRU, LFU, Random 等)自动删除旧数据,确保缓存的平稳运行。
- 原子操作: Redis 的所有操作都是原子性的,这意味着在多线程或分布式环境中,无需额外的锁机制就能保证数据的一致性。
- 高可用与可扩展性: Redis 支持主从复制 (Replication) 和集群 (Cluster),可以实现高可用和水平扩展,满足大规模应用的需求。
- 广泛的客户端支持: 几乎所有主流编程语言都有成熟的 Redis 客户端库,方便集成。
3.2 Redis 数据结构的选择
针对嵌入缓存,我们主要关注 Redis 的 String 和 Hash 数据结构。
3.2.1 String (字符串)
- 描述: 最简单的数据结构,一个键对应一个值。值可以是任何二进制安全的数据,最大 512MB。
- 用于嵌入缓存:
- 键 (Key): 文本的唯一标识符(通常是文本内容的哈希值)。
- 值 (Value): 序列化后的嵌入向量。
- 优点:
- 简单直观,易于实现。
GET和SET操作非常快。
- 缺点:
- 每个嵌入向量都需要一个独立的 Redis 键。
- 无法直接存储与嵌入相关的元数据(如模型版本、生成时间)而不在键名中编码或使用额外的键。
3.2.2 Hash (哈希)
- 描述: 存储字段-值对的无序字典。适合存储对象。
- 用于嵌入缓存:
- 键 (Key): 文本的唯一标识符(与 String 相同)。
- 字段 (Field): 例如
embedding、model_name、model_version、timestamp等。 - 值 (Value): 对应字段的具体数据,例如序列化后的嵌入向量、模型名称字符串等。
- 优点:
- 可以将相关数据(嵌入向量及其元数据)聚合在一个 Redis 键下,节省键空间。
- 通过
HGETALL可以一次性获取所有元数据,或通过HGET获取特定字段。
- 缺点:
- 操作稍微复杂于
String。 HGET和HSET的性能略低于GET和SET,但在实际应用中差异通常可忽略不计。
- 操作稍微复杂于
数据结构选择对比表:
| 特性 | Redis String | Redis Hash |
|---|---|---|
| 存储方式 | key -> serialized_embedding |
key -> {field1: value1, field2: value2, ...} |
| 键空间 | 每个嵌入占用一个键 | 多个元数据字段共享一个键 |
| 元数据 | 难以直接存储,需编码到键名或额外键 | 可清晰地存储为哈希字段,如 model_version, timestamp |
| 读写操作 | GET, SET (最快) |
HGET, HSET, HGETALL (稍慢,但功能更丰富) |
| 复杂性 | 简单 | 稍高,但结构化程度更好 |
| 推荐场景 | 仅需缓存嵌入向量,无需额外元数据时 | 需要缓存嵌入向量并附带模型版本、生成时间等元数据时 |
在大多数实际场景中,推荐使用 Hash 结构。它提供了更好的结构化和可扩展性,使得在未来需要添加更多元数据时更加方便,也便于维护和调试。例如,当模型版本更新时,我们可以通过检查 model_version 字段来判断缓存是否过期。
4. 局部嵌入缓存的设计策略
一个健壮的嵌入缓存系统需要精心设计以下几个关键方面。
4.1 缓存键的生成
缓存键必须能够唯一且确定性地标识一段文本的嵌入。这意味着对于相同的输入文本和相同的嵌入模型,必须生成相同的缓存键。
4.1.1 文本归一化 (Text Normalization):
这是至关重要的一步。不同的文本表示形式(如大小写、标点符号、多余空格)可能会导致相同的语义内容生成不同的哈希值,从而导致缓存未命中。
归一化步骤通常包括:
- 转小写: 将所有文本转换为小写,例如 "Hello World" 和 "hello world" 将视为相同。
- 去除标点符号: 移除逗号、句号、问号等。
- 去除多余空格: 将多个连续空格替换为单个空格,并修剪文本两端的空格。
- Unicode 规范化: 处理不同 Unicode 表示形式下的字符(例如
é和é)。 - 排序: 对于无序的元素列表(如关键词),可以对其进行排序以确保一致性。
4.1.2 哈希算法:
对归一化后的文本进行哈希计算,生成一个固定长度的字符串作为缓存键。常用的哈希算法有 MD5、SHA1、SHA256 等。SHA256 提供更好的碰撞抗性,是更稳健的选择。
4.1.3 模型版本和名称:
一个极其重要的考量是,不同的嵌入模型或同一模型的不同版本会生成不同的嵌入向量。因此,缓存键必须包含模型名称和版本信息,以避免混淆。
推荐的缓存键格式:
f"{model_name}:{model_version}:{sha256_hash(normalized_text)}"
例如:"openai-ada-002:v1:a1b2c3d4e5f6..." 或 "sbert-all-mpnet-base-v2:v1.0.0:x1y2z3a4b5c6..."。
4.2 嵌入向量的序列化与反序列化
嵌入向量本质上是浮点数的列表或 NumPy 数组。Redis String 或 Hash 的值只能存储字符串或二进制数据,因此需要将向量进行序列化。
4.2.1 序列化方法:
- JSON:
- 优点: 语言无关,人类可读,广泛支持。
- 缺点: 字符串表示,相比二进制数据占用更多空间,解析略慢。对于大量浮点数,精度可能需要注意(但通常不是问题)。
- 示例:
json.dumps([0.1, 0.2, -0.3, ...])
- Pickle (Python 专属):
- 优点: Python 原生,能序列化几乎所有 Python 对象,效率通常高于 JSON (对于 NumPy 数组)。
- 缺点: 仅限 Python,不跨语言。存在安全风险(反序列化恶意数据)。
- 示例:
pickle.dumps(np.array([0.1, 0.2, ...]))
- NumPy
tobytes()/frombuffer():- 优点: 对于 NumPy 数组来说,这是最紧凑、最高效的序列化方式,直接将数组转换为字节串。
- 缺点: 仅限 NumPy 数组,需要知道数据类型 (dtype) 和维度 (shape) 进行反序列化。
- 示例:
embedding_array.astype(np.float32).tobytes()
- Base64 编码 (结合上述方法):
- 如果选择 NumPy
tobytes(),结果是二进制数据。如果 RedisString的值是二进制安全的,可以直接存储。但如果需要通过某些中间层传输或存储在非二进制安全的字段中(例如某些 HTTP Header),可以使用 Base64 对二进制数据进行编码成 ASCII 字符串。 - 优点: 将二进制数据转换为可打印的 ASCII 字符串。
- 缺点: 增加约 33% 的数据大小。
- 如果选择 NumPy
推荐策略:
对于 Python 环境,结合效率和安全性,推荐使用 NumPy tobytes() 配合 np.float32 数据类型。它既紧凑又高效。在存储到 Redis 前,如果 Redis 客户端或环境对二进制数据有顾虑,可以再进行 Base64 编码,反之则直接存储。
import numpy as np
import json
import hashlib
import base64
import pickle
# 假设这是一个嵌入向量 (通常是浮点数数组)
sample_embedding = np.random.rand(768).astype(np.float32)
# 1. JSON 序列化
json_serialized = json.dumps(sample_embedding.tolist())
json_deserialized = np.array(json.loads(json_serialized), dtype=np.float32)
# 2. Pickle 序列化 (Python 专属,注意安全)
pickle_serialized = pickle.dumps(sample_embedding)
pickle_deserialized = pickle.loads(pickle_serialized)
# 3. NumPy tobytes() 序列化 (最推荐)
# 存储时需要记录 dtype 和 shape,反序列化时使用
bytes_serialized = sample_embedding.tobytes()
# 反序列化时需要知道原始的 dtype 和 shape
bytes_deserialized = np.frombuffer(bytes_serialized, dtype=np.float32).reshape(sample_embedding.shape)
# 4. NumPy tobytes() + Base64 编码 (如果需要转换为字符串)
base64_encoded = base64.b64encode(sample_embedding.tobytes()).decode('utf-8')
base64_decoded_bytes = base64.b64decode(base64_encoded.encode('utf-8'))
base64_deserialized = np.frombuffer(base64_decoded_bytes, dtype=np.float32).reshape(sample_embedding.shape)
# 验证反序列化是否正确
assert np.allclose(sample_embedding, json_deserialized)
assert np.allclose(sample_embedding, pickle_deserialized)
assert np.allclose(sample_embedding, bytes_deserialized)
assert np.allclose(sample_embedding, base64_deserialized)
print("各种序列化/反序列化方法验证通过。")
4.3 缓存过期与淘汰策略
缓存不是无限的,我们需要管理其生命周期。
4.3.1 TTL (Time-To-Live):
为缓存项设置过期时间。这对于以下情况非常有用:
- 数据新鲜度要求: 某些文本的语义可能会随时间变化(尽管不常见),或者我们希望定期更新缓存以反映模型可能的变化。
- 内存管理: 确保旧的、不再经常访问的数据最终会被自动清除,释放内存。
Redis 的 EXPIRE 命令可以为键设置 TTL。例如,r.set(key, value, ex=3600) 将键的过期时间设置为 1 小时。
4.3.2 内存淘汰策略 (Eviction Policies):
当 Redis 实例的内存使用达到 maxmemory 配置的上限时,Redis 会根据 maxmemory-policy 配置的策略来删除键以释放内存。
常见的淘汰策略:
noeviction:不删除任何键,所有写操作都会报错 (默认策略)。allkeys-lru:从所有键中选择最近最少使用的 (Least Recently Used) 键进行删除。volatile-lru:从设置了过期时间的键中选择最近最少使用的键进行删除。allkeys-random:从所有键中随机删除。volatile-ttl:从设置了过期时间的键中选择剩余生存时间最短的键进行删除。allkeys-lfu:从所有键中选择使用频率最低的 (Least Frequently Used) 键进行删除。volatile-lfu:从设置了过期时间的键中选择使用频率最低的键进行删除。
选择建议:
对于嵌入缓存,allkeys-lru 或 allkeys-lfu 是最常见的选择,它们会优先淘汰那些不经常访问的嵌入,保留热点数据。如果所有缓存项都设置了 TTL,那么 volatile-lru 或 volatile-lfu 也是合理的。
4.4 缓存失效 (Invalidation)
除了自动过期和淘汰,有时我们需要主动让缓存失效。
- 模型更新: 当嵌入模型升级到新版本时,旧版本生成的嵌入是过时的。此时,所有使用旧模型版本生成的缓存项都应该被视为无效。最简单的做法是清空缓存,或者在缓存键中包含模型版本,这样新模型版本自然会生成新的缓存项,旧的会逐渐过期。
- 文本内容变更: 如果原始文本内容发生变化,其对应的嵌入也应该被重新计算。这通常需要应用层感知文本变化并主动删除对应的缓存键。
策略:
将模型版本纳入缓存键是处理模型更新的最佳实践。当模型版本变化时,新请求会生成不同的缓存键,从而强制重新计算。旧版本的缓存项会随着时间或淘汰策略自然清除。
5. 实践:利用 Python 和 Redis 实现嵌入缓存
接下来,我们将通过 Python 代码演示如何构建一个实用的嵌入缓存。
5.1 环境准备
首先,确保你已安装 redis-py 和 numpy (用于处理嵌入向量) 以及一个嵌入模型库(这里以 sentence_transformers 为例,或使用一个模拟函数)。
pip install redis numpy sentence-transformers
5.2 模拟嵌入模型
为了不依赖实际的 GPU 或外部 API,我们先创建一个模拟的嵌入模型。
import time
import numpy as np
from sentence_transformers import SentenceTransformer # 真实模型示例
# 模拟的嵌入模型,耗时操作
class MockEmbeddingModel:
def __init__(self, vector_size=768, sleep_time=0.1):
self.vector_size = vector_size
self.sleep_time = sleep_time
self.model_name = "mock-embedding-model"
self.model_version = "v1.0.0"
def encode(self, texts, batch_size=32):
if isinstance(texts, str):
texts = [texts]
print(f"--- 模拟计算 {len(texts)} 段文本的嵌入,耗时约 {len(texts) * self.sleep_time:.2f} 秒 ---")
time.sleep(len(texts) * self.sleep_time) # 模拟计算耗时
embeddings = []
for text in texts:
# 简单地为每个文本生成一个随机向量作为模拟嵌入
# 在实际应用中,这里会调用真正的模型推理
# np.random.seed(hash(text) % (2**32 - 1)) # 确保相同文本生成相同随机向量
embedding = np.random.rand(self.vector_size).astype(np.float32)
embeddings.append(embedding)
return np.array(embeddings)
# 真实 Sentence-BERT 模型
# try:
# real_model = SentenceTransformer('all-MiniLM-L6-v2')
# print(f"已加载真实嵌入模型: {real_model.model_name_or_path}")
# except Exception as e:
# print(f"加载 SentenceTransformer 失败: {e}. 将使用 MockEmbeddingModel。")
# real_model = MockEmbeddingModel()
# real_model.model_name = 'all-MiniLM-L6-v2' # 假设模型有这个属性
# real_model.model_version = '1.0.0' # 假设模型有版本信息
# 我们将使用 MockEmbeddingModel 进行演示
embedding_model = MockEmbeddingModel(vector_size=768, sleep_time=0.05)
5.3 辅助函数:文本处理与序列化
import hashlib
import re
import json
import base64
# 定义一个模型名称和版本,用于缓存键
CURRENT_MODEL_NAME = embedding_model.model_name # 'mock-embedding-model'
CURRENT_MODEL_VERSION = embedding_model.model_version # 'v1.0.0'
def normalize_text(text: str) -> str:
"""
对文本进行归一化处理,用于生成缓存键。
- 转小写
- 移除标点符号
- 替换多个空格为单个空格
- 去除首尾空格
"""
text = text.lower()
text = re.sub(r'[^ws]', '', text) # 移除所有非字母数字和非空白字符
text = re.sub(r's+', ' ', text).strip() # 替换多余空格并去除首尾空格
return text
def generate_cache_key(text: str, model_name: str, model_version: str) -> str:
"""
根据归一化文本、模型名称和版本生成唯一的缓存键。
"""
normalized_text = normalize_text(text)
combined_string = f"{model_name}:{model_version}:{normalized_text}"
# 使用 SHA256 哈希确保键的固定长度和唯一性
return hashlib.sha256(combined_string.encode('utf-8')).hexdigest()
def serialize_embedding(embedding: np.ndarray) -> bytes:
"""
将 NumPy 数组嵌入向量序列化为字节串。
存储为 np.float32 格式,然后转换为字节串。
"""
if not isinstance(embedding, np.ndarray):
embedding = np.array(embedding, dtype=np.float32)
# 使用 Base64 编码,确保在 Redis 存储中是可打印的字符串
return base64.b64encode(embedding.astype(np.float32).tobytes())
def deserialize_embedding(serialized_data: bytes, vector_size: int = 768) -> np.ndarray:
"""
将字节串反序列化为 NumPy 数组嵌入向量。
需要知道原始向量的大小。
"""
if serialized_data is None:
return None
# 先进行 Base64 解码
decoded_bytes = base64.b64decode(serialized_data)
# 然后从字节串恢复 NumPy 数组
return np.frombuffer(decoded_bytes, dtype=np.float32).reshape(vector_size)
5.4 EmbeddingCache 类实现
现在,我们创建一个 EmbeddingCache 类,它将封装与 Redis 和嵌入模型交互的逻辑。
import redis
import datetime
class EmbeddingCache:
def __init__(self,
redis_client: redis.Redis,
embedding_model,
model_name: str,
model_version: str,
vector_size: int = 768,
ttl_seconds: int = 60 * 60 * 24 * 7 # 默认缓存7天
):
self.redis_client = redis_client
self.embedding_model = embedding_model
self.model_name = model_name
self.model_version = model_version
self.vector_size = vector_size
self.ttl_seconds = ttl_seconds
print(f"EmbeddingCache initialized for model: {self.model_name} {self.model_version}")
def get_embedding(self, text: str) -> np.ndarray:
"""
获取给定文本的嵌入向量。优先从缓存中获取,若无则计算并缓存。
"""
cache_key = generate_cache_key(text, self.model_name, self.model_version)
# 尝试从 Redis Hash 中获取嵌入和元数据
# 使用 HGETALL 获取所有字段,或者 HGET 'embedding' 和 HGET 'timestamp'
cached_data = self.redis_client.hgetall(cache_key)
if cached_data:
# 缓存命中
serialized_embedding = cached_data.get(b'embedding')
if serialized_embedding:
try:
embedding = deserialize_embedding(serialized_embedding, self.vector_size)
# 可以在这里更新缓存项的 TTL (可选,如果配置了 volatile-lru/lfu)
self.redis_client.expire(cache_key, self.ttl_seconds)
return embedding
except Exception as e:
print(f"Error deserializing embedding for key {cache_key}: {e}. Recalculating.")
# 反序列化失败,可能是数据损坏,删除旧缓存并重新计算
self.redis_client.delete(cache_key)
else:
print(f"Cache hit for key {cache_key}, but 'embedding' field is missing. Recalculating.")
self.redis_client.delete(cache_key) # 数据不完整,删除并重新计算
# 缓存未命中或数据无效,计算嵌入
print(f"Cache miss for text: '{text[:50]}...' (key: {cache_key}). Computing embedding...")
# 实际调用嵌入模型进行计算
# 注意:embedding_model.encode 通常接受列表,返回列表
new_embedding_array = self.embedding_model.encode([text])[0]
# 序列化新计算的嵌入
serialized_new_embedding = serialize_embedding(new_embedding_array)
# 准备存储到 Redis Hash 的数据
# 注意:Redis 的 HSET 命令的字段和值必须是字符串或字节
data_to_cache = {
'embedding': serialized_new_embedding,
'model_name': self.model_name.encode('utf-8'),
'model_version': self.model_version.encode('utf-8'),
'timestamp': str(datetime.datetime.now()).encode('utf-8')
}
# 将嵌入和元数据存储到 Redis Hash
self.redis_client.hmset(cache_key, data_to_cache)
# 设置缓存过期时间
self.redis_client.expire(cache_key, self.ttl_seconds)
return new_embedding_array
def invalidate_cache_for_text(self, text: str):
"""
主动使特定文本的缓存失效。
"""
cache_key = generate_cache_key(text, self.model_name, self.model_version)
self.redis_client.delete(cache_key)
print(f"Invalidated cache for text: '{text[:50]}...' (key: {cache_key})")
def clear_all_cache_for_model(self):
"""
清空当前模型版本的所有相关缓存。
这需要遍历所有键,效率可能不高,更推荐通过 `FLUSHDB` 或 `FLUSHALL`
如果 Redis 实例专门用于此缓存。
或者,如果键名设计合理,可以使用 SCAN 和 DEL 匹配前缀。
"""
print(f"Clearing all cache for model: {self.model_name} {self.model_version} (This might be slow for large caches)...")
# 简单粗暴的方式 (如果 Redis 专门用于此):
# self.redis_client.flushdb()
# 更精细的方式: 遍历并删除匹配前缀的键 (如果键设计包含模型名称和版本)
# 例如,如果键是 "model_name:model_version:hash"
# 假设我们的键是哈希值,不包含模型名称和版本前缀,所以只能清空所有或依赖TTL
# 如果键是 "prefix:model_name:model_version:hash",则可以:
# for key in self.redis_client.scan_iter(match=f"prefix:{self.model_name}:{self.model_version}:*"):
# self.redis_client.delete(key)
# 由于我们的 generate_cache_key 只是一个哈希值,没有前缀,
# 无法高效地只删除特定模型的缓存。
# 实际应用中,如果需要按模型清空,需要在 generate_cache_key 中加入前缀。
# 例如:return f"{model_name}:{model_version}:{hashlib.sha256(...).hexdigest()}"
# 然后使用 r.scan_iter(match=f"{model_name}:{model_version}:*")
# 目前的实现,我们只能删除特定的文本,或者依赖TTL/maxmemory-policy。
# 如果要清空某个模型的全部缓存,需要修改 generate_cache_key 包含模型前缀。
print("Note: Current cache key design does not allow efficient clearing by model. Relying on TTL/eviction.")
5.5 实际使用示例
# 连接到 Redis
# 确保你的 Redis 服务器正在运行,且在默认端口 6379 上
try:
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=False)
r.ping()
print("成功连接到 Redis 服务器!")
except redis.exceptions.ConnectionError as e:
print(f"无法连接到 Redis 服务器: {e}. 请确保 Redis 正在运行。")
exit()
# 初始化嵌入缓存
embedding_cache = EmbeddingCache(
redis_client=r,
embedding_model=embedding_model, # 使用我们定义的 MockEmbeddingModel
model_name=CURRENT_MODEL_NAME,
model_version=CURRENT_MODEL_VERSION,
vector_size=768,
ttl_seconds=3600 # 缓存 1 小时
)
# 示例文本
text1 = "How do I get started with Python programming?"
text2 = "How do I get started with python programming?" # 大小写不同,但归一化后相同
text3 = "What is the best way to learn machine learning?"
text4 = "How to start Python programming?" # 语义相似但文本不同
print("n--- 第一次请求 text1 ---")
start_time = time.time()
emb1_1 = embedding_cache.get_embedding(text1)
end_time = time.time()
print(f"第一次获取 text1 嵌入耗时: {end_time - start_time:.4f} 秒")
print(f"text1 嵌入的前5个维度: {emb1_1[:5]}")
print("n--- 第二次请求 text1 (应命中缓存) ---")
start_time = time.time()
emb1_2 = embedding_cache.get_embedding(text1)
end_time = time.time()
print(f"第二次获取 text1 嵌入耗时: {end_time - start_time:.4f} 秒")
print(f"text1 嵌入的前5个维度: {emb1_2[:5]}")
assert np.allclose(emb1_1, emb1_2) # 验证两次获取的嵌入是否相同
print("n--- 请求 text2 (归一化后与 text1 相同,应命中缓存) ---")
start_time = time.time()
emb2_1 = embedding_cache.get_embedding(text2)
end_time = time.time()
print(f"获取 text2 嵌入耗时: {end_time - start_time:.4f} 秒")
print(f"text2 嵌入的前5个维度: {emb2_1[:5]}")
assert np.allclose(emb1_1, emb2_1) # 验证归一化后的文本命中缓存
print("n--- 请求 text3 (新文本,应未命中缓存) ---")
start_time = time.time()
emb3_1 = embedding_cache.get_embedding(text3)
end_time = time.time()
print(f"获取 text3 嵌入耗时: {end_time - start_time:.4f} 秒")
print(f"text3 嵌入的前5个维度: {emb3_1[:5]}")
print("n--- 请求 text4 (新文本,应未命中缓存) ---")
start_time = time.time()
emb4_1 = embedding_cache.get_embedding(text4)
end_time = time.time()
print(f"获取 text4 嵌入耗时: {end_time - start_time:.4f} 秒")
print(f"text4 嵌入的前5个维度: {emb4_1[:5]}")
print("n--- 主动使 text1 缓存失效并重新请求 ---")
embedding_cache.invalidate_cache_for_text(text1)
start_time = time.time()
emb1_3 = embedding_cache.get_embedding(text1) # 应该重新计算
end_time = time.time()
print(f"失效后重新获取 text1 嵌入耗时: {end_time - start_time:.4f} 秒")
assert not np.allclose(emb1_1, emb1_3) # 模拟模型每次生成随机向量,所以这里不应该相同
# 清理 Redis (谨慎操作,如果 Redis 实例用于其他用途)
# r.flushdb()
# print("nRedis 数据库已清空。")
# 模拟模型版本更新
print("n--- 模拟模型版本更新 ---")
OLD_MODEL_VERSION = CURRENT_MODEL_VERSION
embedding_model.model_version = "v1.1.0" # 模拟模型版本更新
CURRENT_MODEL_VERSION = embedding_model.model_version
# 创建新的缓存实例,它将使用新的模型版本
embedding_cache_v2 = EmbeddingCache(
redis_client=r,
embedding_model=embedding_model,
model_name=CURRENT_MODEL_NAME,
model_version=CURRENT_MODEL_VERSION,
vector_size=768,
ttl_seconds=3600
)
# 使用新模型版本请求 text1,应该会重新计算 (因为缓存键不同)
print("n--- 使用新模型版本请求 text1 (应未命中缓存) ---")
start_time = time.time()
emb1_v2 = embedding_cache_v2.get_embedding(text1)
end_time = time.time()
print(f"使用新模型版本获取 text1 嵌入耗时: {end_time - start_time:.4f} 秒")
print(f"text1 (v2) 嵌入的前5个维度: {emb1_v2[:5]}")
# 再次请求 text1 (新模型版本),应命中缓存
print("n--- 再次使用新模型版本请求 text1 (应命中缓存) ---")
start_time = time.time()
emb1_v2_2 = embedding_cache_v2.get_embedding(text1)
end_time = time.time()
print(f"再次使用新模型版本获取 text1 嵌入耗时: {end_time - start_time:.4f} 秒")
assert np.allclose(emb1_v2, emb1_v2_2)
运行上述代码,你将清晰地看到缓存命中和未命中时的性能差异。当缓存命中时,get_embedding 方法几乎瞬间返回;而未命中时,则需要模拟模型计算的耗时。
6. 进阶考量
6.1 异步缓存
在现代 Web 服务和高并发应用中,阻塞 I/O 操作(如 Redis 同步调用)可能会成为瓶颈。使用异步 I/O (如 Python 的 asyncio 和 aioredis 库) 可以显著提高服务的吞吐量和响应性。
import aioredis
import asyncio
class AsyncEmbeddingCache:
def __init__(self,
redis_pool: aioredis.Redis, # 传入连接池
embedding_model,
model_name: str,
model_version: str,
vector_size: int = 768,
ttl_seconds: int = 60 * 60 * 24 * 7
):
self.redis_pool = redis_pool
self.embedding_model = embedding_model
self.model_name = model_name
self.model_version = model_version
self.vector_size = vector_size
self.ttl_seconds = ttl_seconds
print(f"AsyncEmbeddingCache initialized for model: {self.model_name} {self.model_version}")
async def get_embedding(self, text: str) -> np.ndarray:
cache_key = generate_cache_key(text, self.model_name, self.model_version)
async with self.redis_pool.client() as conn: # 从连接池获取连接
cached_data = await conn.hgetall(cache_key)
if cached_data:
serialized_embedding = cached_data.get(b'embedding')
if serialized_embedding:
try:
embedding = deserialize_embedding(serialized_embedding, self.vector_size)
await conn.expire(cache_key, self.ttl_seconds)
return embedding
except Exception as e:
print(f"Error deserializing embedding for key {cache_key}: {e}. Recalculating.")
await conn.delete(cache_key)
else:
print(f"Async Cache hit for key {cache_key}, but 'embedding' field is missing. Recalculating.")
await conn.delete(cache_key)
print(f"Async Cache miss for text: '{text[:50]}...' (key: {cache_key}). Computing embedding...")
# 真实模型可能需要异步封装,这里 MockEmbeddingModel 仍然是同步的
# 考虑使用 run_in_executor 将同步模型放入线程池运行,避免阻塞事件循环
loop = asyncio.get_running_loop()
new_embedding_array = await loop.run_in_executor(
None, # 使用默认的线程池
lambda: self.embedding_model.encode([text])[0]
)
serialized_new_embedding = serialize_embedding(new_embedding_array)
data_to_cache = {
'embedding': serialized_new_embedding,
'model_name': self.model_name.encode('utf-8'),
'model_version': self.model_version.encode('utf-8'),
'timestamp': str(datetime.datetime.now()).encode('utf-8')
}
await conn.hmset(cache_key, data_to_cache)
await conn.expire(cache_key, self.ttl_seconds)
return new_embedding_array
# 示例使用
async def main_async():
redis_pool = aioredis.Redis(host='localhost', port=6379, db=0)
await redis_pool.ping()
print("成功连接到 Redis 服务器 (异步)!")
async_embedding_cache = AsyncEmbeddingCache(
redis_pool=redis_pool,
embedding_model=embedding_model, # 仍然使用 MockEmbeddingModel
model_name=CURRENT_MODEL_NAME,
model_version=CURRENT_MODEL_VERSION,
vector_size=768,
ttl_seconds=3600
)
text_async = "Asynchronous programming with Python."
print("n--- 第一次异步请求 text_async ---")
start_time = time.time()
emb_async_1 = await async_embedding_cache.get_embedding(text_async)
end_time = time.time()
print(f"第一次异步获取 text_async 嵌入耗时: {end_time - start_time:.4f} 秒")
print("n--- 第二次异步请求 text_async (应命中缓存) ---")
start_time = time.time()
emb_async_2 = await async_embedding_cache.get_embedding(text_async)
end_time = time.time()
print(f"第二次异步获取 text_async 嵌入耗时: {end_time - start_time:.4f} 秒")
assert np.allclose(emb_async_1, emb_async_2)
# asyncio.run(main_async())
请注意,为了运行 asyncio 代码,你需要将 main_async() 函数通过 asyncio.run() 调用。
6.2 缓存击穿 (Cache Stampede / Thundering Herd)
当一个热点键失效时(例如,TTL 到期),大量的并发请求同时涌入,发现缓存未命中,然后它们都会去后端计算嵌入。这可能导致后端服务瞬时过载,甚至崩溃。
解决方案:
- 分布式锁: 在计算嵌入之前,获取一个分布式锁。只有持有锁的请求才能去计算,其他请求等待锁释放后从缓存中获取。
# 伪代码 # lock_key = f"{cache_key}:lock" # if not self.redis_client.set(lock_key, "1", nx=True, ex=30): # 尝试获取锁,30秒过期 # # 未获取到锁,等待一小段时间后重试或直接从缓存获取 # time.sleep(0.1) # return self.get_embedding(text) # 重试 # else: # try: # # 计算嵌入并存入缓存 # ... # finally: # self.redis_client.delete(lock_key) # 释放锁 - 提前续期 (Proactive Refresh): 在缓存即将过期前,启动一个后台任务异步地刷新缓存。
6.3 监控与指标
有效监控缓存的性能至关重要。
- 缓存命中率:
(命中次数 / (命中次数 + 未命中次数))。高命中率是缓存成功的标志。 - 缓存延迟: 命中和未命中时的平均响应时间。
- Redis 内存使用: 确保缓存不会耗尽 Redis 实例的内存。
- 嵌入模型调用次数: 验证缓存确实减少了模型推理的负担。
这些指标可以通过 Redis 的 INFO 命令、Redis Monitor 工具,或者集成到 Prometheus、Grafana 等监控系统来收集。
6.4 安全性
- Redis 认证: 生产环境中务必为 Redis 实例设置密码 (
requirepass)。 - 网络隔离: 将 Redis 部署在私有网络中,限制对其的访问。
- 数据加密: 如果缓存中包含敏感信息,考虑在序列化前进行加密。
7. 总结与展望
局部嵌入缓存是现代 AI 应用中一项不可或缺的性能优化技术。通过利用 Redis 的高速内存访问和丰富的数据结构,我们能够高效地存储和检索已计算的文本嵌入,从而显著降低延迟、节省计算资源和 API 调用成本,并提升系统的整体吞吐量和稳定性。
本讲座详细探讨了嵌入的成本、缓存的原理、Redis 的优势、缓存键设计、序列化、过期策略以及异步和缓存击穿等高级考量。一个设计良好、实现健壮的局部嵌入缓存,将成为你 AI 应用架构中的强大基石,助你构建更具竞争力、更经济高效的智能系统。在实践中,请务必根据你的具体业务场景、数据特性和性能要求,灵活调整和优化缓存策略。