各位编程专家和技术爱好者,大家好!
今天,我们齐聚一堂,探讨多智能体系统(Multi-Agent Systems, MAS)中一个核心且日益严峻的挑战——上下文爆炸(Context Explosion),以及我们如何通过巧妙的消息过滤机制,来构建一个高效、可扩展且智能的“联合消息缓冲区”(Joint Message Buffer, JMB)。
随着大型语言模型(LLMs)能力的飞速发展,我们正迈入一个由多个智能体协作完成复杂任务的新时代。想象一下,一个团队由产品经理智能体、开发智能体、测试智能体和客服智能体组成,它们共同协作开发和维护一款软件产品。在这个场景中,智能体之间需要频繁地交流信息,共享对当前任务的理解和进度。一个共享的对话历史,或者说一个联合消息缓冲区,是实现这种协作的关键。
联合消息缓冲区(Joint Message Buffer, JMB)的必要性
在一个多智能体系统中,JMB 扮演着中央信息枢纽的角色。它的核心功能是存储所有智能体之间发生过的对话、任务更新、观察结果以及其他任何形式的交流。
为什么我们需要 JMB?
- 全局一致性上下文: JMB 提供了一个所有智能体都能访问的、关于系统状态和历史事件的单一事实来源。这有助于确保每个智能体对当前任务和环境有共同的理解。
- 协作与协调: 智能体可以通过 JMB 了解其他智能体的行动、意图和进度,从而更好地协调自己的行为,避免重复工作,并识别潜在的冲突。
- 决策支持: 智能体可以查询 JMB 来回顾过去的决策、吸取经验教训,或者基于历史数据做出更明智的未来决策。
- 可解释性与审计: JMB 记录了系统的整个运行轨迹,对于调试、性能分析以及理解智能体行为至关重要。
- 状态持久化: 在智能体重启或系统崩溃后,JMB 可以用于恢复之前的会话状态。
我们可以将 JMB 想象成一个巨大的会议室白板,所有参会者(智能体)都可以在上面写下自己的想法、问题、解决方案,并且查看其他人的贡献。
一个简单的 JMB 结构设想:
在代码层面,一个 JMB 可以是一个存储消息对象的列表或队列,每个消息对象都包含发送者、接收者、内容、时间戳等信息。
import datetime
import uuid
from typing import List, Dict, Any, Optional
class Message:
"""
表示一个在多智能体系统中传递的消息。
"""
def __init__(self,
sender_id: str,
receiver_id: str, # 可以是单个ID,也可以是'all',或者一个ID列表
content: str,
message_type: str = "text", # 例如 'text', 'code', 'status_update', 'question'
timestamp: Optional[datetime.datetime] = None,
task_id: Optional[str] = None, # 消息所属的任务ID
priority: int = 5, # 1-10,1最高
metadata: Optional[Dict[str, Any]] = None):
self.message_id = str(uuid.uuid4())
self.sender_id = sender_id
self.receiver_id = receiver_id
self.content = content
self.message_type = message_type
self.timestamp = timestamp if timestamp is not None else datetime.datetime.now()
self.task_id = task_id
self.priority = priority
self.metadata = metadata if metadata is not None else {}
def __str__(self):
return (f"[{self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] "
f"From {self.sender_id} to {self.receiver_id} "
f"(Type: {self.message_type}, Task: {self.task_id if self.task_id else 'N/A'}): "
f"{self.content}")
def to_dict(self):
return {
"message_id": self.message_id,
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"content": self.content,
"message_type": self.message_type,
"timestamp": self.timestamp.isoformat(),
"task_id": self.task_id,
"priority": self.priority,
"metadata": self.metadata
}
class JointMessageBuffer:
"""
联合消息缓冲区,存储所有智能体之间的消息。
"""
def __init__(self):
self._messages: List[Message] = []
self._next_message_index: int = 0 # 用于按顺序迭代消息
def add_message(self, message: Message):
"""添加一条消息到缓冲区。"""
self._messages.append(message)
# 保持消息按时间顺序排列,尽管通常添加时已经是时间顺序
self._messages.sort(key=lambda m: m.timestamp)
def get_all_messages(self) -> List[Message]:
"""获取所有消息(慎用,可能导致上下文爆炸)。"""
return list(self._messages)
def get_messages_for_agent(self, agent_id: str, limit: Optional[int] = None) -> List[Message]:
"""
获取与特定智能体相关的所有消息。
这只是一个基础方法,尚未包含过滤逻辑。
"""
relevant_messages = []
for message in self._messages:
if message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all':
relevant_messages.append(message)
if limit is not None and len(relevant_messages) > limit:
return relevant_messages[-limit:] # 获取最新的N条
return relevant_messages
def __len__(self):
return len(self._messages)
def __iter__(self):
self._next_message_index = 0
return self
def __next__(self):
if self._next_message_index < len(self._messages):
message = self._messages[self._next_message_index]
self._next_message_index += 1
return message
raise StopIteration
# 示例用法
if __name__ == "__main__":
jmb = JointMessageBuffer()
# 模拟智能体发送消息
jmb.add_message(Message("AgentA", "AgentB", "AgentB, 你能帮我审查一下这个需求文档吗?", task_id="REQ-001"))
jmb.add_message(Message("AgentB", "AgentA", "好的,AgentA,我正在看。有什么特别需要注意的地方吗?", task_id="REQ-001"))
jmb.add_message(Message("AgentC", "all", "团队注意,今天下午3点有紧急会议,讨论BUG-FIX-005。", message_type="status_update", priority=1))
jmb.add_message(Message("AgentA", "AgentC", "AgentC,收到。我将准备好需求变更对BUG-FIX-005的影响分析。", task_id="BUG-FIX-005"))
jmb.add_message(Message("AgentB", "AgentA", "AgentA, 需求文档初审完成,有一些关于用户体验的建议我已经标注。", task_id="REQ-001"))
print("--- 所有消息 ---")
for msg in jmb.get_all_messages():
print(msg)
print("n--- AgentA 相关消息 (基础获取,未过滤) ---")
for msg in jmb.get_messages_for_agent("AgentA"):
print(msg)
print("n--- AgentB 相关消息 (基础获取,最新2条) ---")
for msg in jmb.get_messages_for_agent("AgentB", limit=2):
print(msg)
上下文爆炸:沉默的杀手
JMB 的强大之处在于其能够汇集所有信息,但这也带来了其最大的挑战:上下文爆炸(Context Explosion)。
随着多智能体系统运行时间的增长,JMB 中存储的消息数量会迅速膨胀。当一个智能体需要访问上下文来做决策时,如果我们将整个 JMB 的内容,或者其中大部分内容,都作为输入提供给底层的 LLM,就会出现以下严重问题:
- 成本急剧增加: LLM 的API调用通常按Token数量计费。一个冗长的上下文意味着更高的Token消耗,从而导致高昂的运营成本。
- 延迟增加: 处理更长的上下文需要更多计算资源和时间,导致智能体响应速度变慢,影响用户体验和系统实时性。
- 性能下降: LLM 在处理超长上下文时,其理解和推理能力会下降。它们可能会“迷失”在大量的无关信息中,无法识别出真正重要的信息,甚至产生幻觉(hallucination)。
- 注意力分散与主题漂移: 智能体可能会被历史中不相关的细节分散注意力,导致在当前任务上无法聚焦,甚至偏离主旨。
- 内存与计算资源消耗: 在智能体内部维护和处理大量上下文需要巨大的内存和计算开销,尤其是在边缘设备或资源受限的环境中。
简而言之,如果我们不对 JMB 进行有效的管理和过滤,它将从一个协作的“白板”变成一个堆满废纸的“垃圾场”,严重阻碍多智能体系统的效率和智能。
消息过滤:防止上下文爆炸的利器
为了解决上下文爆炸问题,我们必须引入智能的消息过滤(Message Filtering)机制。消息过滤的核心思想是:对于每个智能体在特定时间点和特定任务下,只提供其真正需要和最相关的上下文信息。
这就像给每个参会者一个个性化的“摘要”,而不是让他们阅读整个会议记录。摘要的内容会根据他们的角色、当前议题和个人关注点而定制。
消息过滤可以发生在多个层面:
- JMB 层面: 在从 JMB 中检索消息时进行过滤。
- 智能体内部: 智能体在接收到过滤后的消息后,可能还会进行二次筛选或处理。
- LLM 封装层: 在将上下文传递给 LLM 之前,进行最终的裁剪和格式化。
接下来,我们将深入探讨各种消息过滤策略,并提供代码实现示例。
核心过滤策略
消息过滤并非一刀切,而是多种策略的组合。我们可以根据不同的维度来设计过滤规则。
1. 时间基础过滤 (Time-Based Filtering)
这是最直接也最常用的过滤方法。它假设最近的消息通常比很久以前的消息更相关。
策略:
- 滑动时间窗口 (Sliding Time Window): 只保留最近 N 分钟、N 小时或 N 天内的消息。
- 固定消息数量 (Fixed Message Count): 只保留最新的 N 条消息。
- 衰减模型 (Decay Model): 消息的权重或相关性随时间衰减,过滤时优先保留权重高的消息。
适用场景:
大多数实时协作任务,如日常对话、短期项目更新等。
优点:
简单易实现,效果直观。
缺点:
可能丢失历史中重要的长期信息或关键上下文。例如,一个早期的设计决策可能在很久之后才被提及并变得相关。
代码示例:
我们为 JointMessageBuffer 添加一个方法,用于根据时间窗口和消息数量进行过滤。
class JointMessageBuffer:
# ... (previous JMB code) ...
def filter_by_time(self,
agent_id: str,
time_window_seconds: Optional[int] = None,
max_messages: Optional[int] = None) -> List[Message]:
"""
根据时间窗口和/或最大消息数量过滤消息。
只返回与 agent_id 相关 (发送给/收到/发送给all) 的消息。
"""
relevant_messages = []
for message in self._messages:
if message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all':
relevant_messages.append(message)
# 1. 应用时间窗口过滤
if time_window_seconds is not None:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
relevant_messages = [msg for msg in relevant_messages if msg.timestamp >= cutoff_time]
# 2. 确保消息按时间倒序排列,以便获取最新的 N 条
relevant_messages.sort(key=lambda m: m.timestamp)
# 3. 应用最大消息数量过滤
if max_messages is not None and len(relevant_messages) > max_messages:
relevant_messages = relevant_messages[-max_messages:] # 获取最新的 N 条
return relevant_messages
# 示例用法
if __name__ == "__main__":
jmb = JointMessageBuffer()
# ... (add messages as before) ...
jmb.add_message(Message("AgentD", "AgentA", "AgentA, 这是一个5小时前的旧消息,关于项目规划。",
timestamp=datetime.datetime.now() - datetime.timedelta(hours=5), task_id="PROJ-001"))
jmb.add_message(Message("AgentE", "all", "团队,今天的每日站会已结束。",
timestamp=datetime.datetime.now() - datetime.timedelta(minutes=30), priority=7))
jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请确认我们关于新功能A的最新讨论。",
timestamp=datetime.datetime.now() - datetime.timedelta(minutes=5), task_id="FEAT-A"))
print("n--- AgentA 相关消息 (最近1小时内,最多3条) ---")
filtered_msgs = jmb.filter_by_time("AgentA", time_window_seconds=3600, max_messages=3)
for msg in filtered_msgs:
print(msg)
# 预期输出将不包含5小时前的消息,且只包含最新的3条
2. 内容基础过滤 (Content-Based Filtering)
这种策略根据消息的文本内容来判断其相关性。
策略:
- 关键词匹配 (Keyword Matching): 智能体维护一个关键词列表,只接收包含这些关键词的消息。例如,一个“测试智能体”可能只关心包含“bug”、“测试用例”、“缺陷”等关键词的消息。
- 主题模型 (Topic Modeling): 使用 LDA (Latent Dirichlet Allocation) 或其他更复杂的模型来识别消息的主题,并根据智能体关注的主题进行过滤。
- 语义相似度 (Semantic Similarity): 使用词嵌入 (Word Embeddings) 或 Sentence Embeddings 计算消息与智能体关注焦点之间的语义相似度。
- 正则表达式 (Regex Matching): 匹配特定的模式,例如错误代码、URL等。
适用场景:
当智能体有明确的职责范围和关注领域时。
优点:
精确度高,能够捕获与智能体任务直接相关的上下文。
缺点:
需要维护关键词或主题模型,可能遗漏相关但未明确提及关键词的消息。语义相似度计算成本较高。
代码示例:
在 JointMessageBuffer 中添加基于关键词的过滤方法。智能体可以提供一个关键词列表。
import re
class JointMessageBuffer:
# ... (previous JMB code) ...
def filter_by_content(self,
agent_id: str,
keywords: Optional[List[str]] = None,
exclude_keywords: Optional[List[str]] = None,
task_id: Optional[str] = None,
message_types: Optional[List[str]] = None) -> List[Message]:
"""
根据内容关键词、任务ID和消息类型过滤消息。
只返回与 agent_id 相关 (发送给/收到/发送给all) 的消息。
"""
relevant_messages = []
for message in self._messages:
# 首先,确保消息与当前智能体相关
if not (message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all'):
continue
# 过滤任务ID
if task_id is not None and message.task_id != task_id:
continue
# 过滤消息类型
if message_types is not None and message.message_type not in message_types:
continue
# 关键词包含过滤
if keywords:
content_lower = message.content.lower()
if not any(keyword.lower() in content_lower for keyword in keywords):
continue
# 关键词排除过滤
if exclude_keywords:
content_lower = message.content.lower()
if any(exclude_keyword.lower() in content_lower for exclude_keyword in exclude_keywords):
continue
relevant_messages.append(message)
relevant_messages.sort(key=lambda m: m.timestamp)
return relevant_messages
# 示例用法
if __name__ == "__main__":
jmb = JointMessageBuffer()
jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请审查一下我们关于新功能A的需求文档。", task_id="FEAT-A", message_type="request"))
jmb.add_message(Message("AgentB", "AgentA", "AgentA, 我发现新功能A在用户体验上可能存在一个潜在的bug。", task_id="FEAT-A", message_type="feedback"))
jmb.add_message(Message("AgentC", "all", "团队,今天的每日站会已结束。", message_type="status_update"))
jmb.add_message(Message("AgentD", "AgentE", "AgentE, 请确认一下环境配置。", message_type="config_request"))
jmb.add_message(Message("AgentA", "AgentC", "AgentC, 我需要一个关于用户反馈的报告。", task_id="REPORT-001", message_type="request"))
print("n--- AgentA 相关消息 (关键词 '功能A' 或 '需求', 任务ID 'FEAT-A') ---")
filtered_msgs = jmb.filter_by_content("AgentA", keywords=["功能A", "需求"], task_id="FEAT-A")
for msg in filtered_msgs:
print(msg)
# 预期会包含 AgentA 和 AgentB 之间关于 FEAT-A 的消息
print("n--- AgentB 相关消息 (关键词 'bug', 消息类型 'feedback') ---")
filtered_msgs = jmb.filter_by_content("AgentB", keywords=["bug"], message_types=["feedback"])
for msg in filtered_msgs:
print(msg)
# 预期会包含 AgentB 发现 bug 的消息
3. 角色基础过滤 (Role-Based Filtering)
在多智能体系统中,每个智能体通常都有一个明确的“角色”或“职责”。我们可以根据这些角色来定义哪些消息是相关的。
策略:
- 预定义角色关注点: 每个智能体在初始化时被赋予一个角色(例如“产品经理”、“开发工程师”、“测试人员”),每个角色都有一个预定义的过滤规则集(例如,产品经理关注“需求”、“市场”,开发关注“代码”、“实现”,测试关注“bug”、“测试用例”)。
- 智能体间信任关系: 只有来自特定智能体或特定角色的消息才会被考虑。
适用场景:
具有明确职责分工的团队协作场景。
优点:
结构化、可管理性强,易于理解和配置。
缺点:
不够灵活,如果任务跨越多个角色边界,可能需要更复杂的规则。
代码示例:
我们可以将角色与关键词列表关联起来,并在智能体请求上下文时应用。
class AgentRole:
"""定义智能体的角色及其关注点。"""
PRODUCT_MANAGER = {"keywords": ["需求", "用户", "市场", "发布", "计划"], "exclude_keywords": ["代码", "编译", "测试用例"]}
DEVELOPER = {"keywords": ["代码", "实现", "bug", "架构", "部署"], "exclude_keywords": ["市场", "客户反馈"]}
TESTER = {"keywords": ["测试", "缺陷", "bug", "测试用例", "回归"], "exclude_keywords": ["市场分析", "产品路线图"]}
CUSTOMER_SUPPORT = {"keywords": ["用户反馈", "问题", "故障", "客户", "帮助"], "exclude_keywords": ["代码实现", "架构设计"]}
class JointMessageBuffer:
# ... (previous JMB code) ...
def filter_by_role(self,
agent_id: str,
agent_role: Dict[str, Any], # 期望是AgentRole类中的一个字典
time_window_seconds: Optional[int] = None,
max_messages: Optional[int] = None) -> List[Message]:
"""
根据智能体的角色及其关注点过滤消息。
结合了内容过滤和时间过滤。
"""
keywords = agent_role.get("keywords")
exclude_keywords = agent_role.get("exclude_keywords")
message_types = agent_role.get("message_types") # 角色也可以定义关注的消息类型
# 先进行内容过滤
filtered_by_content = self.filter_by_content(
agent_id=agent_id,
keywords=keywords,
exclude_keywords=exclude_keywords,
message_types=message_types
)
# 在内容过滤的基础上,再进行时间过滤
if time_window_seconds is not None:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
filtered_by_content = [msg for msg in filtered_by_content if msg.timestamp >= cutoff_time]
if max_messages is not None and len(filtered_by_content) > max_messages:
filtered_by_content = filtered_by_content[-max_messages:]
return filtered_by_content
# 示例用法
if __name__ == "__main__":
jmb = JointMessageBuffer()
jmb.add_message(Message("AgentPM", "AgentDev", "AgentDev, 用户对新功能A的需求反馈很多,需要优先处理。", task_id="FEAT-A", message_type="request"))
jmb.add_message(Message("AgentDev", "AgentPM", "AgentPM, 我发现新功能A在实现上有一个潜在的bug,需要架构调整。", task_id="FEAT-A", message_type="feedback"))
jmb.add_message(Message("AgentTester", "AgentDev", "AgentDev, 新功能A的测试用例已编写完成,发现一个严重缺陷。", task_id="FEAT-A", message_type="bug_report"))
jmb.add_message(Message("AgentCS", "all", "客户报告了系统B的一个高优先级故障,请求支持。", message_type="urgent_alert", priority=1))
jmb.add_message(Message("AgentDev", "AgentTester", "AgentTester, 我已经修复了你报告的bug,请复测。", task_id="FEAT-A", message_type="status_update"))
print("n--- AgentPM (产品经理) 相关消息 (最近24小时) ---")
filtered_msgs = jmb.filter_by_role("AgentPM", AgentRole.PRODUCT_MANAGER, time_window_seconds=86400)
for msg in filtered_msgs:
print(msg)
# 预期包含与“需求”、“用户反馈”相关的消息
print("n--- AgentTester (测试人员) 相关消息 (最近24小时) ---")
filtered_msgs = jmb.filter_by_role("AgentTester", AgentRole.TESTER, time_window_seconds=86400)
for msg in filtered_msgs:
print(msg)
# 预期包含与“测试”、“缺陷”、“bug”相关的消息
4. 意图/任务基础过滤 (Intent/Task-Based Filtering)
智能体通常是在执行某个特定的任务或达成某个目标。消息的相关性应与当前任务的意图高度匹配。
策略:
- 任务ID关联: 所有与特定任务(例如,
task_id="BUG-FIX-005")相关的消息都被视为相关。 - 意图识别: 使用自然语言处理(NLP)技术识别消息的意图(例如,“提问”、“请求帮助”、“提供信息”、“确认”),并根据智能体当前意图来筛选。
- 目标导向过滤: 智能体明确其当前目标,并只拉取有助于达成该目标的消息。
适用场景:
项目管理、问题解决、复杂的流程自动化等。
优点:
高度聚焦,能够提供智能体完成当前任务所需的最精确上下文。
缺点:
需要维护任务状态,意图识别可能不总是准确,增加了复杂度。
代码示例:
Message 类中已包含 task_id 字段,filter_by_content 方法也支持按 task_id 过滤。这里我们再展示一个更直接的任务过滤。
class JointMessageBuffer:
# ... (previous JMB code) ...
def filter_by_task(self,
agent_id: str,
current_task_id: str,
include_general_messages: bool = False, # 是否包含非特定任务但可能相关的消息 (如 'all' 消息)
max_messages: Optional[int] = None) -> List[Message]:
"""
根据当前任务ID过滤消息。
"""
relevant_messages = []
for message in self._messages:
# 确保消息与当前智能体相关
if not (message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all'):
continue
# 过滤与当前任务ID匹配的消息
if message.task_id == current_task_id:
relevant_messages.append(message)
# 或者包含通用消息
elif include_general_messages and (message.task_id is None or message.receiver_id == 'all'):
relevant_messages.append(message)
relevant_messages.sort(key=lambda m: m.timestamp)
if max_messages is not None and len(relevant_messages) > max_messages:
relevant_messages = relevant_messages[-max_messages:]
return relevant_messages
# 示例用法
if __name__ == "__main__":
jmb = JointMessageBuffer()
jmb.add_message(Message("AgentA", "AgentB", "AgentB, 我们需要尽快完成新功能A的开发。", task_id="FEAT-A"))
jmb.add_message(Message("AgentB", "AgentA", "AgentA, 新功能A的后端API已完成。", task_id="FEAT-A"))
jmb.add_message(Message("AgentC", "all", "团队,今天的每日站会已结束。", message_type="status_update"))
jmb.add_message(Message("AgentB", "AgentD", "AgentD, 请部署新功能A的API到测试环境。", task_id="FEAT-A"))
jmb.add_message(Message("AgentA", "AgentE", "AgentE, 关于旧系统的一个bug,请查看日志。", task_id="BUG-001"))
print("n--- AgentB 相关消息 (当前任务 'FEAT-A', 包含通用消息) ---")
filtered_msgs = jmb.filter_by_task("AgentB", "FEAT-A", include_general_messages=True)
for msg in filtered_msgs:
print(msg)
# 预期包含所有关于FEAT-A的消息,以及AgentC发送的通用站会消息
5. 优先级基础过滤 (Priority-Based Filtering)
某些消息,如紧急警报、关键决策或高优先级任务更新,无论其时间或内容如何,都应该被优先考虑。
策略:
- 消息优先级标签: 在
Message对象中添加一个priority字段(例如,1-10,1最高)。 - 阈值过滤: 只保留高于某个优先级阈值的消息。
- 优先级提升: 即使消息较旧,如果其优先级高,也会被包含在内。
适用场景:
需要处理紧急情况、风险管理、关键决策支持的系统。
优点:
确保关键信息不会被遗漏,即使在上下文受限的情况下。
缺点:
需要智能体或系统能够准确地为消息分配优先级。过多的高优先级消息会稀释其效果。
代码示例:
Message 类中已包含 priority 字段。我们可以在其他过滤方法的基础上,再添加优先级筛选。
class JointMessageBuffer:
# ... (previous JMB code) ...
def filter_by_priority(self,
agent_id: str,
min_priority: int = 10, # 最小优先级,例如1表示最高优先级,10表示最低
time_window_seconds: Optional[int] = None,
max_messages: Optional[int] = None) -> List[Message]:
"""
根据消息优先级过滤。优先级数值越小表示优先级越高。
结合了优先级过滤和时间过滤。
"""
relevant_messages = []
for message in self._messages:
if not (message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all'):
continue
if message.priority <= min_priority:
relevant_messages.append(message)
# 在优先级过滤的基础上,再进行时间过滤
if time_window_seconds is not None:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
relevant_messages = [msg for msg in relevant_messages if msg.timestamp >= cutoff_time]
relevant_messages.sort(key=lambda m: m.timestamp) # 保持时间顺序
if max_messages is not None and len(relevant_messages) > max_messages:
relevant_messages = relevant_messages[-max_messages:]
return relevant_messages
# 示例用法
if __name__ == "__main__":
jmb = JointMessageBuffer()
jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请尽快处理这个高优先级任务。", task_id="URGENT-001", priority=1))
jmb.add_message(Message("AgentB", "AgentA", "AgentA, 已收到紧急任务,正在处理。", task_id="URGENT-001", priority=2))
jmb.add_message(Message("AgentC", "all", "团队,今天的每日站会已结束。", message_type="status_update", priority=7))
jmb.add_message(Message("AgentD", "AgentA", "AgentA, 关于功能A的一个普通反馈。", task_id="FEAT-A", priority=8))
print("n--- AgentA 相关消息 (优先级 <= 2,最近24小时) ---")
filtered_msgs = jmb.filter_by_priority("AgentA", min_priority=2, time_window_seconds=86400)
for msg in filtered_msgs:
print(msg)
# 预期只包含与URGENT-001任务相关的消息
6. 摘要与抽象 (Summarization/Abstraction)
严格来说,这并非“过滤”而是“压缩”或“转换”。当原始消息过于冗长或包含大量细节时,可以将其提炼成一个简短的摘要或高层次的抽象。
策略:
- LLM 摘要: 使用另一个 LLM 对一系列消息进行摘要。
- 结构化表示: 将非结构化的对话内容转换为结构化的知识图谱、状态更新或关键信息列表。
- 主题句提取: 从长消息中提取最能代表其核心内容的一两句话。
适用场景:
需要长期记忆、回顾复杂讨论、生成报告的场景。
优点:
极大地减少上下文长度,同时保留核心信息,提高 LLM 理解效率。
缺点:
引入额外的计算开销和延迟;摘要可能会丢失细微之处或重要细节;摘要质量依赖于摘要模型的性能。
代码示例 (概念性,因为涉及LLM调用):
# 这是一个概念性的示例,实际需要与LLM API集成
class MessageSummarizer:
def __init__(self, llm_client):
self.llm_client = llm_client # 假定是一个与LLM交互的客户端
def summarize_messages(self, messages: List[Message], agent_id: str, context_focus: Optional[str] = None) -> str:
"""
使用LLM对一系列消息进行摘要。
context_focus 可以是智能体的当前任务或角色,以指导摘要方向。
"""
if not messages:
return "无相关消息。"
# 将消息格式化为LLM输入
formatted_messages = "n".join([f"{msg.sender_id}: {msg.content}" for msg in messages])
prompt = f"以下是多智能体系统中的一段对话历史,请为智能体 {agent_id} 针对 '{context_focus if context_focus else '其当前关注点'}' 生成一个简洁的摘要:nn{formatted_messages}nn摘要:"
try:
# 实际调用LLM API (这里仅为占位符)
# response = self.llm_client.generate(prompt, max_tokens=200)
# summary = response.text
summary = f"【自动摘要 for {agent_id}】关于 {context_focus if context_focus else '一般情况'},对话主要涉及...n(原始消息数量: {len(messages)})"
return summary
except Exception as e:
print(f"摘要生成失败: {e}")
return "摘要生成失败。"
# 示例用法 (假设JMB中已经有 summarizer 实例)
if __name__ == "__main__":
jmb = JointMessageBuffer()
jmb.add_message(Message("AgentA", "AgentB", "AgentB, 我们需要尽快完成新功能A的开发。", task_id="FEAT-A"))
jmb.add_message(Message("AgentB", "AgentA", "AgentA, 新功能A的后端API已完成。", task_id="FEAT-A"))
jmb.add_message(Message("AgentB", "AgentD", "AgentD, 请部署新功能A的API到测试环境。", task_id="FEAT-A"))
jmb.add_message(Message("AgentA", "AgentB", "AgentB, 请确认我们关于新功能A的最新讨论。", task_id="FEAT-A"))
# 模拟LLM客户端
class MockLLMClient:
def generate(self, prompt, max_tokens):
return type('obj', (object,), {'text': '模拟摘要内容'})()
mock_llm_client = MockLLMClient()
summarizer = MessageSummarizer(mock_llm_client)
# 首先过滤出相关消息
filtered_msgs = jmb.filter_by_task("AgentA", "FEAT-A", include_general_messages=False)
# 对过滤后的消息进行摘要
summary_for_agent_a = summarizer.summarize_messages(filtered_msgs, "AgentA", "新功能A的开发进度")
print(f"n--- AgentA 的摘要 ---")
print(summary_for_agent_a)
7. 混合过滤策略 (Hybrid Filtering)
在实际应用中,很少只使用单一的过滤策略。通常我们会结合多种策略,以达到最佳效果。例如:
- 先时间后内容: 首先筛选出最近一段时间内的消息,然后在这个子集中再进行关键词或任务ID匹配。
- 优先级结合: 无论其他过滤规则如何,高优先级消息总是被包含在内。
- 动态调整: 根据智能体的当前状态、任务紧迫性、LLM的Token限制等,动态调整过滤参数。
实现方式:
通过链式调用不同的过滤方法,或者在一个统一的 get_filtered_context 方法中集成所有逻辑。
优点:
灵活性高,效果全面,能够应对复杂多变的需求。
缺点:
实现复杂,参数调优困难,容易引入新的逻辑错误。
一个混合过滤的函数原型:
class JointMessageBuffer:
# ... (previous JMB code) ...
def get_filtered_context(self,
agent_id: str,
current_task_id: Optional[str] = None,
agent_role: Optional[Dict[str, Any]] = None,
min_priority: int = 10,
time_window_seconds: Optional[int] = 3600, # 默认最近1小时
max_messages: Optional[int] = 100, # 默认最多100条
keywords: Optional[List[str]] = None,
exclude_keywords: Optional[List[str]] = None,
message_types: Optional[List[str]] = None,
summarize_long_history: bool = False, # 是否对旧消息进行摘要
summarizer=None # 外部摘要工具
) -> List[Message]:
"""
一个综合性的消息过滤方法,结合多种策略。
"""
# 1. 首先获取与智能体直接相关的消息
all_relevant_msgs = []
for message in self._messages:
if message.sender_id == agent_id or message.receiver_id == agent_id or message.receiver_id == 'all':
all_relevant_msgs.append(message)
all_relevant_msgs.sort(key=lambda m: m.timestamp) # 确保按时间排序
filtered_msgs = []
high_priority_msgs = []
other_msgs = []
for msg in all_relevant_msgs:
# 优先级过滤:高优先级消息始终保留
if msg.priority <= min_priority:
high_priority_msgs.append(msg)
continue # 不再进行后续过滤,直接进入高优先级队列
# 任务ID过滤
if current_task_id and msg.task_id != current_task_id:
continue
# 角色/内容关键词过滤
if agent_role:
role_keywords = agent_role.get("keywords", [])
role_exclude_keywords = agent_role.get("exclude_keywords", [])
role_message_types = agent_role.get("message_types", [])
content_lower = msg.content.lower()
# 关键词包含
if role_keywords and not any(k.lower() in content_lower for k in role_keywords):
continue
# 关键词排除
if role_exclude_keywords and any(k.lower() in content_lower for k in role_exclude_keywords):
continue
# 消息类型
if role_message_types and msg.message_type not in role_message_types:
continue
# 自定义关键词过滤
if keywords:
content_lower = msg.content.lower()
if not any(k.lower() in content_lower for k in keywords):
continue
if exclude_keywords:
content_lower = msg.content.lower()
if any(k.lower() in content_lower for k in exclude_keywords):
continue
# 消息类型过滤
if message_types and msg.message_type not in message_types:
continue
other_msgs.append(msg)
# 将高优先级消息放在前面
final_msgs = high_priority_msgs + other_msgs
final_msgs.sort(key=lambda m: m.timestamp) # 重新排序以保持时间一致性
# 时间窗口过滤(在其他过滤之后应用,确保只对相关消息进行时间限制)
if time_window_seconds is not None:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=time_window_seconds)
final_msgs = [msg for msg in final_msgs if msg.timestamp >= cutoff_time]
# 总结旧消息 (如果需要)
# 这部分逻辑会更复杂,可能需要将一部分超出max_messages的旧消息先进行摘要,
# 然后将摘要作为一条新消息插入到列表开头
if summarize_long_history and summarizer and max_messages and len(final_msgs) > max_messages:
messages_to_summarize = final_msgs[:-max_messages]
recent_messages = final_msgs[-max_messages:]
if messages_to_summarize:
long_history_summary = summarizer.summarize_messages(
messages_to_summarize, agent_id, current_task_id or agent_role.get("name", "general_context") if agent_role else "general_context"
)
summary_msg = Message(
sender_id="System",
receiver_id=agent_id,
content=long_history_summary,
message_type="summary",
priority=9 # 摘要优先级较低,但仍保留
)
final_msgs = [summary_msg] + recent_messages
else:
final_msgs = recent_messages
elif max_messages is not None and len(final_msgs) > max_messages:
final_msgs = final_msgs[-max_messages:] # 简单截断
return final_msgs
# 示例用法
if __name__ == "__main__":
jmb = JointMessageBuffer()
# ... (添加各种消息,包括高优先级、不同任务、不同角色相关的消息) ...
jmb.add_message(Message("AgentPM", "AgentDev", "AgentDev, 用户对新功能A的需求反馈很多,需要优先处理。", task_id="FEAT-A", message_type="request"))
jmb.add_message(Message("AgentDev", "AgentPM", "AgentPM, 我发现新功能A在实现上有一个潜在的bug,需要架构调整。", task_id="FEAT-A", message_type="feedback"))
jmb.add_message(Message("AgentTester", "AgentDev", "AgentDev, 新功能A的测试用例已编写完成,发现一个严重缺陷。", task_id="FEAT-A", message_type="bug_report"))
jmb.add_message(Message("AgentCS", "all", "客户报告了系统B的一个高优先级故障,请求支持。", message_type="urgent_alert", priority=1))
jmb.add_message(Message("AgentDev", "AgentTester", "AgentTester, 我已经修复了你报告的bug,请复测。", task_id="FEAT-A", message_type="status_update"))
jmb.add_message(Message("AgentPM", "AgentDev", "AgentDev, 旧的功能C的市场反馈非常好,考虑推广。", task_id="FEAT-C", message_type="feedback",
timestamp=datetime.datetime.now() - datetime.timedelta(days=2))) # 2天前的消息
mock_llm_client = MockLLMClient()
summarizer_instance = MessageSummarizer(mock_llm_client)
print("n--- AgentDev 获取的混合过滤上下文 (当前任务FEAT-A, 角色为DEV) ---")
filtered_context = jmb.get_filtered_context(
agent_id="AgentDev",
current_task_id="FEAT-A",
agent_role=AgentRole.DEVELOPER,
min_priority=5, # 优先级高于5的消息会被包含 (即1-5)
time_window_seconds=86400, # 最近24小时
max_messages=5,
summarize_long_history=True,
summarizer=summarizer_instance
)
for msg in filtered_context:
print(msg)
# 预期:包含AgentCS的紧急警报(优先级1),以及AgentDev与FEAT-A任务相关的最新消息。
# 如果消息数量超过5条,会有一个摘要消息。
过滤策略对比
| 策略名称 | 优点 | 缺点 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 时间基础 | 简单易实现,直观高效 | 易丢失旧但重要的上下文 | 低 | 大多数实时协作任务 |
| 内容基础 | 针对性强,可捕获关键词 | 需维护关键词,可能错过语义相关但无关键词信息 | 中 | 智能体有明确关注点 |
| 角色基础 | 结构化,易于管理,符合组织分工 | 灵活性差,跨角色任务处理困难 | 中 | 职责明确的多智能体系统 |
| 意图/任务基础 | 高度聚焦,提供完成任务所需精确上下文 | 依赖任务状态管理,意图识别成本高 | 中高 | 项目管理、流程自动化 |
| 优先级基础 | 确保关键信息不被遗漏 | 依赖准确的优先级分配,可能被滥用 | 低 | 紧急情况、风险管理 |
| 摘要/抽象 | 大幅减少上下文长度,保留核心信息 | 计算开销大,可能丢失细节,摘要质量有风险 | 高 | 长期记忆、复杂讨论回顾 |
| 混合策略 | 灵活性强,效果全面 | 实现复杂,参数调优困难 | 高 | 复杂、动态变化的智能体协作环境 |
挑战与考量
尽管消息过滤是解决上下文爆炸的关键,但在实际应用中仍面临一些挑战:
- 过度过滤 (Over-filtering): 移除过多信息,导致智能体失去必要的上下文,无法做出正确决策或理解全局情况。这可能比上下文爆炸更糟糕。
- 欠过滤 (Under-filtering): 过滤效果不佳,仍然向 LLM 提供了冗余信息,未能有效解决上下文爆炸问题。
- 动态性: 智能体在不同阶段或面对不同问题时,其相关性标准会发生变化。静态过滤规则难以适应这种动态性。
- 计算开销: 复杂的过滤逻辑,尤其是涉及语义分析或 LLM 摘要时,本身会消耗计算资源,可能引入新的延迟。
- 一致性维护: 当每个智能体看到的是一个高度定制的上下文视图时,如何确保它们对共享状态保持足够的一致性理解,避免“信息茧房”。
- 人类可读性: 如果需要人类介入或审计,过滤后的上下文可能不够完整,难以理解。
- 迭代与优化: 找到最佳的过滤策略和参数是一个持续的迭代过程,需要监控智能体性能和LLM成本,并进行A/B测试。
联合消息缓冲区的持久化与扩展
为了提高系统的鲁棒性和可扩展性,JMB 不仅仅是一个内存中的列表。
- 持久化: 消息应该被持久化到数据库(如 PostgreSQL, MongoDB, Redis)中,以防止数据丢失,并支持历史查询。
- 异步处理: 消息的添加和过滤可以异步进行,避免阻塞智能体的主逻辑。
- 分布式: 在大规模多智能体系统中,JMB 可能需要部署为分布式服务,支持高并发读写。
- 索引: 对
sender_id,receiver_id,task_id,timestamp等字段建立索引,可以显著提高过滤查询的效率。
展望未来
未来的消息过滤技术将更加智能和自适应:
- 学习型过滤器: 使用强化学习或元学习,让智能体自己学习哪些消息是相关的,动态调整过滤策略。
- 图谱化上下文: 将对话历史构建成知识图谱,智能体可以根据其当前查询,在图谱中进行路径遍历,提取最相关的子图作为上下文。
- 多模态过滤: 不仅仅是文本,还包括图像、音频、视频等多模态信息的过滤和摘要。
- 个性化 LLM 微调: 结合过滤机制,对特定智能体进行小规模的 LLM 微调,使其对特定类型的上下文有更好的理解和偏好。
结语
联合消息缓冲区是多智能体协作的基石,而消息过滤则是其高效运行的保障。通过精心设计和实施时间、内容、角色、意图和优先级等多种过滤策略,我们能够有效应对上下文爆炸的挑战,为智能体提供精准、高效且成本可控的上下文信息。这是一个复杂但充满潜力的领域,需要我们不断探索和创新,以构建更加智能、健壮的多智能体系统。