各位同仁,下午好!
今天,我们将共同深入探索一个引人入胜的话题:如何借鉴生物大脑中记忆巩固的精妙机制——短期与长期记忆增强(Short-term vs Long-term Potentiation, STP vs LTP),来为我们的AI Agent构建一个高效、持久且富有上下文的“海马体”记忆系统。我们将聚焦于利用Redis的实时性与向量数据库的语义能力,实现Agent的记忆巩固与检索。
在人工智能领域,尤其是大语言模型(LLMs)驱动的Agent,其性能往往受限于上下文窗口的长度。Agent需要超越单次交互的限制,积累经验,学习成长,形成连贯的自我认知和世界模型。这就要求Agent具备一套复杂的记忆管理系统,能够区分瞬时信息与持久知识,并在需要时高效地检索相关记忆。这正是我们今天探讨的核心。
一、生物学启示:海马体、STP与LTP
在深入技术实现之前,让我们快速回顾一下生物学中关于记忆的几个核心概念。人脑的海马体被认为是记忆形成和巩固的关键区域。它扮演着一个“中央车站”的角色,将新信息从短期记忆转化为长期记忆。
-
短期记忆增强(STP – Short-term Potentiation):这是一种神经元连接强度的暂时性增强,通常持续几秒到几分钟。它允许我们记住电话号码、刚刚听到的指令等瞬时信息。STP的特点是快速形成、易于衰退,且容量有限。可以将其理解为大脑的“工作内存”或“缓存”。
-
长期记忆增强(LTP – Long-term Potentiation):这是一种神经元连接强度的持久性增强,可以持续数小时、数天乃至更长时间。LTP被认为是学习和记忆的细胞基础。它涉及神经元结构和功能的实际改变,如突触数量的增加、受体灵敏度的提高等。LTP的特点是形成较慢、但更为持久,且容量巨大。
生物大脑并非将所有输入都转化为长期记忆。海马体在其中扮演了“筛选者”和“整合者”的角色,它评估信息的重复性、重要性、情感关联等,决定哪些短期记忆值得被巩固为长期记忆。未被巩固的短期记忆会自然衰减和遗忘。
对于AI Agent而言,这意味着:
- 瞬时处理与工作记忆(STP):Agent需要一个快速、高吞吐量的区域来处理当前感知到的信息、对话历史和即时任务状态。
- 经验积累与知识存储(LTP):Agent需要一个持久、可检索的区域来存储重要的经验、学习到的知识和形成的概念。
- 记忆巩固机制:Agent需要一个“海马体”代理,负责将重要的短期记忆转化为长期记忆,并进行有效的组织和管理。
二、Agent记忆架构:计算类比
我们将Agent的记忆系统划分为几个层级,并尝试将其与生物学概念进行类比:
| 记忆层级 | 生物学类比 | 主要功能 | 特点 | 核心技术栈 |
|---|---|---|---|---|
| 感知与瞬时记忆 | 感觉记忆 | 接收并暂时持有原始输入(视觉、听觉、文本) | 容量大,保留时间极短(毫秒-秒) | Agent输入缓冲区、Redis Stream |
| 工作记忆 (STP) | 短期记忆/工作记忆 | 活跃处理当前任务,维持短期上下文 | 容量有限,保留时间短(秒-分钟),易于访问 | Redis Hashes/Lists/Sets |
| 情景记忆 | 情景记忆 | 存储带时间戳、上下文的特定事件和经验 | 时间顺序,高上下文,可回溯 | 向量数据库(语义检索),Redis(时间戳、元数据) |
| 语义记忆 | 语义记忆 | 存储事实、概念、原理、一般性知识 | 抽象化,去情景化,高度关联性 | 向量数据库(语义检索),Redis(结构化知识) |
| 巩固与检索层 | 海马体 | 协调记忆流转,决定记忆去留,实现高效检索 | 决策、调度、整合、优化 | Agent逻辑层,LLM,Redis Pub/Sub |
今天,我们的重点将放在如何利用Redis和向量数据库来实现工作记忆(STP)和长期记忆(LTP),以及如何构建“海马体”代理来管理它们的巩固过程。
三、Redis在短期记忆增强(STP)中的应用
Redis以其内存存储、高速读写和丰富的数据结构,成为实现Agent短期记忆(STP)的理想选择。短期记忆需要能够快速记录Agent的感知、交互和即时状态,并且能够自然地衰减或被更新。
3.1 Redis Streams:记录Agent的“感知流”与“事件历史”
Redis Streams是天然的时间序列数据结构,非常适合记录Agent的连续感知数据、对话消息、操作日志等。每个Stream条目可以包含多个字段,代表一个事件的详细信息。
场景:Agent持续接收用户消息、环境观测结果、内部思考步骤。
代码示例:
import redis
import time
import json
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
class AgentPerceptionBuffer:
def __init__(self, agent_id: str, stream_key_prefix: str = "agent:perception:"):
self.agent_id = agent_id
self.stream_key = f"{stream_key_prefix}{agent_id}"
print(f"Initialized Perception Buffer for Agent {agent_id}, Stream Key: {self.stream_key}")
def add_perception(self, event_type: str, content: dict):
"""
向Agent的感知流中添加一个新事件。
event_type: 事件类型,如 "user_message", "env_observation", "internal_thought"
content: 事件的具体内容,字典形式
"""
event_data = {
"timestamp": time.time(),
"event_type": event_type,
"content": json.dumps(content) # 将内容序列化为JSON字符串
}
try:
message_id = r.xadd(self.stream_key, event_data)
print(f"Agent {self.agent_id} added perception (ID: {message_id}): {event_type} - {content}")
return message_id
except Exception as e:
print(f"Error adding perception: {e}")
return None
def get_recent_perceptions(self, count: int = 10, last_id: str = '$'):
"""
获取Agent最近的感知事件。
count: 获取的条目数量
last_id: 从哪个ID开始读取,'$'表示最新的,'0'表示最早的
"""
try:
# XREVRANGE 从 Stream 的末尾向开头读取,即获取最新的 N 条
raw_entries = r.xrevrange(self.stream_key, max='+', min='-', count=count)
perceptions = []
for entry_id, fields in raw_entries:
try:
content_dict = json.loads(fields.get("content", "{}"))
perceptions.append({
"id": entry_id,
"timestamp": float(fields.get("timestamp")),
"event_type": fields.get("event_type"),
"content": content_dict
})
except json.JSONDecodeError:
print(f"Warning: Could not decode content for entry {entry_id}")
continue
return perceptions[::-1] # 反转列表,使最新的在末尾
except Exception as e:
print(f"Error getting recent perceptions: {e}")
return []
def trim_perception_stream(self, max_len: int = 1000):
"""
修剪感知流,保持其最大长度,防止无限增长。
max_len: Stream的最大长度
"""
try:
r.xtrim(self.stream_key, maxlen=max_len, approximate=True)
print(f"Stream {self.stream_key} trimmed to max length {max_len}")
except Exception as e:
print(f"Error trimming stream: {e}")
# --- 演示Agent感知流 ---
if __name__ == "__main__":
agent_alpha = AgentPerceptionBuffer(agent_id="AgentAlpha")
# 模拟Agent的感知活动
agent_alpha.add_perception("user_message", {"sender": "User", "text": "你好,AgentAlpha,你最近在忙什么?"})
time.sleep(0.1)
agent_alpha.add_perception("internal_thought", {"thought": "用户询问我的状态,需要生成一个友好的回应。"})
time.sleep(0.1)
agent_alpha.add_perception("env_observation", {"sensor": "camera", "object_detected": "cup", "location": "desk"})
time.sleep(0.1)
agent_alpha.add_perception("user_message", {"sender": "User", "text": "你有没有看到我的钥匙?"})
time.sleep(0.1)
agent_alpha.add_perception("internal_thought", {"thought": "用户在找钥匙,我需要回忆之前的环境观测。"})
# 获取最近的感知
print("n--- AgentAlpha's Recent Perceptions (Last 5) ---")
recent_perceptions = agent_alpha.get_recent_perceptions(count=5)
for p in recent_perceptions:
print(f"[{time.strftime('%H:%M:%S', time.localtime(p['timestamp']))}] ID:{p['id']} Type:{p['event_type']} Content:{p['content']}")
# 修剪流
agent_alpha.trim_perception_stream(max_len=3)
print("n--- AgentAlpha's Perceptions after trimming (should be max 3) ---")
recent_perceptions_after_trim = agent_alpha.get_recent_perceptions(count=5) # 尝试获取5个,但只会返回最多3个
for p in recent_perceptions_after_trim:
print(f"[{time.strftime('%H:%M:%S', time.localtime(p['timestamp']))}] ID:{p['id']} Type:{p['event_type']} Content:{p['content']}")
解释:
xadd命令将新事件追加到Stream末尾,并返回一个唯一的消息ID。xrevrange命令用于从Stream中倒序读取消息,方便获取最新事件。xtrim命令允许我们设置Stream的最大长度,自动删除最旧的条目,实现记忆的自然衰减(类似短期记忆的有限容量)。approximate=True允许Redis进行近似修剪,提高效率。
3.2 Redis Hashes:存储Agent的“工作记忆”与“当前状态”
Redis Hashes适合存储Agent当前关注的实体状态、对话的当前上下文、正在执行的任务参数等。它们可以被视为Agent的“工作记忆”,快速更新和访问。
场景:Agent需要跟踪当前对话对象的情绪、当前任务的进度、用户提供的临时信息。
代码示例:
import redis
import json
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
class AgentWorkingMemory:
def __init__(self, agent_id: str, hash_key_prefix: str = "agent:working_memory:"):
self.agent_id = agent_id
self.hash_key = f"{hash_key_prefix}{agent_id}"
print(f"Initialized Working Memory for Agent {agent_id}, Hash Key: {self.hash_key}")
def update_context(self, key: str, value: any, ttl: int = None):
"""
更新Agent工作记忆中的一个键值对。
key: 记忆项的键
value: 记忆项的值(会自动JSON序列化)
ttl: 可选,设置键的过期时间(秒),实现记忆的自然衰减
"""
try:
serialized_value = json.dumps(value)
r.hset(self.hash_key, key, serialized_value)
if ttl is not None:
r.expire(self.hash_key, ttl) # 设置整个Hash的过期时间
print(f"Agent {self.agent_id} updated working memory: {key} = {value}")
except Exception as e:
print(f"Error updating working memory: {e}")
def get_context(self, key: str):
"""
获取Agent工作记忆中的某个键的值。
"""
try:
serialized_value = r.hget(self.hash_key, key)
if serialized_value:
return json.loads(serialized_value)
return None
except Exception as e:
print(f"Error getting working memory: {e}")
return None
def get_all_context(self):
"""
获取Agent工作记忆中的所有键值对。
"""
try:
all_fields = r.hgetall(self.hash_key)
deserialized_context = {}
for key, serialized_value in all_fields.items():
try:
deserialized_context[key] = json.loads(serialized_value)
except json.JSONDecodeError:
deserialized_context[key] = serialized_value # 如果不是JSON,保留原样
return deserialized_context
except Exception as e:
print(f"Error getting all working memory: {e}")
return {}
def delete_context(self, key: str):
"""
从工作记忆中删除某个键。
"""
try:
r.hdel(self.hash_key, key)
print(f"Agent {self.agent_id} deleted from working memory: {key}")
except Exception as e:
print(f"Error deleting from working memory: {e}")
# --- 演示Agent工作记忆 ---
if __name__ == "__main__":
agent_beta = AgentWorkingMemory(agent_id="AgentBeta")
# 模拟Agent工作记忆更新
agent_beta.update_context("user_intent", "schedule_meeting")
agent_beta.update_context("meeting_topic", "Project X Review")
agent_beta.update_context("participants", ["Alice", "Bob"], ttl=60) # 这个Hash 60秒后过期
agent_beta.update_context("current_mood", "neutral")
print("n--- AgentBeta's Current Working Memory ---")
print(agent_beta.get_all_context())
# 获取特定上下文
print(f"nUser Intent: {agent_beta.get_context('user_intent')}")
print(f"Meeting Topic: {agent_beta.get_context('meeting_topic')}")
# 模拟一段时间后,某个临时上下文可能过期
print("n--- Waiting for participants context to potentially expire ---")
# time.sleep(61) # 如果需要测试TTL,可以取消注释
print(f"Participants (after delay): {agent_beta.get_context('participants')}")
# 删除某个上下文
agent_beta.delete_context("current_mood")
print("n--- AgentBeta's Working Memory after deleting 'current_mood' ---")
print(agent_beta.get_all_context())
解释:
hset用于设置Hash字段。hget和hgetall用于获取Hash字段。expire命令可以为整个Hash设置过期时间(TTL),这意味着如果Agent在一段时间内没有更新某个工作记忆,它将自动消失,模拟短期记忆的衰减。
3.3 Redis Lists & Sets:管理短期任务队列与临时集合
- Lists:可以作为Agent的待办任务队列、对话历史的最近N条消息。
LPUSH/RPUSH和LTRIM组合可以实现一个固定大小的循环缓冲区。 - Sets:用于存储Agent在短期内关注的唯一实体ID、临时标签或分类。
这些Redis数据结构共同构成了Agent灵活、高效的短期记忆层,能够应对瞬息万变的交互环境。
四、向量数据库在长期记忆增强(LTP)中的应用
长期记忆(LTP)要求记忆是持久的、可扩展的,并且最重要的是,可以通过语义相关性进行检索。传统的关键词匹配难以捕捉Agent对世界的深层理解。向量数据库(Vector Database)的出现,通过将信息转换为高维向量(embeddings),并支持高效的近似最近邻(ANN)搜索,完美解决了这个问题。
4.1 向量嵌入:将记忆转化为语义表示
核心思想是将Agent的经验(文本、事件描述、知识片段)通过预训练的语言模型(如BERT、OpenAI Embeddings等)转换为高维浮点数向量。这些向量捕捉了原始信息的语义含义,语义相似的信息在向量空间中距离更近。
代码示例:
from sentence_transformers import SentenceTransformer
import numpy as np
# 加载一个预训练的SBERT模型
# 第一次运行时可能需要下载模型
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embedding(text: str) -> list[float]:
"""
将文本转换为向量嵌入。
"""
if not text:
return []
embedding = embedding_model.encode(text, convert_to_tensor=False)
return embedding.tolist()
# 演示
if __name__ == "__main__":
text1 = "Agent今天和用户讨论了项目进展。"
text2 = "用户询问了关于项目会议的安排。"
text3 = "猫咪在沙发上睡着了。"
text4 = "Agent回顾了昨天的用户反馈。"
emb1 = get_embedding(text1)
emb2 = get_embedding(text2)
emb3 = get_embedding(text3)
emb4 = get_embedding(text4)
print(f"Embedding dimension: {len(emb1)}")
# 计算余弦相似度(向量数据库内部会高效执行此操作)
def cosine_similarity(v1, v2):
v1_np = np.array(v1)
v2_np = np.array(v2)
dot_product = np.dot(v1_np, v2_np)
norm_v1 = np.linalg.norm(v1_np)
norm_v2 = np.linalg.norm(v2_np)
return dot_product / (norm_v1 * norm_v2) if norm_v1 and norm_v2 else 0
print(f"nSimilarity between text1 and text2: {cosine_similarity(emb1, emb2):.4f}") # 语义相关
print(f"Similarity between text1 and text3: {cosine_similarity(emb1, emb3):.4f}") # 语义不相关
print(f"Similarity between text1 and text4: {cosine_similarity(emb1, emb4):.4f}") # 语义相关
解释:
- 我们使用
SentenceTransformer库来生成文本嵌入。实际应用中,可以选择更强大的模型,如OpenAI的text-embedding-ada-002。 get_embedding函数将输入的文本转换为一个浮点数列表,这就是它的语义向量表示。- 余弦相似度(cosine similarity)是衡量两个向量之间相似度的常用指标,值越接近1,表示语义越相似。
4.2 向量数据库:存储与语义检索长期记忆
向量数据库(如Pinecone, Weaviate, Milvus, Qdrant,或者Redis Stack的RediSearch模块)负责高效地存储这些向量,并根据查询向量执行快速的相似度搜索。每个记忆条目除了向量外,还会附带元数据(metadata),如时间戳、来源、Agent ID、重要性评分等,以便进行过滤和上下文关联。
场景:Agent需要回忆过去关于某个主题的所有信息,或者在特定时间段内发生的重要事件。
抽象代码示例(由于不同的向量数据库API差异较大,我们这里提供一个抽象的接口,假设其背后是任何一种向量数据库):
import uuid
import time
from typing import List, Dict, Any
# 假设的向量数据库客户端接口
class AbstractVectorDBClient:
def __init__(self, dimension: int, index_name: str = "agent_memories"):
self.dimension = dimension
self.index_name = index_name
print(f"Initialized Abstract Vector DB Client for index: {index_name} (dim: {dimension})")
def upsert(self, vectors: List[Dict[str, Any]]):
"""
插入或更新向量及其元数据。
vectors: 列表,每个元素是一个字典,包含 'id', 'vector', 'metadata'
"""
raise NotImplementedError
def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""
根据查询向量进行语义搜索。
query_vector: 查询的向量
top_k: 返回最相似的记忆数量
filter: 可选的元数据过滤条件
返回:一个记忆列表,每个记忆包含 'id', 'score', 'metadata'
"""
raise NotImplementedError
# 模拟一个简单的内存向量数据库实现(仅用于演示概念)
class InMemoryVectorDBClient(AbstractVectorDBClient):
def __init__(self, dimension: int, index_name: str = "agent_memories"):
super().__init__(dimension, index_name)
self.memory_store = {} # {id: {'vector': [...], 'metadata': {...}}}
def upsert(self, memories: List[Dict[str, Any]]):
for mem in memories:
mem_id = mem['id']
# 验证向量维度
if len(mem['vector']) != self.dimension:
print(f"Warning: Vector ID {mem_id} has incorrect dimension. Expected {self.dimension}, got {len(mem['vector'])}")
continue
self.memory_store[mem_id] = {
'vector': np.array(mem['vector']), # 存储为numpy数组方便计算
'metadata': mem.get('metadata', {})
}
# print(f"Upserted memory: {mem_id} with metadata {mem.get('metadata', {})}")
def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
query_vec_np = np.array(query_vector)
results = []
for mem_id, stored_mem in self.memory_store.items():
metadata = stored_mem['metadata']
# 应用过滤器
if filter:
match = True
for k, v in filter.items():
if k not in metadata or metadata[k] != v:
match = False
break
if not match:
continue
# 计算相似度
similarity_score = cosine_similarity(query_vec_np, stored_mem['vector'])
results.append({
'id': mem_id,
'score': similarity_score,
'metadata': metadata
})
results.sort(key=lambda x: x['score'], reverse=True)
return results[:top_k]
# 将之前的get_embedding函数引入
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embedding(text: str) -> list[float]:
if not text: return []
return embedding_model.encode(text, convert_to_tensor=False).tolist()
# --- 演示Agent长期记忆 ---
if __name__ == "__main__":
VECTOR_DIMENSION = len(get_embedding("test")) # 获取嵌入维度
ltp_memory_db = InMemoryVectorDBClient(dimension=VECTOR_DIMENSION)
class AgentLongTermMemory:
def __init__(self, agent_id: str, vector_db_client: AbstractVectorDBClient):
self.agent_id = agent_id
self.db = vector_db_client
print(f"Initialized Long-Term Memory for Agent {agent_id}")
def store_memory(self, content: str, memory_type: str = "episodic", significance: float = 0.5):
"""
存储一个长期记忆。
content: 记忆的文本内容
memory_type: 记忆类型,如 "episodic" (情景), "semantic" (语义)
significance: 重要性评分
"""
memory_id = str(uuid.uuid4())
embedding = get_embedding(content)
if not embedding:
print(f"Could not generate embedding for content: {content}")
return None
memory_data = {
"id": memory_id,
"vector": embedding,
"metadata": {
"agent_id": self.agent_id,
"content": content, # 存储原始内容,方便检索后展示
"timestamp": time.time(),
"memory_type": memory_type,
"significance": significance
}
}
self.db.upsert([memory_data])
print(f"Agent {self.agent_id} stored LTP memory (ID: {memory_id}): '{content[:50]}...'")
return memory_id
def retrieve_memories(self, query: str, top_k: int = 3, memory_type: str = None) -> List[Dict[str, Any]]:
"""
根据查询检索相关的长期记忆。
query: 查询的文本
top_k: 返回数量
memory_type: 可选,按记忆类型过滤
"""
query_embedding = get_embedding(query)
if not query_embedding:
print(f"Could not generate embedding for query: {query}")
return []
filters = {"agent_id": self.agent_id}
if memory_type:
filters["memory_type"] = memory_type
results = self.db.query(query_embedding, top_k=top_k, filter=filters)
retrieved_memories = []
for res in results:
retrieved_memories.append({
"id": res['id'],
"score": res['score'],
"content": res['metadata']['content'],
"type": res['metadata']['memory_type'],
"timestamp": res['metadata']['timestamp'],
"significance": res['metadata']['significance']
})
return retrieved_memories
# --- 演示Agent长期记忆存储与检索 ---
agent_gamma = AgentLongTermMemory(agent_id="AgentGamma", vector_db_client=ltp_memory_db)
# 存储一些长期记忆
agent_gamma.store_memory("AgentGamma在2023年完成了与团队的首次项目启动会议。", "episodic", 0.8)
agent_gamma.store_memory("编程时,优先考虑模块化和可重用性。", "semantic", 0.9)
agent_gamma.store_memory("用户对新功能提出了改进建议,主要集中在界面友好度上。", "episodic", 0.7)
agent_gamma.store_memory("Python的GIL限制了多线程的并行计算能力。", "semantic", 0.6)
agent_gamma.store_memory("昨天我帮助用户解决了配置环境的问题,用户反馈很满意。", "episodic", 0.95)
# 检索记忆
print("n--- Retrieving memories about 'project meetings' ---")
query_results = agent_gamma.retrieve_memories("关于项目会议的信息")
for res in query_results:
print(f"Score: {res['score']:.4f}, Type: {res['type']}, Content: {res['content'][:80]}...")
print("n--- Retrieving semantic memories about 'programming best practices' ---")
query_results_semantic = agent_gamma.retrieve_memories("关于编程最佳实践的知识", memory_type="semantic")
for res in query_results_semantic:
print(f"Score: {res['score']:.4f}, Type: {res['type']}, Content: {res['content'][:80]}...")
print("n--- Retrieving memories about 'user feedback on new features' ---")
query_results_feedback = agent_gamma.retrieve_memories("用户对新功能的反馈意见")
for res in query_results_feedback:
print(f"Score: {res['score']:.4f}, Type: {res['type']}, Content: {res['content'][:80]}...")
解释:
store_memory函数接收文本内容、类型和重要性,生成嵌入,然后将其与元数据一起存储到向量数据库中。retrieve_memories函数接收查询文本,生成查询嵌入,然后向向量数据库发送查询,获取语义最相似的记忆。- 元数据过滤(例如
agent_id,memory_type)是向量数据库的关键功能,它允许我们在语义搜索的基础上进一步精确结果。
五、构建Agent的“海马体”:记忆巩固机制
现在,我们有了短期记忆(Redis)和长期记忆(向量数据库),核心挑战是如何构建一个“海马体”代理,来协调这两者之间的信息流,实现记忆的巩固、更新和高效检索。
这个“海马体”代理将是一个独立的逻辑组件,它持续监控Agent的短期记忆,根据一定的策略(例如重要性、重复性、与Agent目标的关联性)来决定哪些短期记忆应该被提升为长期记忆。
5.1 记忆巩固代理的架构
- 事件监听器:监控Redis Streams中Agent的感知流。
- 重要性评估器:对短期记忆事件进行评估,判断其是否重要到需要巩固。这可以通过LLM进行评估,或者基于预设规则(如包含关键词、被Agent多次提及等)。
- 记忆聚合器:将相关的短期事件聚合成一个连贯的“记忆块”或“经验摘要”,以减少长期记忆的冗余,并形成更高层次的理解。
- 嵌入生成器:将聚合后的记忆块转换为向量。
- LTP管理器:将向量及其元数据存储到向量数据库。
- STP清理器:对已巩固或已过时的短期记忆进行清理。
5.2 记忆巩固的流程与策略
- 定时巩固:每隔一定时间(例如每小时),“海马体”代理扫描Agent的短期记忆。
- 按需巩固:当Agent完成一个重要任务,或者与用户进行了长时间对话后,触发一次巩固。
- 重复性检测:如果短期记忆中的某个信息反复出现,说明其可能具有重要性,应优先巩固。
- LLM辅助评估:将短期记忆片段输入LLM,让其评估记忆的重要性、概括其核心内容,并生成一个简洁的记忆摘要。
代码示例:一个简化的“海马体”巩固代理
import redis
import time
import json
import uuid
from typing import List, Dict, Any
# 假设的Redis连接和向量数据库客户端
# from your_redis_setup import r
# from your_vector_db_setup import InMemoryVectorDBClient, get_embedding, VECTOR_DIMENSION
# 重新定义,确保代码独立可运行
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embedding(text: str) -> list[float]:
if not text: return []
return embedding_model.encode(text, convert_to_tensor=False).tolist()
VECTOR_DIMENSION = len(get_embedding("test"))
class AbstractVectorDBClient:
# ... (与之前InMemVectorDBClient相同的定义,略去重复代码)
def __init__(self, dimension: int, index_name: str = "agent_memories"):
self.dimension = dimension
self.index_name = index_name
def upsert(self, vectors: List[Dict[str, Any]]): raise NotImplementedError
def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]: raise NotImplementedError
class InMemoryVectorDBClient(AbstractVectorDBClient):
# ... (与之前InMemVectorDBClient相同的实现,略去重复代码)
def __init__(self, dimension: int, index_name: str = "agent_memories"):
super().__init__(dimension, index_name)
self.memory_store = {}
def upsert(self, memories: List[Dict[str, Any]]):
for mem in memories:
mem_id = mem['id']
if len(mem['vector']) != self.dimension:
print(f"Warning: Vector ID {mem_id} has incorrect dimension. Expected {self.dimension}, got {len(mem['vector'])}")
continue
self.memory_store[mem_id] = {'vector': np.array(mem['vector']), 'metadata': mem.get('metadata', {})}
def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
query_vec_np = np.array(query_vector)
results = []
for mem_id, stored_mem in self.memory_store.items():
metadata = stored_mem['metadata']
if filter:
match = True
for k, v in filter.items():
if k not in metadata or metadata[k] != v:
match = False
break
if not match: continue
similarity_score = cosine_similarity(query_vec_np, stored_mem['vector'])
results.append({'id': mem_id, 'score': similarity_score, 'metadata': metadata})
results.sort(key=lambda x: x['score'], reverse=True)
return results[:top_k]
# 引入之前的cosine_similarity函数
def cosine_similarity(v1, v2):
v1_np = np.array(v1)
v2_np = np.array(v2)
dot_product = np.dot(v1_np, v2_np)
norm_v1 = np.linalg.norm(v1_np)
norm_v2 = np.linalg.norm(v2_np)
return dot_product / (norm_v1 * norm_v2) if norm_v1 and norm_v2 else 0
class AgentHippocampus:
def __init__(self, agent_id: str, redis_client: redis.Redis, vector_db_client: AbstractVectorDBClient,
perception_stream_key: str = "agent:perception:",
consolidation_interval_sec: int = 30): # 每30秒尝试巩固一次
self.agent_id = agent_id
self.r = redis_client
self.vector_db = vector_db_client
self.perception_stream_key = f"{perception_stream_key}{agent_id}"
self.consolidation_interval_sec = consolidation_interval_sec
self.last_consolidated_id = '0-0' # 记录上次巩固到的Stream ID
print(f"Initialized Hippocampus for Agent {agent_id}. Monitoring stream: {self.perception_stream_key}")
def _evaluate_significance(self, event_content: str) -> float:
"""
模拟重要性评估。实际中可能用LLM或更复杂的规则。
这里简单地根据长度和关键词判断。
"""
if "重要" in event_content or "紧急" in event_content:
return 0.9
if len(event_content) > 50: # 较长的内容可能更重要
return 0.7
return 0.3 # 默认较低重要性
def _summarize_events(self, events: List[Dict]) -> str:
"""
模拟事件摘要。实际中会用LLM。
这里简单地连接所有事件内容。
"""
if not events:
return ""
# 提取所有事件内容,并拼接成一个长字符串
contents = [event['content'].get('text', str(event['content'])) for event in events if 'content' in event]
# 简单过滤重复,并用LLM总结会更好
return " ".join(list(set(contents)))
def consolidate_memories(self):
"""
从短期记忆流中读取新事件,评估并巩固为长期记忆。
"""
print(f"n--- Agent {self.agent_id} Hippocampus: Starting memory consolidation ---")
# 从上次巩固的ID开始读取新的事件
new_entries = self.r.xread({self.perception_stream_key: self.last_consolidated_id}, count=100, block=0)
if not new_entries:
print("No new perceptions to consolidate.")
return
# new_entries 是一个列表,每个元素是 (stream_key, [(id, {field:value}), ...])
# 我们只关心当前Agent的stream
agent_stream_data = []
for stream_key, entries in new_entries:
if stream_key == self.perception_stream_key:
agent_stream_data = entries
break
if not agent_stream_data:
print("No new perceptions for this agent to consolidate.")
return
memories_to_store = []
for entry_id, fields in agent_stream_data:
try:
event_type = fields.get("event_type")
content_json = fields.get("content", "{}")
content_dict = json.loads(content_json)
timestamp = float(fields.get("timestamp"))
# 这里是一个简化的重要性评估和聚合逻辑
# 在真实的Agent中,你可能会在这里调用LLM来评估和总结
event_description = f"Agent {self.agent_id} perceived a {event_type} event: {content_dict}"
significance = self._evaluate_significance(str(content_dict))
# 只有达到一定重要性的事件才会被巩固
if significance >= 0.6:
memory_id = str(uuid.uuid4())
embedding = get_embedding(event_description) # 对聚合后的描述进行嵌入
if not embedding:
print(f"Skipping consolidation for entry {entry_id} due to embedding failure.")
continue
memories_to_store.append({
"id": memory_id,
"vector": embedding,
"metadata": {
"agent_id": self.agent_id,
"original_event_id": entry_id, # 记录原始短期记忆ID
"content": event_description,
"timestamp": timestamp,
"memory_type": "episodic", # 默认情景记忆
"significance": significance
}
})
print(f"Consolidating event ID {entry_id} (Significance: {significance:.2f})")
except json.JSONDecodeError:
print(f"Error decoding content for entry {entry_id}: {fields.get('content')}")
except Exception as e:
print(f"Error processing entry {entry_id}: {e}")
finally:
self.last_consolidated_id = entry_id # 更新上次巩固的ID
if memories_to_store:
self.vector_db.upsert(memories_to_store)
print(f"--- Successfully consolidated {len(memories_to_store)} memories into LTP. ---")
else:
print("No significant memories found for consolidation.")
def run_consolidation_loop(self):
"""
持续运行巩固循环。
"""
print(f"Agent {self.agent_id} Hippocampus starting consolidation loop...")
while True:
self.consolidate_memories()
time.sleep(self.consolidation_interval_sec)
# --- 演示海马体记忆巩固 ---
if __name__ == "__main__":
# 模拟Redis和Vector DB
agent_id_for_hippocampus = "AgentDelta"
perception_buffer = AgentPerceptionBuffer(agent_id=agent_id_for_hippocampus)
ltp_vector_db = InMemoryVectorDBClient(dimension=VECTOR_DIMENSION)
hippocampus = AgentHippocampus(
agent_id=agent_id_for_hippocampus,
redis_client=r,
vector_db_client=ltp_vector_db,
consolidation_interval_sec=5 # 每5秒尝试巩固一次
)
# 在后台线程运行巩固循环
import threading
consolidation_thread = threading.Thread(target=hippocampus.run_consolidation_loop, daemon=True)
consolidation_thread.start()
# 模拟Agent持续产生感知事件
print("n--- AgentDelta starts perceiving events (some significant, some not) ---")
perception_buffer.add_perception("user_message", {"sender": "User", "text": "你好,AgentDelta。今天天气真好!"}) # 较低重要性
time.sleep(1)
perception_buffer.add_perception("internal_thought", {"thought": "我需要记住用户对我的期望,这很重要。"}) # 较高重要性
time.sleep(1)
perception_buffer.add_perception("env_observation", {"sensor": "lidar", "data": "no obstacle ahead"}) # 较低重要性
time.sleep(1)
perception_buffer.add_perception("user_message", {"sender": "User", "text": "请帮我安排一个明天上午的重要会议,主题是项目风险评估。"}) # 较高重要性
time.sleep(1)
perception_buffer.add_perception("internal_thought", {"thought": "用户要求安排重要会议,我应该立即处理。"}) # 较高重要性
time.sleep(1)
perception_buffer.add_perception("system_log", {"level": "info", "message": "Disk usage is 45%."}) # 较低重要性
# 让巩固线程运行一段时间
print("n--- Allowing hippocampus to consolidate for a short period ---")
time.sleep(15) # 模拟Agent运行15秒,巩固线程会运行3次
# 检查长期记忆中是否有了巩固的记忆
class AgentLongTermMemoryReader: # 重新封装一个阅读器
def __init__(self, agent_id: str, vector_db_client: AbstractVectorDBClient):
self.agent_id = agent_id
self.db = vector_db_client
def retrieve_memories(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
query_embedding = get_embedding(query)
if not query_embedding: return []
filters = {"agent_id": self.agent_id}
results = self.db.query(query_embedding, top_k=top_k, filter=filters)
retrieved_memories = []
for res in results:
retrieved_memories.append({
"id": res['id'], "score": res['score'],
"content": res['metadata']['content'],
"timestamp": res['metadata']['timestamp'],
"significance": res['metadata']['significance']
})
return retrieved_memories
ltp_reader = AgentLongTermMemoryReader(agent_id=agent_id_for_hippocampus, vector_db_client=ltp_vector_db)
print("n--- Checking AgentDelta's Long-Term Memories ---")
retrieved_ltp = ltp_reader.retrieve_memories("关于重要会议和用户期望的记忆", top_k=5)
for mem in retrieved_ltp:
print(f"Score: {mem['score']:.4f}, Sig: {mem['significance']:.2f}, Content: {mem['content'][:80]}...")
print("n--- Stopping simulation ---")
# 为了演示,这里不停止线程,实际应用中需要优雅地停止
解释:
AgentHippocampus类模拟了海马体的功能。- 它通过
xread命令持续监听Redis Stream中的新事件,但只读取自上次巩固点之后的数据,避免重复处理。 _evaluate_significance和_summarize_events是简化版本,实际应用中会集成LLM来提供更智能的评估和摘要能力。- 只有重要性达到阈值的事件才会被生成嵌入并存储到向量数据库中。
last_consolidated_id确保了每次巩固只处理新的、未处理过的短期记忆。- 在
if __name__ == "__main__":块中,我们展示了如何启动一个独立的线程来模拟海马体的后台巩固过程,并让Agent同时产生短期感知。
5.3 记忆检索的协同
当Agent需要回忆时,它不应仅仅查询长期记忆,也应考虑最近的短期记忆。
- 即时上下文:从Redis Hashes/Lists中获取当前对话、任务的即时上下文。
- 语义相关:使用Agent的查询嵌入,从向量数据库中检索语义相关的长期记忆。
- 整合:将即时上下文与长期记忆结合起来,作为LLM的输入,生成更全面、更准确的回应或决策。
这种协同检索机制确保Agent在处理信息时既能顾及眼前,又能利用过去的经验。
六、高级概念与优化
- 遗忘机制:长期记忆并非永久不变。可以为向量数据库中的记忆设置“老化”分数,或者定期删除不重要的、长时间未被检索的记忆,模拟遗忘,防止记忆库无限增长。
- 记忆重构与强化:当一个长期记忆被检索并用于Agent的推理时,它应该被“激活”,并可能与新的短期信息结合,形成新的、更丰富的记忆,或强化原有记忆。这类似于生物学中的“记忆再巩固”过程。
- 层次化记忆:将记忆组织成不同的抽象层次。例如,原始事件是低层记忆,事件摘要是中层记忆,学习到的通用原则是高层记忆。这可以通过多层嵌入和多索引向量数据库实现。
- 多模态记忆:不仅限于文本,Agent还可以处理图像、音频等多种模态的数据,将其转换为统一的向量表示并存储。
- 分布式与扩展性:Redis Cluster和向量数据库(如Milvus, Pinecone)本身就支持分布式部署,能够处理大规模的Agent和海量的记忆数据。
七、总结
通过借鉴生物大脑中短期与长期记忆增强的原理,我们为AI Agent构建了一个强大的“海马体”记忆巩固机制。Redis提供了高速、灵活的短期记忆存储,处理Agent的实时感知与工作状态。向量数据库则为Agent提供了可扩展、语义化的长期记忆,实现高效的知识检索。通过一个智能的“巩固代理”,我们能够将短期经验转化为持久知识,让Agent真正从经验中学习,不断成长。这一架构为构建更智能、更自主的AI Agent奠定了坚实的基础,使其能够超越瞬时交互的限制,拥有更深远的“认知”。