深入 ‘Handoffs’ 协议:当一个 Agent 无法处理当前问题时,如何优雅地将上下文‘接力’给另一个 Agent?

各位同仁,各位对多智能体系统充满热情的开发者们:

欢迎来到今天的技术讲座。今天,我们将深入探讨多智能体系统中的一个核心且至关重要的机制——“Handoffs”协议。在日益复杂的AI应用场景中,单一智能体(Agent)往往难以应对所有挑战。它们可能面临知识边界、能力限制、资源瓶颈,甚至单纯的决策失误。此时,如何优雅、高效、无缝地将一个智能体无法处理的问题及其所有相关上下文,安全地“接力”给另一个更具专长或更合适的智能体,就成为了构建健壮、智能、用户友好型系统的关键。

我们将从Handoffs的必要性出发,逐步解构上下文的构成、触发机制、目标选择、协议设计,直至其健壮性与高级策略。我将通过具体的代码示例,帮助大家将抽象概念落地,理解如何在实际项目中构建一个可靠的Handoffs系统。


第一章:Handoffs 协议的诞生背景与核心挑战

在人工智能领域,特别是大型语言模型(LLM)驱动的智能体浪潮中,我们常常构建复杂的系统来解决现实世界的问题。这些系统通常由多个智能体协作完成任务。例如,一个客服系统可能包含一个处理常见问题的“初级客服Agent”,一个处理技术故障的“技术支持Agent”,以及一个处理退款和投诉的“高级调解Agent”。

1.1 单一 Agent 的局限性

任何一个Agent,无论其底层模型多么强大,都存在固有的局限性:

  • 知识边界: Agent 训练数据是有限的,特定领域的专业知识可能不足。
  • 能力限制: Agent 可能被设计为执行特定类型的任务(例如,只回答问题,不执行操作)。
  • 资源限制: 某些任务可能需要耗费大量计算资源或访问特定外部工具,而当前Agent可能不具备。
  • 复杂性管理: 当问题过于复杂,需要多步骤推理和信息整合时,单一Agent可能陷入困境。
  • 用户意图误解: Agent 可能无法准确理解用户意图,导致无法提供有效帮助。
  • 状态管理: 在长时间的对话或任务中,Agent可能难以有效管理和利用全部历史状态。

当一个Agent遇到上述挑战,无法继续推进任务时,它不能简单地报错或停止响应。这会导致糟糕的用户体验,甚至任务失败。

1.2 Handoffs 的定义与重要性

Handoffs (接力) 协议,正是为了解决上述问题而生。它定义了一套规范的流程和数据结构,使得一个Agent在判断自身无法处理当前任务时,能够将其所有必要信息(即“上下文”)封装起来,并以一种可被另一个Agent理解和恢复的方式,传递给后者。新的Agent接收上下文后,能够无缝地接管任务,仿佛从未中断过。

Handoffs 的重要性体现在:

  • 提升用户体验: 避免中断和重复提问,提供流畅、连贯的服务。
  • 提高系统鲁棒性: 即使部分Agent失败,系统整体仍能通过Handoffs继续运行。
  • 实现 Agent 专长化: 允许我们构建专注于特定任务的Agent,并通过Handoffs将它们有机地组合起来。
  • 优化资源利用: 将复杂任务分配给更合适的Agent,避免资源浪费。
  • 促进协作与扩展: 为未来增加更多 Agent 类型和更复杂的协作模式打下基础。

1.3 Handoffs 的核心挑战

设计一个高效的Handoffs协议面临以下挑战:

  • 上下文的完整性与一致性: 如何确保所有必要信息都被传递,并且在传递过程中不丢失、不损坏?
  • 上下文的结构化与通用性: 如何将不同Agent内部可能以不同形式表示的信息,转化为一种通用的、可被所有潜在接收者理解的结构?
  • 触发机制的准确性: Agent何时应该决定Handoff?过早可能浪费资源,过晚可能损害用户体验。
  • 目标Agent的选择: 如何智能地选择最适合接管任务的Agent?
  • Handoff 的原子性与健壮性: Handoff过程本身也可能失败,如何处理这些失败?
  • 效率与延迟: Handoff过程应尽可能快速,避免引入显著延迟。

在接下来的章节中,我们将逐一攻克这些挑战。


第二章:上下文的构成与封装

Handoffs 的核心是上下文的传递。一个“优雅”的接力,首先要确保接力的“棒子”——即上下文——是完整且易于抓取的。

2.1 何为“上下文”?

在一个多Agent系统中,上下文远不止是用户输入的最新一句话。它是一个多维度、动态变化的复合体,通常包括:

  1. 用户输入历史: 完整的对话历史,包括用户的所有查询和Agent的所有响应。
  2. 当前问题描述: Agent当前正在尝试解决的具体问题,可能经过Agent内部的解析和提炼。
  3. Agent 内部状态:
    • 思考链 (Chain of Thought / Reasoning Trace): Agent 尝试解决问题时的内部推理过程,包括它尝试过哪些方法、为什么失败了。这对于接手的Agent来说是宝贵的诊断信息。
    • 工具使用记录: Agent 调用了哪些外部工具(API、数据库查询等),以及这些工具的输入和输出。
    • 提取的关键实体/信息: 从对话中识别出的重要信息,如用户ID、订单号、产品名称、时间、地点等。
    • Agent 自身的内部假设或决策树路径。
  4. 系统级信息:
    • 会话ID (Session ID): 唯一标识一次交互会话。
    • 用户ID (User ID): 标识发起请求的用户。
    • 时间戳: Handoff发生的时间。
    • 环境信息: 如语言、地域设置等。
  5. Handoff 特定信息:
    • Handoff 原因: 当前Agent决定Handoff的具体理由(如“知识不足”、“需要人工介入”、“工具调用失败”)。
    • 建议的下一步行动: 当前Agent可能对下一个Agent给出建议,例如“请查询用户订单历史”。

2.2 上下文的结构化表示

为了确保上下文的完整性、一致性和可解析性,我们必须对其进行结构化表示。JSON是首选,因为它通用、易读、跨语言兼容。我们可以定义一个标准化的Context数据结构。

这里,我们将使用 Python 的 pydantic 库来定义上下文模型,它能提供类型检查和序列化/反序列化能力,非常适合构建健壮的数据协议。

import uuid
from datetime import datetime
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field

# 定义对话消息的结构
class Message(BaseModel):
    role: str  # "user", "agent", "system", "tool"
    content: str
    timestamp: datetime = Field(default_factory=datetime.now)
    tool_calls: Optional[List[Dict[str, Any]]] = None # 如果是agent消息,可能包含工具调用
    tool_outputs: Optional[List[Dict[str, Any]]] = None # 工具调用的结果

# 定义 Agent 内部思考链的步骤
class ReasoningStep(BaseModel):
    step_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    action: str  # 尝试的动作,如 "analyze_query", "call_tool", "generate_response"
    details: Dict[str, Any] # 动作的具体细节,如工具名称、参数、LLM提示等
    outcome: str # 动作结果,如 "success", "failure", "inconclusive"
    reasoning: Optional[str] = None # Agent 对此步骤的额外思考

# 定义 Handoff 原因的枚举或字符串常量
class HandoffReason(str):
    KNOWLEDGE_GAP = "知识不足"
    OUT_OF_SCOPE = "超出能力范围"
    TOOL_FAILURE = "工具调用失败"
    USER_ESCALATION = "用户要求升级"
    COMPLEXITY_EXCEEDED = "问题复杂度超限"
    NO_MATCH_AGENT = "无匹配Agent" # 这是一个特殊的内部原因,表示找不到下一个Agent
    OTHER = "其他原因"

# 核心上下文模型
class AgentContext(BaseModel):
    session_id: str
    user_id: Optional[str] = None
    initial_query: str # 用户最初的问题
    current_problem_description: str # Agent 提炼出的当前问题
    conversation_history: List[Message] # 完整的对话历史

    # Agent 内部状态,方便接手Agent理解前因后果
    internal_state: Dict[str, Any] = Field(default_factory=dict) # 存储 Agent 特定的一些键值对状态
    reasoning_trace: List[ReasoningStep] = Field(default_factory=list) # Agent 的思考链

    # Handoff 特定信息
    handoff_reason: HandoffReason # Handoff 的具体原因
    source_agent_id: str # 发起 Handoff 的 Agent ID
    suggested_next_action: Optional[str] = None # 对下一个 Agent 的建议

    # 额外元数据
    metadata: Dict[str, Any] = Field(default_factory=dict) # 存储其他扩展信息,如语言、优先级等

    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }
        # 允许额外字段,方便未来扩展
        extra = "allow"

# 示例:创建一个上下文对象
if __name__ == "__main__":
    session_id = str(uuid.uuid4())
    user_id = "user_123"

    history = [
        Message(role="user", content="我想查询我的订单状态,订单号是 ABC-123。"),
        Message(role="agent", content="好的,请稍等,我正在查询。", 
                tool_calls=[{"name": "lookup_order", "args": {"order_id": "ABC-123"}}]),
        Message(role="tool", content='{"status": "查询失败", "error": "订单不存在或已取消"}', 
                tool_outputs=[{"tool_call_id": "call_123", "output": {"status": "查询失败", "error": "订单不存在或已取消"}}]),
        Message(role="agent", content="抱歉,订单ABC-123查询失败,系统显示订单不存在或已取消。您是否能提供更多信息?", timestamp=datetime.now())
    ]

    reasoning_steps = [
        ReasoningStep(
            action="analyze_query",
            details={"query": "我想查询我的订单状态,订单号是 ABC-123。"},
            outcome="success",
            reasoning="识别到用户意图是查询订单,提取订单号。"
        ),
        ReasoningStep(
            action="call_tool",
            details={"tool_name": "lookup_order", "args": {"order_id": "ABC-123"}},
            outcome="failure",
            reasoning="订单查询工具返回失败,提示订单不存在。"
        ),
        ReasoningStep(
            action="evaluate_failure",
            details={"error": "订单不存在或已取消"},
            outcome="failure",
            reasoning="当前Agent无法处理订单不存在的情况,需要更专业的Agent或人工介入。"
        )
    ]

    context = AgentContext(
        session_id=session_id,
        user_id=user_id,
        initial_query="我想查询我的订单状态,订单号是 ABC-123。",
        current_problem_description="用户订单查询失败,订单号不存在或已取消。",
        conversation_history=history,
        internal_state={"order_id_attempted": "ABC-123", "retry_count": 0},
        reasoning_trace=reasoning_steps,
        handoff_reason=HandoffReason.TOOL_FAILURE,
        source_agent_id="InitialOrderAgent",
        suggested_next_action="请核实用户身份并尝试通过其他信息(如手机号)查询订单,或转交人工客服。"
    )

    # 序列化为 JSON 字符串
    json_context = context.model_dump_json(indent=2)
    print("序列化后的上下文 JSON:n", json_context)

    # 从 JSON 字符串反序列化
    reconstructed_context = AgentContext.model_validate_json(json_context)
    print("n反序列化后的上下文对象:n", reconstructed_context)
    print(f"nHandoff 原因: {reconstructed_context.handoff_reason}")
    print(f"原始 Agent: {reconstructed_context.source_agent_id}")
    print(f"建议操作: {reconstructed_context.suggested_next_action}")

代码说明:

  • Message 类定义了对话中的基本信息单位,包括角色、内容、时间戳以及可能的工具调用和结果。
  • ReasoningStep 类记录了Agent的内部思考和行动轨迹,这对于接手的Agent理解失败原因至关重要。
  • HandoffReason 类定义了Handoff的标准化原因,有助于分类和后续处理。
  • AgentContext 是核心模型,聚合了所有关键信息。它包含了用户对话、Agent内部状态和推理过程,以及Handoff的元数据。
  • model_dump_json()model_validate_json() 方法(Pydantic v2)提供了方便的JSON序列化和反序列化功能,确保数据在网络传输中的完整性。

通过这种结构化的上下文表示,我们为Agent之间的无缝接力奠定了坚实的基础。


第三章:Handoffs 的触发机制

何时进行Handoff是Agent智能决策的关键一环。过早的Handoff可能导致不必要的Agent切换,增加系统开销;过晚则可能拖延问题解决,损害用户体验。Handoff触发机制可以是显式的,也可以是隐式的。

3.1 显式触发:Agent 自我评估

这是最常见也最直接的方式,Agent通过对自身状态和任务进展的评估,主动决定进行Handoff。

  • 知识边界判断: 当Agent识别出用户的问题超出了其预设的知识范围时。
    • 示例: 一个“商品查询Agent”收到一个关于“退款政策”的问题。
  • 能力范围限制: 当Agent发现解决问题需要执行它不具备的能力(如调用特定API、进行复杂计算、需要人工干预)时。
    • 示例: 一个“信息查询Agent”被要求“取消订单”,但它只具备查询能力。
  • 工具调用失败: 当Agent依赖的外部工具(如数据库、API)返回错误或无法提供所需信息时,且Agent自身无法通过重试或替代方案解决。
    • 示例: “订单查询Agent”多次调用订单系统API失败。
  • 推理陷入循环或死锁: Agent的内部推理过程长时间无法收敛,或者陷入了重复的尝试中。
  • 用户明确要求: 用户直接要求“转人工”、“换个客服”等。
  • 置信度不足: Agent对其生成的回答或解决方案的置信度低于预设阈值。

3.2 隐式触发:系统监控与规则

这些触发机制通常由外部系统或Agent管理层监控并触发。

  • 超时: Agent 在预设时间内未能给出有效响应或解决问题。
    • 示例: Agent 连续3分钟没有进展。
  • 错误率阈值: Agent 在短时间内连续产生错误响应,或工具调用失败次数达到阈值。
    • 示例: Agent 尝试了3次查询,每次都失败。
  • 用户重复提问: 用户多次以相似方式重复提问,暗示Agent未能理解或解决问题。
  • 特定关键词/短语: 系统检测到用户输入包含某些触发Handoff的关键词,如“投诉”、“紧急”、“无法解决”。
  • 人工干预: 监控人员发现Agent表现不佳,手动触发Handoff。

3.3 实现 Handoff 触发的逻辑

在Agent的决策循环中,通常会有一个 should_handoff() 方法来封装这些判断逻辑。

import time
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
# 假设 AgentContext 和 HandoffReason 已经定义

class AgentConfig(BaseModel):
    agent_id: str
    known_capabilities: List[str] # Agent 具备的能力,如 ["query_order", "update_profile"]
    known_knowledge_domains: List[str] # Agent 擅长的知识领域,如 ["订单管理", "账户信息"]
    max_tool_retries: int = 2
    max_dialog_turns_without_progress: int = 3

class BaseAgent:
    def __init__(self, config: AgentConfig):
        self.config = config
        self.current_context: Optional[AgentContext] = None
        self.last_progress_timestamp: float = time.time()
        self.tool_failure_count: int = 0
        self.dialog_turns_since_last_progress: int = 0

    def load_context(self, context: AgentContext):
        self.current_context = context
        self.last_progress_timestamp = time.time() # 重新加载上下文时重置计时器
        self.tool_failure_count = 0
        self.dialog_turns_since_last_progress = 0

    def process_query(self, query: str) -> str:
        # 这是一个模拟的方法,实际中会进行复杂的LLM调用、工具使用等
        print(f"Agent {self.config.agent_id} 正在处理查询: {query}")
        # 模拟处理逻辑,可能更新 self.current_context.reasoning_trace
        # ...

        # 模拟工具调用失败
        if "特殊订单" in query and self.tool_failure_count < self.config.max_tool_retries:
            self.tool_failure_count += 1
            print(f"模拟工具调用失败,尝试次数: {self.tool_failure_count}")
            self.current_context.reasoning_trace.append(
                ReasoningStep(
                    action="call_tool_special_order",
                    details={"query_part": "特殊订单"},
                    outcome="failure",
                    reasoning=f"尝试处理特殊订单失败,第 {self.tool_failure_count} 次。"
                )
            )
            return "我似乎无法处理特殊订单,请您再详细描述一下吗?"

        # 模拟知识边界
        if "退款政策" in query:
            self.current_context.reasoning_trace.append(
                ReasoningStep(
                    action="evaluate_knowledge_domain",
                    details={"query_topic": "退款政策"},
                    outcome="failure",
                    reasoning="问题超出当前Agent的知识领域。"
                )
            )
            return "关于退款政策,我需要将您转接给专门的Agent处理。"

        # 模拟长时间无进展
        if time.time() - self.last_progress_timestamp > 10: # 10秒无进展
            self.current_context.reasoning_trace.append(
                ReasoningStep(
                    action="evaluate_progress",
                    details={"time_elapsed": time.time() - self.last_progress_timestamp},
                    outcome="failure",
                    reasoning="长时间未取得有效进展。"
                )
            )
            return "我们似乎陷入了僵局,请稍等,我为您寻找更专业的帮助。"

        # 模拟成功处理
        self.last_progress_timestamp = time.time() # 模拟取得进展
        self.tool_failure_count = 0 # 成功处理后重置失败计数
        self.dialog_turns_since_last_progress = 0
        self.current_context.reasoning_trace.append(
                ReasoningStep(
                    action="generate_response",
                    details={"response_content": f"已处理查询: {query}"},
                    outcome="success",
                    reasoning="成功生成响应。"
                )
            )
        return f"Agent {self.config.agent_id} 已成功处理: {query}"

    def should_handoff(self, current_query: str) -> Optional[HandoffReason]:
        """
        判断是否需要 Handoff,如果需要,返回 Handoff 原因。
        """
        if not self.current_context:
            return None

        # 显式触发:用户明确要求
        if "转人工" in current_query or "换个客服" in current_query:
            return HandoffReason.USER_ESCALATION

        # 显式触发:知识边界判断
        if "退款政策" in current_query and "退款管理" not in self.config.known_knowledge_domains:
            return HandoffReason.KNOWLEDGE_GAP

        # 显式触发:能力范围限制
        if "取消订单" in current_query and "cancel_order" not in self.config.known_capabilities:
            return HandoffReason.OUT_OF_SCOPE

        # 显式触发:工具调用失败次数过多
        if self.tool_failure_count >= self.config.max_tool_retries:
            return HandoffReason.TOOL_FAILURE

        # 隐式触发:长时间无进展 (假设每轮对话都会更新时间戳)
        if (time.time() - self.last_progress_timestamp) > 15: # 15秒无进展则Handoff
            return HandoffReason.COMPLEXITY_EXCEEDED

        # 隐式触发:对话轮次无进展
        if self.dialog_turns_since_last_progress >= self.config.max_dialog_turns_without_progress:
             return HandoffReason.COMPLEXITY_EXCEEDED

        # 更多判断逻辑...

        return None # 不需要 Handoff

    def prepare_handoff_context(self, reason: HandoffReason, suggested_action: Optional[str] = None) -> AgentContext:
        """
        在 Handoff 之前,更新并返回最新的上下文。
        """
        if not self.current_context:
            raise ValueError("No context loaded to prepare for handoff.")

        self.current_context.handoff_reason = reason
        self.current_context.source_agent_id = self.config.agent_id
        self.current_context.suggested_next_action = suggested_action

        # 确保 latest user query 在 conversation_history 中
        if self.current_context.conversation_history and 
           self.current_context.conversation_history[-1].role != "user":
            # 如果最后一条不是用户消息,说明还需要把导致 handoff 的用户消息加进去
            # 这里简单处理,实际应根据 Agent 的设计确保消息完整
            pass # 假设在 process_query 阶段已经添加了

        return self.current_context

# 示例使用
if __name__ == "__main__":
    initial_context = AgentContext(
        session_id="sess_001",
        user_id="user_A",
        initial_query="我想查询一个特殊订单,订单号是 XYZ-789。",
        current_problem_description="查询特殊订单",
        conversation_history=[Message(role="user", content="我想查询一个特殊订单,订单号是 XYZ-789。")],
        handoff_reason=HandoffReason.OTHER, # 初始状态
        source_agent_id="None"
    )

    agent_config_order = AgentConfig(
        agent_id="OrderQueryAgent",
        known_capabilities=["query_order", "track_shipping"],
        known_knowledge_domains=["订单管理", "物流信息"],
        max_tool_retries=2,
        max_dialog_turns_without_progress=2
    )

    order_agent = BaseAgent(agent_config_order)
    order_agent.load_context(initial_context)

    print(f"n--- {order_agent.config.agent_id} 处理第一轮 ---")
    response = order_agent.process_query("我想查询一个特殊订单,订单号是 XYZ-789。")
    print(f"Agent Response: {response}")
    reason = order_agent.should_handoff("我想查询一个特殊订单,订单号是 XYZ-789。")
    print(f"Should Handoff? {reason}")
    if reason:
        handoff_context = order_agent.prepare_handoff_context(reason, "请寻找能处理特殊订单的Agent。")
        print("Handoff Context prepared:", handoff_context.model_dump_json(indent=2))

    print(f"n--- {order_agent.config.agent_id} 处理第二轮 (模拟重试) ---")
    order_agent.dialog_turns_since_last_progress += 1 # 模拟无进展轮次增加
    response = order_agent.process_query("我又试了一次,还是查不到特殊订单 XYZ-789。")
    print(f"Agent Response: {response}")
    reason = order_agent.should_handoff("我又试了一次,还是查不到特殊订单 XYZ-789。")
    print(f"Should Handoff? {reason}")
    if reason:
        handoff_context = order_agent.prepare_handoff_context(reason, "工具调用失败,需要更高级的订单处理Agent。")
        print("Handoff Context prepared:", handoff_context.model_dump_json(indent=2))

    print(f"n--- {order_agent.config.agent_id} 处理第三轮 (模拟知识边界) ---")
    order_agent.load_context(
        AgentContext(
            session_id="sess_002",
            user_id="user_B",
            initial_query="请问你们的退款政策是什么?",
            current_problem_description="用户询问退款政策",
            conversation_history=[Message(role="user", content="请问你们的退款政策是什么?")],
            handoff_reason=HandoffReason.OTHER,
            source_agent_id="None"
        )
    )
    response = order_agent.process_query("请问你们的退款政策是什么?")
    print(f"Agent Response: {response}")
    reason = order_agent.should_handoff("请问你们的退款政策是什么?")
    print(f"Should Handoff? {reason}")
    if reason:
        handoff_context = order_agent.prepare_handoff_context(reason, "需要转接至了解退款政策的Agent。")
        print("Handoff Context prepared:", handoff_context.model_dump_json(indent=2))

代码说明:

  • AgentConfig 包含了Agent的基本配置,如能力和知识领域,这些是Handoff判断的重要依据。
  • BaseAgent 类模拟了一个Agent,其中包含了处理查询、判断是否Handoff以及准备Handoff上下文的方法。
  • should_handoff() 方法是核心,它根据当前Agent的状态、用户输入和预设规则来决定是否进行Handoff,并返回相应的HandoffReason
  • prepare_handoff_context() 方法用于在Handoff前更新上下文,填充handoff_reasonsource_agent_idsuggested_next_action等关键信息。
  • 示例演示了工具调用失败重试后Handoff,以及知识边界Handoff的场景。

第四章:选择合适的接力对象

当一个Agent决定Handoff时,接下来的关键问题是:将任务交给谁?选择一个不合适的Agent,可能导致任务在多个Agent之间反复Handoff,形成“Handoff循环”,严重影响效率和用户体验。

4.1 目标 Agent 选择策略

选择策略通常基于以下几个维度:

  1. 基于能力图谱 (Skill Graph / Capability Matching):

    • 每个Agent都注册其擅长的能力(known_capabilities)和知识领域(known_knowledge_domains)。
    • Handoff发起Agent或中央调度器根据当前问题的current_problem_descriptionhandoff_reason以及suggested_next_action,匹配最符合的Agent。
    • 这通常涉及一个“能力匹配算法”,可能是简单的关键词匹配,也可能是基于嵌入(embeddings)的语义相似度匹配。
  2. 基于负载均衡 (Load Balancing):

    • 当有多个Agent都具备处理该问题的能力时,选择当前负载最低的Agent。
    • 这要求Agent注册其当前处理的任务数量或计算资源占用情况。
  3. 基于历史成功率 / 绩效 (Performance-based):

    • 记录每个Agent处理特定类型问题的历史成功率。
    • 优先选择历史成功率更高的Agent。这需要一个监控和数据收集系统。
  4. 基于优先级 / 等级 (Priority / Tiering):

    • 为Agent设置不同的优先级或等级。例如,“初级Agent”无法解决的问题,优先Handoff给“高级Agent”或“专家Agent”。
    • 这与Handoff原因和问题复杂度相关联。
  5. 基于用户偏好 / 历史记录:

    • 如果用户之前与某个特定Agent成功解决过类似问题,可以优先尝试将其Handoff给该Agent。
  6. 人工Agent作为最终Fallback:

    • 在所有自动化Agent都无法处理时,最终Handoff给人类操作员。

4.2 Agent 注册与发现机制

为了实现上述策略,我们需要一个Agent注册中心(AgentRegistry)来管理所有可用Agent的信息。

import random
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field

# 假设 AgentConfig 和 AgentContext 已经定义

class AgentRegistryEntry(BaseModel):
    agent_id: str
    capabilities: List[str]
    knowledge_domains: List[str]
    load: int = 0 # 模拟当前 Agent 的负载
    performance_score: float = 1.0 # 模拟 Agent 的历史表现分数 (0.0 - 1.0)
    tier: int = 1 # Agent 等级,数字越大等级越高

class AgentRegistry:
    def __init__(self):
        self._agents: Dict[str, AgentRegistryEntry] = {}

    def register_agent(self, agent_entry: AgentRegistryEntry):
        self._agents[agent_entry.agent_id] = agent_entry
        print(f"Agent {agent_entry.agent_id} 已注册。")

    def update_agent_load(self, agent_id: str, load_change: int):
        if agent_id in self._agents:
            self._agents[agent_id].load = max(0, self._agents[agent_id].load + load_change)
            # print(f"Agent {agent_id} 负载更新为: {self._agents[agent_id].load}")

    def update_agent_performance(self, agent_id: str, success: bool):
        if agent_id in self._agents:
            current_score = self._agents[agent_id].performance_score
            if success:
                self._agents[agent_id].performance_score = min(1.0, current_score * 1.05 + 0.01) # 成功略微提升
            else:
                self._agents[agent_id].performance_score = max(0.0, current_score * 0.95 - 0.01) # 失败略微降低
            # print(f"Agent {agent_id} 表现分数更新为: {self._agents[agent_id].performance_score:.2f}")

    def find_best_agent(self, context: AgentContext) -> Optional[str]:
        """
        根据上下文信息,找到最合适的接力 Agent。
        这是一个示例的匹配逻辑,实际会更复杂。
        """
        potential_agents: List[AgentRegistryEntry] = []

        # 1. 根据 Handoff 原因和建议筛选初选 Agent
        required_capabilities = []
        required_knowledge_domains = []

        if context.handoff_reason == HandoffReason.KNOWLEDGE_GAP:
            # 尝试从 problem description 或 suggested_next_action 中推断所需知识领域
            if "退款政策" in context.current_problem_description or 
               (context.suggested_next_action and "退款" in context.suggested_next_action):
                required_knowledge_domains.append("退款管理")
            # 更多根据 reason 填充所需能力和知识的逻辑

        elif context.handoff_reason == HandoffReason.OUT_OF_SCOPE:
            if context.suggested_next_action and "取消订单" in context.suggested_next_action:
                 required_capabilities.append("cancel_order")
            elif context.suggested_next_action and "修改个人信息" in context.suggested_next_action:
                 required_capabilities.append("update_profile")

        elif context.handoff_reason == HandoffReason.TOOL_FAILURE:
            # 如果是工具失败,可能需要一个更专业的工具Agent,或者能处理异常的Agent
            # 假设一个 "AdvancedOrderAgent" 具备更强的订单处理能力
            if "订单" in context.current_problem_description:
                required_knowledge_domains.append("高级订单管理")

        elif context.handoff_reason == HandoffReason.USER_ESCALATION:
            # 用户要求升级,通常需要转接给高等级Agent或人工
            potential_agents.extend([
                entry for entry in self._agents.values() if entry.tier >= 2
            ])

        # 匹配能力和知识领域
        for agent_id, agent_entry in self._agents.items():
            # 确保 Agent 至少具备一项所需能力或知识领域
            if (not required_capabilities or any(cap in agent_entry.capabilities for cap in required_capabilities)) and 
               (not required_knowledge_domains or any(domain in agent_entry.knowledge_domains for domain in required_knowledge_domains)):
                potential_agents.append(agent_entry)

        if not potential_agents:
            print("未能找到匹配能力的 Agent,尝试寻找通用高等级 Agent。")
            # 如果没有匹配的,尝试找通用高等级 Agent 或人工
            potential_agents.extend([
                entry for entry in self._agents.values() if entry.tier >= 2 # 假设 tier 2 及以上是通用型或人工
            ])

        if not potential_agents:
            print("未能找到任何合适的 Agent。")
            return None

        # 2. 根据负载、性能和等级进行排序和选择
        # 优先选择:更高等级 -> 更高表现分 -> 更低负载
        potential_agents.sort(key=lambda x: (x.tier, x.performance_score, -x.load), reverse=True) # 等级和分数降序,负载升序

        # 排除当前发起 Handoff 的 Agent (防止循环)
        if context.source_agent_id:
            potential_agents = [a for a in potential_agents if a.agent_id != context.source_agent_id]

        if potential_agents:
            selected_agent = potential_agents[0]
            print(f"为会话 {context.session_id} 选定 Agent: {selected_agent.agent_id} "
                  f"(Tier: {selected_agent.tier}, Score: {selected_agent.performance_score:.2f}, Load: {selected_agent.load})")
            return selected_agent.agent_id

        return None # 无法找到合适的 Agent

# 示例使用
if __name__ == "__main__":
    registry = AgentRegistry()

    # 注册 Agent
    registry.register_agent(AgentRegistryEntry(agent_id="OrderAgent", capabilities=["query_order", "track_shipping"], knowledge_domains=["订单管理"], tier=1))
    registry.register_agent(AgentRegistryEntry(agent_id="RefundAgent", capabilities=["process_refund", "query_policy"], knowledge_domains=["退款管理", "政策咨询"], tier=2))
    registry.register_agent(AgentRegistryEntry(agent_id="TechSupportAgent", capabilities=["troubleshoot_tech", "reset_account"], knowledge_domains=["技术支持", "账户管理"], tier=2))
    registry.register_agent(AgentRegistryEntry(agent_id="HumanAgent_Tier3", capabilities=["all_human_interaction"], knowledge_domains=["通用客服"], tier=3))
    registry.register_agent(AgentRegistryEntry(agent_id="AdvancedOrderAgent", capabilities=["query_special_order", "cancel_order"], knowledge_domains=["高级订单管理"], tier=2))

    # 模拟 Handoff 场景 1: 知识边界 (退款政策)
    context_kg = AgentContext(
        session_id="sess_kg_001",
        user_id="user_A",
        initial_query="请问你们的退款政策是什么?",
        current_problem_description="用户询问退款政策",
        conversation_history=[],
        handoff_reason=HandoffReason.KNOWLEDGE_GAP,
        source_agent_id="OrderAgent",
        suggested_next_action="需要转接至了解退款政策的Agent。"
    )
    target_agent_id_kg = registry.find_best_agent(context_kg)
    print(f"场景 1: 知识边界 Handoff,目标 Agent: {target_agent_id_kg}n") # 期望是 RefundAgent

    # 模拟 Handoff 场景 2: 工具调用失败 (特殊订单)
    context_tf = AgentContext(
        session_id="sess_tf_001",
        user_id="user_B",
        initial_query="我查不到我的特殊订单 XYZ-789。",
        current_problem_description="用户特殊订单查询失败",
        conversation_history=[],
        handoff_reason=HandoffReason.TOOL_FAILURE,
        source_agent_id="OrderAgent",
        suggested_next_action="工具调用失败,需要更高级的订单处理Agent。"
    )
    target_agent_id_tf = registry.find_best_agent(context_tf)
    print(f"场景 2: 工具失败 Handoff,目标 Agent: {target_agent_id_tf}n") # 期望是 AdvancedOrderAgent

    # 模拟 Handoff 场景 3: 用户升级
    context_ue = AgentContext(
        session_id="sess_ue_001",
        user_id="user_C",
        initial_query="我要求转人工客服!",
        current_problem_description="用户要求转人工",
        conversation_history=[],
        handoff_reason=HandoffReason.USER_ESCALATION,
        source_agent_id="OrderAgent",
        suggested_next_action="转接人工客服。"
    )
    target_agent_id_ue = registry.find_best_agent(context_ue)
    print(f"场景 3: 用户升级 Handoff,目标 Agent: {target_agent_id_ue}n") # 期望是 HumanAgent_Tier3 或 RefundAgent (tier 2)

    # 模拟 Handoff 场景 4: 无匹配 Agent
    context_no_match = AgentContext(
        session_id="sess_nm_001",
        user_id="user_D",
        initial_query="请帮我修理我的智能冰箱。",
        current_problem_description="用户请求维修智能冰箱",
        conversation_history=[],
        handoff_reason=HandoffReason.OUT_OF_SCOPE,
        source_agent_id="OrderAgent",
        suggested_next_action="需要维修智能冰箱的Agent。"
    )
    target_agent_id_nm = registry.find_best_agent(context_no_match)
    print(f"场景 4: 无匹配 Handoff,目标 Agent: {target_agent_id_nm}n") # 期望是 HumanAgent_Tier3 或 TechSupportAgent

代码说明:

  • AgentRegistryEntry 记录了Agent的元数据,包括能力、知识领域、负载、表现分数和等级。
  • AgentRegistry 类是一个中央注册中心,负责Agent的注册、状态更新以及最关键的——目标Agent的发现。
  • find_best_agent() 方法实现了多维度的Agent选择逻辑:
    • 首先根据handoff_reasonsuggested_next_action来推断所需的能力和知识领域,进行初步过滤。
    • 然后,对符合条件的Agent,根据tier(等级)、performance_score(表现)和load(负载)进行加权排序。
    • tierperformance_score 越高越优先,load 越低越优先。
    • 最后,排除了发起Handoff的Agent,以避免立即循环。
  • 这个示例中的匹配逻辑是启发式的,实际应用中可能需要更复杂的语义匹配模型或基于规则引擎的推理。
  • update_agent_loadupdate_agent_performance 方法展示了如何动态更新Agent的状态,以便进行更智能的决策。

第五章:Handoffs 协议的设计与实现

有了上下文结构和目标Agent选择机制,我们现在需要定义实际的Handoff消息协议,确保Agent之间能可靠地进行通信。

5.1 Handoff 协议栈的思考

Handoff 本质上是一种 Agent 间的特殊消息传递。它需要遵循一定的规范,就像网络通信中的协议栈一样。

  • 传输层: 如何发送 Handoff 消息?可以是 HTTP/REST API 调用、消息队列(如 Kafka、RabbitMQ)、WebSocket 等。消息队列是异步 Handoff 的理想选择,因为它提供了解耦、削峰填谷和持久化能力。
  • 会话层: 如何管理 Handoff 过程中的状态?例如,Handoff 请求是否被接收、是否被处理、是否成功。
  • 应用层: Handoff 消息的具体结构和语义。

这里我们专注于应用层协议的设计。

5.2 Handoff 消息结构

Handoff 消息需要包含足以描述其意图和内容的字段。

import uuid
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field

# 假设 AgentContext 和 HandoffReason 已经定义

class HandoffStatus(str):
    PENDING = "待处理"
    ACCEPTED = "已接受"
    REJECTED = "已拒绝"
    COMPLETED = "已完成"
    FAILED = "失败"
    CANCELLED = "已取消"

class HandoffMessage(BaseModel):
    handoff_id: str = Field(default_factory=lambda: str(uuid.uuid4())) # 唯一 Handoff 标识符
    session_id: str # 关联的会话 ID
    source_agent_id: str # 发起 Handoff 的 Agent ID
    target_agent_id: str # 目标 Agent ID
    timestamp: datetime = Field(default_factory=datetime.now) # Handoff 发起时间
    context: AgentContext # 完整的上下文信息
    status: HandoffStatus = HandoffStatus.PENDING # Handoff 状态

    # Handoff 过程中的反馈信息
    rejection_reason: Optional[str] = None # 如果被拒绝,拒绝原因
    completion_details: Optional[Dict[str, Any]] = None # 完成 Handoff 后的额外信息

    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }
        extra = "allow"

# 示例:创建 Handoff 消息
if __name__ == "__main__":
    # 假设我们有一个准备好的 AgentContext
    sample_context = AgentContext(
        session_id="sess_001",
        user_id="user_A",
        initial_query="我想查询一个特殊订单,订单号是 XYZ-789。",
        current_problem_description="用户特殊订单查询失败",
        conversation_history=[],
        handoff_reason=HandoffReason.TOOL_FAILURE,
        source_agent_id="OrderAgent",
        suggested_next_action="工具调用失败,需要更高级的订单处理Agent。"
    )

    handoff_msg = HandoffMessage(
        session_id=sample_context.session_id,
        source_agent_id=sample_context.source_agent_id,
        target_agent_id="AdvancedOrderAgent",
        context=sample_context,
        status=HandoffStatus.PENDING
    )

    print("Handoff 消息 JSON:n", handoff_msg.model_dump_json(indent=2))

    # 模拟 Handoff 接受
    accepted_msg = handoff_msg.model_copy(update={"status": HandoffStatus.ACCEPTED})
    print("nHandoff 接受消息 JSON:n", accepted_msg.model_dump_json(indent=2))

    # 模拟 Handoff 拒绝
    rejected_msg = handoff_msg.model_copy(update={"status": HandoffStatus.REJECTED, "rejection_reason": "当前负载过高,无法处理。"})
    print("nHandoff 拒绝消息 JSON:n", rejected_msg.model_dump_json(indent=2))

代码说明:

  • HandoffStatus 定义了Handoff请求的生命周期状态。
  • HandoffMessage 封装了Handoff的元数据(ID、源/目标Agent、时间戳、状态)以及最重要的AgentContext
  • rejection_reasoncompletion_details 字段允许在Handoff生命周期中传递额外的反馈信息。

5.3 异步与同步 Handoffs

  • 同步 Handoff: 发起Agent等待目标Agent的响应(接受/拒绝)。简单直接,但可能阻塞发起Agent,且目标Agent必须实时在线。适用于Agent数量少、实时性要求高的场景。
  • 异步 Handoff: 发起Agent发送Handoff请求后立即继续自己的任务(或释放资源),不等待立即响应。目标Agent在收到请求后异步处理。这通常通过消息队列实现,提供了更高的解耦性、可扩展性和容错性。适用于大规模、高并发的Agent系统。

在大多数复杂的Agent系统中,异步Handoff是更优的选择,因为它避免了紧耦合,并能更好地处理Agent的暂时性不可用。

Handoff 流程表:

阶段 发起 Agent (Source Agent) 传输机制 目标 Agent (Target Agent) 管理/调度器 (Orchestrator/Registry)
1. 触发 1.1 评估无法处理,决定 Handoff。 N/A N/A N/A
2. 准备上下文 2.1 更新 AgentContext,设置 handoff_reason 等。 N/A N/A N/A
3. 选择目标 3.1 请求管理/调度器 find_best_agent() API 调用 N/A 3.2 根据策略选择目标 Agent ID。
4. 发送 Handoff 4.1 构建 HandoffMessage (状态 PENDING)。 消息队列 (e.g., Kafka) 或 REST API N/A N/A
5. 接收 Handoff N/A 消息队列监听器 或 REST Endpoint 5.1 收到 HandoffMessage N/A
6. 评估与响应 N/A N/A 6.1 评估 AgentContext,判断是否能处理。 N/A
7. 接受/拒绝 N/A 消息队列 或 REST API (回调) 7.1 发送 HandoffMessage (状态 ACCEPTEDREJECTED)。 N/A
8. 更新状态 8.1 (如为同步) 收到响应,更新会话状态。 N/A N/A 8.2 接收到响应,更新 Handoff 状态。
9. 接管处理 N/A N/A 9.1 (如接受) 加载 AgentContext,开始处理。 N/A
10. Handoff 完成 N/A 消息队列 或 REST API (回调) 10.1 处理完成后,发送 HandoffMessage (状态 COMPLETED)。 10.2 接收到完成消息,记录 Handoff 结果。

第六章:上下文的无缝传递与恢复

Handoff 协议的关键在于“无缝”。这意味着目标 Agent 在接收到上下文后,能够像从一开始就在处理这个问题一样,没有任何障碍。

6.1 序列化与反序列化

我们已经使用了 Pydantic 来定义 AgentContextHandoffMessage,这使得序列化(Python 对象转 JSON 字符串)和反序列化(JSON 字符串转 Python 对象)变得非常简单和可靠。

  • 序列化: 当发起 Agent 准备 Handoff 时,它会将 AgentContext 对象转换为 JSON 字符串进行传输。
  • 反序列化: 当目标 Agent 收到 Handoff 消息时,它会将 JSON 字符串反序列化回 AgentContext 对象。

这种强类型定义的好处在于,如果在传输或解析过程中出现数据不匹配,Pydantic 会立即报错,防止不完整或错误的数据被 Agent 误用。

6.2 目标 Agent 的上下文加载与状态重建

目标 Agent 接收到反序列化的 AgentContext 对象后,需要执行以下步骤来恢复会话状态:

  1. 加载核心信息:session_iduser_idinitial_querycurrent_problem_descriptionconversation_history 加载到其内部会话管理模块。
  2. 理解 Handoff 原因: 读取 handoff_reasonsuggested_next_action,理解前一个 Agent 为什么 Handoff,并获取潜在的下一步指导。
  3. 分析推理轨迹: 仔细检查 reasoning_trace,理解前一个 Agent 尝试了什么、在哪里失败了。这对于避免重复工作和快速定位问题至关重要。
  4. 恢复内部状态: 如果 internal_state 包含 Agent 特定的关键信息(如已提取的实体、用户偏好设置),目标 Agent 需要将其解析并集成到自己的工作流程中。
  5. 生成初始响应或行动: 基于恢复的上下文,目标 Agent 可以立即开始推理,生成一个针对性的响应,或者执行第一个行动。

代码示例:目标 Agent 接收并处理 Handoff

import time
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field

# 假设 HandoffMessage, AgentContext, AgentConfig, HandoffReason, Message, ReasoningStep 已经定义

class AdvancedOrderAgent(BaseAgent): # 继承自 BaseAgent
    def __init__(self, config: AgentConfig):
        super().__init__(config)
        self.special_order_db: Dict[str, Dict[str, Any]] = {
            "XYZ-789": {"status": "处理中", "details": "正在与供应商确认", "customer_info": "user_B"},
            "SPO-001": {"status": "已发货", "details": "快递单号: SF123456", "customer_info": "user_C"}
        }

    def process_query(self, query: str) -> str:
        # 这是一个模拟的方法,处理更复杂的订单查询
        print(f"Agent {self.config.agent_id} 正在处理查询: {query}")

        # 模拟处理特殊订单
        if "特殊订单" in query or "XYZ-789" in query:
            order_id = "XYZ-789" # 简化处理,直接假设是这个订单
            if order_id in self.special_order_db:
                order_info = self.special_order_db[order_id]
                self.current_context.reasoning_trace.append(
                    ReasoningStep(
                        action="query_special_order_db",
                        details={"order_id": order_id},
                        outcome="success",
                        reasoning=f"成功从特殊订单数据库查询到 {order_id}。"
                    )
                )
                return f"您的特殊订单 {order_id} 状态是:{order_info['status']},详情:{order_info['details']}。"
            else:
                self.current_context.reasoning_trace.append(
                    ReasoningStep(
                        action="query_special_order_db",
                        details={"order_id": order_id},
                        outcome="failure",
                        reasoning=f"在特殊订单数据库中未能找到 {order_id}。"
                    )
                )
                return "抱歉,即使是特殊订单数据库也未能找到该订单。"

        # 模拟退款政策查询 (假设 AdvancedOrderAgent 也能处理部分退款问题)
        if "退款政策" in query and "退款管理" in self.config.known_knowledge_domains:
            self.current_context.reasoning_trace.append(
                ReasoningStep(
                    action="provide_refund_policy",
                    details={},
                    outcome="success",
                    reasoning="提供退款政策信息。"
                )
            )
            return "我们的退款政策是:商品在签收后7天内,如不影响二次销售,可无理由退货。"

        return super().process_query(query) # 调用基类的处理逻辑作为兜底

    def handle_handoff(self, handoff_msg: HandoffMessage) -> HandoffMessage:
        """
        接收并处理 Handoff 请求。
        """
        print(f"nAgent {self.config.agent_id} 收到 Handoff 请求 (ID: {handoff_msg.handoff_id})")

        # 1. 评估是否接受 Handoff
        # 检查自身负载、能力和 Handoff 原因
        if self.config.agent_id == handoff_msg.source_agent_id: # 避免自己 Handoff 给自己
            print(f"警告:Handoff 目标是自身 ({self.config.agent_id}),拒绝 Handoff。")
            return handoff_msg.model_copy(update={
                "status": HandoffStatus.REJECTED,
                "rejection_reason": "目标Agent与发起Agent相同。"
            })

        if self.config.load > 5: # 模拟负载过高
            print(f"Agent {self.config.agent_id} 负载过高,拒绝 Handoff。")
            return handoff_msg.model_copy(update={
                "status": HandoffStatus.REJECTED,
                "rejection_reason": "Agent 负载过高。"
            })

        # 简单能力匹配,实际应更复杂
        if handoff_msg.context.handoff_reason == HandoffReason.TOOL_FAILURE and 
           "高级订单管理" not in self.config.known_knowledge_domains:
            print(f"Agent {self.config.agent_id} 不具备处理高级订单的能力,拒绝 Handoff。")
            return handoff_msg.model_copy(update={
                "status": HandoffStatus.REJECTED,
                "rejection_reason": "Agent 不具备所需知识领域。"
            })

        # 接受 Handoff
        self.config.load += 1 # 增加负载
        handoff_msg.status = HandoffStatus.ACCEPTED
        print(f"Agent {self.config.agent_id} 接受 Handoff 请求,当前负载: {self.config.load}")

        # 2. 加载上下文并重建状态
        self.load_context(handoff_msg.context)
        print(f"已加载上下文,会话 ID: {self.current_context.session_id}")
        print(f"Handoff 原因: {self.current_context.handoff_reason}")
        print(f"前一个 Agent 的思考链:")
        for step in self.current_context.reasoning_trace:
            print(f"  - {step.action} ({step.outcome}): {step.reasoning}")
        print(f"建议下一步行动: {self.current_context.suggested_next_action}")

        # 3. 根据上下文开始处理
        # 模拟立即对用户进行响应
        last_user_query = self.current_context.conversation_history[-1].content if self.current_context.conversation_history else self.current_context.initial_query
        initial_response = f"您好,我是 {self.config.agent_id},已从 {self.current_context.source_agent_id} 接手。我了解到您的问题是:'{self.current_context.current_problem_description}'。"

        # 根据 Handoff 原因和建议,决定如何开始处理
        if self.current_context.handoff_reason == HandoffReason.TOOL_FAILURE:
            initial_response += f"根据您提供的订单号 '{self.current_context.internal_state.get('order_id_attempted', '未知')}',我将尝试进行更深入的查询。"
            # 立即尝试处理
            response_content = self.process_query(last_user_query)
            initial_response += f"n我的初步处理结果是:{response_content}"
        elif self.current_context.handoff_reason == HandoffReason.KNOWLEDGE_GAP:
            initial_response += f"关于'{self.current_context.current_problem_description}',我将为您提供详细信息。"
            response_content = self.process_query(last_user_query)
            initial_response += f"n我的初步处理结果是:{response_content}"
        else:
            initial_response += "请您详细描述一下您需要我做什么。"

        # 模拟将响应添加到历史中
        self.current_context.conversation_history.append(
            Message(role="agent", content=initial_response, timestamp=datetime.now())
        )

        handoff_msg.completion_details = {"initial_agent_response": initial_response}
        # Handoff 消息状态在 Agent 成功接管并处理后,可以更新为 COMPLETED
        handoff_msg.status = HandoffStatus.COMPLETED 

        return handoff_msg # 返回更新后的 Handoff 消息

# 示例使用
if __name__ == "__main__":
    registry = AgentRegistry()

    order_agent_config = AgentConfig(
        agent_id="OrderAgent",
        known_capabilities=["query_order", "track_shipping"],
        known_knowledge_domains=["订单管理"],
        max_tool_retries=2,
        max_dialog_turns_without_progress=2
    )
    order_agent = BaseAgent(order_agent_config)
    registry.register_agent(AgentRegistryEntry(agent_id="OrderAgent", capabilities=["query_order", "track_shipping"], knowledge_domains=["订单管理"], tier=1))

    adv_order_agent_config = AgentConfig(
        agent_id="AdvancedOrderAgent",
        known_capabilities=["query_special_order", "cancel_order", "process_refund"], # 假设也能处理退款
        known_knowledge_domains=["高级订单管理", "退款管理"],
        max_tool_retries=5,
        max_dialog_turns_without_progress=5
    )
    adv_order_agent = AdvancedOrderAgent(adv_order_agent_config)
    registry.register_agent(AgentRegistryEntry(agent_id="AdvancedOrderAgent", capabilities=["query_special_order", "cancel_order", "process_refund"], knowledge_domains=["高级订单管理", "退款管理"], tier=2))

    # --- 模拟 Handoff 流程 ---

    # 1. 初始 Agent 遇到问题,准备 Handoff
    initial_context = AgentContext(
        session_id="sess_tf_001",
        user_id="user_B",
        initial_query="我查不到我的特殊订单 XYZ-789。",
        current_problem_description="用户特殊订单查询失败",
        conversation_history=[Message(role="user", content="我查不到我的特殊订单 XYZ-789。")],
        internal_state={"order_id_attempted": "XYZ-789"},
        reasoning_trace=[
            ReasoningStep(action="query_order_api", details={"order_id": "XYZ-789"}, outcome="failure", reasoning="API 返回订单不存在。")
        ],
        handoff_reason=HandoffReason.TOOL_FAILURE,
        source_agent_id="OrderAgent",
        suggested_next_action="工具调用失败,需要更高级的订单处理Agent。"
    )
    order_agent.load_context(initial_context)

    # 2. 找到目标 Agent
    target_agent_id = registry.find_best_agent(order_agent.prepare_handoff_context(initial_context.handoff_reason, initial_context.suggested_next_action))
    print(f"n调度器选定目标 Agent: {target_agent_id}")

    if target_agent_id:
        # 3. 构建 Handoff 消息
        handoff_msg_to_send = HandoffMessage(
            session_id=initial_context.session_id,
            source_agent_id=order_agent.config.agent_id,
            target_agent_id=target_agent_id,
            context=order_agent.prepare_handoff_context(initial_context.handoff_reason, initial_context.suggested_next_action)
        )
        print(f"n发送 Handoff 消息给 {handoff_msg_to_send.target_agent_id}...")

        # 4. 目标 Agent 接收并处理 Handoff
        if target_agent_id == adv_order_agent.config.agent_id:
            returned_handoff_msg = adv_order_agent.handle_handoff(handoff_msg_to_send)
            print(f"nHandoff 处理结果: {returned_handoff_msg.status}")
            if returned_handoff_msg.status == HandoffStatus.COMPLETED:
                print(f"目标 Agent 初始响应: {returned_handoff_msg.completion_details.get('initial_agent_response')}")
                # 此时,adv_order_agent 已经加载了上下文,可以继续与用户交互
                print(f"n用户再次提问:'那我的订单现在到底是什么情况?'")
                final_response = adv_order_agent.process_query("那我的订单现在到底是什么情况?")
                print(f"AdvancedOrderAgent 回复: {final_response}")
        else:
            print("目标 Agent 不在模拟范围内。")
    else:
        print("未找到合适的 Agent 进行 Handoff。")

代码说明:

  • AdvancedOrderAgent 继承自 BaseAgent,并重写了 process_query 以模拟其特殊能力。
  • handle_handoff() 是目标 Agent 接收和处理 Handoff 的核心方法。
    • 它首先进行初步评估(负载、能力匹配),决定是否接受 Handoff。
    • 如果接受,它会调用 self.load_context() 加载完整的 AgentContext,从而恢复会话状态。
    • 接着,它分析 handoff_reasonreasoning_trace,理解前因后果。
    • 最后,它根据恢复的上下文生成一个针对性的初始响应,并可以立即开始解决问题。
    • Handoff 消息的状态会在处理过程中更新,从 PENDINGACCEPTED,最终到 COMPLETED
  • 示例展示了从 OrderAgent Handoff 到 AdvancedOrderAgent 的完整过程,以及 AdvancedOrderAgent 如何无缝接管并继续处理用户查询。

第七章:健壮性与错误处理

Handoff 协议的健壮性至关重要。Handoff 过程本身也可能失败,我们需要设计相应的错误处理和回退机制。

7.1 Handoff 请求的失败处理

失败类型 描述 应对策略
目标 Agent 不可用 目标 Agent 下线、崩溃或网络不可达。 重试: 在一定次数内重试 Handoff 请求。
重新选择: 如果重试失败,请求调度器重新选择一个备用 Agent。
Handoff 到人工: 作为最终回退方案。
目标 Agent 拒绝 Handoff 目标 Agent 因负载过高、能力不匹配、内部错误等原因明确拒绝。 分析拒绝原因: 根据 rejection_reason 判断。
重新选择: 如果是负载或临时问题,重新选择 Agent。
Handoff 到人工: 如果是能力不匹配且无其他匹配 Agent,转人工。
Handoff 消息丢失 消息队列或网络传输问题导致 Handoff 消息未送达。 消息队列的持久化: 确保消息持久化存储。
ACK 机制: 目标 Agent 接收到消息后发送确认。
超时重发: 发起 Agent 在规定时间内未收到 ACK 则重发。
上下文损坏/不完整 传输过程中数据损坏,或原始 Agent 提供的上下文信息不足。 数据校验: Pydantic 自动提供。
版本控制: 上下文结构应有版本号,确保兼容性。
回退: 如果上下文无法解析或关键信息缺失,Handoff 到人工,并记录错误。

7.2 监控与日志

一个完善的 Handoff 系统必须配备详细的监控和日志:

  • Handoff 成功率: 监控 Handoff 请求的成功率。
  • Handoff 延迟: 从发起 Handoff 到目标 Agent 成功接管的平均时间。
  • Handoff 循环检测: 识别一个会话在多个 Agent 之间反复 Handoff 的情况。这通常通过记录会话的 Handoff 历史和限制 Handoff 次数来实现。
  • Agent 负载与性能: 监控每个 Agent 的实时负载和历史成功率,为调度提供数据。
  • 详细日志: 记录每次 Handoff 的完整 HandoffMessagehandoff_reason、目标 Agent 选择过程、处理结果和任何错误信息。

7.3 Handoff 失败的最终回退:人工 Agent

在所有自动化 Handoff 机制都失败时,将问题升级到人类操作员 (Human-in-the-Loop Agent) 应该是最终的兜底方案。人工 Agent 应该能够接收与自动化 Agent 相同的 AgentContext,并有一个用户友好的界面来查看对话历史、Agent 思考链和 Handoff 原因。


第八章:Handoffs 的高级策略与未来展望

Handoffs 协议不仅是解决当前问题的工具,也是构建更智能、更自适应多 Agent 系统的基石。

8.1 人类在环 (Human-in-the-Loop) Handoffs

将人类智能无缝集成到 Agent 工作流中,是提升系统智能和可靠性的重要手段。

  • 主动 Handoff 给人工: 当 Agent 确定自身无法处理或风险过高时,主动将任务 Handoff 给人工。
  • 被动 Handoff 给人工: 当系统检测到 Agent 陷入困境(如长时间无响应、用户情绪负面、Handoff 循环)时,自动触发 Handoff 给人工。
  • 人机协作模式: 人工 Agent 在接管任务后,仍可以利用其他自动化 Agent 来执行子任务,如查询信息、生成草稿,实现人机协同。
  • 人工反馈循环: 人工操作员可以对 Agent 的 Handoff 决策、处理结果提供反馈,这些反馈可以用于优化 Agent 的 Handoff 策略和能力。

8.2 基于学习的 Handoff 优化

Handoff 决策和目标 Agent 选择都可以通过机器学习进行优化:

  • 强化学习 (Reinforcement Learning): 将 Handoff 视为 Agent 的一个“行动”,通过奖励机制(如问题解决时长、用户满意度、资源消耗)来训练 Agent 何时 Handoff、Handoff 给谁。
  • 预测模型: 利用历史数据训练模型,预测特定问题由哪个 Agent 处理的成功率最高。
  • 上下文嵌入 (Context Embeddings):AgentContext 编码为向量,利用相似度搜索来匹配最合适的 Agent。

8.3 Handoffs 的链式反应与循环检测

  • 链式 Handoff: 一个 Agent 将任务 Handoff 给 B,B 又 Handoff 给 C。这本身是允许的,但需要限制链的长度,避免无限 Handoff。
  • 循环 Handoff 检测: 严格防止 A -> B -> A 或 A -> B -> C -> A 这种循环。可以在 AgentContext 中维护一个 handoff_path 列表,记录任务经过的 Agent ID。当发现目标 Agent ID 已在路径中时,立即 Handoff 到人工或最高等级 Agent。
# 扩展 AgentContext 以支持 Handoff 路径
class AgentContext(BaseModel):
    # ... 其他字段不变
    handoff_path: List[str] = Field(default_factory=list) # 记录 Handoff 路径

    # ... 其他方法

# 在 BaseAgent 的 prepare_handoff_context 方法中更新 handoff_path
class BaseAgent:
    # ...
    def prepare_handoff_context(self, reason: HandoffReason, suggested_action: Optional[str] = None) -> AgentContext:
        if not self.current_context:
            raise ValueError("No context loaded to prepare for handoff.")

        self.current_context.handoff_reason = reason
        self.current_context.source_agent_id = self.config.agent_id
        self.current_context.suggested_next_action = suggested_action

        # 更新 Handoff 路径
        if self.config.agent_id not in self.current_context.handoff_path:
            self.current_context.handoff_path.append(self.config.agent_id)

        return self.current_context

# 在 AgentRegistry 的 find_best_agent 方法中检测循环
class AgentRegistry:
    # ...
    def find_best_agent(self, context: AgentContext) -> Optional[str]:
        # ... 初步筛选和排序逻辑

        # 排除当前发起 Handoff 的 Agent (防止立即循环)
        # 还要排除已经在 Handoff 路径中的 Agent
        if context.source_agent_id:
            potential_agents = [a for a in potential_agents if a.agent_id != context.source_agent_id and a.agent_id not in context.handoff_path]

        if not potential_agents:
            print(f"警告:检测到 Handoff 循环风险或无可用 Agent,将 Handoff 到最高等级人工 Agent。")
            # 查找最高等级人工 Agent 作为回退
            human_agents = [entry for entry in self._agents.values() if "human_interaction" in entry.capabilities and entry.tier == max([a.tier for a in self._agents.values()])]
            if human_agents:
                return human_agents[0].agent_id
            return None # 实在找不到就返回 None

        # ... 选择逻辑

第九章:实战案例:一个简单的客户服务 Handoff 系统

让我们将上述概念整合到一个简化的客户服务 Handoff 系统中。

场景描述:

一个在线商店的客户服务系统,包含:

  1. GeneralEnquiryAgent (通用咨询 Agent): 处理常见问题,如商品信息、营业时间。
  2. OrderManagementAgent (订单管理 Agent): 处理订单查询、修改等。
  3. RefundAgent (退款处理 Agent): 专门处理退款申请和退款政策咨询。
  4. HumanSupportAgent (人工客服 Agent): 作为最终回退。

Handoff 流程演示:

用户提出问题 -> GeneralEnquiryAgent 尝试处理 -> 如果涉及订单 -> Handoff 给 OrderManagementAgent -> 如果涉及退款 -> Handoff 给 RefundAgent -> 如果所有自动化 Agent 都无法处理 -> Handoff 给 HumanSupportAgent


import uuid
import time
from datetime import datetime
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field

# --- 基础数据模型 (重复定义以便代码独立运行,实际项目中只定义一次) ---
class Message(BaseModel):
    role: str
    content: str
    timestamp: datetime = Field(default_factory=datetime.now)
    tool_calls: Optional[List[Dict[str, Any]]] = None
    tool_outputs: Optional[List[Dict[str, Any]]] = None

class ReasoningStep(BaseModel):
    step_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    action: str
    details: Dict[str, Any]
    outcome: str
    reasoning: Optional[str] = None

class HandoffReason(str):
    KNOWLEDGE_GAP = "知识不足"
    OUT_OF_SCOPE = "超出能力范围"
    TOOL_FAILURE = "工具调用失败"
    USER_ESCALATION = "用户要求升级"
    COMPLEXITY_EXCEEDED = "问题复杂度超限"
    NO_MATCH_AGENT = "无匹配Agent"
    OTHER = "其他原因"

class AgentContext(BaseModel):
    session_id: str
    user_id: Optional[str] = None
    initial_query: str
    current_problem_description: str
    conversation_history: List[Message]
    internal_state: Dict[str, Any] = Field(default_factory=dict)
    reasoning_trace: List[ReasoningStep] = Field(default_factory=list)
    handoff_reason: HandoffReason
    source_agent_id: str
    suggested_next_action: Optional[str] = None
    metadata: Dict[str, Any] = Field(default_factory=dict)
    handoff_path: List[str] = Field(default_factory=list) # 增加 Handoff 路径

    class Config:
        json_encoders = {datetime: lambda v: v.isoformat()}
        extra = "allow"

class HandoffStatus(str):
    PENDING = "待处理"
    ACCEPTED = "已接受"
    REJECTED = "已拒绝"
    COMPLETED = "已完成"
    FAILED = "失败"
    CANCELLED = "已取消"

class HandoffMessage(BaseModel):
    handoff_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    session_id: str
    source_agent_id: str
    target_agent_id: str
    timestamp: datetime = Field(default_factory=datetime.now)
    context: AgentContext
    status: HandoffStatus = HandoffStatus.PENDING
    rejection_reason: Optional[str] = None
    completion_details: Optional[Dict[str, Any]] = None

class AgentConfig(BaseModel):
    agent_id: str
    known_capabilities: List[str]
    known_knowledge_domains: List[str]
    max_tool_retries: int = 2
    max_dialog_turns_without_progress: int = 3
    tier: int = 1 # 新增 Agent 等级

class AgentRegistryEntry(BaseModel):
    agent_id: str
    capabilities: List[str]
    knowledge_domains: List[str]
    load: int = 0
    performance_score: float = 1.0
    tier: int = 1

# --- AgentRegistry (模拟调度中心) ---
class AgentRegistry:
    def __init__(self):
        self._agents: Dict[str, AgentRegistryEntry] = {}

    def register_agent(self, agent_entry: AgentRegistryEntry):
        self._agents[agent_entry.agent_id] = agent_entry
        # print(f"Agent {agent_entry.agent_id} 已注册。")

    def update_agent_load(self, agent_id: str, load_change: int):
        if agent_id in self._agents:
            self._agents[agent_id].load = max(0, self._agents[agent_id].load + load_change)

    def find_best_agent(self, context: AgentContext) -> Optional[str]:
        potential_agents: List[AgentRegistryEntry] = []

        required_capabilities = []
        required_knowledge_domains = []

        # 根据 Handoff 原因推断所需能力/知识
        if context.handoff_reason == HandoffReason.KNOWLEDGE_GAP:
            if "退款政策" in context.current_problem_description or (context.suggested_next_action and "退款" in context.suggested_next_action):
                required_knowledge_domains.append("退款管理")
            elif "订单" in context.current_problem_description or (context.suggested_next_action and "订单" in context.suggested_next_action):
                required_knowledge_domains.append("订单管理")
        elif context.handoff_reason == HandoffReason.OUT_OF_SCOPE:
            if "订单" in context.current_problem_description or (context.suggested_next_action and "订单" in context.suggested_next_action):
                required_capabilities.append("query_order")
                required_capabilities.append("modify_order")
            elif "退款" in context.current_problem_description or (context.suggested_next_action and "退款" in context.

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注