深入 ‘Resume with Action’:当用户修改了 Agent 的中间思考后,图如何从断点处继续推理?

各位技术同仁,大家好!

今天我们深入探讨一个在构建高级AI Agent时至关重要且极具挑战性的话题——当用户修改了 Agent 的中间思考后,图如何从断点处继续推理? 这不仅仅是技术实现的问题,更是人机协作范式下,如何赋予AI Agent韧性与可控性的核心议题。我们将从Agent的思考范式、计算图的构建、状态管理、以及具体的代码实现策略等多个维度,系统性地剖析这一复杂机制。

引言:AI Agent 的崛起与人机协作的必然性

近年来,以大型语言模型(LLM)为核心的AI Agent展现出惊人的能力,它们不再局限于单一任务,而是能够通过规划、工具使用、记忆和反思,自主完成一系列复杂任务。然而,这些Agent的推理过程,特别是涉及多步骤、多工具调用的链式思考,并非总是完美无瑕。它们可能陷入逻辑循环、选择错误的工具、生成不准确的事实,或仅仅是未能理解用户意图的细微差别。

在这种情况下,人机协作(Human-in-the-Loop, HITL) 变得不可或缺。用户介入Agent的中间思考,纠正其规划、修改其假设、甚至直接提供下一步的行动指令,能够显著提高Agent的可靠性、效率和安全性。然而,这种介入带来了一个核心技术挑战:Agent的推理过程是一个有向无环图(DAG)或更广义的计算图,当图中的某个节点被修改时,如何有效地、逻辑严谨地从这个“断点”处继续推理,而不是简单地从头开始? 这正是我们今天讲座的焦点——“Resume with Action”。

第一章:AI Agent 的思考范式与计算图的抽象

要理解如何从断点处恢复,我们首先需要理解Agent的思考是如何被结构化的。

1.1 Agent 的核心组件与思考流程

一个典型的AI Agent通常包含以下核心组件:

  • 大型语言模型(LLM): 作为Agent的大脑,负责理解指令、生成思考、规划行动。
  • 规划器(Planner): 根据LLM的推理,将复杂任务分解为一系列子任务或步骤。
  • 工具集(Tools): 供Agent调用的外部功能,如搜索引擎、代码解释器、API接口等。
  • 记忆(Memory): 存储历史交互、思考过程和结果,用于上下文维持和学习。
  • 反思器(Reflector): 对已完成的步骤或结果进行评估,识别错误并进行修正。

其思考流程通常遵循“观察-思考-行动-观察”的循环:

  1. 观察 (Observe): Agent接收用户指令,或从环境中获取信息。
  2. 思考 (Think): LLM根据观察到的信息,结合记忆和工具集,生成一个思考(Thought)和下一步的行动计划(Action)。这通常表现为内部的思维链(Chain of Thought, CoT)或ReAct模式(Reasoning and Acting)。
  3. 行动 (Act): Agent执行计划中的行动,这通常是调用某个工具。
  4. 观察 (Observe): Agent接收工具的执行结果(Observation),并将其作为新的输入,再次进入“思考”阶段,直到任务完成。

1.2 计算图:Agent 思考的结构化表示

我们可以将Agent的整个推理过程抽象为一个有向无环图(DAG)。图中的每一个节点代表一个离散的思考步骤、一个工具调用、一个观察结果,或者一个最终答案。边则表示这些步骤之间的逻辑依赖或数据流。

表 1: 计算图中的节点类型示例

节点类型 描述 示例输出
Thought LLM产生的内部思考,用于规划或推理。 "我需要先查找股票的当前价格,然后分析其财务报告。"
Action LLM决定执行的某个工具调用,包含工具名和参数。 {"tool": "search_stock_price", "input": {"symbol": "AAPL"}}
Observation 工具执行后返回的结果。 {"AAPL_price": 175.25}
FinalAnswer Agent得出的最终结论或结果。 "根据最新数据,苹果公司股票(AAPL)当前价格为175.25美元,建议买入。"
UserIntervention 用户提供的修改或指令。 "Agent,你的分析有误,AAPL的市盈率已经过高,不建议买入。"

图的结构示例:

用户指令 -> Thought_1 -> Action_1 (SearchTool) -> Observation_1 -> Thought_2 -> Action_2 (AnalysisTool) -> Observation_2 -> FinalAnswer

在这个图中,每个节点都有其输入(来自前驱节点或用户指令),并产生输出(供后继节点使用)。

Python 代码示例:抽象计算图的节点和边

import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Any, Optional, List, Callable, Union

class NodeType(str, Enum):
    """定义计算图中的节点类型。"""
    USER_PROMPT = "user_prompt"
    THOUGHT = "thought"
    ACTION = "action"
    OBSERVATION = "observation"
    FINAL_ANSWER = "final_answer"
    USER_INTERVENTION = "user_intervention" # 用户干预节点

class NodeStatus(str, Enum):
    """定义节点在执行过程中的状态。"""
    PENDING = "pending"       # 等待执行
    EXECUTING = "executing"   # 正在执行
    COMPLETED = "completed"   # 执行完成
    FAILED = "failed"         # 执行失败
    SKIPPED = "skipped"       # 跳过执行(例如,被下游修改影响)
    MODIFIED = "modified"     # 被用户修改,需要重新评估下游

class AgentNode:
    """计算图中的一个节点,代表Agent思考或行动的一个步骤。"""
    def __init__(self,
                 node_id: Optional[str] = None,
                 node_type: NodeType = NodeType.THOUGHT,
                 input_data: Any = None,
                 output_data: Any = None,
                 status: NodeStatus = NodeStatus.PENDING,
                 timestamp: Optional[datetime] = None,
                 description: str = "",
                 # 对于ACTION节点,可能需要存储工具信息
                 tool_name: Optional[str] = None,
                 tool_args: Optional[Dict[str, Any]] = None):
        self.node_id = node_id if node_id else str(uuid.uuid4())
        self.node_type = node_type
        self.input_data = input_data
        self.output_data = output_data
        self.status = status
        self.timestamp = timestamp if timestamp else datetime.now()
        self.description = description
        self.tool_name = tool_name
        self.tool_args = tool_args
        self.predecessors: List[str] = [] # 前驱节点ID
        self.successors: List[str] = []   # 后继节点ID

    def to_dict(self) -> Dict[str, Any]:
        """将节点转换为字典,便于存储和序列化。"""
        return {
            "node_id": self.node_id,
            "node_type": self.node_type.value,
            "input_data": self.input_data,
            "output_data": self.output_data,
            "status": self.status.value,
            "timestamp": self.timestamp.isoformat(),
            "description": self.description,
            "tool_name": self.tool_name,
            "tool_args": self.tool_args,
            "predecessors": self.predecessors,
            "successors": self.successors,
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'AgentNode':
        """从字典创建节点实例。"""
        node = cls(
            node_id=data["node_id"],
            node_type=NodeType(data["node_type"]),
            input_data=data["input_data"],
            output_data=data["output_data"],
            status=NodeStatus(data["status"]),
            timestamp=datetime.fromisoformat(data["timestamp"]),
            description=data.get("description", ""),
            tool_name=data.get("tool_name"),
            tool_args=data.get("tool_args")
        )
        node.predecessors = data.get("predecessors", [])
        node.successors = data.get("successors", [])
        return node

    def __repr__(self):
        return (f"AgentNode(id='{self.node_id[:8]}...', type={self.node_type.value}, "
                f"status={self.status.value}, desc='{self.description[:30]}...')")

class AgentGraph:
    """表示Agent的整个推理过程的计算图。"""
    def __init__(self):
        self.nodes: Dict[str, AgentNode] = {}
        self.current_execution_path: List[str] = [] # 记录当前执行路径,用于回溯

    def add_node(self, node: AgentNode):
        """向图中添加一个节点。"""
        if node.node_id in self.nodes:
            raise ValueError(f"Node with ID {node.node_id} already exists.")
        self.nodes[node.node_id] = node

    def add_edge(self, predecessor_id: str, successor_id: str):
        """在两个节点之间添加一条边,表示逻辑依赖。"""
        if predecessor_id not in self.nodes:
            raise ValueError(f"Predecessor node {predecessor_id} not found.")
        if successor_id not in self.nodes:
            raise ValueError(f"Successor node {successor_id} not found.")

        if successor_id not in self.nodes[predecessor_id].successors:
            self.nodes[predecessor_id].successors.append(successor_id)
        if predecessor_id not in self.nodes[successor_id].predecessors:
            self.nodes[successor_id].predecessors.append(predecessor_id)

    def get_node(self, node_id: str) -> Optional[AgentNode]:
        """根据ID获取节点。"""
        return self.nodes.get(node_id)

    def get_ready_nodes(self) -> List[AgentNode]:
        """获取所有已就绪(PENDING且所有前驱已完成)的节点。"""
        ready_nodes = []
        for node_id, node in self.nodes.items():
            if node.status == NodeStatus.PENDING:
                all_predecessors_completed = True
                for pred_id in node.predecessors:
                    if self.nodes[pred_id].status not in [NodeStatus.COMPLETED, NodeStatus.MODIFIED, NodeStatus.SKIPPED]:
                        all_predecessors_completed = False
                        break
                if all_predecessors_completed:
                    ready_nodes.append(node)
        return ready_nodes

    def topological_sort(self) -> List[str]:
        """
        对图进行拓扑排序,返回节点的执行顺序。
        只考虑未完成的节点,但会包含已完成的节点作为依赖。
        """
        in_degree = {node_id: len(node.predecessors) for node_id, node in self.nodes.items()}
        queue = [node_id for node_id, degree in in_degree.items() if degree == 0]
        sorted_nodes = []

        while queue:
            node_id = queue.pop(0)
            sorted_nodes.append(node_id)

            for successor_id in self.nodes[node_id].successors:
                in_degree[successor_id] -= 1
                if in_degree[successor_id] == 0:
                    queue.append(successor_id)

        if len(sorted_nodes) != len(self.nodes):
            # 这表示图中存在循环,对于Agent的推理图来说通常是不期望的
            # 或者意味着我们只排序了部分可达节点
            pass # 实际应用中可能需要更严格的循环检测

        return sorted_nodes

    def to_dict(self) -> Dict[str, Any]:
        """将整个图转换为字典,便于序列化。"""
        return {
            "nodes": {node_id: node.to_dict() for node_id, node in self.nodes.items()},
            "current_execution_path": self.current_execution_path,
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'AgentGraph':
        """从字典创建图实例。"""
        graph = cls()
        for node_id, node_data in data["nodes"].items():
            graph.add_node(AgentNode.from_dict(node_data))
        # 重新建立边(因为节点创建时没有处理前驱后继列表)
        for node_id, node_data in data["nodes"].items():
            node = graph.nodes[node_id]
            for pred_id in node_data.get("predecessors", []):
                if pred_id not in node.predecessors: # 避免重复添加
                    node.predecessors.append(pred_id)
            for succ_id in node_data.get("successors", []):
                if succ_id not in node.successors: # 避免重复添加
                    node.successors.append(succ_id)
        graph.current_execution_path = data.get("current_execution_path", [])
        return graph

    def __repr__(self):
        return f"AgentGraph(nodes={len(self.nodes)}, ready={len(self.get_ready_nodes())})"

第二章:人机协作的必要性与“断点”的引入

AI Agent的自动推理能力强大,但并非万能。在复杂、高风险或需要精确控制的场景中,人类的介入是必不可少的。

2.1 用户介入的动机

  • 纠正错误: Agent可能产生错误的思考、错误的工具参数或不正确的结论。
  • 引导方向: 当Agent偏离用户意图或陷入僵局时,用户可以提供明确的指导。
  • 注入新信息: 用户可能拥有Agent无法通过工具获取的私有或最新信息。
  • 优化效率: 用户可以识别冗余步骤或提供更直接的解决方案。
  • 确保安全与合规: 在敏感任务中,人类可以作为最终的守门员,防止Agent产生有害或不合规的行为。

2.2 定义“断点”与用户修改

当用户介入时,实际上是在Agent的计算图中设置了一个“断点”。这个断点通常对应于图中的某个特定节点,用户希望修改该节点的输出,或者直接改变Agent后续的推理路径。

用户修改的两种主要形式:

  1. 修改已有节点的内容: 用户指出某个已完成的 ThoughtAction 节点存在问题,并提供新的 output_data。例如,Agent错误地规划了“查询历史股价”,用户将其修改为“查询最新财报”。
  2. 插入新节点: 用户可能希望在某个现有节点之后强制插入一个新的 ThoughtAction,从而改变原有的执行流。例如,在Agent执行某个搜索操作后,用户要求它立即执行一个特定的计算,而不是等待LLM生成下一个思考。

无论是哪种形式,核心挑战都是如何将用户的修改无缝地整合回计算图,并让Agent从新的起点或修正后的状态继续其推理。

第三章:核心机制:状态管理与上下文重构

为了能够从断点处继续推理,Agent系统必须具备强大的状态管理能力,能够精确地记录和恢复其执行上下文。

3.1 Agent 状态的捕获

在Agent执行的每一步,我们都需要捕获足够的信息,以便在需要时进行回溯或恢复。这包括:

  • 计算图的完整结构: 所有的 AgentNode 及其 predecessorssuccessors
  • 每个节点的状态: status (PENDING, COMPLETED, FAILED, MODIFIED等), input_data, output_data, timestamp
  • Agent的全局上下文: 例如,共享的记忆(Memory)内容,已经调用的工具列表等。
  • 当前执行路径: 在多分支或并行执行的场景中,记录Agent是如何到达当前节点的路径信息。

表 2: Agent 状态的关键组成部分

状态组件 描述 示例
graph AgentGraph 实例,包含所有节点及其连接。 {'nodes': {...}, 'edges': {...}}
current_prompt 驱动当前推理的用户初始指令。 "分析AAPL股票并给出投资建议"
memory_context Agent的记忆模块,包含历史对话、关键事实等。 ["用户想了解AAPL股票。", "上次查询的AAPL价格是175.25。"]
tool_registry Agent可用的工具列表及其定义。 {'search_tool': <SearchTool obj>, 'analysis_tool': <AnalysisTool obj>}
execution_log 详细的执行日志,可能包含LLM的原始输出、工具的详细日志等。 [{"node_id": "...", "event": "LLM_CALL_START", "prompt": "..."}, ...]

3.2 上下文重构与“世界状态”

当用户修改一个节点时,我们不仅仅是改变了该节点本身,更重要的是改变了Agent对“世界状态”的理解。例如,如果Agent认为股票价格是100美元,而用户将其纠正为200美元,那么所有依赖于这个价格进行决策的后续步骤都可能变得无效。

上下文重构的核心思想:

  1. 加载历史状态: 从持久化存储中加载Agent在用户干预前的完整计算图和全局上下文。
  2. 应用用户修改: 将用户的修改应用到计算图的特定节点上,更新其 output_datastatus
  3. 识别受影响的下游节点: 找出所有直接或间接依赖于被修改节点的后续节点。这些节点需要被标记为“无效”或“待重新评估”。
  4. 重新规划/执行: Agent需要从被修改的节点或其紧邻的未完成(且现在已无效)的下游节点开始,重新进行推理和执行。

这个过程需要Agent框架能够:

  • 遍历图: 有效地识别一个节点的所有下游依赖。
  • 状态更新: 准确地更新节点和图的状态。
  • LLM调用策略: 在重新执行时,如何向LLM提供正确的上下文,使其能够基于新的信息继续思考。

第四章:从断点处续航推理的策略与实现

这是本次讲座的核心技术部分。我们将聚焦于如何具体实现“从断点处继续推理”的逻辑。

4.1 核心挑战:下游节点的失效与重评估

当节点 N 的输出 output_N 被用户修改为 output_N' 后:

  1. 直接后继节点: N 的所有直接后继节点 S1, S2, ... 的输入都可能发生了变化。它们需要被重新评估。
  2. 间接后继节点: S1, S2, ... 的后继节点,以及更深层的下游节点,也可能受到影响。

因此,最关键的策略是:在用户修改某个节点后,递归地将该节点及其所有下游节点标记为“待重新执行”状态,并清除它们的旧输出。

4.2 续航推理的步骤与算法

假设用户修改了 node_id_to_modify 节点,并提供了 new_output_data

  1. 定位并更新被修改节点:

    • 获取 AgentGraph 实例。
    • 找到 node_id_to_modify 对应的 AgentNode
    • 更新其 output_datanew_output_data
    • 将其 status 设置为 NodeStatus.MODIFIED
  2. 失效下游节点:

    • node_id_to_modify 开始,进行深度优先搜索(DFS)或广度优先搜索(BFS),遍历所有可达的下游节点。
    • 对于每一个下游节点 D
      • 将其 status 设置为 NodeStatus.PENDING(或 INVALIDATED,如果需要区分)。
      • 清除其 output_data
      • 更新 timestamp
  3. 重新规划起始点:

    • 在失效所有下游节点后,Agent需要从某个点开始重新执行。这个点可能是被修改的节点本身(如果它不是最终答案,且需要LLM基于新输出进行下一步思考),或者是被修改节点下游的第一个 PENDING 节点。
    • 通常,我们希望Agent能够从被修改节点“之后”的逻辑起点重新开始。
  4. 执行图:

    • 从新的起始点开始,按照拓扑顺序(或动态就绪队列)执行图中的 PENDING 节点。
    • 执行过程中,LLM将接收包含用户修改后上下文的Prompt。

Python 代码示例:实现用户修改与图的续航

首先,我们需要一个模拟LLM和工具的接口:

class MockLLM:
    """模拟一个LLM,根据上下文生成Thought或Action。"""
    def __init__(self):
        self.call_count = 0

    def generate_response(self, prompt: str) -> Dict[str, Any]:
        self.call_count += 1
        print(f"n--- LLM Call {self.call_count} ---")
        print(f"Prompt:n{prompt}n---")

        # 模拟LLM根据Prompt生成Thought或Action
        if "股票价格" in prompt and "分析" not在 prompt:
            return {"type": "action", "tool_name": "search_tool", "tool_args": {"query": "AAPL current stock price"}}
        elif "current stock price is" in prompt and "分析" in prompt:
            return {"type": "thought", "content": "好的,我已经获取了股票价格。现在我需要使用分析工具来评估其投资价值。"}
        elif "评估投资价值" in prompt:
            return {"type": "action", "tool_name": "analysis_tool", "tool_args": {"symbol": "AAPL", "price": "175.25"}}
        elif "投资建议" in prompt and "不建议买入" in prompt:
            return {"type": "final_answer", "content": "综合分析,苹果公司股票(AAPL)当前市盈率过高,不建议买入。"}
        elif "投资建议" in prompt: # 默认的正确路径
            return {"type": "final_answer", "content": "综合分析,苹果公司股票(AAPL)具有投资潜力,建议买入。"}
        else:
            return {"type": "thought", "content": f"基于当前信息,我将继续思考下一步:{prompt[:50]}..."}

class MockTool:
    """模拟一个工具。"""
    def __init__(self, name: str):
        self.name = name

    def run(self, **kwargs) -> Any:
        print(f"--- Tool '{self.name}' called with args: {kwargs} ---")
        if self.name == "search_tool":
            query = kwargs.get("query", "")
            if "AAPL current stock price" in query:
                return {"result": "The current stock price of Apple (AAPL) is 175.25 USD."}
            return {"result": f"Search results for '{query}': No specific data found."}
        elif self.name == "analysis_tool":
            symbol = kwargs.get("symbol")
            price = kwargs.get("price")
            if symbol == "AAPL" and price == "175.25":
                return {"result": "AAPL analysis: P/E Ratio is 30x, historical growth strong, but current valuation high."}
            return {"result": f"Analysis for {symbol} at {price}: Insufficient data."}
        else:
            return {"result": f"Unknown tool '{self.name}'."}

# 将执行逻辑封装到 AgentGraph 中
class AgentGraphWithExecution(AgentGraph):
    def __init__(self, llm: MockLLM, tools: Dict[str, MockTool]):
        super().__init__()
        self.llm = llm
        self.tools = tools
        self.scratchpad: List[str] = [] # 记录Agent的思考过程,用于LLM上下文

    def _build_llm_prompt(self, current_node: AgentNode) -> str:
        """根据当前图的状态和执行路径构建LLM的Prompt。"""
        prompt_parts = [
            "你是一个专业的AI Agent,你的任务是完成用户指令。",
            "请根据以下信息,思考下一步应该做什么(Thought),或者决定调用哪个工具(Action),或者给出最终答案(FinalAnswer)。",
            "你的输出必须是JSON格式,包含'type'('thought', 'action', 'final_answer')和对应的内容。",
            "如果type是'action',则需要包含'tool_name'和'tool_args'。",
            "如果type是'final_answer',则需要包含'content'。",
            "n--- 历史思考和行动 ---"
        ]

        # 遍历所有已完成或已修改的节点,构建历史上下文
        for node_id in self.topological_sort(): # 按照拓扑顺序提供上下文
            node = self.nodes[node_id]
            if node.status in [NodeStatus.COMPLETED, NodeStatus.MODIFIED]:
                if node.node_type == NodeType.USER_PROMPT:
                    prompt_parts.append(f"用户指令: {node.output_data}")
                elif node.node_type == NodeType.THOUGHT:
                    prompt_parts.append(f"Thought: {node.output_data}")
                elif node.node_type == NodeType.ACTION:
                    prompt_parts.append(f"Action: Tool '{node.tool_name}' with args {node.tool_args}")
                elif node.node_type == NodeType.OBSERVATION:
                    prompt_parts.append(f"Observation: {node.output_data}")
                elif node.node_type == NodeType.USER_INTERVENTION:
                    prompt_parts.append(f"User Intervention: {node.output_data}")
            elif node.status == NodeStatus.FAILED:
                prompt_parts.append(f"Failed Node: {node.description} (Input: {node.input_data})")

        prompt_parts.append(f"n--- 当前任务 ---")
        prompt_parts.append(f"当前节点期望输入: {current_node.input_data}")
        prompt_parts.append("请给出你的下一步:")

        return "n".join(prompt_parts)

    def _execute_node(self, node_id: str):
        """执行图中一个节点。"""
        node = self.nodes[node_id]
        if node.status != NodeStatus.PENDING:
            print(f"Skipping node {node.node_id[:8]}... (Status: {node.status.value})")
            return

        node.status = NodeStatus.EXECUTING
        self.current_execution_path.append(node_id)
        print(f"n--- Executing Node: {node.node_type.value} - {node.node_id[:8]}... ---")

        try:
            if node.node_type == NodeType.USER_PROMPT:
                # 用户提示节点通常是起点,其输出就是提示本身
                node.output_data = node.input_data
                node.status = NodeStatus.COMPLETED
            elif node.node_type == NodeType.THOUGHT:
                llm_prompt = self._build_llm_prompt(node)
                llm_response = self.llm.generate_response(llm_prompt)
                node.output_data = llm_response.get("content", "No specific thought generated.")
                node.status = NodeStatus.COMPLETED
                # 如果LLM在Thought中直接生成了Action或FinalAnswer,需要特殊处理
                if llm_response.get("type") == "action":
                    # 动态创建并链接Action节点
                    action_node = AgentNode(
                        node_type=NodeType.ACTION,
                        input_data=node.output_data, # Thought的输出作为Action的输入
                        description="Action from LLM in Thought step",
                        tool_name=llm_response["tool_name"],
                        tool_args=llm_response["tool_args"]
                    )
                    self.add_node(action_node)
                    self.add_edge(node.node_id, action_node.node_id)
                    # 标记当前Thought节点为完成,并触发Action节点执行
                    # 实际中可能需要调整图的执行顺序,确保新生成的节点能被执行
                elif llm_response.get("type") == "final_answer":
                    final_node = AgentNode(
                        node_type=NodeType.FINAL_ANSWER,
                        input_data=node.output_data,
                        output_data=llm_response["content"],
                        status=NodeStatus.COMPLETED,
                        description="Final Answer from LLM in Thought step"
                    )
                    self.add_node(final_node)
                    self.add_edge(node.node_id, final_node.node_id)
                    # 终止执行
                    self.nodes[final_node.node_id] = final_node
                    self.nodes[node.node_id] = node # 确保node状态更新
                    return # 提前返回,因为已得到最终答案
            elif node.node_type == NodeType.ACTION:
                tool = self.tools.get(node.tool_name)
                if not tool:
                    raise ValueError(f"Tool '{node.tool_name}' not found.")

                # 确保输入数据是dict或者可以转换为dict
                tool_input = node.tool_args if isinstance(node.tool_args, dict) else {}
                if not tool_input and node.input_data and isinstance(node.input_data, dict):
                    tool_input = node.input_data # 如果没有tool_args,尝试用input_data作为参数

                tool_output = tool.run(**tool_input)
                node.output_data = tool_output
                node.status = NodeStatus.COMPLETED

                # 动态创建并链接Observation节点
                obs_node = AgentNode(
                    node_type=NodeType.OBSERVATION,
                    input_data=node.output_data,
                    output_data=node.output_data,
                    status=NodeStatus.COMPLETED,
                    description=f"Observation from tool '{node.tool_name}'"
                )
                self.add_node(obs_node)
                self.add_edge(node.node_id, obs_node.node_id)

            elif node.node_type == NodeType.OBSERVATION:
                # Observation节点通常是工具调用的结果,其输出已经预设
                # 如果需要LLM基于Observation生成新的Thought,则需要在这里触发
                node.status = NodeStatus.COMPLETED

                # 在Observation之后,通常会生成一个新的Thought节点
                # 动态创建并链接Thought节点
                next_thought_node = AgentNode(
                    node_type=NodeType.THOUGHT,
                    input_data=node.output_data, # Observation的输出作为Thought的输入
                    description="Thinking after observation"
                )
                self.add_node(next_thought_node)
                self.add_edge(node.node_id, next_thought_node.node_id)

            elif node.node_type == NodeType.FINAL_ANSWER:
                node.status = NodeStatus.COMPLETED
                # 最终答案节点,标记任务完成

            elif node.node_type == NodeType.USER_INTERVENTION:
                # 用户干预节点,其输出就是用户提供的修改内容
                node.output_data = node.input_data
                node.status = NodeStatus.COMPLETED

        except Exception as e:
            node.status = NodeStatus.FAILED
            node.output_data = {"error": str(e)}
            print(f"Node {node.node_id[:8]}... FAILED: {e}")

    def execute(self, start_node_id: Optional[str] = None):
        """从指定节点开始执行图。如果没有指定,则从第一个就绪节点开始。"""
        if not self.nodes:
            print("Graph is empty, nothing to execute.")
            return

        if start_node_id and start_node_id not in self.nodes:
            raise ValueError(f"Start node {start_node_id} not found in graph.")

        # 如果没有指定起始节点,从拓扑排序的第一个未完成节点开始
        if not start_node_id:
            all_sorted_nodes = self.topological_sort()
            for node_id in all_sorted_nodes:
                if self.nodes[node_id].status not in [NodeStatus.COMPLETED, NodeStatus.MODIFIED, NodeStatus.SKIPPED]:
                    start_node_id = node_id
                    break
            if not start_node_id:
                print("All nodes already completed or skipped. Nothing to execute.")
                return

        # 执行循环
        current_node_id = start_node_id
        while current_node_id:
            node = self.nodes[current_node_id]
            if node.status in [NodeStatus.COMPLETED, NodeStatus.MODIFIED, NodeStatus.SKIPPED, NodeStatus.FAILED]:
                # 如果当前节点已经完成、修改、跳过或失败,尝试寻找下一个可执行节点
                next_ready_nodes = self.get_ready_nodes()
                if next_ready_nodes:
                    # 找一个在拓扑序上靠前的作为下一个
                    sorted_ready_ids = [n.node_id for n in next_ready_nodes]
                    sorted_graph_ids = self.topological_sort()
                    next_node_id = None
                    for _id in sorted_graph_ids:
                        if _id in sorted_ready_ids:
                            next_node_id = _id
                            break
                    current_node_id = next_node_id
                else:
                    current_node_id = None # 没有更多就绪节点
                continue

            self._execute_node(current_node_id)

            if any(n.node_type == NodeType.FINAL_ANSWER and n.status == NodeStatus.COMPLETED for n in self.nodes.values()):
                print("n--- Final Answer Reached. Stopping Execution. ---")
                break # 任务完成

            # 找到下一个就绪节点
            next_ready_nodes = self.get_ready_nodes()
            if next_ready_nodes:
                # 寻找在拓扑排序中最早的就绪节点作为下一个执行点
                sorted_ready_ids = [n.node_id for n in next_ready_nodes]
                sorted_graph_ids = self.topological_sort()
                next_node_id = None
                for _id in sorted_graph_ids:
                    if _id in sorted_ready_ids:
                        next_node_id = _id
                        break
                current_node_id = next_node_id
            else:
                current_node_id = None # 没有更多就绪节点

    def invalidate_downstream_nodes(self, node_id: str):
        """
        递归地将指定节点及其所有下游节点标记为PENDING,并清除其输出。
        这模拟了用户修改对后续推理的影响。
        """
        if node_id not in self.nodes:
            return

        q = [node_id]
        visited = {node_id}

        nodes_to_invalidate = []
        while q:
            current_id = q.pop(0)
            nodes_to_invalidate.append(current_id)

            for successor_id in self.nodes[current_id].successors:
                if successor_id not in visited:
                    visited.add(successor_id)
                    q.append(successor_id)

        # 按照拓扑逆序处理,确保依赖关系正确
        # 实际操作中,直接将所有受影响的设置为 PENDING 即可,不需要复杂的逆序,
        # 因为后续的execute会从 PENDING 节点开始按拓扑顺序执行
        for n_id in nodes_to_invalidate:
            node = self.nodes[n_id]
            if n_id == node_id: # 被修改的节点,状态设为 MODIFIED
                node.status = NodeStatus.MODIFIED
            else: # 下游节点,状态设为 PENDING
                node.status = NodeStatus.PENDING
            node.output_data = None # 清除旧输出
            node.timestamp = datetime.now()
            print(f"Invalidated/Cleared node: {node.node_id[:8]}... (Type: {node.node_type.value}, New Status: {node.status.value})")

    def apply_user_modification(self, node_id: str, new_output_data: Any):
        """
        用户修改指定节点的内容。
        这将触发下游节点的失效和重新执行。
        """
        if node_id not in self.nodes:
            raise ValueError(f"Node {node_id} not found for modification.")

        node = self.nodes[node_id]
        print(f"n--- User modifying node {node.node_id[:8]}... (Type: {node.node_type.value}) ---")
        print(f"Old output: {node.output_data}")
        print(f"New output: {new_output_data}")

        # 直接更新被修改节点的输出
        node.output_data = new_output_data
        node.status = NodeStatus.MODIFIED # 标记为已修改
        node.timestamp = datetime.now()

        # 使所有下游节点失效
        self.invalidate_downstream_nodes(node_id)
        print(f"--- User modification applied to node {node.node_id[:8]}... ---")

    def get_final_answer(self) -> Optional[Any]:
        """获取最终答案节点。"""
        for node in self.nodes.values():
            if node.node_type == NodeType.FINAL_ANSWER and node.status == NodeStatus.COMPLETED:
                return node.output_data
        return None

4.3 完整示例流程

我们来模拟一个Agent查询股票并给出投资建议的场景。

场景: Agent被要求分析AAPL股票。它首先查找价格,然后进行分析,最后给出建议。

第一次运行(Agent犯错): Agent可能会错误地认为AAPL值得买入。

# 初始化LLM和工具
mock_llm = MockLLM()
mock_tools = {
    "search_tool": MockTool("search_tool"),
    "analysis_tool": MockTool("analysis_tool")
}

# 创建Agent图实例
agent_graph = AgentGraphWithExecution(llm=mock_llm, tools=mock_tools)

# 定义初始用户Prompt节点
initial_prompt = "分析AAPL股票,给出投资建议。"
prompt_node = AgentNode(node_type=NodeType.USER_PROMPT, input_data=initial_prompt, description="Initial user prompt")
agent_graph.add_node(prompt_node)

# 创建第一个Thought节点,通常会从这里开始LLM的推理
first_thought_node = AgentNode(node_type=NodeType.THOUGHT, input_data=initial_prompt, description="Initial planning thought")
agent_graph.add_node(first_thought_node)
agent_graph.add_edge(prompt_node.node_id, first_thought_node.node_id)

print("--- Initial Agent Execution ---")
agent_graph.execute(start_node_id=first_thought_node.node_id) # 从第一个Thought节点开始执行

print("n--- Agent's First Attempt Result ---")
for node_id, node in agent_graph.nodes.items():
    print(f"Node {node.node_id[:8]}... [{node.node_type.value}] - Status: {node.status.value} - Output: {str(node.output_data)[:50]}")

final_answer = agent_graph.get_final_answer()
print(f"nAgent's initial final answer: {final_answer}")

# 假设Agent的最终答案是“建议买入”
# 但用户发现Agent的分析有误,AAPL的市盈率已经过高,不建议买入。
# 用户决定干预,修改一个中间的Thought节点或Observation节点。

用户干预与续航: 用户发现Agent在某个ObservationThought节点后的决策有误。

假设在第一次执行中,Agent通过search_tool获得了价格,并生成了一个Observation节点,然后基于此生成了Thought,最终得出“建议买入”。现在用户要修改这个“建议买入”的判断。为了简化,我们假设用户决定修改导致最终FinalAnswer的那个Thought节点。

我们需要找到对应的Thought节点ID。

# 找到一个关键的Thought节点,例如负责最终判断的Thought
target_thought_node_id: Optional[str] = None
for node_id, node in agent_graph.nodes.items():
    if node.node_type == NodeType.THOUGHT and "投资价值" in str(node.output_data):
        target_thought_node_id = node_id
        break

if target_thought_node_id:
    print(f"n--- User Intervention: Modifying Thought node {target_thought_node_id[:8]}... ---")

    # 假设用户干预:Agent在分析完股价后,认为应该买入,但用户觉得错了。
    # 用户直接修改了这个“思考”的结果,强制Agent认为“市盈率过高,不建议买入”
    user_corrected_thought = "我已经获取了股票价格和分析结果。但我发现当前市盈率过高,因此不建议买入。"

    agent_graph.apply_user_modification(target_thought_node_id, user_corrected_thought)

    print("n--- Agent Resuming Execution after User Intervention ---")
    # 从被修改的节点(或其紧邻的下游PENDING节点)开始重新执行
    agent_graph.execute(start_node_id=target_thought_node_id) 

    print("n--- Agent's Final Result After Intervention ---")
    for node_id, node in agent_graph.nodes.items():
        print(f"Node {node.node_id[:8]}... [{node.node_type.value}] - Status: {node.status.value} - Output: {str(node.output_data)[:50]}")

    final_answer_after_intervention = agent_graph.get_final_answer()
    print(f"nAgent's final answer after user intervention: {final_answer_after_intervention}")
else:
    print("nCould not find a suitable Thought node to intervene. Please check the initial execution flow.")

在这个续航执行中:

  1. apply_user_modification会更新目标Thought节点的output_datastatus
  2. 它会递归地遍历所有依赖于这个Thought节点的下游节点,将它们的status重置为PENDING并清除output_data
  3. execute方法被再次调用,并被告知从被修改的Thought节点开始。
  4. LLM在生成后续ActionFinalAnswer时,将能够从_build_llm_prompt中获取到用户修改后的Thought作为上下文。因此,它会基于“市盈率过高,不建议买入”这个新的思考,生成一个符合用户意图的最终答案。

第五章:高级考量与未来展望

5.1 幂等性(Idempotency)与副作用

当Agent从断点处恢复执行时,一个关键问题是:如果被失效的下游节点中包含已经执行过的具有副作用的工具调用,我们该如何处理?例如,如果Agent已经发送了一封邮件,或者更新了一个数据库记录。

  • 解决方案:
    • 工具设计: 尽可能设计幂等的工具,即多次调用产生相同的结果,且不会有额外副作用。
    • 状态检查: 在重新执行工具前,检查外部系统的状态,判断是否需要再次执行。
    • 用户确认: 对于高风险操作,在重新执行前寻求用户确认。
    • 事务管理: 对于一系列操作,采用事务机制,在出现问题时可以回滚。

5.2 复杂图结构与动态重构

我们当前的图是一个简单的链式结构。在更复杂的Agent中,图可能包含并行分支、条件分支甚至循环(虽然Agent推理图通常是DAG)。

  • 并行分支: 如果修改的节点只影响一个分支,其他已完成的分支可以保持不变。
  • 条件分支: 用户修改可能会改变决策条件,导致Agent选择不同的分支。这需要更智能的图遍历和分支管理逻辑。
  • 动态图重构: 在某些情况下,用户修改可能非常根本,以至于Agent需要完全重新规划其后续步骤,甚至动态地添加或删除节点和边。这通常涉及将用户指令重新输入LLM,让其从修改点开始生成一个全新的子图。

5.3 持久化与版本控制

为了支持断点续航,Agent的状态必须能够被持久化。这意味着计算图、节点状态、LLM交互历史等都需要存储。

  • 数据库/文件系统:AgentGraph对象序列化为JSON或Protocol Buffers,存储到数据库或文件系统中。
  • 版本控制: 每次用户修改或Agent重要步骤完成后,保存一个“快照”,形成历史版本,方便追溯和比较。

5.4 安全性与权限

用户干预能力强大,但也带来了安全风险。

  • 权限管理: 确保只有授权用户才能修改Agent的中间思考。
  • 审计日志: 记录所有用户干预行为,以便审计和追溯。
  • 输入验证: 对用户提供的修改内容进行严格验证,防止注入恶意指令。

结语

“Resume with Action”机制是构建健壮、可控AI Agent的关键一环。通过将Agent的思考抽象为计算图,并结合精细的状态管理和智能的图遍历算法,我们能够有效地应对用户干预带来的挑战。这不仅提升了Agent的实用性和可靠性,也开启了人机协作在复杂任务处理中的新范式,使得AI Agent能够更好地服务于人类,共同解决现实世界的难题。随着AI技术的不断演进,对这种灵活、可干预的Agent架构的需求将愈发强烈,它将是未来智能系统不可或缺的能力。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注