解析 ‘Joint Message Buffer’:多智能体共享对话历史时,如何通过消息过滤防止上下文爆炸?

各位编程专家和技术爱好者,大家好!

今天,我们齐聚一堂,探讨多智能体系统(Multi-Agent Systems, MAS)中一个核心且日益严峻的挑战——上下文爆炸(Context Explosion),以及我们如何通过巧妙的消息过滤机制,来构建一个高效、可扩展且智能的“联合消息缓冲区”(Joint Message Buffer, JMB)。

随着大型语言模型(LLMs)能力的飞速发展,我们正迈入一个由多个智能体协作完成复杂任务的新时代。想象一下,一个团队由产品经理智能体、开发智能体、测试智能体和客服智能体组成,它们共同协作开发和维护一款软件产品。在这个场景中,智能体之间需要频繁地交流信息,共享对当前任务的理解和进度。一个共享的对话历史,或者说一个联合消息缓冲区,是实现这种协作的关键。

联合消息缓冲区(Joint Message Buffer, JMB)的必要性

在一个多智能体系统中,JMB 扮演着中央信息枢纽的角色。它的核心功能是存储所有智能体之间发生过的对话、任务更新、观察结果以及其他任何形式的交流。

为什么我们需要 JMB?

  1. 全局一致性上下文: JMB 提供了一个所有智能体都能访问的、关于系统状态和历史事件的单一事实来源。这有助于确保每个智能体对当前任务和环境有共同的理解。
  2. 协作与协调: 智能体可以通过 JMB 了解其他智能体的行动、意图和进度,从而更好地协调自己的行为,避免重复工作,并识别潜在的冲突。
  3. 决策支持: 智能体可以查询 JMB 来回顾过去的决策、吸取经验教训,或者基于历史数据做出更明智的未来决策。
  4. 可解释性与审计: JMB 记录了系统的整个运行轨迹,对于调试、性能分析以及理解智能体行为至关重要。
  5. 状态持久化: 在智能体重启或系统崩溃后,JMB 可以用于恢复之前的会话状态。

我们可以将 JMB 想象成一个巨大的会议室白板,所有参会者(智能体)都可以在上面写下自己的想法、问题、解决方案,并且查看其他人的贡献。

一个简单的 JMB 结构设想:

在代码层面,一个 JMB 可以是一个存储消息对象的列表或队列,每个消息对象都包含发送者、接收者、内容、时间戳等信息。

import datetime
import uuid
from typing import List, Dict, Any, Optional

class Message:
    """
    表示一个在多智能体系统中传递的消息。
    """
    def __init__(self,
                 sender_id: str,
                 receiver_id: str, # 可以是单个ID,也可以是'all',或者一个ID列表
                 content: str,
                 message_type: str = "text", # 例如 'text', 'code', 'status_update', 'question'
                 timestamp: Optional[datetime.datetime] = None,
                 task_id: Optional[str] = None, # 消息所属的任务ID
                 priority: int = 5, # 1-10,1最高
                 metadata: Optional[Dict[str, Any]] = None):
        self.message_id = str(uuid.uuid4())
        self.sender_id = sender_id
        self.receiver_id = receiver_id
        self.content = content
        self.message_type = message_type
        self.timestamp = timestamp if timestamp is not None else datetime.datetime.now()
        self.task_id = task_id
        self.priority = priority
        self.metadata = metadata if metadata is not None else {}

    def __str__(self):
        return (f"[{self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] "
                f"From {self.sender_id} to {self.receiver_id} "
                f"(Type: {self.message_type}, Task: {self.task_id if self.task_id else 'N/A'}): "
                f"{self.content}")

    def to_dict(self):
        return {
            "message_id": self.message_id,
            "sender_id": self.sender_id,
            "receiver_id": self.receiver_id,
            "content": self.content,
            "message_type": self.message_type,
            "timestamp": self.timestamp.isoformat(),
            "task_id": self.task_id,
            "priority": self.priority,
            "metadata": self.metadata
        }

class JointMessageBuffer:
    """
    联合消息缓冲区,存储所有智能体之间的消息。
    """
    def __init__(self):
        self._messages: List[Message] = []
        self._next_message_index: int = 0 # 用于按顺序迭代消息

    def add_message(self, message: Message):
        """添加一条消息到缓冲区。"""
        self._messages.append(message)
        # 保持消息按时间顺序排列,尽管通常添加时已经是时间顺序
        self._messages.sort(key=lambda m: m.timestamp)

    def get_all_messages(self) -> List[Message]:
        """获取所有消息(慎用,可能导致上下文爆炸)。"""
        return list(self._messages)

    def get_messages_for_agent(self, agent_id: str, limit: Optional[int] = None) -> List[Message]:
        """
        获取与特定智能体相关的所有消息。
        这只是一个基础方法,尚未包含过滤逻辑。
        """
        relevant_messages = []
        for message in self._messages:
            if message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all':
                relevant_messages.append(message)

        if limit is not None and len(relevant_messages) > limit:
            return relevant_messages[-limit:] # 获取最新的N条
        return relevant_messages

    def __len__(self):
        return len(self._messages)

    def __iter__(self):
        self._next_message_index = 0
        return self

    def __next__(self):
        if self._next_message_index < len(self._messages):
            message = self._messages[self._next_message_index]
            self._next_message_index += 1
            return message
        raise StopIteration

# 示例用法
if __name__ == "__main__":
    jmb = JointMessageBuffer()

    # 模拟智能体发送消息
    jmb.add_message(Message("AgentA", "AgentB", "AgentB, 你能帮我审查一下这个需求文档吗?", task_id="REQ-001"))
    jmb.add_message(Message("AgentB", "AgentA", "好的,AgentA,我正在看。有什么特别需要注意的地方吗?", task_id="REQ-001"))
    jmb.add_message(Message("AgentC", "all", "团队注意,今天下午3点有紧急会议,讨论BUG-FIX-005。", message_type="status_update", priority=1))
    jmb.add_message(Message("AgentA", "AgentC", "AgentC,收到。我将准备好需求变更对BUG-FIX-005的影响分析。", task_id="BUG-FIX-005"))
    jmb.add_message(Message("AgentB", "AgentA", "AgentA, 需求文档初审完成,有一些关于用户体验的建议我已经标注。", task_id="REQ-001"))

    print("--- 所有消息 ---")
    for msg in jmb.get_all_messages():
        print(msg)

    print("n--- AgentA 相关消息 (基础获取,未过滤) ---")
    for msg in jmb.get_messages_for_agent("AgentA"):
        print(msg)

    print("n--- AgentB 相关消息 (基础获取,最新2条) ---")
    for msg in jmb.get_messages_for_agent("AgentB", limit=2):
        print(msg)

上下文爆炸:沉默的杀手

JMB 的强大之处在于其能够汇集所有信息,但这也带来了其最大的挑战:上下文爆炸(Context Explosion)

随着多智能体系统运行时间的增长,JMB 中存储的消息数量会迅速膨胀。当一个智能体需要访问上下文来做决策时,如果我们将整个 JMB 的内容,或者其中大部分内容,都作为输入提供给底层的 LLM,就会出现以下严重问题:

  1. 成本急剧增加: LLM 的API调用通常按Token数量计费。一个冗长的上下文意味着更高的Token消耗,从而导致高昂的运营成本。
  2. 延迟增加: 处理更长的上下文需要更多计算资源和时间,导致智能体响应速度变慢,影响用户体验和系统实时性。
  3. 性能下降: LLM 在处理超长上下文时,其理解和推理能力会下降。它们可能会“迷失”在大量的无关信息中,无法识别出真正重要的信息,甚至产生幻觉(hallucination)。
  4. 注意力分散与主题漂移: 智能体可能会被历史中不相关的细节分散注意力,导致在当前任务上无法聚焦,甚至偏离主旨。
  5. 内存与计算资源消耗: 在智能体内部维护和处理大量上下文需要巨大的内存和计算开销,尤其是在边缘设备或资源受限的环境中。

简而言之,如果我们不对 JMB 进行有效的管理和过滤,它将从一个协作的“白板”变成一个堆满废纸的“垃圾场”,严重阻碍多智能体系统的效率和智能。

消息过滤:防止上下文爆炸的利器

为了解决上下文爆炸问题,我们必须引入智能的消息过滤(Message Filtering)机制。消息过滤的核心思想是:对于每个智能体在特定时间点和特定任务下,只提供其真正需要和最相关的上下文信息。

这就像给每个参会者一个个性化的“摘要”,而不是让他们阅读整个会议记录。摘要的内容会根据他们的角色、当前议题和个人关注点而定制。

消息过滤可以发生在多个层面:

  • JMB 层面: 在从 JMB 中检索消息时进行过滤。
  • 智能体内部: 智能体在接收到过滤后的消息后,可能还会进行二次筛选或处理。
  • LLM 封装层: 在将上下文传递给 LLM 之前,进行最终的裁剪和格式化。

接下来,我们将深入探讨各种消息过滤策略,并提供代码实现示例。

核心过滤策略

消息过滤并非一刀切,而是多种策略的组合。我们可以根据不同的维度来设计过滤规则。

1. 时间基础过滤 (Time-Based Filtering)

这是最直接也最常用的过滤方法。它假设最近的消息通常比很久以前的消息更相关。

策略:

  • 滑动时间窗口 (Sliding Time Window): 只保留最近 N 分钟、N 小时或 N 天内的消息。
  • 固定消息数量 (Fixed Message Count): 只保留最新的 N 条消息。
  • 衰减模型 (Decay Model): 消息的权重或相关性随时间衰减,过滤时优先保留权重高的消息。

适用场景:
大多数实时协作任务,如日常对话、短期项目更新等。

优点:
简单易实现,效果直观。

缺点:
可能丢失历史中重要的长期信息或关键上下文。例如,一个早期的设计决策可能在很久之后才被提及并变得相关。

代码示例:

我们为 JointMessageBuffer 添加一个方法,用于根据时间窗口和消息数量进行过滤。

class JointMessageBuffer:
    # ... (previous JMB code) ...

    def filter_by_time(self,
                       agent_id: str,
                       time_window_seconds: Optional[int] = None,
                       max_messages: Optional[int] = None) -> List[Message]:
        """
        根据时间窗口和/或最大消息数量过滤消息。
        只返回与 agent_id 相关 (发送给/收到/发送给all) 的消息。
        """
        relevant_messages = []
        for message in self._messages:
            if message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all':
                relevant_messages.append(message)

        # 1. 应用时间窗口过滤
        if time_window_seconds is not None:
            cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
            relevant_messages = [msg for msg in relevant_messages if msg.timestamp >= cutoff_time]

        # 2. 确保消息按时间倒序排列,以便获取最新的 N 条
        relevant_messages.sort(key=lambda m: m.timestamp)

        # 3. 应用最大消息数量过滤
        if max_messages is not None and len(relevant_messages) > max_messages:
            relevant_messages = relevant_messages[-max_messages:] # 获取最新的 N 条

        return relevant_messages

# 示例用法
if __name__ == "__main__":
    jmb = JointMessageBuffer()
    # ... (add messages as before) ...
    jmb.add_message(Message("AgentD", "AgentA", "AgentA, 这是一个5小时前的旧消息,关于项目规划。",
                            timestamp=datetime.datetime.now() - datetime.timedelta(hours=5), task_id="PROJ-001"))
    jmb.add_message(Message("AgentE", "all", "团队,今天的每日站会已结束。",
                            timestamp=datetime.datetime.now() - datetime.timedelta(minutes=30), priority=7))
    jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请确认我们关于新功能A的最新讨论。",
                            timestamp=datetime.datetime.now() - datetime.timedelta(minutes=5), task_id="FEAT-A"))

    print("n--- AgentA 相关消息 (最近1小时内,最多3条) ---")
    filtered_msgs = jmb.filter_by_time("AgentA", time_window_seconds=3600, max_messages=3)
    for msg in filtered_msgs:
        print(msg)
    # 预期输出将不包含5小时前的消息,且只包含最新的3条

2. 内容基础过滤 (Content-Based Filtering)

这种策略根据消息的文本内容来判断其相关性。

策略:

  • 关键词匹配 (Keyword Matching): 智能体维护一个关键词列表,只接收包含这些关键词的消息。例如,一个“测试智能体”可能只关心包含“bug”、“测试用例”、“缺陷”等关键词的消息。
  • 主题模型 (Topic Modeling): 使用 LDA (Latent Dirichlet Allocation) 或其他更复杂的模型来识别消息的主题,并根据智能体关注的主题进行过滤。
  • 语义相似度 (Semantic Similarity): 使用词嵌入 (Word Embeddings) 或 Sentence Embeddings 计算消息与智能体关注焦点之间的语义相似度。
  • 正则表达式 (Regex Matching): 匹配特定的模式,例如错误代码、URL等。

适用场景:
当智能体有明确的职责范围和关注领域时。

优点:
精确度高,能够捕获与智能体任务直接相关的上下文。

缺点:
需要维护关键词或主题模型,可能遗漏相关但未明确提及关键词的消息。语义相似度计算成本较高。

代码示例:

JointMessageBuffer 中添加基于关键词的过滤方法。智能体可以提供一个关键词列表。

import re

class JointMessageBuffer:
    # ... (previous JMB code) ...

    def filter_by_content(self,
                          agent_id: str,
                          keywords: Optional[List[str]] = None,
                          exclude_keywords: Optional[List[str]] = None,
                          task_id: Optional[str] = None,
                          message_types: Optional[List[str]] = None) -> List[Message]:
        """
        根据内容关键词、任务ID和消息类型过滤消息。
        只返回与 agent_id 相关 (发送给/收到/发送给all) 的消息。
        """
        relevant_messages = []
        for message in self._messages:
            # 首先,确保消息与当前智能体相关
            if not (message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all'):
                continue

            # 过滤任务ID
            if task_id is not None and message.task_id != task_id:
                continue

            # 过滤消息类型
            if message_types is not None and message.message_type not in message_types:
                continue

            # 关键词包含过滤
            if keywords:
                content_lower = message.content.lower()
                if not any(keyword.lower() in content_lower for keyword in keywords):
                    continue

            # 关键词排除过滤
            if exclude_keywords:
                content_lower = message.content.lower()
                if any(exclude_keyword.lower() in content_lower for exclude_keyword in exclude_keywords):
                    continue

            relevant_messages.append(message)

        relevant_messages.sort(key=lambda m: m.timestamp)
        return relevant_messages

# 示例用法
if __name__ == "__main__":
    jmb = JointMessageBuffer()
    jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请审查一下我们关于新功能A的需求文档。", task_id="FEAT-A", message_type="request"))
    jmb.add_message(Message("AgentB", "AgentA", "AgentA, 我发现新功能A在用户体验上可能存在一个潜在的bug。", task_id="FEAT-A", message_type="feedback"))
    jmb.add_message(Message("AgentC", "all", "团队,今天的每日站会已结束。", message_type="status_update"))
    jmb.add_message(Message("AgentD", "AgentE", "AgentE, 请确认一下环境配置。", message_type="config_request"))
    jmb.add_message(Message("AgentA", "AgentC", "AgentC, 我需要一个关于用户反馈的报告。", task_id="REPORT-001", message_type="request"))

    print("n--- AgentA 相关消息 (关键词 '功能A' 或 '需求', 任务ID 'FEAT-A') ---")
    filtered_msgs = jmb.filter_by_content("AgentA", keywords=["功能A", "需求"], task_id="FEAT-A")
    for msg in filtered_msgs:
        print(msg)
    # 预期会包含 AgentA 和 AgentB 之间关于 FEAT-A 的消息

    print("n--- AgentB 相关消息 (关键词 'bug', 消息类型 'feedback') ---")
    filtered_msgs = jmb.filter_by_content("AgentB", keywords=["bug"], message_types=["feedback"])
    for msg in filtered_msgs:
        print(msg)
    # 预期会包含 AgentB 发现 bug 的消息

3. 角色基础过滤 (Role-Based Filtering)

在多智能体系统中,每个智能体通常都有一个明确的“角色”或“职责”。我们可以根据这些角色来定义哪些消息是相关的。

策略:

  • 预定义角色关注点: 每个智能体在初始化时被赋予一个角色(例如“产品经理”、“开发工程师”、“测试人员”),每个角色都有一个预定义的过滤规则集(例如,产品经理关注“需求”、“市场”,开发关注“代码”、“实现”,测试关注“bug”、“测试用例”)。
  • 智能体间信任关系: 只有来自特定智能体或特定角色的消息才会被考虑。

适用场景:
具有明确职责分工的团队协作场景。

优点:
结构化、可管理性强,易于理解和配置。

缺点:
不够灵活,如果任务跨越多个角色边界,可能需要更复杂的规则。

代码示例:

我们可以将角色与关键词列表关联起来,并在智能体请求上下文时应用。

class AgentRole:
    """定义智能体的角色及其关注点。"""
    PRODUCT_MANAGER = {"keywords": ["需求", "用户", "市场", "发布", "计划"], "exclude_keywords": ["代码", "编译", "测试用例"]}
    DEVELOPER = {"keywords": ["代码", "实现", "bug", "架构", "部署"], "exclude_keywords": ["市场", "客户反馈"]}
    TESTER = {"keywords": ["测试", "缺陷", "bug", "测试用例", "回归"], "exclude_keywords": ["市场分析", "产品路线图"]}
    CUSTOMER_SUPPORT = {"keywords": ["用户反馈", "问题", "故障", "客户", "帮助"], "exclude_keywords": ["代码实现", "架构设计"]}

class JointMessageBuffer:
    # ... (previous JMB code) ...

    def filter_by_role(self,
                       agent_id: str,
                       agent_role: Dict[str, Any], # 期望是AgentRole类中的一个字典
                       time_window_seconds: Optional[int] = None,
                       max_messages: Optional[int] = None) -> List[Message]:
        """
        根据智能体的角色及其关注点过滤消息。
        结合了内容过滤和时间过滤。
        """
        keywords = agent_role.get("keywords")
        exclude_keywords = agent_role.get("exclude_keywords")
        message_types = agent_role.get("message_types") # 角色也可以定义关注的消息类型

        # 先进行内容过滤
        filtered_by_content = self.filter_by_content(
            agent_id=agent_id,
            keywords=keywords,
            exclude_keywords=exclude_keywords,
            message_types=message_types
        )

        # 在内容过滤的基础上,再进行时间过滤
        if time_window_seconds is not None:
            cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
            filtered_by_content = [msg for msg in filtered_by_content if msg.timestamp >= cutoff_time]

        if max_messages is not None and len(filtered_by_content) > max_messages:
            filtered_by_content = filtered_by_content[-max_messages:]

        return filtered_by_content

# 示例用法
if __name__ == "__main__":
    jmb = JointMessageBuffer()
    jmb.add_message(Message("AgentPM", "AgentDev", "AgentDev, 用户对新功能A的需求反馈很多,需要优先处理。", task_id="FEAT-A", message_type="request"))
    jmb.add_message(Message("AgentDev", "AgentPM", "AgentPM, 我发现新功能A在实现上有一个潜在的bug,需要架构调整。", task_id="FEAT-A", message_type="feedback"))
    jmb.add_message(Message("AgentTester", "AgentDev", "AgentDev, 新功能A的测试用例已编写完成,发现一个严重缺陷。", task_id="FEAT-A", message_type="bug_report"))
    jmb.add_message(Message("AgentCS", "all", "客户报告了系统B的一个高优先级故障,请求支持。", message_type="urgent_alert", priority=1))
    jmb.add_message(Message("AgentDev", "AgentTester", "AgentTester, 我已经修复了你报告的bug,请复测。", task_id="FEAT-A", message_type="status_update"))

    print("n--- AgentPM (产品经理) 相关消息 (最近24小时) ---")
    filtered_msgs = jmb.filter_by_role("AgentPM", AgentRole.PRODUCT_MANAGER, time_window_seconds=86400)
    for msg in filtered_msgs:
        print(msg)
    # 预期包含与“需求”、“用户反馈”相关的消息

    print("n--- AgentTester (测试人员) 相关消息 (最近24小时) ---")
    filtered_msgs = jmb.filter_by_role("AgentTester", AgentRole.TESTER, time_window_seconds=86400)
    for msg in filtered_msgs:
        print(msg)
    # 预期包含与“测试”、“缺陷”、“bug”相关的消息

4. 意图/任务基础过滤 (Intent/Task-Based Filtering)

智能体通常是在执行某个特定的任务或达成某个目标。消息的相关性应与当前任务的意图高度匹配。

策略:

  • 任务ID关联: 所有与特定任务(例如,task_id="BUG-FIX-005")相关的消息都被视为相关。
  • 意图识别: 使用自然语言处理(NLP)技术识别消息的意图(例如,“提问”、“请求帮助”、“提供信息”、“确认”),并根据智能体当前意图来筛选。
  • 目标导向过滤: 智能体明确其当前目标,并只拉取有助于达成该目标的消息。

适用场景:
项目管理、问题解决、复杂的流程自动化等。

优点:
高度聚焦,能够提供智能体完成当前任务所需的最精确上下文。

缺点:
需要维护任务状态,意图识别可能不总是准确,增加了复杂度。

代码示例:

Message 类中已包含 task_id 字段,filter_by_content 方法也支持按 task_id 过滤。这里我们再展示一个更直接的任务过滤。

class JointMessageBuffer:
    # ... (previous JMB code) ...

    def filter_by_task(self,
                       agent_id: str,
                       current_task_id: str,
                       include_general_messages: bool = False, # 是否包含非特定任务但可能相关的消息 (如 'all' 消息)
                       max_messages: Optional[int] = None) -> List[Message]:
        """
        根据当前任务ID过滤消息。
        """
        relevant_messages = []
        for message in self._messages:
            # 确保消息与当前智能体相关
            if not (message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all'):
                continue

            # 过滤与当前任务ID匹配的消息
            if message.task_id == current_task_id:
                relevant_messages.append(message)
            # 或者包含通用消息
            elif include_general_messages and (message.task_id is None or message.receiver_id == 'all'):
                relevant_messages.append(message)

        relevant_messages.sort(key=lambda m: m.timestamp)
        if max_messages is not None and len(relevant_messages) > max_messages:
            relevant_messages = relevant_messages[-max_messages:]
        return relevant_messages

# 示例用法
if __name__ == "__main__":
    jmb = JointMessageBuffer()
    jmb.add_message(Message("AgentA", "AgentB", "AgentB, 我们需要尽快完成新功能A的开发。", task_id="FEAT-A"))
    jmb.add_message(Message("AgentB", "AgentA", "AgentA, 新功能A的后端API已完成。", task_id="FEAT-A"))
    jmb.add_message(Message("AgentC", "all", "团队,今天的每日站会已结束。", message_type="status_update"))
    jmb.add_message(Message("AgentB", "AgentD", "AgentD, 请部署新功能A的API到测试环境。", task_id="FEAT-A"))
    jmb.add_message(Message("AgentA", "AgentE", "AgentE, 关于旧系统的一个bug,请查看日志。", task_id="BUG-001"))

    print("n--- AgentB 相关消息 (当前任务 'FEAT-A', 包含通用消息) ---")
    filtered_msgs = jmb.filter_by_task("AgentB", "FEAT-A", include_general_messages=True)
    for msg in filtered_msgs:
        print(msg)
    # 预期包含所有关于FEAT-A的消息,以及AgentC发送的通用站会消息

5. 优先级基础过滤 (Priority-Based Filtering)

某些消息,如紧急警报、关键决策或高优先级任务更新,无论其时间或内容如何,都应该被优先考虑。

策略:

  • 消息优先级标签:Message 对象中添加一个 priority 字段(例如,1-10,1最高)。
  • 阈值过滤: 只保留高于某个优先级阈值的消息。
  • 优先级提升: 即使消息较旧,如果其优先级高,也会被包含在内。

适用场景:
需要处理紧急情况、风险管理、关键决策支持的系统。

优点:
确保关键信息不会被遗漏,即使在上下文受限的情况下。

缺点:
需要智能体或系统能够准确地为消息分配优先级。过多的高优先级消息会稀释其效果。

代码示例:

Message 类中已包含 priority 字段。我们可以在其他过滤方法的基础上,再添加优先级筛选。

class JointMessageBuffer:
    # ... (previous JMB code) ...

    def filter_by_priority(self,
                           agent_id: str,
                           min_priority: int = 10, # 最小优先级,例如1表示最高优先级,10表示最低
                           time_window_seconds: Optional[int] = None,
                           max_messages: Optional[int] = None) -> List[Message]:
        """
        根据消息优先级过滤。优先级数值越小表示优先级越高。
        结合了优先级过滤和时间过滤。
        """
        relevant_messages = []
        for message in self._messages:
            if not (message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all'):
                continue

            if message.priority <= min_priority:
                relevant_messages.append(message)

        # 在优先级过滤的基础上,再进行时间过滤
        if time_window_seconds is not None:
            cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
            relevant_messages = [msg for msg in relevant_messages if msg.timestamp >= cutoff_time]

        relevant_messages.sort(key=lambda m: m.timestamp) # 保持时间顺序
        if max_messages is not None and len(relevant_messages) > max_messages:
            relevant_messages = relevant_messages[-max_messages:]

        return relevant_messages

# 示例用法
if __name__ == "__main__":
    jmb = JointMessageBuffer()
    jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请尽快处理这个高优先级任务。", task_id="URGENT-001", priority=1))
    jmb.add_message(Message("AgentB", "AgentA", "AgentA, 已收到紧急任务,正在处理。", task_id="URGENT-001", priority=2))
    jmb.add_message(Message("AgentC", "all", "团队,今天的每日站会已结束。", message_type="status_update", priority=7))
    jmb.add_message(Message("AgentD", "AgentA", "AgentA, 关于功能A的一个普通反馈。", task_id="FEAT-A", priority=8))

    print("n--- AgentA 相关消息 (优先级 <= 2,最近24小时) ---")
    filtered_msgs = jmb.filter_by_priority("AgentA", min_priority=2, time_window_seconds=86400)
    for msg in filtered_msgs:
        print(msg)
    # 预期只包含与URGENT-001任务相关的消息

6. 摘要与抽象 (Summarization/Abstraction)

严格来说,这并非“过滤”而是“压缩”或“转换”。当原始消息过于冗长或包含大量细节时,可以将其提炼成一个简短的摘要或高层次的抽象。

策略:

  • LLM 摘要: 使用另一个 LLM 对一系列消息进行摘要。
  • 结构化表示: 将非结构化的对话内容转换为结构化的知识图谱、状态更新或关键信息列表。
  • 主题句提取: 从长消息中提取最能代表其核心内容的一两句话。

适用场景:
需要长期记忆、回顾复杂讨论、生成报告的场景。

优点:
极大地减少上下文长度,同时保留核心信息,提高 LLM 理解效率。

缺点:
引入额外的计算开销和延迟;摘要可能会丢失细微之处或重要细节;摘要质量依赖于摘要模型的性能。

代码示例 (概念性,因为涉及LLM调用):

# 这是一个概念性的示例,实际需要与LLM API集成
class MessageSummarizer:
    def __init__(self, llm_client):
        self.llm_client = llm_client # 假定是一个与LLM交互的客户端

    def summarize_messages(self, messages: List[Message], agent_id: str, context_focus: Optional[str] = None) -> str:
        """
        使用LLM对一系列消息进行摘要。
        context_focus 可以是智能体的当前任务或角色,以指导摘要方向。
        """
        if not messages:
            return "无相关消息。"

        # 将消息格式化为LLM输入
        formatted_messages = "n".join([f"{msg.sender_id}: {msg.content}" for msg in messages])

        prompt = f"以下是多智能体系统中的一段对话历史,请为智能体 {agent_id} 针对 '{context_focus if context_focus else '其当前关注点'}' 生成一个简洁的摘要:nn{formatted_messages}nn摘要:"

        try:
            # 实际调用LLM API (这里仅为占位符)
            # response = self.llm_client.generate(prompt, max_tokens=200)
            # summary = response.text
            summary = f"【自动摘要 for {agent_id}】关于 {context_focus if context_focus else '一般情况'},对话主要涉及...n(原始消息数量: {len(messages)})"
            return summary
        except Exception as e:
            print(f"摘要生成失败: {e}")
            return "摘要生成失败。"

# 示例用法 (假设JMB中已经有 summarizer 实例)
if __name__ == "__main__":
    jmb = JointMessageBuffer()
    jmb.add_message(Message("AgentA", "AgentB", "AgentB, 我们需要尽快完成新功能A的开发。", task_id="FEAT-A"))
    jmb.add_message(Message("AgentB", "AgentA", "AgentA, 新功能A的后端API已完成。", task_id="FEAT-A"))
    jmb.add_message(Message("AgentB", "AgentD", "AgentD, 请部署新功能A的API到测试环境。", task_id="FEAT-A"))
    jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请确认我们关于新功能A的最新讨论。", task_id="FEAT-A"))

    # 模拟LLM客户端
    class MockLLMClient:
        def generate(self, prompt, max_tokens):
            return type('obj', (object,), {'text': '模拟摘要内容'})()

    mock_llm_client = MockLLMClient()
    summarizer = MessageSummarizer(mock_llm_client)

    # 首先过滤出相关消息
    filtered_msgs = jmb.filter_by_task("AgentA", "FEAT-A", include_general_messages=False)

    # 对过滤后的消息进行摘要
    summary_for_agent_a = summarizer.summarize_messages(filtered_msgs, "AgentA", "新功能A的开发进度")
    print(f"n--- AgentA 的摘要 ---")
    print(summary_for_agent_a)

7. 混合过滤策略 (Hybrid Filtering)

在实际应用中,很少只使用单一的过滤策略。通常我们会结合多种策略,以达到最佳效果。例如:

  • 先时间后内容: 首先筛选出最近一段时间内的消息,然后在这个子集中再进行关键词或任务ID匹配。
  • 优先级结合: 无论其他过滤规则如何,高优先级消息总是被包含在内。
  • 动态调整: 根据智能体的当前状态、任务紧迫性、LLM的Token限制等,动态调整过滤参数。

实现方式:
通过链式调用不同的过滤方法,或者在一个统一的 get_filtered_context 方法中集成所有逻辑。

优点:
灵活性高,效果全面,能够应对复杂多变的需求。

缺点:
实现复杂,参数调优困难,容易引入新的逻辑错误。

一个混合过滤的函数原型:

class JointMessageBuffer:
    # ... (previous JMB code) ...

    def get_filtered_context(self,
                             agent_id: str,
                             current_task_id: Optional[str] = None,
                             agent_role: Optional[Dict[str, Any]] = None,
                             min_priority: int = 10,
                             time_window_seconds: Optional[int] = 3600, # 默认最近1小时
                             max_messages: Optional[int] = 100, # 默认最多100条
                             keywords: Optional[List[str]] = None,
                             exclude_keywords: Optional[List[str]] = None,
                             message_types: Optional[List[str]] = None,
                             summarize_long_history: bool = False, # 是否对旧消息进行摘要
                             summarizer=None # 外部摘要工具
                             ) -> List[Message]:
        """
        一个综合性的消息过滤方法,结合多种策略。
        """
        # 1. 首先获取与智能体直接相关的消息
        all_relevant_msgs = []
        for message in self._messages:
            if message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all':
                all_relevant_msgs.append(message)
        all_relevant_msgs.sort(key=lambda m: m.timestamp) # 确保按时间排序

        filtered_msgs = []
        high_priority_msgs = []
        other_msgs = []

        for msg in all_relevant_msgs:
            # 优先级过滤:高优先级消息始终保留
            if msg.priority <= min_priority:
                high_priority_msgs.append(msg)
                continue # 不再进行后续过滤,直接进入高优先级队列

            # 任务ID过滤
            if current_task_id and msg.task_id != current_task_id:
                continue

            # 角色/内容关键词过滤
            if agent_role:
                role_keywords = agent_role.get("keywords", [])
                role_exclude_keywords = agent_role.get("exclude_keywords", [])
                role_message_types = agent_role.get("message_types", [])

                content_lower = msg.content.lower()

                # 关键词包含
                if role_keywords and not any(k.lower() in content_lower for k in role_keywords):
                    continue
                # 关键词排除
                if role_exclude_keywords and any(k.lower() in content_lower for k in role_exclude_keywords):
                    continue
                # 消息类型
                if role_message_types and msg.message_type not in role_message_types:
                    continue

            # 自定义关键词过滤
            if keywords:
                content_lower = msg.content.lower()
                if not any(k.lower() in content_lower for k in keywords):
                    continue
            if exclude_keywords:
                content_lower = msg.content.lower()
                if any(k.lower() in content_lower for k in exclude_keywords):
                    continue

            # 消息类型过滤
            if message_types and msg.message_type not in message_types:
                continue

            other_msgs.append(msg)

        # 将高优先级消息放在前面
        final_msgs = high_priority_msgs + other_msgs
        final_msgs.sort(key=lambda m: m.timestamp) # 重新排序以保持时间一致性

        # 时间窗口过滤(在其他过滤之后应用,确保只对相关消息进行时间限制)
        if time_window_seconds is not None:
            cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
            final_msgs = [msg for msg in final_msgs if msg.timestamp >= cutoff_time]

        # 总结旧消息 (如果需要)
        # 这部分逻辑会更复杂,可能需要将一部分超出max_messages的旧消息先进行摘要,
        # 然后将摘要作为一条新消息插入到列表开头
        if summarize_long_history and summarizer and max_messages and len(final_msgs) > max_messages:
            messages_to_summarize = final_msgs[:-max_messages]
            recent_messages = final_msgs[-max_messages:]
            if messages_to_summarize:
                long_history_summary = summarizer.summarize_messages(
                    messages_to_summarize, agent_id, current_task_id or agent_role.get("name", "general_context") if agent_role else "general_context"
                )
                summary_msg = Message(
                    sender_id="System",
                    receiver_id=agent_id,
                    content=long_history_summary,
                    message_type="summary",
                    priority=9 # 摘要优先级较低,但仍保留
                )
                final_msgs = [summary_msg] + recent_messages
            else:
                final_msgs = recent_messages
        elif max_messages is not None and len(final_msgs) > max_messages:
            final_msgs = final_msgs[-max_messages:] # 简单截断

        return final_msgs

# 示例用法
if __name__ == "__main__":
    jmb = JointMessageBuffer()
    # ... (添加各种消息,包括高优先级、不同任务、不同角色相关的消息) ...
    jmb.add_message(Message("AgentPM", "AgentDev", "AgentDev, 用户对新功能A的需求反馈很多,需要优先处理。", task_id="FEAT-A", message_type="request"))
    jmb.add_message(Message("AgentDev", "AgentPM", "AgentPM, 我发现新功能A在实现上有一个潜在的bug,需要架构调整。", task_id="FEAT-A", message_type="feedback"))
    jmb.add_message(Message("AgentTester", "AgentDev", "AgentDev, 新功能A的测试用例已编写完成,发现一个严重缺陷。", task_id="FEAT-A", message_type="bug_report"))
    jmb.add_message(Message("AgentCS", "all", "客户报告了系统B的一个高优先级故障,请求支持。", message_type="urgent_alert", priority=1))
    jmb.add_message(Message("AgentDev", "AgentTester", "AgentTester, 我已经修复了你报告的bug,请复测。", task_id="FEAT-A", message_type="status_update"))
    jmb.add_message(Message("AgentPM", "AgentDev", "AgentDev, 旧的功能C的市场反馈非常好,考虑推广。", task_id="FEAT-C", message_type="feedback",
                            timestamp=datetime.datetime.now() - datetime.timedelta(days=2))) # 2天前的消息

    mock_llm_client = MockLLMClient()
    summarizer_instance = MessageSummarizer(mock_llm_client)

    print("n--- AgentDev 获取的混合过滤上下文 (当前任务FEAT-A, 角色为DEV) ---")
    filtered_context = jmb.get_filtered_context(
        agent_id="AgentDev",
        current_task_id="FEAT-A",
        agent_role=AgentRole.DEVELOPER,
        min_priority=5, # 优先级高于5的消息会被包含 (即1-5)
        time_window_seconds=86400, # 最近24小时
        max_messages=5,
        summarize_long_history=True,
        summarizer=summarizer_instance
    )
    for msg in filtered_context:
        print(msg)
    # 预期:包含AgentCS的紧急警报(优先级1),以及AgentDev与FEAT-A任务相关的最新消息。
    # 如果消息数量超过5条,会有一个摘要消息。

过滤策略对比

策略名称 优点 缺点 复杂度 适用场景
时间基础 简单易实现,直观高效 易丢失旧但重要的上下文 大多数实时协作任务
内容基础 针对性强,可捕获关键词 需维护关键词,可能错过语义相关但无关键词信息 智能体有明确关注点
角色基础 结构化,易于管理,符合组织分工 灵活性差,跨角色任务处理困难 职责明确的多智能体系统
意图/任务基础 高度聚焦,提供完成任务所需精确上下文 依赖任务状态管理,意图识别成本高 中高 项目管理、流程自动化
优先级基础 确保关键信息不被遗漏 依赖准确的优先级分配,可能被滥用 紧急情况、风险管理
摘要/抽象 大幅减少上下文长度,保留核心信息 计算开销大,可能丢失细节,摘要质量有风险 长期记忆、复杂讨论回顾
混合策略 灵活性强,效果全面 实现复杂,参数调优困难 复杂、动态变化的智能体协作环境

挑战与考量

尽管消息过滤是解决上下文爆炸的关键,但在实际应用中仍面临一些挑战:

  1. 过度过滤 (Over-filtering): 移除过多信息,导致智能体失去必要的上下文,无法做出正确决策或理解全局情况。这可能比上下文爆炸更糟糕。
  2. 欠过滤 (Under-filtering): 过滤效果不佳,仍然向 LLM 提供了冗余信息,未能有效解决上下文爆炸问题。
  3. 动态性: 智能体在不同阶段或面对不同问题时,其相关性标准会发生变化。静态过滤规则难以适应这种动态性。
  4. 计算开销: 复杂的过滤逻辑,尤其是涉及语义分析或 LLM 摘要时,本身会消耗计算资源,可能引入新的延迟。
  5. 一致性维护: 当每个智能体看到的是一个高度定制的上下文视图时,如何确保它们对共享状态保持足够的一致性理解,避免“信息茧房”。
  6. 人类可读性: 如果需要人类介入或审计,过滤后的上下文可能不够完整,难以理解。
  7. 迭代与优化: 找到最佳的过滤策略和参数是一个持续的迭代过程,需要监控智能体性能和LLM成本,并进行A/B测试。

联合消息缓冲区的持久化与扩展

为了提高系统的鲁棒性和可扩展性,JMB 不仅仅是一个内存中的列表。

  • 持久化: 消息应该被持久化到数据库(如 PostgreSQL, MongoDB, Redis)中,以防止数据丢失,并支持历史查询。
  • 异步处理: 消息的添加和过滤可以异步进行,避免阻塞智能体的主逻辑。
  • 分布式: 在大规模多智能体系统中,JMB 可能需要部署为分布式服务,支持高并发读写。
  • 索引:sender_id, receiver_id, task_id, timestamp 等字段建立索引,可以显著提高过滤查询的效率。

展望未来

未来的消息过滤技术将更加智能和自适应:

  • 学习型过滤器: 使用强化学习或元学习,让智能体自己学习哪些消息是相关的,动态调整过滤策略。
  • 图谱化上下文: 将对话历史构建成知识图谱,智能体可以根据其当前查询,在图谱中进行路径遍历,提取最相关的子图作为上下文。
  • 多模态过滤: 不仅仅是文本,还包括图像、音频、视频等多模态信息的过滤和摘要。
  • 个性化 LLM 微调: 结合过滤机制,对特定智能体进行小规模的 LLM 微调,使其对特定类型的上下文有更好的理解和偏好。

结语

联合消息缓冲区是多智能体协作的基石,而消息过滤则是其高效运行的保障。通过精心设计和实施时间、内容、角色、意图和优先级等多种过滤策略,我们能够有效应对上下文爆炸的挑战,为智能体提供精准、高效且成本可控的上下文信息。这是一个复杂但充满潜力的领域,需要我们不断探索和创新,以构建更加智能、健壮的多智能体系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注