各位同仁,各位对多智能体系统充满热情的开发者们:
欢迎来到今天的技术讲座。今天,我们将深入探讨多智能体系统中的一个核心且至关重要的机制——“Handoffs”协议。在日益复杂的AI应用场景中,单一智能体(Agent)往往难以应对所有挑战。它们可能面临知识边界、能力限制、资源瓶颈,甚至单纯的决策失误。此时,如何优雅、高效、无缝地将一个智能体无法处理的问题及其所有相关上下文,安全地“接力”给另一个更具专长或更合适的智能体,就成为了构建健壮、智能、用户友好型系统的关键。
我们将从Handoffs的必要性出发,逐步解构上下文的构成、触发机制、目标选择、协议设计,直至其健壮性与高级策略。我将通过具体的代码示例,帮助大家将抽象概念落地,理解如何在实际项目中构建一个可靠的Handoffs系统。
第一章:Handoffs 协议的诞生背景与核心挑战
在人工智能领域,特别是大型语言模型(LLM)驱动的智能体浪潮中,我们常常构建复杂的系统来解决现实世界的问题。这些系统通常由多个智能体协作完成任务。例如,一个客服系统可能包含一个处理常见问题的“初级客服Agent”,一个处理技术故障的“技术支持Agent”,以及一个处理退款和投诉的“高级调解Agent”。
1.1 单一 Agent 的局限性
任何一个Agent,无论其底层模型多么强大,都存在固有的局限性:
- 知识边界: Agent 训练数据是有限的,特定领域的专业知识可能不足。
- 能力限制: Agent 可能被设计为执行特定类型的任务(例如,只回答问题,不执行操作)。
- 资源限制: 某些任务可能需要耗费大量计算资源或访问特定外部工具,而当前Agent可能不具备。
- 复杂性管理: 当问题过于复杂,需要多步骤推理和信息整合时,单一Agent可能陷入困境。
- 用户意图误解: Agent 可能无法准确理解用户意图,导致无法提供有效帮助。
- 状态管理: 在长时间的对话或任务中,Agent可能难以有效管理和利用全部历史状态。
当一个Agent遇到上述挑战,无法继续推进任务时,它不能简单地报错或停止响应。这会导致糟糕的用户体验,甚至任务失败。
1.2 Handoffs 的定义与重要性
Handoffs (接力) 协议,正是为了解决上述问题而生。它定义了一套规范的流程和数据结构,使得一个Agent在判断自身无法处理当前任务时,能够将其所有必要信息(即“上下文”)封装起来,并以一种可被另一个Agent理解和恢复的方式,传递给后者。新的Agent接收上下文后,能够无缝地接管任务,仿佛从未中断过。
Handoffs 的重要性体现在:
- 提升用户体验: 避免中断和重复提问,提供流畅、连贯的服务。
- 提高系统鲁棒性: 即使部分Agent失败,系统整体仍能通过Handoffs继续运行。
- 实现 Agent 专长化: 允许我们构建专注于特定任务的Agent,并通过Handoffs将它们有机地组合起来。
- 优化资源利用: 将复杂任务分配给更合适的Agent,避免资源浪费。
- 促进协作与扩展: 为未来增加更多 Agent 类型和更复杂的协作模式打下基础。
1.3 Handoffs 的核心挑战
设计一个高效的Handoffs协议面临以下挑战:
- 上下文的完整性与一致性: 如何确保所有必要信息都被传递,并且在传递过程中不丢失、不损坏?
- 上下文的结构化与通用性: 如何将不同Agent内部可能以不同形式表示的信息,转化为一种通用的、可被所有潜在接收者理解的结构?
- 触发机制的准确性: Agent何时应该决定Handoff?过早可能浪费资源,过晚可能损害用户体验。
- 目标Agent的选择: 如何智能地选择最适合接管任务的Agent?
- Handoff 的原子性与健壮性: Handoff过程本身也可能失败,如何处理这些失败?
- 效率与延迟: Handoff过程应尽可能快速,避免引入显著延迟。
在接下来的章节中,我们将逐一攻克这些挑战。
第二章:上下文的构成与封装
Handoffs 的核心是上下文的传递。一个“优雅”的接力,首先要确保接力的“棒子”——即上下文——是完整且易于抓取的。
2.1 何为“上下文”?
在一个多Agent系统中,上下文远不止是用户输入的最新一句话。它是一个多维度、动态变化的复合体,通常包括:
- 用户输入历史: 完整的对话历史,包括用户的所有查询和Agent的所有响应。
- 当前问题描述: Agent当前正在尝试解决的具体问题,可能经过Agent内部的解析和提炼。
- Agent 内部状态:
- 思考链 (Chain of Thought / Reasoning Trace): Agent 尝试解决问题时的内部推理过程,包括它尝试过哪些方法、为什么失败了。这对于接手的Agent来说是宝贵的诊断信息。
- 工具使用记录: Agent 调用了哪些外部工具(API、数据库查询等),以及这些工具的输入和输出。
- 提取的关键实体/信息: 从对话中识别出的重要信息,如用户ID、订单号、产品名称、时间、地点等。
- Agent 自身的内部假设或决策树路径。
- 系统级信息:
- 会话ID (Session ID): 唯一标识一次交互会话。
- 用户ID (User ID): 标识发起请求的用户。
- 时间戳: Handoff发生的时间。
- 环境信息: 如语言、地域设置等。
- Handoff 特定信息:
- Handoff 原因: 当前Agent决定Handoff的具体理由(如“知识不足”、“需要人工介入”、“工具调用失败”)。
- 建议的下一步行动: 当前Agent可能对下一个Agent给出建议,例如“请查询用户订单历史”。
2.2 上下文的结构化表示
为了确保上下文的完整性、一致性和可解析性,我们必须对其进行结构化表示。JSON是首选,因为它通用、易读、跨语言兼容。我们可以定义一个标准化的Context数据结构。
这里,我们将使用 Python 的 pydantic 库来定义上下文模型,它能提供类型检查和序列化/反序列化能力,非常适合构建健壮的数据协议。
import uuid
from datetime import datetime
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
# 定义对话消息的结构
class Message(BaseModel):
role: str # "user", "agent", "system", "tool"
content: str
timestamp: datetime = Field(default_factory=datetime.now)
tool_calls: Optional[List[Dict[str, Any]]] = None # 如果是agent消息,可能包含工具调用
tool_outputs: Optional[List[Dict[str, Any]]] = None # 工具调用的结果
# 定义 Agent 内部思考链的步骤
class ReasoningStep(BaseModel):
step_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
action: str # 尝试的动作,如 "analyze_query", "call_tool", "generate_response"
details: Dict[str, Any] # 动作的具体细节,如工具名称、参数、LLM提示等
outcome: str # 动作结果,如 "success", "failure", "inconclusive"
reasoning: Optional[str] = None # Agent 对此步骤的额外思考
# 定义 Handoff 原因的枚举或字符串常量
class HandoffReason(str):
KNOWLEDGE_GAP = "知识不足"
OUT_OF_SCOPE = "超出能力范围"
TOOL_FAILURE = "工具调用失败"
USER_ESCALATION = "用户要求升级"
COMPLEXITY_EXCEEDED = "问题复杂度超限"
NO_MATCH_AGENT = "无匹配Agent" # 这是一个特殊的内部原因,表示找不到下一个Agent
OTHER = "其他原因"
# 核心上下文模型
class AgentContext(BaseModel):
session_id: str
user_id: Optional[str] = None
initial_query: str # 用户最初的问题
current_problem_description: str # Agent 提炼出的当前问题
conversation_history: List[Message] # 完整的对话历史
# Agent 内部状态,方便接手Agent理解前因后果
internal_state: Dict[str, Any] = Field(default_factory=dict) # 存储 Agent 特定的一些键值对状态
reasoning_trace: List[ReasoningStep] = Field(default_factory=list) # Agent 的思考链
# Handoff 特定信息
handoff_reason: HandoffReason # Handoff 的具体原因
source_agent_id: str # 发起 Handoff 的 Agent ID
suggested_next_action: Optional[str] = None # 对下一个 Agent 的建议
# 额外元数据
metadata: Dict[str, Any] = Field(default_factory=dict) # 存储其他扩展信息,如语言、优先级等
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
# 允许额外字段,方便未来扩展
extra = "allow"
# 示例:创建一个上下文对象
if __name__ == "__main__":
session_id = str(uuid.uuid4())
user_id = "user_123"
history = [
Message(role="user", content="我想查询我的订单状态,订单号是 ABC-123。"),
Message(role="agent", content="好的,请稍等,我正在查询。",
tool_calls=[{"name": "lookup_order", "args": {"order_id": "ABC-123"}}]),
Message(role="tool", content='{"status": "查询失败", "error": "订单不存在或已取消"}',
tool_outputs=[{"tool_call_id": "call_123", "output": {"status": "查询失败", "error": "订单不存在或已取消"}}]),
Message(role="agent", content="抱歉,订单ABC-123查询失败,系统显示订单不存在或已取消。您是否能提供更多信息?", timestamp=datetime.now())
]
reasoning_steps = [
ReasoningStep(
action="analyze_query",
details={"query": "我想查询我的订单状态,订单号是 ABC-123。"},
outcome="success",
reasoning="识别到用户意图是查询订单,提取订单号。"
),
ReasoningStep(
action="call_tool",
details={"tool_name": "lookup_order", "args": {"order_id": "ABC-123"}},
outcome="failure",
reasoning="订单查询工具返回失败,提示订单不存在。"
),
ReasoningStep(
action="evaluate_failure",
details={"error": "订单不存在或已取消"},
outcome="failure",
reasoning="当前Agent无法处理订单不存在的情况,需要更专业的Agent或人工介入。"
)
]
context = AgentContext(
session_id=session_id,
user_id=user_id,
initial_query="我想查询我的订单状态,订单号是 ABC-123。",
current_problem_description="用户订单查询失败,订单号不存在或已取消。",
conversation_history=history,
internal_state={"order_id_attempted": "ABC-123", "retry_count": 0},
reasoning_trace=reasoning_steps,
handoff_reason=HandoffReason.TOOL_FAILURE,
source_agent_id="InitialOrderAgent",
suggested_next_action="请核实用户身份并尝试通过其他信息(如手机号)查询订单,或转交人工客服。"
)
# 序列化为 JSON 字符串
json_context = context.model_dump_json(indent=2)
print("序列化后的上下文 JSON:n", json_context)
# 从 JSON 字符串反序列化
reconstructed_context = AgentContext.model_validate_json(json_context)
print("n反序列化后的上下文对象:n", reconstructed_context)
print(f"nHandoff 原因: {reconstructed_context.handoff_reason}")
print(f"原始 Agent: {reconstructed_context.source_agent_id}")
print(f"建议操作: {reconstructed_context.suggested_next_action}")
代码说明:
Message类定义了对话中的基本信息单位,包括角色、内容、时间戳以及可能的工具调用和结果。ReasoningStep类记录了Agent的内部思考和行动轨迹,这对于接手的Agent理解失败原因至关重要。HandoffReason类定义了Handoff的标准化原因,有助于分类和后续处理。AgentContext是核心模型,聚合了所有关键信息。它包含了用户对话、Agent内部状态和推理过程,以及Handoff的元数据。model_dump_json()和model_validate_json()方法(Pydantic v2)提供了方便的JSON序列化和反序列化功能,确保数据在网络传输中的完整性。
通过这种结构化的上下文表示,我们为Agent之间的无缝接力奠定了坚实的基础。
第三章:Handoffs 的触发机制
何时进行Handoff是Agent智能决策的关键一环。过早的Handoff可能导致不必要的Agent切换,增加系统开销;过晚则可能拖延问题解决,损害用户体验。Handoff触发机制可以是显式的,也可以是隐式的。
3.1 显式触发:Agent 自我评估
这是最常见也最直接的方式,Agent通过对自身状态和任务进展的评估,主动决定进行Handoff。
- 知识边界判断: 当Agent识别出用户的问题超出了其预设的知识范围时。
- 示例: 一个“商品查询Agent”收到一个关于“退款政策”的问题。
- 能力范围限制: 当Agent发现解决问题需要执行它不具备的能力(如调用特定API、进行复杂计算、需要人工干预)时。
- 示例: 一个“信息查询Agent”被要求“取消订单”,但它只具备查询能力。
- 工具调用失败: 当Agent依赖的外部工具(如数据库、API)返回错误或无法提供所需信息时,且Agent自身无法通过重试或替代方案解决。
- 示例: “订单查询Agent”多次调用订单系统API失败。
- 推理陷入循环或死锁: Agent的内部推理过程长时间无法收敛,或者陷入了重复的尝试中。
- 用户明确要求: 用户直接要求“转人工”、“换个客服”等。
- 置信度不足: Agent对其生成的回答或解决方案的置信度低于预设阈值。
3.2 隐式触发:系统监控与规则
这些触发机制通常由外部系统或Agent管理层监控并触发。
- 超时: Agent 在预设时间内未能给出有效响应或解决问题。
- 示例: Agent 连续3分钟没有进展。
- 错误率阈值: Agent 在短时间内连续产生错误响应,或工具调用失败次数达到阈值。
- 示例: Agent 尝试了3次查询,每次都失败。
- 用户重复提问: 用户多次以相似方式重复提问,暗示Agent未能理解或解决问题。
- 特定关键词/短语: 系统检测到用户输入包含某些触发Handoff的关键词,如“投诉”、“紧急”、“无法解决”。
- 人工干预: 监控人员发现Agent表现不佳,手动触发Handoff。
3.3 实现 Handoff 触发的逻辑
在Agent的决策循环中,通常会有一个 should_handoff() 方法来封装这些判断逻辑。
import time
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
# 假设 AgentContext 和 HandoffReason 已经定义
class AgentConfig(BaseModel):
agent_id: str
known_capabilities: List[str] # Agent 具备的能力,如 ["query_order", "update_profile"]
known_knowledge_domains: List[str] # Agent 擅长的知识领域,如 ["订单管理", "账户信息"]
max_tool_retries: int = 2
max_dialog_turns_without_progress: int = 3
class BaseAgent:
def __init__(self, config: AgentConfig):
self.config = config
self.current_context: Optional[AgentContext] = None
self.last_progress_timestamp: float = time.time()
self.tool_failure_count: int = 0
self.dialog_turns_since_last_progress: int = 0
def load_context(self, context: AgentContext):
self.current_context = context
self.last_progress_timestamp = time.time() # 重新加载上下文时重置计时器
self.tool_failure_count = 0
self.dialog_turns_since_last_progress = 0
def process_query(self, query: str) -> str:
# 这是一个模拟的方法,实际中会进行复杂的LLM调用、工具使用等
print(f"Agent {self.config.agent_id} 正在处理查询: {query}")
# 模拟处理逻辑,可能更新 self.current_context.reasoning_trace
# ...
# 模拟工具调用失败
if "特殊订单" in query and self.tool_failure_count < self.config.max_tool_retries:
self.tool_failure_count += 1
print(f"模拟工具调用失败,尝试次数: {self.tool_failure_count}")
self.current_context.reasoning_trace.append(
ReasoningStep(
action="call_tool_special_order",
details={"query_part": "特殊订单"},
outcome="failure",
reasoning=f"尝试处理特殊订单失败,第 {self.tool_failure_count} 次。"
)
)
return "我似乎无法处理特殊订单,请您再详细描述一下吗?"
# 模拟知识边界
if "退款政策" in query:
self.current_context.reasoning_trace.append(
ReasoningStep(
action="evaluate_knowledge_domain",
details={"query_topic": "退款政策"},
outcome="failure",
reasoning="问题超出当前Agent的知识领域。"
)
)
return "关于退款政策,我需要将您转接给专门的Agent处理。"
# 模拟长时间无进展
if time.time() - self.last_progress_timestamp > 10: # 10秒无进展
self.current_context.reasoning_trace.append(
ReasoningStep(
action="evaluate_progress",
details={"time_elapsed": time.time() - self.last_progress_timestamp},
outcome="failure",
reasoning="长时间未取得有效进展。"
)
)
return "我们似乎陷入了僵局,请稍等,我为您寻找更专业的帮助。"
# 模拟成功处理
self.last_progress_timestamp = time.time() # 模拟取得进展
self.tool_failure_count = 0 # 成功处理后重置失败计数
self.dialog_turns_since_last_progress = 0
self.current_context.reasoning_trace.append(
ReasoningStep(
action="generate_response",
details={"response_content": f"已处理查询: {query}"},
outcome="success",
reasoning="成功生成响应。"
)
)
return f"Agent {self.config.agent_id} 已成功处理: {query}"
def should_handoff(self, current_query: str) -> Optional[HandoffReason]:
"""
判断是否需要 Handoff,如果需要,返回 Handoff 原因。
"""
if not self.current_context:
return None
# 显式触发:用户明确要求
if "转人工" in current_query or "换个客服" in current_query:
return HandoffReason.USER_ESCALATION
# 显式触发:知识边界判断
if "退款政策" in current_query and "退款管理" not in self.config.known_knowledge_domains:
return HandoffReason.KNOWLEDGE_GAP
# 显式触发:能力范围限制
if "取消订单" in current_query and "cancel_order" not in self.config.known_capabilities:
return HandoffReason.OUT_OF_SCOPE
# 显式触发:工具调用失败次数过多
if self.tool_failure_count >= self.config.max_tool_retries:
return HandoffReason.TOOL_FAILURE
# 隐式触发:长时间无进展 (假设每轮对话都会更新时间戳)
if (time.time() - self.last_progress_timestamp) > 15: # 15秒无进展则Handoff
return HandoffReason.COMPLEXITY_EXCEEDED
# 隐式触发:对话轮次无进展
if self.dialog_turns_since_last_progress >= self.config.max_dialog_turns_without_progress:
return HandoffReason.COMPLEXITY_EXCEEDED
# 更多判断逻辑...
return None # 不需要 Handoff
def prepare_handoff_context(self, reason: HandoffReason, suggested_action: Optional[str] = None) -> AgentContext:
"""
在 Handoff 之前,更新并返回最新的上下文。
"""
if not self.current_context:
raise ValueError("No context loaded to prepare for handoff.")
self.current_context.handoff_reason = reason
self.current_context.source_agent_id = self.config.agent_id
self.current_context.suggested_next_action = suggested_action
# 确保 latest user query 在 conversation_history 中
if self.current_context.conversation_history and
self.current_context.conversation_history[-1].role != "user":
# 如果最后一条不是用户消息,说明还需要把导致 handoff 的用户消息加进去
# 这里简单处理,实际应根据 Agent 的设计确保消息完整
pass # 假设在 process_query 阶段已经添加了
return self.current_context
# 示例使用
if __name__ == "__main__":
initial_context = AgentContext(
session_id="sess_001",
user_id="user_A",
initial_query="我想查询一个特殊订单,订单号是 XYZ-789。",
current_problem_description="查询特殊订单",
conversation_history=[Message(role="user", content="我想查询一个特殊订单,订单号是 XYZ-789。")],
handoff_reason=HandoffReason.OTHER, # 初始状态
source_agent_id="None"
)
agent_config_order = AgentConfig(
agent_id="OrderQueryAgent",
known_capabilities=["query_order", "track_shipping"],
known_knowledge_domains=["订单管理", "物流信息"],
max_tool_retries=2,
max_dialog_turns_without_progress=2
)
order_agent = BaseAgent(agent_config_order)
order_agent.load_context(initial_context)
print(f"n--- {order_agent.config.agent_id} 处理第一轮 ---")
response = order_agent.process_query("我想查询一个特殊订单,订单号是 XYZ-789。")
print(f"Agent Response: {response}")
reason = order_agent.should_handoff("我想查询一个特殊订单,订单号是 XYZ-789。")
print(f"Should Handoff? {reason}")
if reason:
handoff_context = order_agent.prepare_handoff_context(reason, "请寻找能处理特殊订单的Agent。")
print("Handoff Context prepared:", handoff_context.model_dump_json(indent=2))
print(f"n--- {order_agent.config.agent_id} 处理第二轮 (模拟重试) ---")
order_agent.dialog_turns_since_last_progress += 1 # 模拟无进展轮次增加
response = order_agent.process_query("我又试了一次,还是查不到特殊订单 XYZ-789。")
print(f"Agent Response: {response}")
reason = order_agent.should_handoff("我又试了一次,还是查不到特殊订单 XYZ-789。")
print(f"Should Handoff? {reason}")
if reason:
handoff_context = order_agent.prepare_handoff_context(reason, "工具调用失败,需要更高级的订单处理Agent。")
print("Handoff Context prepared:", handoff_context.model_dump_json(indent=2))
print(f"n--- {order_agent.config.agent_id} 处理第三轮 (模拟知识边界) ---")
order_agent.load_context(
AgentContext(
session_id="sess_002",
user_id="user_B",
initial_query="请问你们的退款政策是什么?",
current_problem_description="用户询问退款政策",
conversation_history=[Message(role="user", content="请问你们的退款政策是什么?")],
handoff_reason=HandoffReason.OTHER,
source_agent_id="None"
)
)
response = order_agent.process_query("请问你们的退款政策是什么?")
print(f"Agent Response: {response}")
reason = order_agent.should_handoff("请问你们的退款政策是什么?")
print(f"Should Handoff? {reason}")
if reason:
handoff_context = order_agent.prepare_handoff_context(reason, "需要转接至了解退款政策的Agent。")
print("Handoff Context prepared:", handoff_context.model_dump_json(indent=2))
代码说明:
AgentConfig包含了Agent的基本配置,如能力和知识领域,这些是Handoff判断的重要依据。BaseAgent类模拟了一个Agent,其中包含了处理查询、判断是否Handoff以及准备Handoff上下文的方法。should_handoff()方法是核心,它根据当前Agent的状态、用户输入和预设规则来决定是否进行Handoff,并返回相应的HandoffReason。prepare_handoff_context()方法用于在Handoff前更新上下文,填充handoff_reason、source_agent_id和suggested_next_action等关键信息。- 示例演示了工具调用失败重试后Handoff,以及知识边界Handoff的场景。
第四章:选择合适的接力对象
当一个Agent决定Handoff时,接下来的关键问题是:将任务交给谁?选择一个不合适的Agent,可能导致任务在多个Agent之间反复Handoff,形成“Handoff循环”,严重影响效率和用户体验。
4.1 目标 Agent 选择策略
选择策略通常基于以下几个维度:
-
基于能力图谱 (Skill Graph / Capability Matching):
- 每个Agent都注册其擅长的能力(
known_capabilities)和知识领域(known_knowledge_domains)。 - Handoff发起Agent或中央调度器根据当前问题的
current_problem_description、handoff_reason以及suggested_next_action,匹配最符合的Agent。 - 这通常涉及一个“能力匹配算法”,可能是简单的关键词匹配,也可能是基于嵌入(embeddings)的语义相似度匹配。
- 每个Agent都注册其擅长的能力(
-
基于负载均衡 (Load Balancing):
- 当有多个Agent都具备处理该问题的能力时,选择当前负载最低的Agent。
- 这要求Agent注册其当前处理的任务数量或计算资源占用情况。
-
基于历史成功率 / 绩效 (Performance-based):
- 记录每个Agent处理特定类型问题的历史成功率。
- 优先选择历史成功率更高的Agent。这需要一个监控和数据收集系统。
-
基于优先级 / 等级 (Priority / Tiering):
- 为Agent设置不同的优先级或等级。例如,“初级Agent”无法解决的问题,优先Handoff给“高级Agent”或“专家Agent”。
- 这与Handoff原因和问题复杂度相关联。
-
基于用户偏好 / 历史记录:
- 如果用户之前与某个特定Agent成功解决过类似问题,可以优先尝试将其Handoff给该Agent。
-
人工Agent作为最终Fallback:
- 在所有自动化Agent都无法处理时,最终Handoff给人类操作员。
4.2 Agent 注册与发现机制
为了实现上述策略,我们需要一个Agent注册中心(AgentRegistry)来管理所有可用Agent的信息。
import random
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
# 假设 AgentConfig 和 AgentContext 已经定义
class AgentRegistryEntry(BaseModel):
agent_id: str
capabilities: List[str]
knowledge_domains: List[str]
load: int = 0 # 模拟当前 Agent 的负载
performance_score: float = 1.0 # 模拟 Agent 的历史表现分数 (0.0 - 1.0)
tier: int = 1 # Agent 等级,数字越大等级越高
class AgentRegistry:
def __init__(self):
self._agents: Dict[str, AgentRegistryEntry] = {}
def register_agent(self, agent_entry: AgentRegistryEntry):
self._agents[agent_entry.agent_id] = agent_entry
print(f"Agent {agent_entry.agent_id} 已注册。")
def update_agent_load(self, agent_id: str, load_change: int):
if agent_id in self._agents:
self._agents[agent_id].load = max(0, self._agents[agent_id].load + load_change)
# print(f"Agent {agent_id} 负载更新为: {self._agents[agent_id].load}")
def update_agent_performance(self, agent_id: str, success: bool):
if agent_id in self._agents:
current_score = self._agents[agent_id].performance_score
if success:
self._agents[agent_id].performance_score = min(1.0, current_score * 1.05 + 0.01) # 成功略微提升
else:
self._agents[agent_id].performance_score = max(0.0, current_score * 0.95 - 0.01) # 失败略微降低
# print(f"Agent {agent_id} 表现分数更新为: {self._agents[agent_id].performance_score:.2f}")
def find_best_agent(self, context: AgentContext) -> Optional[str]:
"""
根据上下文信息,找到最合适的接力 Agent。
这是一个示例的匹配逻辑,实际会更复杂。
"""
potential_agents: List[AgentRegistryEntry] = []
# 1. 根据 Handoff 原因和建议筛选初选 Agent
required_capabilities = []
required_knowledge_domains = []
if context.handoff_reason == HandoffReason.KNOWLEDGE_GAP:
# 尝试从 problem description 或 suggested_next_action 中推断所需知识领域
if "退款政策" in context.current_problem_description or
(context.suggested_next_action and "退款" in context.suggested_next_action):
required_knowledge_domains.append("退款管理")
# 更多根据 reason 填充所需能力和知识的逻辑
elif context.handoff_reason == HandoffReason.OUT_OF_SCOPE:
if context.suggested_next_action and "取消订单" in context.suggested_next_action:
required_capabilities.append("cancel_order")
elif context.suggested_next_action and "修改个人信息" in context.suggested_next_action:
required_capabilities.append("update_profile")
elif context.handoff_reason == HandoffReason.TOOL_FAILURE:
# 如果是工具失败,可能需要一个更专业的工具Agent,或者能处理异常的Agent
# 假设一个 "AdvancedOrderAgent" 具备更强的订单处理能力
if "订单" in context.current_problem_description:
required_knowledge_domains.append("高级订单管理")
elif context.handoff_reason == HandoffReason.USER_ESCALATION:
# 用户要求升级,通常需要转接给高等级Agent或人工
potential_agents.extend([
entry for entry in self._agents.values() if entry.tier >= 2
])
# 匹配能力和知识领域
for agent_id, agent_entry in self._agents.items():
# 确保 Agent 至少具备一项所需能力或知识领域
if (not required_capabilities or any(cap in agent_entry.capabilities for cap in required_capabilities)) and
(not required_knowledge_domains or any(domain in agent_entry.knowledge_domains for domain in required_knowledge_domains)):
potential_agents.append(agent_entry)
if not potential_agents:
print("未能找到匹配能力的 Agent,尝试寻找通用高等级 Agent。")
# 如果没有匹配的,尝试找通用高等级 Agent 或人工
potential_agents.extend([
entry for entry in self._agents.values() if entry.tier >= 2 # 假设 tier 2 及以上是通用型或人工
])
if not potential_agents:
print("未能找到任何合适的 Agent。")
return None
# 2. 根据负载、性能和等级进行排序和选择
# 优先选择:更高等级 -> 更高表现分 -> 更低负载
potential_agents.sort(key=lambda x: (x.tier, x.performance_score, -x.load), reverse=True) # 等级和分数降序,负载升序
# 排除当前发起 Handoff 的 Agent (防止循环)
if context.source_agent_id:
potential_agents = [a for a in potential_agents if a.agent_id != context.source_agent_id]
if potential_agents:
selected_agent = potential_agents[0]
print(f"为会话 {context.session_id} 选定 Agent: {selected_agent.agent_id} "
f"(Tier: {selected_agent.tier}, Score: {selected_agent.performance_score:.2f}, Load: {selected_agent.load})")
return selected_agent.agent_id
return None # 无法找到合适的 Agent
# 示例使用
if __name__ == "__main__":
registry = AgentRegistry()
# 注册 Agent
registry.register_agent(AgentRegistryEntry(agent_id="OrderAgent", capabilities=["query_order", "track_shipping"], knowledge_domains=["订单管理"], tier=1))
registry.register_agent(AgentRegistryEntry(agent_id="RefundAgent", capabilities=["process_refund", "query_policy"], knowledge_domains=["退款管理", "政策咨询"], tier=2))
registry.register_agent(AgentRegistryEntry(agent_id="TechSupportAgent", capabilities=["troubleshoot_tech", "reset_account"], knowledge_domains=["技术支持", "账户管理"], tier=2))
registry.register_agent(AgentRegistryEntry(agent_id="HumanAgent_Tier3", capabilities=["all_human_interaction"], knowledge_domains=["通用客服"], tier=3))
registry.register_agent(AgentRegistryEntry(agent_id="AdvancedOrderAgent", capabilities=["query_special_order", "cancel_order"], knowledge_domains=["高级订单管理"], tier=2))
# 模拟 Handoff 场景 1: 知识边界 (退款政策)
context_kg = AgentContext(
session_id="sess_kg_001",
user_id="user_A",
initial_query="请问你们的退款政策是什么?",
current_problem_description="用户询问退款政策",
conversation_history=[],
handoff_reason=HandoffReason.KNOWLEDGE_GAP,
source_agent_id="OrderAgent",
suggested_next_action="需要转接至了解退款政策的Agent。"
)
target_agent_id_kg = registry.find_best_agent(context_kg)
print(f"场景 1: 知识边界 Handoff,目标 Agent: {target_agent_id_kg}n") # 期望是 RefundAgent
# 模拟 Handoff 场景 2: 工具调用失败 (特殊订单)
context_tf = AgentContext(
session_id="sess_tf_001",
user_id="user_B",
initial_query="我查不到我的特殊订单 XYZ-789。",
current_problem_description="用户特殊订单查询失败",
conversation_history=[],
handoff_reason=HandoffReason.TOOL_FAILURE,
source_agent_id="OrderAgent",
suggested_next_action="工具调用失败,需要更高级的订单处理Agent。"
)
target_agent_id_tf = registry.find_best_agent(context_tf)
print(f"场景 2: 工具失败 Handoff,目标 Agent: {target_agent_id_tf}n") # 期望是 AdvancedOrderAgent
# 模拟 Handoff 场景 3: 用户升级
context_ue = AgentContext(
session_id="sess_ue_001",
user_id="user_C",
initial_query="我要求转人工客服!",
current_problem_description="用户要求转人工",
conversation_history=[],
handoff_reason=HandoffReason.USER_ESCALATION,
source_agent_id="OrderAgent",
suggested_next_action="转接人工客服。"
)
target_agent_id_ue = registry.find_best_agent(context_ue)
print(f"场景 3: 用户升级 Handoff,目标 Agent: {target_agent_id_ue}n") # 期望是 HumanAgent_Tier3 或 RefundAgent (tier 2)
# 模拟 Handoff 场景 4: 无匹配 Agent
context_no_match = AgentContext(
session_id="sess_nm_001",
user_id="user_D",
initial_query="请帮我修理我的智能冰箱。",
current_problem_description="用户请求维修智能冰箱",
conversation_history=[],
handoff_reason=HandoffReason.OUT_OF_SCOPE,
source_agent_id="OrderAgent",
suggested_next_action="需要维修智能冰箱的Agent。"
)
target_agent_id_nm = registry.find_best_agent(context_no_match)
print(f"场景 4: 无匹配 Handoff,目标 Agent: {target_agent_id_nm}n") # 期望是 HumanAgent_Tier3 或 TechSupportAgent
代码说明:
AgentRegistryEntry记录了Agent的元数据,包括能力、知识领域、负载、表现分数和等级。AgentRegistry类是一个中央注册中心,负责Agent的注册、状态更新以及最关键的——目标Agent的发现。find_best_agent()方法实现了多维度的Agent选择逻辑:- 首先根据
handoff_reason和suggested_next_action来推断所需的能力和知识领域,进行初步过滤。 - 然后,对符合条件的Agent,根据
tier(等级)、performance_score(表现)和load(负载)进行加权排序。 tier和performance_score越高越优先,load越低越优先。- 最后,排除了发起Handoff的Agent,以避免立即循环。
- 首先根据
- 这个示例中的匹配逻辑是启发式的,实际应用中可能需要更复杂的语义匹配模型或基于规则引擎的推理。
update_agent_load和update_agent_performance方法展示了如何动态更新Agent的状态,以便进行更智能的决策。
第五章:Handoffs 协议的设计与实现
有了上下文结构和目标Agent选择机制,我们现在需要定义实际的Handoff消息协议,确保Agent之间能可靠地进行通信。
5.1 Handoff 协议栈的思考
Handoff 本质上是一种 Agent 间的特殊消息传递。它需要遵循一定的规范,就像网络通信中的协议栈一样。
- 传输层: 如何发送 Handoff 消息?可以是 HTTP/REST API 调用、消息队列(如 Kafka、RabbitMQ)、WebSocket 等。消息队列是异步 Handoff 的理想选择,因为它提供了解耦、削峰填谷和持久化能力。
- 会话层: 如何管理 Handoff 过程中的状态?例如,Handoff 请求是否被接收、是否被处理、是否成功。
- 应用层: Handoff 消息的具体结构和语义。
这里我们专注于应用层协议的设计。
5.2 Handoff 消息结构
Handoff 消息需要包含足以描述其意图和内容的字段。
import uuid
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
# 假设 AgentContext 和 HandoffReason 已经定义
class HandoffStatus(str):
PENDING = "待处理"
ACCEPTED = "已接受"
REJECTED = "已拒绝"
COMPLETED = "已完成"
FAILED = "失败"
CANCELLED = "已取消"
class HandoffMessage(BaseModel):
handoff_id: str = Field(default_factory=lambda: str(uuid.uuid4())) # 唯一 Handoff 标识符
session_id: str # 关联的会话 ID
source_agent_id: str # 发起 Handoff 的 Agent ID
target_agent_id: str # 目标 Agent ID
timestamp: datetime = Field(default_factory=datetime.now) # Handoff 发起时间
context: AgentContext # 完整的上下文信息
status: HandoffStatus = HandoffStatus.PENDING # Handoff 状态
# Handoff 过程中的反馈信息
rejection_reason: Optional[str] = None # 如果被拒绝,拒绝原因
completion_details: Optional[Dict[str, Any]] = None # 完成 Handoff 后的额外信息
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
extra = "allow"
# 示例:创建 Handoff 消息
if __name__ == "__main__":
# 假设我们有一个准备好的 AgentContext
sample_context = AgentContext(
session_id="sess_001",
user_id="user_A",
initial_query="我想查询一个特殊订单,订单号是 XYZ-789。",
current_problem_description="用户特殊订单查询失败",
conversation_history=[],
handoff_reason=HandoffReason.TOOL_FAILURE,
source_agent_id="OrderAgent",
suggested_next_action="工具调用失败,需要更高级的订单处理Agent。"
)
handoff_msg = HandoffMessage(
session_id=sample_context.session_id,
source_agent_id=sample_context.source_agent_id,
target_agent_id="AdvancedOrderAgent",
context=sample_context,
status=HandoffStatus.PENDING
)
print("Handoff 消息 JSON:n", handoff_msg.model_dump_json(indent=2))
# 模拟 Handoff 接受
accepted_msg = handoff_msg.model_copy(update={"status": HandoffStatus.ACCEPTED})
print("nHandoff 接受消息 JSON:n", accepted_msg.model_dump_json(indent=2))
# 模拟 Handoff 拒绝
rejected_msg = handoff_msg.model_copy(update={"status": HandoffStatus.REJECTED, "rejection_reason": "当前负载过高,无法处理。"})
print("nHandoff 拒绝消息 JSON:n", rejected_msg.model_dump_json(indent=2))
代码说明:
HandoffStatus定义了Handoff请求的生命周期状态。HandoffMessage封装了Handoff的元数据(ID、源/目标Agent、时间戳、状态)以及最重要的AgentContext。rejection_reason和completion_details字段允许在Handoff生命周期中传递额外的反馈信息。
5.3 异步与同步 Handoffs
- 同步 Handoff: 发起Agent等待目标Agent的响应(接受/拒绝)。简单直接,但可能阻塞发起Agent,且目标Agent必须实时在线。适用于Agent数量少、实时性要求高的场景。
- 异步 Handoff: 发起Agent发送Handoff请求后立即继续自己的任务(或释放资源),不等待立即响应。目标Agent在收到请求后异步处理。这通常通过消息队列实现,提供了更高的解耦性、可扩展性和容错性。适用于大规模、高并发的Agent系统。
在大多数复杂的Agent系统中,异步Handoff是更优的选择,因为它避免了紧耦合,并能更好地处理Agent的暂时性不可用。
Handoff 流程表:
| 阶段 | 发起 Agent (Source Agent) | 传输机制 | 目标 Agent (Target Agent) | 管理/调度器 (Orchestrator/Registry) |
|---|---|---|---|---|
| 1. 触发 | 1.1 评估无法处理,决定 Handoff。 | N/A | N/A | N/A |
| 2. 准备上下文 | 2.1 更新 AgentContext,设置 handoff_reason 等。 |
N/A | N/A | N/A |
| 3. 选择目标 | 3.1 请求管理/调度器 find_best_agent()。 |
API 调用 | N/A | 3.2 根据策略选择目标 Agent ID。 |
| 4. 发送 Handoff | 4.1 构建 HandoffMessage (状态 PENDING)。 |
消息队列 (e.g., Kafka) 或 REST API | N/A | N/A |
| 5. 接收 Handoff | N/A | 消息队列监听器 或 REST Endpoint | 5.1 收到 HandoffMessage。 |
N/A |
| 6. 评估与响应 | N/A | N/A | 6.1 评估 AgentContext,判断是否能处理。 |
N/A |
| 7. 接受/拒绝 | N/A | 消息队列 或 REST API (回调) | 7.1 发送 HandoffMessage (状态 ACCEPTED 或 REJECTED)。 |
N/A |
| 8. 更新状态 | 8.1 (如为同步) 收到响应,更新会话状态。 | N/A | N/A | 8.2 接收到响应,更新 Handoff 状态。 |
| 9. 接管处理 | N/A | N/A | 9.1 (如接受) 加载 AgentContext,开始处理。 |
N/A |
| 10. Handoff 完成 | N/A | 消息队列 或 REST API (回调) | 10.1 处理完成后,发送 HandoffMessage (状态 COMPLETED)。 |
10.2 接收到完成消息,记录 Handoff 结果。 |
第六章:上下文的无缝传递与恢复
Handoff 协议的关键在于“无缝”。这意味着目标 Agent 在接收到上下文后,能够像从一开始就在处理这个问题一样,没有任何障碍。
6.1 序列化与反序列化
我们已经使用了 Pydantic 来定义 AgentContext 和 HandoffMessage,这使得序列化(Python 对象转 JSON 字符串)和反序列化(JSON 字符串转 Python 对象)变得非常简单和可靠。
- 序列化: 当发起 Agent 准备 Handoff 时,它会将
AgentContext对象转换为 JSON 字符串进行传输。 - 反序列化: 当目标 Agent 收到 Handoff 消息时,它会将 JSON 字符串反序列化回
AgentContext对象。
这种强类型定义的好处在于,如果在传输或解析过程中出现数据不匹配,Pydantic 会立即报错,防止不完整或错误的数据被 Agent 误用。
6.2 目标 Agent 的上下文加载与状态重建
目标 Agent 接收到反序列化的 AgentContext 对象后,需要执行以下步骤来恢复会话状态:
- 加载核心信息: 将
session_id、user_id、initial_query、current_problem_description和conversation_history加载到其内部会话管理模块。 - 理解 Handoff 原因: 读取
handoff_reason和suggested_next_action,理解前一个 Agent 为什么 Handoff,并获取潜在的下一步指导。 - 分析推理轨迹: 仔细检查
reasoning_trace,理解前一个 Agent 尝试了什么、在哪里失败了。这对于避免重复工作和快速定位问题至关重要。 - 恢复内部状态: 如果
internal_state包含 Agent 特定的关键信息(如已提取的实体、用户偏好设置),目标 Agent 需要将其解析并集成到自己的工作流程中。 - 生成初始响应或行动: 基于恢复的上下文,目标 Agent 可以立即开始推理,生成一个针对性的响应,或者执行第一个行动。
代码示例:目标 Agent 接收并处理 Handoff
import time
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
# 假设 HandoffMessage, AgentContext, AgentConfig, HandoffReason, Message, ReasoningStep 已经定义
class AdvancedOrderAgent(BaseAgent): # 继承自 BaseAgent
def __init__(self, config: AgentConfig):
super().__init__(config)
self.special_order_db: Dict[str, Dict[str, Any]] = {
"XYZ-789": {"status": "处理中", "details": "正在与供应商确认", "customer_info": "user_B"},
"SPO-001": {"status": "已发货", "details": "快递单号: SF123456", "customer_info": "user_C"}
}
def process_query(self, query: str) -> str:
# 这是一个模拟的方法,处理更复杂的订单查询
print(f"Agent {self.config.agent_id} 正在处理查询: {query}")
# 模拟处理特殊订单
if "特殊订单" in query or "XYZ-789" in query:
order_id = "XYZ-789" # 简化处理,直接假设是这个订单
if order_id in self.special_order_db:
order_info = self.special_order_db[order_id]
self.current_context.reasoning_trace.append(
ReasoningStep(
action="query_special_order_db",
details={"order_id": order_id},
outcome="success",
reasoning=f"成功从特殊订单数据库查询到 {order_id}。"
)
)
return f"您的特殊订单 {order_id} 状态是:{order_info['status']},详情:{order_info['details']}。"
else:
self.current_context.reasoning_trace.append(
ReasoningStep(
action="query_special_order_db",
details={"order_id": order_id},
outcome="failure",
reasoning=f"在特殊订单数据库中未能找到 {order_id}。"
)
)
return "抱歉,即使是特殊订单数据库也未能找到该订单。"
# 模拟退款政策查询 (假设 AdvancedOrderAgent 也能处理部分退款问题)
if "退款政策" in query and "退款管理" in self.config.known_knowledge_domains:
self.current_context.reasoning_trace.append(
ReasoningStep(
action="provide_refund_policy",
details={},
outcome="success",
reasoning="提供退款政策信息。"
)
)
return "我们的退款政策是:商品在签收后7天内,如不影响二次销售,可无理由退货。"
return super().process_query(query) # 调用基类的处理逻辑作为兜底
def handle_handoff(self, handoff_msg: HandoffMessage) -> HandoffMessage:
"""
接收并处理 Handoff 请求。
"""
print(f"nAgent {self.config.agent_id} 收到 Handoff 请求 (ID: {handoff_msg.handoff_id})")
# 1. 评估是否接受 Handoff
# 检查自身负载、能力和 Handoff 原因
if self.config.agent_id == handoff_msg.source_agent_id: # 避免自己 Handoff 给自己
print(f"警告:Handoff 目标是自身 ({self.config.agent_id}),拒绝 Handoff。")
return handoff_msg.model_copy(update={
"status": HandoffStatus.REJECTED,
"rejection_reason": "目标Agent与发起Agent相同。"
})
if self.config.load > 5: # 模拟负载过高
print(f"Agent {self.config.agent_id} 负载过高,拒绝 Handoff。")
return handoff_msg.model_copy(update={
"status": HandoffStatus.REJECTED,
"rejection_reason": "Agent 负载过高。"
})
# 简单能力匹配,实际应更复杂
if handoff_msg.context.handoff_reason == HandoffReason.TOOL_FAILURE and
"高级订单管理" not in self.config.known_knowledge_domains:
print(f"Agent {self.config.agent_id} 不具备处理高级订单的能力,拒绝 Handoff。")
return handoff_msg.model_copy(update={
"status": HandoffStatus.REJECTED,
"rejection_reason": "Agent 不具备所需知识领域。"
})
# 接受 Handoff
self.config.load += 1 # 增加负载
handoff_msg.status = HandoffStatus.ACCEPTED
print(f"Agent {self.config.agent_id} 接受 Handoff 请求,当前负载: {self.config.load}")
# 2. 加载上下文并重建状态
self.load_context(handoff_msg.context)
print(f"已加载上下文,会话 ID: {self.current_context.session_id}")
print(f"Handoff 原因: {self.current_context.handoff_reason}")
print(f"前一个 Agent 的思考链:")
for step in self.current_context.reasoning_trace:
print(f" - {step.action} ({step.outcome}): {step.reasoning}")
print(f"建议下一步行动: {self.current_context.suggested_next_action}")
# 3. 根据上下文开始处理
# 模拟立即对用户进行响应
last_user_query = self.current_context.conversation_history[-1].content if self.current_context.conversation_history else self.current_context.initial_query
initial_response = f"您好,我是 {self.config.agent_id},已从 {self.current_context.source_agent_id} 接手。我了解到您的问题是:'{self.current_context.current_problem_description}'。"
# 根据 Handoff 原因和建议,决定如何开始处理
if self.current_context.handoff_reason == HandoffReason.TOOL_FAILURE:
initial_response += f"根据您提供的订单号 '{self.current_context.internal_state.get('order_id_attempted', '未知')}',我将尝试进行更深入的查询。"
# 立即尝试处理
response_content = self.process_query(last_user_query)
initial_response += f"n我的初步处理结果是:{response_content}"
elif self.current_context.handoff_reason == HandoffReason.KNOWLEDGE_GAP:
initial_response += f"关于'{self.current_context.current_problem_description}',我将为您提供详细信息。"
response_content = self.process_query(last_user_query)
initial_response += f"n我的初步处理结果是:{response_content}"
else:
initial_response += "请您详细描述一下您需要我做什么。"
# 模拟将响应添加到历史中
self.current_context.conversation_history.append(
Message(role="agent", content=initial_response, timestamp=datetime.now())
)
handoff_msg.completion_details = {"initial_agent_response": initial_response}
# Handoff 消息状态在 Agent 成功接管并处理后,可以更新为 COMPLETED
handoff_msg.status = HandoffStatus.COMPLETED
return handoff_msg # 返回更新后的 Handoff 消息
# 示例使用
if __name__ == "__main__":
registry = AgentRegistry()
order_agent_config = AgentConfig(
agent_id="OrderAgent",
known_capabilities=["query_order", "track_shipping"],
known_knowledge_domains=["订单管理"],
max_tool_retries=2,
max_dialog_turns_without_progress=2
)
order_agent = BaseAgent(order_agent_config)
registry.register_agent(AgentRegistryEntry(agent_id="OrderAgent", capabilities=["query_order", "track_shipping"], knowledge_domains=["订单管理"], tier=1))
adv_order_agent_config = AgentConfig(
agent_id="AdvancedOrderAgent",
known_capabilities=["query_special_order", "cancel_order", "process_refund"], # 假设也能处理退款
known_knowledge_domains=["高级订单管理", "退款管理"],
max_tool_retries=5,
max_dialog_turns_without_progress=5
)
adv_order_agent = AdvancedOrderAgent(adv_order_agent_config)
registry.register_agent(AgentRegistryEntry(agent_id="AdvancedOrderAgent", capabilities=["query_special_order", "cancel_order", "process_refund"], knowledge_domains=["高级订单管理", "退款管理"], tier=2))
# --- 模拟 Handoff 流程 ---
# 1. 初始 Agent 遇到问题,准备 Handoff
initial_context = AgentContext(
session_id="sess_tf_001",
user_id="user_B",
initial_query="我查不到我的特殊订单 XYZ-789。",
current_problem_description="用户特殊订单查询失败",
conversation_history=[Message(role="user", content="我查不到我的特殊订单 XYZ-789。")],
internal_state={"order_id_attempted": "XYZ-789"},
reasoning_trace=[
ReasoningStep(action="query_order_api", details={"order_id": "XYZ-789"}, outcome="failure", reasoning="API 返回订单不存在。")
],
handoff_reason=HandoffReason.TOOL_FAILURE,
source_agent_id="OrderAgent",
suggested_next_action="工具调用失败,需要更高级的订单处理Agent。"
)
order_agent.load_context(initial_context)
# 2. 找到目标 Agent
target_agent_id = registry.find_best_agent(order_agent.prepare_handoff_context(initial_context.handoff_reason, initial_context.suggested_next_action))
print(f"n调度器选定目标 Agent: {target_agent_id}")
if target_agent_id:
# 3. 构建 Handoff 消息
handoff_msg_to_send = HandoffMessage(
session_id=initial_context.session_id,
source_agent_id=order_agent.config.agent_id,
target_agent_id=target_agent_id,
context=order_agent.prepare_handoff_context(initial_context.handoff_reason, initial_context.suggested_next_action)
)
print(f"n发送 Handoff 消息给 {handoff_msg_to_send.target_agent_id}...")
# 4. 目标 Agent 接收并处理 Handoff
if target_agent_id == adv_order_agent.config.agent_id:
returned_handoff_msg = adv_order_agent.handle_handoff(handoff_msg_to_send)
print(f"nHandoff 处理结果: {returned_handoff_msg.status}")
if returned_handoff_msg.status == HandoffStatus.COMPLETED:
print(f"目标 Agent 初始响应: {returned_handoff_msg.completion_details.get('initial_agent_response')}")
# 此时,adv_order_agent 已经加载了上下文,可以继续与用户交互
print(f"n用户再次提问:'那我的订单现在到底是什么情况?'")
final_response = adv_order_agent.process_query("那我的订单现在到底是什么情况?")
print(f"AdvancedOrderAgent 回复: {final_response}")
else:
print("目标 Agent 不在模拟范围内。")
else:
print("未找到合适的 Agent 进行 Handoff。")
代码说明:
AdvancedOrderAgent继承自BaseAgent,并重写了process_query以模拟其特殊能力。handle_handoff()是目标 Agent 接收和处理 Handoff 的核心方法。- 它首先进行初步评估(负载、能力匹配),决定是否接受 Handoff。
- 如果接受,它会调用
self.load_context()加载完整的AgentContext,从而恢复会话状态。 - 接着,它分析
handoff_reason和reasoning_trace,理解前因后果。 - 最后,它根据恢复的上下文生成一个针对性的初始响应,并可以立即开始解决问题。
- Handoff 消息的状态会在处理过程中更新,从
PENDING到ACCEPTED,最终到COMPLETED。
- 示例展示了从
OrderAgentHandoff 到AdvancedOrderAgent的完整过程,以及AdvancedOrderAgent如何无缝接管并继续处理用户查询。
第七章:健壮性与错误处理
Handoff 协议的健壮性至关重要。Handoff 过程本身也可能失败,我们需要设计相应的错误处理和回退机制。
7.1 Handoff 请求的失败处理
| 失败类型 | 描述 | 应对策略 |
|---|---|---|
| 目标 Agent 不可用 | 目标 Agent 下线、崩溃或网络不可达。 | 重试: 在一定次数内重试 Handoff 请求。 重新选择: 如果重试失败,请求调度器重新选择一个备用 Agent。 Handoff 到人工: 作为最终回退方案。 |
| 目标 Agent 拒绝 Handoff | 目标 Agent 因负载过高、能力不匹配、内部错误等原因明确拒绝。 | 分析拒绝原因: 根据 rejection_reason 判断。重新选择: 如果是负载或临时问题,重新选择 Agent。 Handoff 到人工: 如果是能力不匹配且无其他匹配 Agent,转人工。 |
| Handoff 消息丢失 | 消息队列或网络传输问题导致 Handoff 消息未送达。 | 消息队列的持久化: 确保消息持久化存储。 ACK 机制: 目标 Agent 接收到消息后发送确认。 超时重发: 发起 Agent 在规定时间内未收到 ACK 则重发。 |
| 上下文损坏/不完整 | 传输过程中数据损坏,或原始 Agent 提供的上下文信息不足。 | 数据校验: Pydantic 自动提供。 版本控制: 上下文结构应有版本号,确保兼容性。 回退: 如果上下文无法解析或关键信息缺失,Handoff 到人工,并记录错误。 |
7.2 监控与日志
一个完善的 Handoff 系统必须配备详细的监控和日志:
- Handoff 成功率: 监控 Handoff 请求的成功率。
- Handoff 延迟: 从发起 Handoff 到目标 Agent 成功接管的平均时间。
- Handoff 循环检测: 识别一个会话在多个 Agent 之间反复 Handoff 的情况。这通常通过记录会话的 Handoff 历史和限制 Handoff 次数来实现。
- Agent 负载与性能: 监控每个 Agent 的实时负载和历史成功率,为调度提供数据。
- 详细日志: 记录每次 Handoff 的完整
HandoffMessage、handoff_reason、目标 Agent 选择过程、处理结果和任何错误信息。
7.3 Handoff 失败的最终回退:人工 Agent
在所有自动化 Handoff 机制都失败时,将问题升级到人类操作员 (Human-in-the-Loop Agent) 应该是最终的兜底方案。人工 Agent 应该能够接收与自动化 Agent 相同的 AgentContext,并有一个用户友好的界面来查看对话历史、Agent 思考链和 Handoff 原因。
第八章:Handoffs 的高级策略与未来展望
Handoffs 协议不仅是解决当前问题的工具,也是构建更智能、更自适应多 Agent 系统的基石。
8.1 人类在环 (Human-in-the-Loop) Handoffs
将人类智能无缝集成到 Agent 工作流中,是提升系统智能和可靠性的重要手段。
- 主动 Handoff 给人工: 当 Agent 确定自身无法处理或风险过高时,主动将任务 Handoff 给人工。
- 被动 Handoff 给人工: 当系统检测到 Agent 陷入困境(如长时间无响应、用户情绪负面、Handoff 循环)时,自动触发 Handoff 给人工。
- 人机协作模式: 人工 Agent 在接管任务后,仍可以利用其他自动化 Agent 来执行子任务,如查询信息、生成草稿,实现人机协同。
- 人工反馈循环: 人工操作员可以对 Agent 的 Handoff 决策、处理结果提供反馈,这些反馈可以用于优化 Agent 的 Handoff 策略和能力。
8.2 基于学习的 Handoff 优化
Handoff 决策和目标 Agent 选择都可以通过机器学习进行优化:
- 强化学习 (Reinforcement Learning): 将 Handoff 视为 Agent 的一个“行动”,通过奖励机制(如问题解决时长、用户满意度、资源消耗)来训练 Agent 何时 Handoff、Handoff 给谁。
- 预测模型: 利用历史数据训练模型,预测特定问题由哪个 Agent 处理的成功率最高。
- 上下文嵌入 (Context Embeddings): 将
AgentContext编码为向量,利用相似度搜索来匹配最合适的 Agent。
8.3 Handoffs 的链式反应与循环检测
- 链式 Handoff: 一个 Agent 将任务 Handoff 给 B,B 又 Handoff 给 C。这本身是允许的,但需要限制链的长度,避免无限 Handoff。
- 循环 Handoff 检测: 严格防止 A -> B -> A 或 A -> B -> C -> A 这种循环。可以在
AgentContext中维护一个handoff_path列表,记录任务经过的 Agent ID。当发现目标 Agent ID 已在路径中时,立即 Handoff 到人工或最高等级 Agent。
# 扩展 AgentContext 以支持 Handoff 路径
class AgentContext(BaseModel):
# ... 其他字段不变
handoff_path: List[str] = Field(default_factory=list) # 记录 Handoff 路径
# ... 其他方法
# 在 BaseAgent 的 prepare_handoff_context 方法中更新 handoff_path
class BaseAgent:
# ...
def prepare_handoff_context(self, reason: HandoffReason, suggested_action: Optional[str] = None) -> AgentContext:
if not self.current_context:
raise ValueError("No context loaded to prepare for handoff.")
self.current_context.handoff_reason = reason
self.current_context.source_agent_id = self.config.agent_id
self.current_context.suggested_next_action = suggested_action
# 更新 Handoff 路径
if self.config.agent_id not in self.current_context.handoff_path:
self.current_context.handoff_path.append(self.config.agent_id)
return self.current_context
# 在 AgentRegistry 的 find_best_agent 方法中检测循环
class AgentRegistry:
# ...
def find_best_agent(self, context: AgentContext) -> Optional[str]:
# ... 初步筛选和排序逻辑
# 排除当前发起 Handoff 的 Agent (防止立即循环)
# 还要排除已经在 Handoff 路径中的 Agent
if context.source_agent_id:
potential_agents = [a for a in potential_agents if a.agent_id != context.source_agent_id and a.agent_id not in context.handoff_path]
if not potential_agents:
print(f"警告:检测到 Handoff 循环风险或无可用 Agent,将 Handoff 到最高等级人工 Agent。")
# 查找最高等级人工 Agent 作为回退
human_agents = [entry for entry in self._agents.values() if "human_interaction" in entry.capabilities and entry.tier == max([a.tier for a in self._agents.values()])]
if human_agents:
return human_agents[0].agent_id
return None # 实在找不到就返回 None
# ... 选择逻辑
第九章:实战案例:一个简单的客户服务 Handoff 系统
让我们将上述概念整合到一个简化的客户服务 Handoff 系统中。
场景描述:
一个在线商店的客户服务系统,包含:
GeneralEnquiryAgent(通用咨询 Agent): 处理常见问题,如商品信息、营业时间。OrderManagementAgent(订单管理 Agent): 处理订单查询、修改等。RefundAgent(退款处理 Agent): 专门处理退款申请和退款政策咨询。HumanSupportAgent(人工客服 Agent): 作为最终回退。
Handoff 流程演示:
用户提出问题 -> GeneralEnquiryAgent 尝试处理 -> 如果涉及订单 -> Handoff 给 OrderManagementAgent -> 如果涉及退款 -> Handoff 给 RefundAgent -> 如果所有自动化 Agent 都无法处理 -> Handoff 给 HumanSupportAgent。
import uuid
import time
from datetime import datetime
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
# --- 基础数据模型 (重复定义以便代码独立运行,实际项目中只定义一次) ---
class Message(BaseModel):
role: str
content: str
timestamp: datetime = Field(default_factory=datetime.now)
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_outputs: Optional[List[Dict[str, Any]]] = None
class ReasoningStep(BaseModel):
step_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
action: str
details: Dict[str, Any]
outcome: str
reasoning: Optional[str] = None
class HandoffReason(str):
KNOWLEDGE_GAP = "知识不足"
OUT_OF_SCOPE = "超出能力范围"
TOOL_FAILURE = "工具调用失败"
USER_ESCALATION = "用户要求升级"
COMPLEXITY_EXCEEDED = "问题复杂度超限"
NO_MATCH_AGENT = "无匹配Agent"
OTHER = "其他原因"
class AgentContext(BaseModel):
session_id: str
user_id: Optional[str] = None
initial_query: str
current_problem_description: str
conversation_history: List[Message]
internal_state: Dict[str, Any] = Field(default_factory=dict)
reasoning_trace: List[ReasoningStep] = Field(default_factory=list)
handoff_reason: HandoffReason
source_agent_id: str
suggested_next_action: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
handoff_path: List[str] = Field(default_factory=list) # 增加 Handoff 路径
class Config:
json_encoders = {datetime: lambda v: v.isoformat()}
extra = "allow"
class HandoffStatus(str):
PENDING = "待处理"
ACCEPTED = "已接受"
REJECTED = "已拒绝"
COMPLETED = "已完成"
FAILED = "失败"
CANCELLED = "已取消"
class HandoffMessage(BaseModel):
handoff_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
session_id: str
source_agent_id: str
target_agent_id: str
timestamp: datetime = Field(default_factory=datetime.now)
context: AgentContext
status: HandoffStatus = HandoffStatus.PENDING
rejection_reason: Optional[str] = None
completion_details: Optional[Dict[str, Any]] = None
class AgentConfig(BaseModel):
agent_id: str
known_capabilities: List[str]
known_knowledge_domains: List[str]
max_tool_retries: int = 2
max_dialog_turns_without_progress: int = 3
tier: int = 1 # 新增 Agent 等级
class AgentRegistryEntry(BaseModel):
agent_id: str
capabilities: List[str]
knowledge_domains: List[str]
load: int = 0
performance_score: float = 1.0
tier: int = 1
# --- AgentRegistry (模拟调度中心) ---
class AgentRegistry:
def __init__(self):
self._agents: Dict[str, AgentRegistryEntry] = {}
def register_agent(self, agent_entry: AgentRegistryEntry):
self._agents[agent_entry.agent_id] = agent_entry
# print(f"Agent {agent_entry.agent_id} 已注册。")
def update_agent_load(self, agent_id: str, load_change: int):
if agent_id in self._agents:
self._agents[agent_id].load = max(0, self._agents[agent_id].load + load_change)
def find_best_agent(self, context: AgentContext) -> Optional[str]:
potential_agents: List[AgentRegistryEntry] = []
required_capabilities = []
required_knowledge_domains = []
# 根据 Handoff 原因推断所需能力/知识
if context.handoff_reason == HandoffReason.KNOWLEDGE_GAP:
if "退款政策" in context.current_problem_description or (context.suggested_next_action and "退款" in context.suggested_next_action):
required_knowledge_domains.append("退款管理")
elif "订单" in context.current_problem_description or (context.suggested_next_action and "订单" in context.suggested_next_action):
required_knowledge_domains.append("订单管理")
elif context.handoff_reason == HandoffReason.OUT_OF_SCOPE:
if "订单" in context.current_problem_description or (context.suggested_next_action and "订单" in context.suggested_next_action):
required_capabilities.append("query_order")
required_capabilities.append("modify_order")
elif "退款" in context.current_problem_description or (context.suggested_next_action and "退款" in context.