深入 ‘Short-term vs Long-term Potentiation’:利用 Redis 与向量库实现 Agent 的‘海马体’记忆巩固机制

各位同仁,下午好!

今天,我们将共同深入探索一个引人入胜的话题:如何借鉴生物大脑中记忆巩固的精妙机制——短期与长期记忆增强(Short-term vs Long-term Potentiation, STP vs LTP),来为我们的AI Agent构建一个高效、持久且富有上下文的“海马体”记忆系统。我们将聚焦于利用Redis的实时性与向量数据库的语义能力,实现Agent的记忆巩固与检索。

在人工智能领域,尤其是大语言模型(LLMs)驱动的Agent,其性能往往受限于上下文窗口的长度。Agent需要超越单次交互的限制,积累经验,学习成长,形成连贯的自我认知和世界模型。这就要求Agent具备一套复杂的记忆管理系统,能够区分瞬时信息与持久知识,并在需要时高效地检索相关记忆。这正是我们今天探讨的核心。

一、生物学启示:海马体、STP与LTP

在深入技术实现之前,让我们快速回顾一下生物学中关于记忆的几个核心概念。人脑的海马体被认为是记忆形成和巩固的关键区域。它扮演着一个“中央车站”的角色,将新信息从短期记忆转化为长期记忆。

  • 短期记忆增强(STP – Short-term Potentiation):这是一种神经元连接强度的暂时性增强,通常持续几秒到几分钟。它允许我们记住电话号码、刚刚听到的指令等瞬时信息。STP的特点是快速形成、易于衰退,且容量有限。可以将其理解为大脑的“工作内存”或“缓存”。

  • 长期记忆增强(LTP – Long-term Potentiation):这是一种神经元连接强度的持久性增强,可以持续数小时、数天乃至更长时间。LTP被认为是学习和记忆的细胞基础。它涉及神经元结构和功能的实际改变,如突触数量的增加、受体灵敏度的提高等。LTP的特点是形成较慢、但更为持久,且容量巨大。

生物大脑并非将所有输入都转化为长期记忆。海马体在其中扮演了“筛选者”和“整合者”的角色,它评估信息的重复性、重要性、情感关联等,决定哪些短期记忆值得被巩固为长期记忆。未被巩固的短期记忆会自然衰减和遗忘。

对于AI Agent而言,这意味着:

  1. 瞬时处理与工作记忆(STP):Agent需要一个快速、高吞吐量的区域来处理当前感知到的信息、对话历史和即时任务状态。
  2. 经验积累与知识存储(LTP):Agent需要一个持久、可检索的区域来存储重要的经验、学习到的知识和形成的概念。
  3. 记忆巩固机制:Agent需要一个“海马体”代理,负责将重要的短期记忆转化为长期记忆,并进行有效的组织和管理。

二、Agent记忆架构:计算类比

我们将Agent的记忆系统划分为几个层级,并尝试将其与生物学概念进行类比:

记忆层级 生物学类比 主要功能 特点 核心技术栈
感知与瞬时记忆 感觉记忆 接收并暂时持有原始输入(视觉、听觉、文本) 容量大,保留时间极短(毫秒-秒) Agent输入缓冲区、Redis Stream
工作记忆 (STP) 短期记忆/工作记忆 活跃处理当前任务,维持短期上下文 容量有限,保留时间短(秒-分钟),易于访问 Redis Hashes/Lists/Sets
情景记忆 情景记忆 存储带时间戳、上下文的特定事件和经验 时间顺序,高上下文,可回溯 向量数据库(语义检索),Redis(时间戳、元数据)
语义记忆 语义记忆 存储事实、概念、原理、一般性知识 抽象化,去情景化,高度关联性 向量数据库(语义检索),Redis(结构化知识)
巩固与检索层 海马体 协调记忆流转,决定记忆去留,实现高效检索 决策、调度、整合、优化 Agent逻辑层,LLM,Redis Pub/Sub

今天,我们的重点将放在如何利用Redis和向量数据库来实现工作记忆(STP)和长期记忆(LTP),以及如何构建“海马体”代理来管理它们的巩固过程。

三、Redis在短期记忆增强(STP)中的应用

Redis以其内存存储、高速读写和丰富的数据结构,成为实现Agent短期记忆(STP)的理想选择。短期记忆需要能够快速记录Agent的感知、交互和即时状态,并且能够自然地衰减或被更新。

3.1 Redis Streams:记录Agent的“感知流”与“事件历史”

Redis Streams是天然的时间序列数据结构,非常适合记录Agent的连续感知数据、对话消息、操作日志等。每个Stream条目可以包含多个字段,代表一个事件的详细信息。

场景:Agent持续接收用户消息、环境观测结果、内部思考步骤。

代码示例

import redis
import time
import json

# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

class AgentPerceptionBuffer:
    def __init__(self, agent_id: str, stream_key_prefix: str = "agent:perception:"):
        self.agent_id = agent_id
        self.stream_key = f"{stream_key_prefix}{agent_id}"
        print(f"Initialized Perception Buffer for Agent {agent_id}, Stream Key: {self.stream_key}")

    def add_perception(self, event_type: str, content: dict):
        """
        向Agent的感知流中添加一个新事件。
        event_type: 事件类型,如 "user_message", "env_observation", "internal_thought"
        content: 事件的具体内容,字典形式
        """
        event_data = {
            "timestamp": time.time(),
            "event_type": event_type,
            "content": json.dumps(content) # 将内容序列化为JSON字符串
        }
        try:
            message_id = r.xadd(self.stream_key, event_data)
            print(f"Agent {self.agent_id} added perception (ID: {message_id}): {event_type} - {content}")
            return message_id
        except Exception as e:
            print(f"Error adding perception: {e}")
            return None

    def get_recent_perceptions(self, count: int = 10, last_id: str = '$'):
        """
        获取Agent最近的感知事件。
        count: 获取的条目数量
        last_id: 从哪个ID开始读取,'$'表示最新的,'0'表示最早的
        """
        try:
            # XREVRANGE 从 Stream 的末尾向开头读取,即获取最新的 N 条
            raw_entries = r.xrevrange(self.stream_key, max='+', min='-', count=count)
            perceptions = []
            for entry_id, fields in raw_entries:
                try:
                    content_dict = json.loads(fields.get("content", "{}"))
                    perceptions.append({
                        "id": entry_id,
                        "timestamp": float(fields.get("timestamp")),
                        "event_type": fields.get("event_type"),
                        "content": content_dict
                    })
                except json.JSONDecodeError:
                    print(f"Warning: Could not decode content for entry {entry_id}")
                    continue
            return perceptions[::-1] # 反转列表,使最新的在末尾
        except Exception as e:
            print(f"Error getting recent perceptions: {e}")
            return []

    def trim_perception_stream(self, max_len: int = 1000):
        """
        修剪感知流,保持其最大长度,防止无限增长。
        max_len: Stream的最大长度
        """
        try:
            r.xtrim(self.stream_key, maxlen=max_len, approximate=True)
            print(f"Stream {self.stream_key} trimmed to max length {max_len}")
        except Exception as e:
            print(f"Error trimming stream: {e}")

# --- 演示Agent感知流 ---
if __name__ == "__main__":
    agent_alpha = AgentPerceptionBuffer(agent_id="AgentAlpha")

    # 模拟Agent的感知活动
    agent_alpha.add_perception("user_message", {"sender": "User", "text": "你好,AgentAlpha,你最近在忙什么?"})
    time.sleep(0.1)
    agent_alpha.add_perception("internal_thought", {"thought": "用户询问我的状态,需要生成一个友好的回应。"})
    time.sleep(0.1)
    agent_alpha.add_perception("env_observation", {"sensor": "camera", "object_detected": "cup", "location": "desk"})
    time.sleep(0.1)
    agent_alpha.add_perception("user_message", {"sender": "User", "text": "你有没有看到我的钥匙?"})
    time.sleep(0.1)
    agent_alpha.add_perception("internal_thought", {"thought": "用户在找钥匙,我需要回忆之前的环境观测。"})

    # 获取最近的感知
    print("n--- AgentAlpha's Recent Perceptions (Last 5) ---")
    recent_perceptions = agent_alpha.get_recent_perceptions(count=5)
    for p in recent_perceptions:
        print(f"[{time.strftime('%H:%M:%S', time.localtime(p['timestamp']))}] ID:{p['id']} Type:{p['event_type']} Content:{p['content']}")

    # 修剪流
    agent_alpha.trim_perception_stream(max_len=3)
    print("n--- AgentAlpha's Perceptions after trimming (should be max 3) ---")
    recent_perceptions_after_trim = agent_alpha.get_recent_perceptions(count=5) # 尝试获取5个,但只会返回最多3个
    for p in recent_perceptions_after_trim:
        print(f"[{time.strftime('%H:%M:%S', time.localtime(p['timestamp']))}] ID:{p['id']} Type:{p['event_type']} Content:{p['content']}")

解释

  • xadd 命令将新事件追加到Stream末尾,并返回一个唯一的消息ID。
  • xrevrange 命令用于从Stream中倒序读取消息,方便获取最新事件。
  • xtrim 命令允许我们设置Stream的最大长度,自动删除最旧的条目,实现记忆的自然衰减(类似短期记忆的有限容量)。approximate=True 允许Redis进行近似修剪,提高效率。

3.2 Redis Hashes:存储Agent的“工作记忆”与“当前状态”

Redis Hashes适合存储Agent当前关注的实体状态、对话的当前上下文、正在执行的任务参数等。它们可以被视为Agent的“工作记忆”,快速更新和访问。

场景:Agent需要跟踪当前对话对象的情绪、当前任务的进度、用户提供的临时信息。

代码示例

import redis
import json

# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

class AgentWorkingMemory:
    def __init__(self, agent_id: str, hash_key_prefix: str = "agent:working_memory:"):
        self.agent_id = agent_id
        self.hash_key = f"{hash_key_prefix}{agent_id}"
        print(f"Initialized Working Memory for Agent {agent_id}, Hash Key: {self.hash_key}")

    def update_context(self, key: str, value: any, ttl: int = None):
        """
        更新Agent工作记忆中的一个键值对。
        key: 记忆项的键
        value: 记忆项的值(会自动JSON序列化)
        ttl: 可选,设置键的过期时间(秒),实现记忆的自然衰减
        """
        try:
            serialized_value = json.dumps(value)
            r.hset(self.hash_key, key, serialized_value)
            if ttl is not None:
                r.expire(self.hash_key, ttl) # 设置整个Hash的过期时间
            print(f"Agent {self.agent_id} updated working memory: {key} = {value}")
        except Exception as e:
            print(f"Error updating working memory: {e}")

    def get_context(self, key: str):
        """
        获取Agent工作记忆中的某个键的值。
        """
        try:
            serialized_value = r.hget(self.hash_key, key)
            if serialized_value:
                return json.loads(serialized_value)
            return None
        except Exception as e:
            print(f"Error getting working memory: {e}")
            return None

    def get_all_context(self):
        """
        获取Agent工作记忆中的所有键值对。
        """
        try:
            all_fields = r.hgetall(self.hash_key)
            deserialized_context = {}
            for key, serialized_value in all_fields.items():
                try:
                    deserialized_context[key] = json.loads(serialized_value)
                except json.JSONDecodeError:
                    deserialized_context[key] = serialized_value # 如果不是JSON,保留原样
            return deserialized_context
        except Exception as e:
            print(f"Error getting all working memory: {e}")
            return {}

    def delete_context(self, key: str):
        """
        从工作记忆中删除某个键。
        """
        try:
            r.hdel(self.hash_key, key)
            print(f"Agent {self.agent_id} deleted from working memory: {key}")
        except Exception as e:
            print(f"Error deleting from working memory: {e}")

# --- 演示Agent工作记忆 ---
if __name__ == "__main__":
    agent_beta = AgentWorkingMemory(agent_id="AgentBeta")

    # 模拟Agent工作记忆更新
    agent_beta.update_context("user_intent", "schedule_meeting")
    agent_beta.update_context("meeting_topic", "Project X Review")
    agent_beta.update_context("participants", ["Alice", "Bob"], ttl=60) # 这个Hash 60秒后过期
    agent_beta.update_context("current_mood", "neutral")

    print("n--- AgentBeta's Current Working Memory ---")
    print(agent_beta.get_all_context())

    # 获取特定上下文
    print(f"nUser Intent: {agent_beta.get_context('user_intent')}")
    print(f"Meeting Topic: {agent_beta.get_context('meeting_topic')}")

    # 模拟一段时间后,某个临时上下文可能过期
    print("n--- Waiting for participants context to potentially expire ---")
    # time.sleep(61) # 如果需要测试TTL,可以取消注释
    print(f"Participants (after delay): {agent_beta.get_context('participants')}")

    # 删除某个上下文
    agent_beta.delete_context("current_mood")
    print("n--- AgentBeta's Working Memory after deleting 'current_mood' ---")
    print(agent_beta.get_all_context())

解释

  • hset 用于设置Hash字段。
  • hgethgetall 用于获取Hash字段。
  • expire 命令可以为整个Hash设置过期时间(TTL),这意味着如果Agent在一段时间内没有更新某个工作记忆,它将自动消失,模拟短期记忆的衰减。

3.3 Redis Lists & Sets:管理短期任务队列与临时集合

  • Lists:可以作为Agent的待办任务队列、对话历史的最近N条消息。LPUSH/RPUSHLTRIM 组合可以实现一个固定大小的循环缓冲区。
  • Sets:用于存储Agent在短期内关注的唯一实体ID、临时标签或分类。

这些Redis数据结构共同构成了Agent灵活、高效的短期记忆层,能够应对瞬息万变的交互环境。

四、向量数据库在长期记忆增强(LTP)中的应用

长期记忆(LTP)要求记忆是持久的、可扩展的,并且最重要的是,可以通过语义相关性进行检索。传统的关键词匹配难以捕捉Agent对世界的深层理解。向量数据库(Vector Database)的出现,通过将信息转换为高维向量(embeddings),并支持高效的近似最近邻(ANN)搜索,完美解决了这个问题。

4.1 向量嵌入:将记忆转化为语义表示

核心思想是将Agent的经验(文本、事件描述、知识片段)通过预训练的语言模型(如BERT、OpenAI Embeddings等)转换为高维浮点数向量。这些向量捕捉了原始信息的语义含义,语义相似的信息在向量空间中距离更近。

代码示例

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载一个预训练的SBERT模型
# 第一次运行时可能需要下载模型
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

def get_embedding(text: str) -> list[float]:
    """
    将文本转换为向量嵌入。
    """
    if not text:
        return []
    embedding = embedding_model.encode(text, convert_to_tensor=False)
    return embedding.tolist()

# 演示
if __name__ == "__main__":
    text1 = "Agent今天和用户讨论了项目进展。"
    text2 = "用户询问了关于项目会议的安排。"
    text3 = "猫咪在沙发上睡着了。"
    text4 = "Agent回顾了昨天的用户反馈。"

    emb1 = get_embedding(text1)
    emb2 = get_embedding(text2)
    emb3 = get_embedding(text3)
    emb4 = get_embedding(text4)

    print(f"Embedding dimension: {len(emb1)}")

    # 计算余弦相似度(向量数据库内部会高效执行此操作)
    def cosine_similarity(v1, v2):
        v1_np = np.array(v1)
        v2_np = np.array(v2)
        dot_product = np.dot(v1_np, v2_np)
        norm_v1 = np.linalg.norm(v1_np)
        norm_v2 = np.linalg.norm(v2_np)
        return dot_product / (norm_v1 * norm_v2) if norm_v1 and norm_v2 else 0

    print(f"nSimilarity between text1 and text2: {cosine_similarity(emb1, emb2):.4f}") # 语义相关
    print(f"Similarity between text1 and text3: {cosine_similarity(emb1, emb3):.4f}") # 语义不相关
    print(f"Similarity between text1 and text4: {cosine_similarity(emb1, emb4):.4f}") # 语义相关

解释

  • 我们使用SentenceTransformer库来生成文本嵌入。实际应用中,可以选择更强大的模型,如OpenAI的text-embedding-ada-002
  • get_embedding函数将输入的文本转换为一个浮点数列表,这就是它的语义向量表示。
  • 余弦相似度(cosine similarity)是衡量两个向量之间相似度的常用指标,值越接近1,表示语义越相似。

4.2 向量数据库:存储与语义检索长期记忆

向量数据库(如Pinecone, Weaviate, Milvus, Qdrant,或者Redis Stack的RediSearch模块)负责高效地存储这些向量,并根据查询向量执行快速的相似度搜索。每个记忆条目除了向量外,还会附带元数据(metadata),如时间戳、来源、Agent ID、重要性评分等,以便进行过滤和上下文关联。

场景:Agent需要回忆过去关于某个主题的所有信息,或者在特定时间段内发生的重要事件。

抽象代码示例(由于不同的向量数据库API差异较大,我们这里提供一个抽象的接口,假设其背后是任何一种向量数据库):

import uuid
import time
from typing import List, Dict, Any

# 假设的向量数据库客户端接口
class AbstractVectorDBClient:
    def __init__(self, dimension: int, index_name: str = "agent_memories"):
        self.dimension = dimension
        self.index_name = index_name
        print(f"Initialized Abstract Vector DB Client for index: {index_name} (dim: {dimension})")

    def upsert(self, vectors: List[Dict[str, Any]]):
        """
        插入或更新向量及其元数据。
        vectors: 列表,每个元素是一个字典,包含 'id', 'vector', 'metadata'
        """
        raise NotImplementedError

    def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """
        根据查询向量进行语义搜索。
        query_vector: 查询的向量
        top_k: 返回最相似的记忆数量
        filter: 可选的元数据过滤条件
        返回:一个记忆列表,每个记忆包含 'id', 'score', 'metadata'
        """
        raise NotImplementedError

# 模拟一个简单的内存向量数据库实现(仅用于演示概念)
class InMemoryVectorDBClient(AbstractVectorDBClient):
    def __init__(self, dimension: int, index_name: str = "agent_memories"):
        super().__init__(dimension, index_name)
        self.memory_store = {} # {id: {'vector': [...], 'metadata': {...}}}

    def upsert(self, memories: List[Dict[str, Any]]):
        for mem in memories:
            mem_id = mem['id']
            # 验证向量维度
            if len(mem['vector']) != self.dimension:
                print(f"Warning: Vector ID {mem_id} has incorrect dimension. Expected {self.dimension}, got {len(mem['vector'])}")
                continue
            self.memory_store[mem_id] = {
                'vector': np.array(mem['vector']), # 存储为numpy数组方便计算
                'metadata': mem.get('metadata', {})
            }
            # print(f"Upserted memory: {mem_id} with metadata {mem.get('metadata', {})}")

    def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        query_vec_np = np.array(query_vector)
        results = []
        for mem_id, stored_mem in self.memory_store.items():
            metadata = stored_mem['metadata']
            # 应用过滤器
            if filter:
                match = True
                for k, v in filter.items():
                    if k not in metadata or metadata[k] != v:
                        match = False
                        break
                if not match:
                    continue

            # 计算相似度
            similarity_score = cosine_similarity(query_vec_np, stored_mem['vector'])
            results.append({
                'id': mem_id,
                'score': similarity_score,
                'metadata': metadata
            })

        results.sort(key=lambda x: x['score'], reverse=True)
        return results[:top_k]

# 将之前的get_embedding函数引入
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embedding(text: str) -> list[float]:
    if not text: return []
    return embedding_model.encode(text, convert_to_tensor=False).tolist()

# --- 演示Agent长期记忆 ---
if __name__ == "__main__":
    VECTOR_DIMENSION = len(get_embedding("test")) # 获取嵌入维度
    ltp_memory_db = InMemoryVectorDBClient(dimension=VECTOR_DIMENSION)

    class AgentLongTermMemory:
        def __init__(self, agent_id: str, vector_db_client: AbstractVectorDBClient):
            self.agent_id = agent_id
            self.db = vector_db_client
            print(f"Initialized Long-Term Memory for Agent {agent_id}")

        def store_memory(self, content: str, memory_type: str = "episodic", significance: float = 0.5):
            """
            存储一个长期记忆。
            content: 记忆的文本内容
            memory_type: 记忆类型,如 "episodic" (情景), "semantic" (语义)
            significance: 重要性评分
            """
            memory_id = str(uuid.uuid4())
            embedding = get_embedding(content)
            if not embedding:
                print(f"Could not generate embedding for content: {content}")
                return None

            memory_data = {
                "id": memory_id,
                "vector": embedding,
                "metadata": {
                    "agent_id": self.agent_id,
                    "content": content, # 存储原始内容,方便检索后展示
                    "timestamp": time.time(),
                    "memory_type": memory_type,
                    "significance": significance
                }
            }
            self.db.upsert([memory_data])
            print(f"Agent {self.agent_id} stored LTP memory (ID: {memory_id}): '{content[:50]}...'")
            return memory_id

        def retrieve_memories(self, query: str, top_k: int = 3, memory_type: str = None) -> List[Dict[str, Any]]:
            """
            根据查询检索相关的长期记忆。
            query: 查询的文本
            top_k: 返回数量
            memory_type: 可选,按记忆类型过滤
            """
            query_embedding = get_embedding(query)
            if not query_embedding:
                print(f"Could not generate embedding for query: {query}")
                return []

            filters = {"agent_id": self.agent_id}
            if memory_type:
                filters["memory_type"] = memory_type

            results = self.db.query(query_embedding, top_k=top_k, filter=filters)
            retrieved_memories = []
            for res in results:
                retrieved_memories.append({
                    "id": res['id'],
                    "score": res['score'],
                    "content": res['metadata']['content'],
                    "type": res['metadata']['memory_type'],
                    "timestamp": res['metadata']['timestamp'],
                    "significance": res['metadata']['significance']
                })
            return retrieved_memories

    # --- 演示Agent长期记忆存储与检索 ---
    agent_gamma = AgentLongTermMemory(agent_id="AgentGamma", vector_db_client=ltp_memory_db)

    # 存储一些长期记忆
    agent_gamma.store_memory("AgentGamma在2023年完成了与团队的首次项目启动会议。", "episodic", 0.8)
    agent_gamma.store_memory("编程时,优先考虑模块化和可重用性。", "semantic", 0.9)
    agent_gamma.store_memory("用户对新功能提出了改进建议,主要集中在界面友好度上。", "episodic", 0.7)
    agent_gamma.store_memory("Python的GIL限制了多线程的并行计算能力。", "semantic", 0.6)
    agent_gamma.store_memory("昨天我帮助用户解决了配置环境的问题,用户反馈很满意。", "episodic", 0.95)

    # 检索记忆
    print("n--- Retrieving memories about 'project meetings' ---")
    query_results = agent_gamma.retrieve_memories("关于项目会议的信息")
    for res in query_results:
        print(f"Score: {res['score']:.4f}, Type: {res['type']}, Content: {res['content'][:80]}...")

    print("n--- Retrieving semantic memories about 'programming best practices' ---")
    query_results_semantic = agent_gamma.retrieve_memories("关于编程最佳实践的知识", memory_type="semantic")
    for res in query_results_semantic:
        print(f"Score: {res['score']:.4f}, Type: {res['type']}, Content: {res['content'][:80]}...")

    print("n--- Retrieving memories about 'user feedback on new features' ---")
    query_results_feedback = agent_gamma.retrieve_memories("用户对新功能的反馈意见")
    for res in query_results_feedback:
        print(f"Score: {res['score']:.4f}, Type: {res['type']}, Content: {res['content'][:80]}...")

解释

  • store_memory函数接收文本内容、类型和重要性,生成嵌入,然后将其与元数据一起存储到向量数据库中。
  • retrieve_memories函数接收查询文本,生成查询嵌入,然后向向量数据库发送查询,获取语义最相似的记忆。
  • 元数据过滤(例如agent_id, memory_type)是向量数据库的关键功能,它允许我们在语义搜索的基础上进一步精确结果。

五、构建Agent的“海马体”:记忆巩固机制

现在,我们有了短期记忆(Redis)和长期记忆(向量数据库),核心挑战是如何构建一个“海马体”代理,来协调这两者之间的信息流,实现记忆的巩固、更新和高效检索。

这个“海马体”代理将是一个独立的逻辑组件,它持续监控Agent的短期记忆,根据一定的策略(例如重要性、重复性、与Agent目标的关联性)来决定哪些短期记忆应该被提升为长期记忆。

5.1 记忆巩固代理的架构

  1. 事件监听器:监控Redis Streams中Agent的感知流。
  2. 重要性评估器:对短期记忆事件进行评估,判断其是否重要到需要巩固。这可以通过LLM进行评估,或者基于预设规则(如包含关键词、被Agent多次提及等)。
  3. 记忆聚合器:将相关的短期事件聚合成一个连贯的“记忆块”或“经验摘要”,以减少长期记忆的冗余,并形成更高层次的理解。
  4. 嵌入生成器:将聚合后的记忆块转换为向量。
  5. LTP管理器:将向量及其元数据存储到向量数据库。
  6. STP清理器:对已巩固或已过时的短期记忆进行清理。

5.2 记忆巩固的流程与策略

  • 定时巩固:每隔一定时间(例如每小时),“海马体”代理扫描Agent的短期记忆。
  • 按需巩固:当Agent完成一个重要任务,或者与用户进行了长时间对话后,触发一次巩固。
  • 重复性检测:如果短期记忆中的某个信息反复出现,说明其可能具有重要性,应优先巩固。
  • LLM辅助评估:将短期记忆片段输入LLM,让其评估记忆的重要性、概括其核心内容,并生成一个简洁的记忆摘要。

代码示例:一个简化的“海马体”巩固代理

import redis
import time
import json
import uuid
from typing import List, Dict, Any

# 假设的Redis连接和向量数据库客户端
# from your_redis_setup import r
# from your_vector_db_setup import InMemoryVectorDBClient, get_embedding, VECTOR_DIMENSION

# 重新定义,确保代码独立可运行
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embedding(text: str) -> list[float]:
    if not text: return []
    return embedding_model.encode(text, convert_to_tensor=False).tolist()

VECTOR_DIMENSION = len(get_embedding("test"))

class AbstractVectorDBClient:
    # ... (与之前InMemVectorDBClient相同的定义,略去重复代码)
    def __init__(self, dimension: int, index_name: str = "agent_memories"):
        self.dimension = dimension
        self.index_name = index_name
    def upsert(self, vectors: List[Dict[str, Any]]): raise NotImplementedError
    def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]: raise NotImplementedError

class InMemoryVectorDBClient(AbstractVectorDBClient):
    # ... (与之前InMemVectorDBClient相同的实现,略去重复代码)
    def __init__(self, dimension: int, index_name: str = "agent_memories"):
        super().__init__(dimension, index_name)
        self.memory_store = {}
    def upsert(self, memories: List[Dict[str, Any]]):
        for mem in memories:
            mem_id = mem['id']
            if len(mem['vector']) != self.dimension:
                print(f"Warning: Vector ID {mem_id} has incorrect dimension. Expected {self.dimension}, got {len(mem['vector'])}")
                continue
            self.memory_store[mem_id] = {'vector': np.array(mem['vector']), 'metadata': mem.get('metadata', {})}
    def query(self, query_vector: List[float], top_k: int = 5, filter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        query_vec_np = np.array(query_vector)
        results = []
        for mem_id, stored_mem in self.memory_store.items():
            metadata = stored_mem['metadata']
            if filter:
                match = True
                for k, v in filter.items():
                    if k not in metadata or metadata[k] != v:
                        match = False
                        break
                if not match: continue
            similarity_score = cosine_similarity(query_vec_np, stored_mem['vector'])
            results.append({'id': mem_id, 'score': similarity_score, 'metadata': metadata})
        results.sort(key=lambda x: x['score'], reverse=True)
        return results[:top_k]

# 引入之前的cosine_similarity函数
def cosine_similarity(v1, v2):
    v1_np = np.array(v1)
    v2_np = np.array(v2)
    dot_product = np.dot(v1_np, v2_np)
    norm_v1 = np.linalg.norm(v1_np)
    norm_v2 = np.linalg.norm(v2_np)
    return dot_product / (norm_v1 * norm_v2) if norm_v1 and norm_v2 else 0

class AgentHippocampus:
    def __init__(self, agent_id: str, redis_client: redis.Redis, vector_db_client: AbstractVectorDBClient,
                 perception_stream_key: str = "agent:perception:",
                 consolidation_interval_sec: int = 30): # 每30秒尝试巩固一次
        self.agent_id = agent_id
        self.r = redis_client
        self.vector_db = vector_db_client
        self.perception_stream_key = f"{perception_stream_key}{agent_id}"
        self.consolidation_interval_sec = consolidation_interval_sec
        self.last_consolidated_id = '0-0' # 记录上次巩固到的Stream ID

        print(f"Initialized Hippocampus for Agent {agent_id}. Monitoring stream: {self.perception_stream_key}")

    def _evaluate_significance(self, event_content: str) -> float:
        """
        模拟重要性评估。实际中可能用LLM或更复杂的规则。
        这里简单地根据长度和关键词判断。
        """
        if "重要" in event_content or "紧急" in event_content:
            return 0.9
        if len(event_content) > 50: # 较长的内容可能更重要
            return 0.7
        return 0.3 # 默认较低重要性

    def _summarize_events(self, events: List[Dict]) -> str:
        """
        模拟事件摘要。实际中会用LLM。
        这里简单地连接所有事件内容。
        """
        if not events:
            return ""
        # 提取所有事件内容,并拼接成一个长字符串
        contents = [event['content'].get('text', str(event['content'])) for event in events if 'content' in event]
        # 简单过滤重复,并用LLM总结会更好
        return " ".join(list(set(contents)))

    def consolidate_memories(self):
        """
        从短期记忆流中读取新事件,评估并巩固为长期记忆。
        """
        print(f"n--- Agent {self.agent_id} Hippocampus: Starting memory consolidation ---")
        # 从上次巩固的ID开始读取新的事件
        new_entries = self.r.xread({self.perception_stream_key: self.last_consolidated_id}, count=100, block=0)

        if not new_entries:
            print("No new perceptions to consolidate.")
            return

        # new_entries 是一个列表,每个元素是 (stream_key, [(id, {field:value}), ...])
        # 我们只关心当前Agent的stream
        agent_stream_data = []
        for stream_key, entries in new_entries:
            if stream_key == self.perception_stream_key:
                agent_stream_data = entries
                break

        if not agent_stream_data:
            print("No new perceptions for this agent to consolidate.")
            return

        memories_to_store = []
        for entry_id, fields in agent_stream_data:
            try:
                event_type = fields.get("event_type")
                content_json = fields.get("content", "{}")
                content_dict = json.loads(content_json)
                timestamp = float(fields.get("timestamp"))

                # 这里是一个简化的重要性评估和聚合逻辑
                # 在真实的Agent中,你可能会在这里调用LLM来评估和总结
                event_description = f"Agent {self.agent_id} perceived a {event_type} event: {content_dict}"
                significance = self._evaluate_significance(str(content_dict))

                # 只有达到一定重要性的事件才会被巩固
                if significance >= 0.6:
                    memory_id = str(uuid.uuid4())
                    embedding = get_embedding(event_description) # 对聚合后的描述进行嵌入
                    if not embedding:
                        print(f"Skipping consolidation for entry {entry_id} due to embedding failure.")
                        continue

                    memories_to_store.append({
                        "id": memory_id,
                        "vector": embedding,
                        "metadata": {
                            "agent_id": self.agent_id,
                            "original_event_id": entry_id, # 记录原始短期记忆ID
                            "content": event_description,
                            "timestamp": timestamp,
                            "memory_type": "episodic", # 默认情景记忆
                            "significance": significance
                        }
                    })
                    print(f"Consolidating event ID {entry_id} (Significance: {significance:.2f})")

            except json.JSONDecodeError:
                print(f"Error decoding content for entry {entry_id}: {fields.get('content')}")
            except Exception as e:
                print(f"Error processing entry {entry_id}: {e}")
            finally:
                self.last_consolidated_id = entry_id # 更新上次巩固的ID

        if memories_to_store:
            self.vector_db.upsert(memories_to_store)
            print(f"--- Successfully consolidated {len(memories_to_store)} memories into LTP. ---")
        else:
            print("No significant memories found for consolidation.")

    def run_consolidation_loop(self):
        """
        持续运行巩固循环。
        """
        print(f"Agent {self.agent_id} Hippocampus starting consolidation loop...")
        while True:
            self.consolidate_memories()
            time.sleep(self.consolidation_interval_sec)

# --- 演示海马体记忆巩固 ---
if __name__ == "__main__":
    # 模拟Redis和Vector DB
    agent_id_for_hippocampus = "AgentDelta"
    perception_buffer = AgentPerceptionBuffer(agent_id=agent_id_for_hippocampus)
    ltp_vector_db = InMemoryVectorDBClient(dimension=VECTOR_DIMENSION)

    hippocampus = AgentHippocampus(
        agent_id=agent_id_for_hippocampus,
        redis_client=r,
        vector_db_client=ltp_vector_db,
        consolidation_interval_sec=5 # 每5秒尝试巩固一次
    )

    # 在后台线程运行巩固循环
    import threading
    consolidation_thread = threading.Thread(target=hippocampus.run_consolidation_loop, daemon=True)
    consolidation_thread.start()

    # 模拟Agent持续产生感知事件
    print("n--- AgentDelta starts perceiving events (some significant, some not) ---")
    perception_buffer.add_perception("user_message", {"sender": "User", "text": "你好,AgentDelta。今天天气真好!"}) # 较低重要性
    time.sleep(1)
    perception_buffer.add_perception("internal_thought", {"thought": "我需要记住用户对我的期望,这很重要。"}) # 较高重要性
    time.sleep(1)
    perception_buffer.add_perception("env_observation", {"sensor": "lidar", "data": "no obstacle ahead"}) # 较低重要性
    time.sleep(1)
    perception_buffer.add_perception("user_message", {"sender": "User", "text": "请帮我安排一个明天上午的重要会议,主题是项目风险评估。"}) # 较高重要性
    time.sleep(1)
    perception_buffer.add_perception("internal_thought", {"thought": "用户要求安排重要会议,我应该立即处理。"}) # 较高重要性
    time.sleep(1)
    perception_buffer.add_perception("system_log", {"level": "info", "message": "Disk usage is 45%."}) # 较低重要性

    # 让巩固线程运行一段时间
    print("n--- Allowing hippocampus to consolidate for a short period ---")
    time.sleep(15) # 模拟Agent运行15秒,巩固线程会运行3次

    # 检查长期记忆中是否有了巩固的记忆
    class AgentLongTermMemoryReader: # 重新封装一个阅读器
        def __init__(self, agent_id: str, vector_db_client: AbstractVectorDBClient):
            self.agent_id = agent_id
            self.db = vector_db_client
        def retrieve_memories(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
            query_embedding = get_embedding(query)
            if not query_embedding: return []
            filters = {"agent_id": self.agent_id}
            results = self.db.query(query_embedding, top_k=top_k, filter=filters)
            retrieved_memories = []
            for res in results:
                retrieved_memories.append({
                    "id": res['id'], "score": res['score'],
                    "content": res['metadata']['content'],
                    "timestamp": res['metadata']['timestamp'],
                    "significance": res['metadata']['significance']
                })
            return retrieved_memories

    ltp_reader = AgentLongTermMemoryReader(agent_id=agent_id_for_hippocampus, vector_db_client=ltp_vector_db)

    print("n--- Checking AgentDelta's Long-Term Memories ---")
    retrieved_ltp = ltp_reader.retrieve_memories("关于重要会议和用户期望的记忆", top_k=5)
    for mem in retrieved_ltp:
        print(f"Score: {mem['score']:.4f}, Sig: {mem['significance']:.2f}, Content: {mem['content'][:80]}...")

    print("n--- Stopping simulation ---")
    # 为了演示,这里不停止线程,实际应用中需要优雅地停止

解释

  • AgentHippocampus类模拟了海马体的功能。
  • 它通过xread命令持续监听Redis Stream中的新事件,但只读取自上次巩固点之后的数据,避免重复处理。
  • _evaluate_significance_summarize_events是简化版本,实际应用中会集成LLM来提供更智能的评估和摘要能力。
  • 只有重要性达到阈值的事件才会被生成嵌入并存储到向量数据库中。
  • last_consolidated_id确保了每次巩固只处理新的、未处理过的短期记忆。
  • if __name__ == "__main__":块中,我们展示了如何启动一个独立的线程来模拟海马体的后台巩固过程,并让Agent同时产生短期感知。

5.3 记忆检索的协同

当Agent需要回忆时,它不应仅仅查询长期记忆,也应考虑最近的短期记忆。

  • 即时上下文:从Redis Hashes/Lists中获取当前对话、任务的即时上下文。
  • 语义相关:使用Agent的查询嵌入,从向量数据库中检索语义相关的长期记忆。
  • 整合:将即时上下文与长期记忆结合起来,作为LLM的输入,生成更全面、更准确的回应或决策。

这种协同检索机制确保Agent在处理信息时既能顾及眼前,又能利用过去的经验。

六、高级概念与优化

  1. 遗忘机制:长期记忆并非永久不变。可以为向量数据库中的记忆设置“老化”分数,或者定期删除不重要的、长时间未被检索的记忆,模拟遗忘,防止记忆库无限增长。
  2. 记忆重构与强化:当一个长期记忆被检索并用于Agent的推理时,它应该被“激活”,并可能与新的短期信息结合,形成新的、更丰富的记忆,或强化原有记忆。这类似于生物学中的“记忆再巩固”过程。
  3. 层次化记忆:将记忆组织成不同的抽象层次。例如,原始事件是低层记忆,事件摘要是中层记忆,学习到的通用原则是高层记忆。这可以通过多层嵌入和多索引向量数据库实现。
  4. 多模态记忆:不仅限于文本,Agent还可以处理图像、音频等多种模态的数据,将其转换为统一的向量表示并存储。
  5. 分布式与扩展性:Redis Cluster和向量数据库(如Milvus, Pinecone)本身就支持分布式部署,能够处理大规模的Agent和海量的记忆数据。

七、总结

通过借鉴生物大脑中短期与长期记忆增强的原理,我们为AI Agent构建了一个强大的“海马体”记忆巩固机制。Redis提供了高速、灵活的短期记忆存储,处理Agent的实时感知与工作状态。向量数据库则为Agent提供了可扩展、语义化的长期记忆,实现高效的知识检索。通过一个智能的“巩固代理”,我们能够将短期经验转化为持久知识,让Agent真正从经验中学习,不断成长。这一架构为构建更智能、更自主的AI Agent奠定了坚实的基础,使其能够超越瞬时交互的限制,拥有更深远的“认知”。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注