各位技术同仁,大家好!
今天我们深入探讨一个在构建高级AI Agent时至关重要且极具挑战性的话题——当用户修改了 Agent 的中间思考后,图如何从断点处继续推理? 这不仅仅是技术实现的问题,更是人机协作范式下,如何赋予AI Agent韧性与可控性的核心议题。我们将从Agent的思考范式、计算图的构建、状态管理、以及具体的代码实现策略等多个维度,系统性地剖析这一复杂机制。
引言:AI Agent 的崛起与人机协作的必然性
近年来,以大型语言模型(LLM)为核心的AI Agent展现出惊人的能力,它们不再局限于单一任务,而是能够通过规划、工具使用、记忆和反思,自主完成一系列复杂任务。然而,这些Agent的推理过程,特别是涉及多步骤、多工具调用的链式思考,并非总是完美无瑕。它们可能陷入逻辑循环、选择错误的工具、生成不准确的事实,或仅仅是未能理解用户意图的细微差别。
在这种情况下,人机协作(Human-in-the-Loop, HITL) 变得不可或缺。用户介入Agent的中间思考,纠正其规划、修改其假设、甚至直接提供下一步的行动指令,能够显著提高Agent的可靠性、效率和安全性。然而,这种介入带来了一个核心技术挑战:Agent的推理过程是一个有向无环图(DAG)或更广义的计算图,当图中的某个节点被修改时,如何有效地、逻辑严谨地从这个“断点”处继续推理,而不是简单地从头开始? 这正是我们今天讲座的焦点——“Resume with Action”。
第一章:AI Agent 的思考范式与计算图的抽象
要理解如何从断点处恢复,我们首先需要理解Agent的思考是如何被结构化的。
1.1 Agent 的核心组件与思考流程
一个典型的AI Agent通常包含以下核心组件:
- 大型语言模型(LLM): 作为Agent的大脑,负责理解指令、生成思考、规划行动。
- 规划器(Planner): 根据LLM的推理,将复杂任务分解为一系列子任务或步骤。
- 工具集(Tools): 供Agent调用的外部功能,如搜索引擎、代码解释器、API接口等。
- 记忆(Memory): 存储历史交互、思考过程和结果,用于上下文维持和学习。
- 反思器(Reflector): 对已完成的步骤或结果进行评估,识别错误并进行修正。
其思考流程通常遵循“观察-思考-行动-观察”的循环:
- 观察 (Observe): Agent接收用户指令,或从环境中获取信息。
- 思考 (Think): LLM根据观察到的信息,结合记忆和工具集,生成一个思考(Thought)和下一步的行动计划(Action)。这通常表现为内部的思维链(Chain of Thought, CoT)或ReAct模式(Reasoning and Acting)。
- 行动 (Act): Agent执行计划中的行动,这通常是调用某个工具。
- 观察 (Observe): Agent接收工具的执行结果(Observation),并将其作为新的输入,再次进入“思考”阶段,直到任务完成。
1.2 计算图:Agent 思考的结构化表示
我们可以将Agent的整个推理过程抽象为一个有向无环图(DAG)。图中的每一个节点代表一个离散的思考步骤、一个工具调用、一个观察结果,或者一个最终答案。边则表示这些步骤之间的逻辑依赖或数据流。
表 1: 计算图中的节点类型示例
| 节点类型 | 描述 | 示例输出 |
|---|---|---|
Thought |
LLM产生的内部思考,用于规划或推理。 | "我需要先查找股票的当前价格,然后分析其财务报告。" |
Action |
LLM决定执行的某个工具调用,包含工具名和参数。 | {"tool": "search_stock_price", "input": {"symbol": "AAPL"}} |
Observation |
工具执行后返回的结果。 | {"AAPL_price": 175.25} |
FinalAnswer |
Agent得出的最终结论或结果。 | "根据最新数据,苹果公司股票(AAPL)当前价格为175.25美元,建议买入。" |
UserIntervention |
用户提供的修改或指令。 | "Agent,你的分析有误,AAPL的市盈率已经过高,不建议买入。" |
图的结构示例:
用户指令 -> Thought_1 -> Action_1 (SearchTool) -> Observation_1 -> Thought_2 -> Action_2 (AnalysisTool) -> Observation_2 -> FinalAnswer
在这个图中,每个节点都有其输入(来自前驱节点或用户指令),并产生输出(供后继节点使用)。
Python 代码示例:抽象计算图的节点和边
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Any, Optional, List, Callable, Union
class NodeType(str, Enum):
"""定义计算图中的节点类型。"""
USER_PROMPT = "user_prompt"
THOUGHT = "thought"
ACTION = "action"
OBSERVATION = "observation"
FINAL_ANSWER = "final_answer"
USER_INTERVENTION = "user_intervention" # 用户干预节点
class NodeStatus(str, Enum):
"""定义节点在执行过程中的状态。"""
PENDING = "pending" # 等待执行
EXECUTING = "executing" # 正在执行
COMPLETED = "completed" # 执行完成
FAILED = "failed" # 执行失败
SKIPPED = "skipped" # 跳过执行(例如,被下游修改影响)
MODIFIED = "modified" # 被用户修改,需要重新评估下游
class AgentNode:
"""计算图中的一个节点,代表Agent思考或行动的一个步骤。"""
def __init__(self,
node_id: Optional[str] = None,
node_type: NodeType = NodeType.THOUGHT,
input_data: Any = None,
output_data: Any = None,
status: NodeStatus = NodeStatus.PENDING,
timestamp: Optional[datetime] = None,
description: str = "",
# 对于ACTION节点,可能需要存储工具信息
tool_name: Optional[str] = None,
tool_args: Optional[Dict[str, Any]] = None):
self.node_id = node_id if node_id else str(uuid.uuid4())
self.node_type = node_type
self.input_data = input_data
self.output_data = output_data
self.status = status
self.timestamp = timestamp if timestamp else datetime.now()
self.description = description
self.tool_name = tool_name
self.tool_args = tool_args
self.predecessors: List[str] = [] # 前驱节点ID
self.successors: List[str] = [] # 后继节点ID
def to_dict(self) -> Dict[str, Any]:
"""将节点转换为字典,便于存储和序列化。"""
return {
"node_id": self.node_id,
"node_type": self.node_type.value,
"input_data": self.input_data,
"output_data": self.output_data,
"status": self.status.value,
"timestamp": self.timestamp.isoformat(),
"description": self.description,
"tool_name": self.tool_name,
"tool_args": self.tool_args,
"predecessors": self.predecessors,
"successors": self.successors,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'AgentNode':
"""从字典创建节点实例。"""
node = cls(
node_id=data["node_id"],
node_type=NodeType(data["node_type"]),
input_data=data["input_data"],
output_data=data["output_data"],
status=NodeStatus(data["status"]),
timestamp=datetime.fromisoformat(data["timestamp"]),
description=data.get("description", ""),
tool_name=data.get("tool_name"),
tool_args=data.get("tool_args")
)
node.predecessors = data.get("predecessors", [])
node.successors = data.get("successors", [])
return node
def __repr__(self):
return (f"AgentNode(id='{self.node_id[:8]}...', type={self.node_type.value}, "
f"status={self.status.value}, desc='{self.description[:30]}...')")
class AgentGraph:
"""表示Agent的整个推理过程的计算图。"""
def __init__(self):
self.nodes: Dict[str, AgentNode] = {}
self.current_execution_path: List[str] = [] # 记录当前执行路径,用于回溯
def add_node(self, node: AgentNode):
"""向图中添加一个节点。"""
if node.node_id in self.nodes:
raise ValueError(f"Node with ID {node.node_id} already exists.")
self.nodes[node.node_id] = node
def add_edge(self, predecessor_id: str, successor_id: str):
"""在两个节点之间添加一条边,表示逻辑依赖。"""
if predecessor_id not in self.nodes:
raise ValueError(f"Predecessor node {predecessor_id} not found.")
if successor_id not in self.nodes:
raise ValueError(f"Successor node {successor_id} not found.")
if successor_id not in self.nodes[predecessor_id].successors:
self.nodes[predecessor_id].successors.append(successor_id)
if predecessor_id not in self.nodes[successor_id].predecessors:
self.nodes[successor_id].predecessors.append(predecessor_id)
def get_node(self, node_id: str) -> Optional[AgentNode]:
"""根据ID获取节点。"""
return self.nodes.get(node_id)
def get_ready_nodes(self) -> List[AgentNode]:
"""获取所有已就绪(PENDING且所有前驱已完成)的节点。"""
ready_nodes = []
for node_id, node in self.nodes.items():
if node.status == NodeStatus.PENDING:
all_predecessors_completed = True
for pred_id in node.predecessors:
if self.nodes[pred_id].status not in [NodeStatus.COMPLETED, NodeStatus.MODIFIED, NodeStatus.SKIPPED]:
all_predecessors_completed = False
break
if all_predecessors_completed:
ready_nodes.append(node)
return ready_nodes
def topological_sort(self) -> List[str]:
"""
对图进行拓扑排序,返回节点的执行顺序。
只考虑未完成的节点,但会包含已完成的节点作为依赖。
"""
in_degree = {node_id: len(node.predecessors) for node_id, node in self.nodes.items()}
queue = [node_id for node_id, degree in in_degree.items() if degree == 0]
sorted_nodes = []
while queue:
node_id = queue.pop(0)
sorted_nodes.append(node_id)
for successor_id in self.nodes[node_id].successors:
in_degree[successor_id] -= 1
if in_degree[successor_id] == 0:
queue.append(successor_id)
if len(sorted_nodes) != len(self.nodes):
# 这表示图中存在循环,对于Agent的推理图来说通常是不期望的
# 或者意味着我们只排序了部分可达节点
pass # 实际应用中可能需要更严格的循环检测
return sorted_nodes
def to_dict(self) -> Dict[str, Any]:
"""将整个图转换为字典,便于序列化。"""
return {
"nodes": {node_id: node.to_dict() for node_id, node in self.nodes.items()},
"current_execution_path": self.current_execution_path,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'AgentGraph':
"""从字典创建图实例。"""
graph = cls()
for node_id, node_data in data["nodes"].items():
graph.add_node(AgentNode.from_dict(node_data))
# 重新建立边(因为节点创建时没有处理前驱后继列表)
for node_id, node_data in data["nodes"].items():
node = graph.nodes[node_id]
for pred_id in node_data.get("predecessors", []):
if pred_id not in node.predecessors: # 避免重复添加
node.predecessors.append(pred_id)
for succ_id in node_data.get("successors", []):
if succ_id not in node.successors: # 避免重复添加
node.successors.append(succ_id)
graph.current_execution_path = data.get("current_execution_path", [])
return graph
def __repr__(self):
return f"AgentGraph(nodes={len(self.nodes)}, ready={len(self.get_ready_nodes())})"
第二章:人机协作的必要性与“断点”的引入
AI Agent的自动推理能力强大,但并非万能。在复杂、高风险或需要精确控制的场景中,人类的介入是必不可少的。
2.1 用户介入的动机
- 纠正错误: Agent可能产生错误的思考、错误的工具参数或不正确的结论。
- 引导方向: 当Agent偏离用户意图或陷入僵局时,用户可以提供明确的指导。
- 注入新信息: 用户可能拥有Agent无法通过工具获取的私有或最新信息。
- 优化效率: 用户可以识别冗余步骤或提供更直接的解决方案。
- 确保安全与合规: 在敏感任务中,人类可以作为最终的守门员,防止Agent产生有害或不合规的行为。
2.2 定义“断点”与用户修改
当用户介入时,实际上是在Agent的计算图中设置了一个“断点”。这个断点通常对应于图中的某个特定节点,用户希望修改该节点的输出,或者直接改变Agent后续的推理路径。
用户修改的两种主要形式:
- 修改已有节点的内容: 用户指出某个已完成的
Thought或Action节点存在问题,并提供新的output_data。例如,Agent错误地规划了“查询历史股价”,用户将其修改为“查询最新财报”。 - 插入新节点: 用户可能希望在某个现有节点之后强制插入一个新的
Thought或Action,从而改变原有的执行流。例如,在Agent执行某个搜索操作后,用户要求它立即执行一个特定的计算,而不是等待LLM生成下一个思考。
无论是哪种形式,核心挑战都是如何将用户的修改无缝地整合回计算图,并让Agent从新的起点或修正后的状态继续其推理。
第三章:核心机制:状态管理与上下文重构
为了能够从断点处继续推理,Agent系统必须具备强大的状态管理能力,能够精确地记录和恢复其执行上下文。
3.1 Agent 状态的捕获
在Agent执行的每一步,我们都需要捕获足够的信息,以便在需要时进行回溯或恢复。这包括:
- 计算图的完整结构: 所有的
AgentNode及其predecessors和successors。 - 每个节点的状态:
status(PENDING, COMPLETED, FAILED, MODIFIED等),input_data,output_data,timestamp。 - Agent的全局上下文: 例如,共享的记忆(Memory)内容,已经调用的工具列表等。
- 当前执行路径: 在多分支或并行执行的场景中,记录Agent是如何到达当前节点的路径信息。
表 2: Agent 状态的关键组成部分
| 状态组件 | 描述 | 示例 |
|---|---|---|
graph |
AgentGraph 实例,包含所有节点及其连接。 |
{'nodes': {...}, 'edges': {...}} |
current_prompt |
驱动当前推理的用户初始指令。 | "分析AAPL股票并给出投资建议" |
memory_context |
Agent的记忆模块,包含历史对话、关键事实等。 | ["用户想了解AAPL股票。", "上次查询的AAPL价格是175.25。"] |
tool_registry |
Agent可用的工具列表及其定义。 | {'search_tool': <SearchTool obj>, 'analysis_tool': <AnalysisTool obj>} |
execution_log |
详细的执行日志,可能包含LLM的原始输出、工具的详细日志等。 | [{"node_id": "...", "event": "LLM_CALL_START", "prompt": "..."}, ...] |
3.2 上下文重构与“世界状态”
当用户修改一个节点时,我们不仅仅是改变了该节点本身,更重要的是改变了Agent对“世界状态”的理解。例如,如果Agent认为股票价格是100美元,而用户将其纠正为200美元,那么所有依赖于这个价格进行决策的后续步骤都可能变得无效。
上下文重构的核心思想:
- 加载历史状态: 从持久化存储中加载Agent在用户干预前的完整计算图和全局上下文。
- 应用用户修改: 将用户的修改应用到计算图的特定节点上,更新其
output_data和status。 - 识别受影响的下游节点: 找出所有直接或间接依赖于被修改节点的后续节点。这些节点需要被标记为“无效”或“待重新评估”。
- 重新规划/执行: Agent需要从被修改的节点或其紧邻的未完成(且现在已无效)的下游节点开始,重新进行推理和执行。
这个过程需要Agent框架能够:
- 遍历图: 有效地识别一个节点的所有下游依赖。
- 状态更新: 准确地更新节点和图的状态。
- LLM调用策略: 在重新执行时,如何向LLM提供正确的上下文,使其能够基于新的信息继续思考。
第四章:从断点处续航推理的策略与实现
这是本次讲座的核心技术部分。我们将聚焦于如何具体实现“从断点处继续推理”的逻辑。
4.1 核心挑战:下游节点的失效与重评估
当节点 N 的输出 output_N 被用户修改为 output_N' 后:
- 直接后继节点:
N的所有直接后继节点S1, S2, ...的输入都可能发生了变化。它们需要被重新评估。 - 间接后继节点:
S1, S2, ...的后继节点,以及更深层的下游节点,也可能受到影响。
因此,最关键的策略是:在用户修改某个节点后,递归地将该节点及其所有下游节点标记为“待重新执行”状态,并清除它们的旧输出。
4.2 续航推理的步骤与算法
假设用户修改了 node_id_to_modify 节点,并提供了 new_output_data。
-
定位并更新被修改节点:
- 获取
AgentGraph实例。 - 找到
node_id_to_modify对应的AgentNode。 - 更新其
output_data为new_output_data。 - 将其
status设置为NodeStatus.MODIFIED。
- 获取
-
失效下游节点:
- 从
node_id_to_modify开始,进行深度优先搜索(DFS)或广度优先搜索(BFS),遍历所有可达的下游节点。 - 对于每一个下游节点
D:- 将其
status设置为NodeStatus.PENDING(或INVALIDATED,如果需要区分)。 - 清除其
output_data。 - 更新
timestamp。
- 将其
- 从
-
重新规划起始点:
- 在失效所有下游节点后,Agent需要从某个点开始重新执行。这个点可能是被修改的节点本身(如果它不是最终答案,且需要LLM基于新输出进行下一步思考),或者是被修改节点下游的第一个
PENDING节点。 - 通常,我们希望Agent能够从被修改节点“之后”的逻辑起点重新开始。
- 在失效所有下游节点后,Agent需要从某个点开始重新执行。这个点可能是被修改的节点本身(如果它不是最终答案,且需要LLM基于新输出进行下一步思考),或者是被修改节点下游的第一个
-
执行图:
- 从新的起始点开始,按照拓扑顺序(或动态就绪队列)执行图中的
PENDING节点。 - 执行过程中,LLM将接收包含用户修改后上下文的Prompt。
- 从新的起始点开始,按照拓扑顺序(或动态就绪队列)执行图中的
Python 代码示例:实现用户修改与图的续航
首先,我们需要一个模拟LLM和工具的接口:
class MockLLM:
"""模拟一个LLM,根据上下文生成Thought或Action。"""
def __init__(self):
self.call_count = 0
def generate_response(self, prompt: str) -> Dict[str, Any]:
self.call_count += 1
print(f"n--- LLM Call {self.call_count} ---")
print(f"Prompt:n{prompt}n---")
# 模拟LLM根据Prompt生成Thought或Action
if "股票价格" in prompt and "分析" not在 prompt:
return {"type": "action", "tool_name": "search_tool", "tool_args": {"query": "AAPL current stock price"}}
elif "current stock price is" in prompt and "分析" in prompt:
return {"type": "thought", "content": "好的,我已经获取了股票价格。现在我需要使用分析工具来评估其投资价值。"}
elif "评估投资价值" in prompt:
return {"type": "action", "tool_name": "analysis_tool", "tool_args": {"symbol": "AAPL", "price": "175.25"}}
elif "投资建议" in prompt and "不建议买入" in prompt:
return {"type": "final_answer", "content": "综合分析,苹果公司股票(AAPL)当前市盈率过高,不建议买入。"}
elif "投资建议" in prompt: # 默认的正确路径
return {"type": "final_answer", "content": "综合分析,苹果公司股票(AAPL)具有投资潜力,建议买入。"}
else:
return {"type": "thought", "content": f"基于当前信息,我将继续思考下一步:{prompt[:50]}..."}
class MockTool:
"""模拟一个工具。"""
def __init__(self, name: str):
self.name = name
def run(self, **kwargs) -> Any:
print(f"--- Tool '{self.name}' called with args: {kwargs} ---")
if self.name == "search_tool":
query = kwargs.get("query", "")
if "AAPL current stock price" in query:
return {"result": "The current stock price of Apple (AAPL) is 175.25 USD."}
return {"result": f"Search results for '{query}': No specific data found."}
elif self.name == "analysis_tool":
symbol = kwargs.get("symbol")
price = kwargs.get("price")
if symbol == "AAPL" and price == "175.25":
return {"result": "AAPL analysis: P/E Ratio is 30x, historical growth strong, but current valuation high."}
return {"result": f"Analysis for {symbol} at {price}: Insufficient data."}
else:
return {"result": f"Unknown tool '{self.name}'."}
# 将执行逻辑封装到 AgentGraph 中
class AgentGraphWithExecution(AgentGraph):
def __init__(self, llm: MockLLM, tools: Dict[str, MockTool]):
super().__init__()
self.llm = llm
self.tools = tools
self.scratchpad: List[str] = [] # 记录Agent的思考过程,用于LLM上下文
def _build_llm_prompt(self, current_node: AgentNode) -> str:
"""根据当前图的状态和执行路径构建LLM的Prompt。"""
prompt_parts = [
"你是一个专业的AI Agent,你的任务是完成用户指令。",
"请根据以下信息,思考下一步应该做什么(Thought),或者决定调用哪个工具(Action),或者给出最终答案(FinalAnswer)。",
"你的输出必须是JSON格式,包含'type'('thought', 'action', 'final_answer')和对应的内容。",
"如果type是'action',则需要包含'tool_name'和'tool_args'。",
"如果type是'final_answer',则需要包含'content'。",
"n--- 历史思考和行动 ---"
]
# 遍历所有已完成或已修改的节点,构建历史上下文
for node_id in self.topological_sort(): # 按照拓扑顺序提供上下文
node = self.nodes[node_id]
if node.status in [NodeStatus.COMPLETED, NodeStatus.MODIFIED]:
if node.node_type == NodeType.USER_PROMPT:
prompt_parts.append(f"用户指令: {node.output_data}")
elif node.node_type == NodeType.THOUGHT:
prompt_parts.append(f"Thought: {node.output_data}")
elif node.node_type == NodeType.ACTION:
prompt_parts.append(f"Action: Tool '{node.tool_name}' with args {node.tool_args}")
elif node.node_type == NodeType.OBSERVATION:
prompt_parts.append(f"Observation: {node.output_data}")
elif node.node_type == NodeType.USER_INTERVENTION:
prompt_parts.append(f"User Intervention: {node.output_data}")
elif node.status == NodeStatus.FAILED:
prompt_parts.append(f"Failed Node: {node.description} (Input: {node.input_data})")
prompt_parts.append(f"n--- 当前任务 ---")
prompt_parts.append(f"当前节点期望输入: {current_node.input_data}")
prompt_parts.append("请给出你的下一步:")
return "n".join(prompt_parts)
def _execute_node(self, node_id: str):
"""执行图中一个节点。"""
node = self.nodes[node_id]
if node.status != NodeStatus.PENDING:
print(f"Skipping node {node.node_id[:8]}... (Status: {node.status.value})")
return
node.status = NodeStatus.EXECUTING
self.current_execution_path.append(node_id)
print(f"n--- Executing Node: {node.node_type.value} - {node.node_id[:8]}... ---")
try:
if node.node_type == NodeType.USER_PROMPT:
# 用户提示节点通常是起点,其输出就是提示本身
node.output_data = node.input_data
node.status = NodeStatus.COMPLETED
elif node.node_type == NodeType.THOUGHT:
llm_prompt = self._build_llm_prompt(node)
llm_response = self.llm.generate_response(llm_prompt)
node.output_data = llm_response.get("content", "No specific thought generated.")
node.status = NodeStatus.COMPLETED
# 如果LLM在Thought中直接生成了Action或FinalAnswer,需要特殊处理
if llm_response.get("type") == "action":
# 动态创建并链接Action节点
action_node = AgentNode(
node_type=NodeType.ACTION,
input_data=node.output_data, # Thought的输出作为Action的输入
description="Action from LLM in Thought step",
tool_name=llm_response["tool_name"],
tool_args=llm_response["tool_args"]
)
self.add_node(action_node)
self.add_edge(node.node_id, action_node.node_id)
# 标记当前Thought节点为完成,并触发Action节点执行
# 实际中可能需要调整图的执行顺序,确保新生成的节点能被执行
elif llm_response.get("type") == "final_answer":
final_node = AgentNode(
node_type=NodeType.FINAL_ANSWER,
input_data=node.output_data,
output_data=llm_response["content"],
status=NodeStatus.COMPLETED,
description="Final Answer from LLM in Thought step"
)
self.add_node(final_node)
self.add_edge(node.node_id, final_node.node_id)
# 终止执行
self.nodes[final_node.node_id] = final_node
self.nodes[node.node_id] = node # 确保node状态更新
return # 提前返回,因为已得到最终答案
elif node.node_type == NodeType.ACTION:
tool = self.tools.get(node.tool_name)
if not tool:
raise ValueError(f"Tool '{node.tool_name}' not found.")
# 确保输入数据是dict或者可以转换为dict
tool_input = node.tool_args if isinstance(node.tool_args, dict) else {}
if not tool_input and node.input_data and isinstance(node.input_data, dict):
tool_input = node.input_data # 如果没有tool_args,尝试用input_data作为参数
tool_output = tool.run(**tool_input)
node.output_data = tool_output
node.status = NodeStatus.COMPLETED
# 动态创建并链接Observation节点
obs_node = AgentNode(
node_type=NodeType.OBSERVATION,
input_data=node.output_data,
output_data=node.output_data,
status=NodeStatus.COMPLETED,
description=f"Observation from tool '{node.tool_name}'"
)
self.add_node(obs_node)
self.add_edge(node.node_id, obs_node.node_id)
elif node.node_type == NodeType.OBSERVATION:
# Observation节点通常是工具调用的结果,其输出已经预设
# 如果需要LLM基于Observation生成新的Thought,则需要在这里触发
node.status = NodeStatus.COMPLETED
# 在Observation之后,通常会生成一个新的Thought节点
# 动态创建并链接Thought节点
next_thought_node = AgentNode(
node_type=NodeType.THOUGHT,
input_data=node.output_data, # Observation的输出作为Thought的输入
description="Thinking after observation"
)
self.add_node(next_thought_node)
self.add_edge(node.node_id, next_thought_node.node_id)
elif node.node_type == NodeType.FINAL_ANSWER:
node.status = NodeStatus.COMPLETED
# 最终答案节点,标记任务完成
elif node.node_type == NodeType.USER_INTERVENTION:
# 用户干预节点,其输出就是用户提供的修改内容
node.output_data = node.input_data
node.status = NodeStatus.COMPLETED
except Exception as e:
node.status = NodeStatus.FAILED
node.output_data = {"error": str(e)}
print(f"Node {node.node_id[:8]}... FAILED: {e}")
def execute(self, start_node_id: Optional[str] = None):
"""从指定节点开始执行图。如果没有指定,则从第一个就绪节点开始。"""
if not self.nodes:
print("Graph is empty, nothing to execute.")
return
if start_node_id and start_node_id not in self.nodes:
raise ValueError(f"Start node {start_node_id} not found in graph.")
# 如果没有指定起始节点,从拓扑排序的第一个未完成节点开始
if not start_node_id:
all_sorted_nodes = self.topological_sort()
for node_id in all_sorted_nodes:
if self.nodes[node_id].status not in [NodeStatus.COMPLETED, NodeStatus.MODIFIED, NodeStatus.SKIPPED]:
start_node_id = node_id
break
if not start_node_id:
print("All nodes already completed or skipped. Nothing to execute.")
return
# 执行循环
current_node_id = start_node_id
while current_node_id:
node = self.nodes[current_node_id]
if node.status in [NodeStatus.COMPLETED, NodeStatus.MODIFIED, NodeStatus.SKIPPED, NodeStatus.FAILED]:
# 如果当前节点已经完成、修改、跳过或失败,尝试寻找下一个可执行节点
next_ready_nodes = self.get_ready_nodes()
if next_ready_nodes:
# 找一个在拓扑序上靠前的作为下一个
sorted_ready_ids = [n.node_id for n in next_ready_nodes]
sorted_graph_ids = self.topological_sort()
next_node_id = None
for _id in sorted_graph_ids:
if _id in sorted_ready_ids:
next_node_id = _id
break
current_node_id = next_node_id
else:
current_node_id = None # 没有更多就绪节点
continue
self._execute_node(current_node_id)
if any(n.node_type == NodeType.FINAL_ANSWER and n.status == NodeStatus.COMPLETED for n in self.nodes.values()):
print("n--- Final Answer Reached. Stopping Execution. ---")
break # 任务完成
# 找到下一个就绪节点
next_ready_nodes = self.get_ready_nodes()
if next_ready_nodes:
# 寻找在拓扑排序中最早的就绪节点作为下一个执行点
sorted_ready_ids = [n.node_id for n in next_ready_nodes]
sorted_graph_ids = self.topological_sort()
next_node_id = None
for _id in sorted_graph_ids:
if _id in sorted_ready_ids:
next_node_id = _id
break
current_node_id = next_node_id
else:
current_node_id = None # 没有更多就绪节点
def invalidate_downstream_nodes(self, node_id: str):
"""
递归地将指定节点及其所有下游节点标记为PENDING,并清除其输出。
这模拟了用户修改对后续推理的影响。
"""
if node_id not in self.nodes:
return
q = [node_id]
visited = {node_id}
nodes_to_invalidate = []
while q:
current_id = q.pop(0)
nodes_to_invalidate.append(current_id)
for successor_id in self.nodes[current_id].successors:
if successor_id not in visited:
visited.add(successor_id)
q.append(successor_id)
# 按照拓扑逆序处理,确保依赖关系正确
# 实际操作中,直接将所有受影响的设置为 PENDING 即可,不需要复杂的逆序,
# 因为后续的execute会从 PENDING 节点开始按拓扑顺序执行
for n_id in nodes_to_invalidate:
node = self.nodes[n_id]
if n_id == node_id: # 被修改的节点,状态设为 MODIFIED
node.status = NodeStatus.MODIFIED
else: # 下游节点,状态设为 PENDING
node.status = NodeStatus.PENDING
node.output_data = None # 清除旧输出
node.timestamp = datetime.now()
print(f"Invalidated/Cleared node: {node.node_id[:8]}... (Type: {node.node_type.value}, New Status: {node.status.value})")
def apply_user_modification(self, node_id: str, new_output_data: Any):
"""
用户修改指定节点的内容。
这将触发下游节点的失效和重新执行。
"""
if node_id not in self.nodes:
raise ValueError(f"Node {node_id} not found for modification.")
node = self.nodes[node_id]
print(f"n--- User modifying node {node.node_id[:8]}... (Type: {node.node_type.value}) ---")
print(f"Old output: {node.output_data}")
print(f"New output: {new_output_data}")
# 直接更新被修改节点的输出
node.output_data = new_output_data
node.status = NodeStatus.MODIFIED # 标记为已修改
node.timestamp = datetime.now()
# 使所有下游节点失效
self.invalidate_downstream_nodes(node_id)
print(f"--- User modification applied to node {node.node_id[:8]}... ---")
def get_final_answer(self) -> Optional[Any]:
"""获取最终答案节点。"""
for node in self.nodes.values():
if node.node_type == NodeType.FINAL_ANSWER and node.status == NodeStatus.COMPLETED:
return node.output_data
return None
4.3 完整示例流程
我们来模拟一个Agent查询股票并给出投资建议的场景。
场景: Agent被要求分析AAPL股票。它首先查找价格,然后进行分析,最后给出建议。
第一次运行(Agent犯错): Agent可能会错误地认为AAPL值得买入。
# 初始化LLM和工具
mock_llm = MockLLM()
mock_tools = {
"search_tool": MockTool("search_tool"),
"analysis_tool": MockTool("analysis_tool")
}
# 创建Agent图实例
agent_graph = AgentGraphWithExecution(llm=mock_llm, tools=mock_tools)
# 定义初始用户Prompt节点
initial_prompt = "分析AAPL股票,给出投资建议。"
prompt_node = AgentNode(node_type=NodeType.USER_PROMPT, input_data=initial_prompt, description="Initial user prompt")
agent_graph.add_node(prompt_node)
# 创建第一个Thought节点,通常会从这里开始LLM的推理
first_thought_node = AgentNode(node_type=NodeType.THOUGHT, input_data=initial_prompt, description="Initial planning thought")
agent_graph.add_node(first_thought_node)
agent_graph.add_edge(prompt_node.node_id, first_thought_node.node_id)
print("--- Initial Agent Execution ---")
agent_graph.execute(start_node_id=first_thought_node.node_id) # 从第一个Thought节点开始执行
print("n--- Agent's First Attempt Result ---")
for node_id, node in agent_graph.nodes.items():
print(f"Node {node.node_id[:8]}... [{node.node_type.value}] - Status: {node.status.value} - Output: {str(node.output_data)[:50]}")
final_answer = agent_graph.get_final_answer()
print(f"nAgent's initial final answer: {final_answer}")
# 假设Agent的最终答案是“建议买入”
# 但用户发现Agent的分析有误,AAPL的市盈率已经过高,不建议买入。
# 用户决定干预,修改一个中间的Thought节点或Observation节点。
用户干预与续航: 用户发现Agent在某个Observation或Thought节点后的决策有误。
假设在第一次执行中,Agent通过search_tool获得了价格,并生成了一个Observation节点,然后基于此生成了Thought,最终得出“建议买入”。现在用户要修改这个“建议买入”的判断。为了简化,我们假设用户决定修改导致最终FinalAnswer的那个Thought节点。
我们需要找到对应的Thought节点ID。
# 找到一个关键的Thought节点,例如负责最终判断的Thought
target_thought_node_id: Optional[str] = None
for node_id, node in agent_graph.nodes.items():
if node.node_type == NodeType.THOUGHT and "投资价值" in str(node.output_data):
target_thought_node_id = node_id
break
if target_thought_node_id:
print(f"n--- User Intervention: Modifying Thought node {target_thought_node_id[:8]}... ---")
# 假设用户干预:Agent在分析完股价后,认为应该买入,但用户觉得错了。
# 用户直接修改了这个“思考”的结果,强制Agent认为“市盈率过高,不建议买入”
user_corrected_thought = "我已经获取了股票价格和分析结果。但我发现当前市盈率过高,因此不建议买入。"
agent_graph.apply_user_modification(target_thought_node_id, user_corrected_thought)
print("n--- Agent Resuming Execution after User Intervention ---")
# 从被修改的节点(或其紧邻的下游PENDING节点)开始重新执行
agent_graph.execute(start_node_id=target_thought_node_id)
print("n--- Agent's Final Result After Intervention ---")
for node_id, node in agent_graph.nodes.items():
print(f"Node {node.node_id[:8]}... [{node.node_type.value}] - Status: {node.status.value} - Output: {str(node.output_data)[:50]}")
final_answer_after_intervention = agent_graph.get_final_answer()
print(f"nAgent's final answer after user intervention: {final_answer_after_intervention}")
else:
print("nCould not find a suitable Thought node to intervene. Please check the initial execution flow.")
在这个续航执行中:
apply_user_modification会更新目标Thought节点的output_data和status。- 它会递归地遍历所有依赖于这个
Thought节点的下游节点,将它们的status重置为PENDING并清除output_data。 execute方法被再次调用,并被告知从被修改的Thought节点开始。- LLM在生成后续
Action或FinalAnswer时,将能够从_build_llm_prompt中获取到用户修改后的Thought作为上下文。因此,它会基于“市盈率过高,不建议买入”这个新的思考,生成一个符合用户意图的最终答案。
第五章:高级考量与未来展望
5.1 幂等性(Idempotency)与副作用
当Agent从断点处恢复执行时,一个关键问题是:如果被失效的下游节点中包含已经执行过的具有副作用的工具调用,我们该如何处理?例如,如果Agent已经发送了一封邮件,或者更新了一个数据库记录。
- 解决方案:
- 工具设计: 尽可能设计幂等的工具,即多次调用产生相同的结果,且不会有额外副作用。
- 状态检查: 在重新执行工具前,检查外部系统的状态,判断是否需要再次执行。
- 用户确认: 对于高风险操作,在重新执行前寻求用户确认。
- 事务管理: 对于一系列操作,采用事务机制,在出现问题时可以回滚。
5.2 复杂图结构与动态重构
我们当前的图是一个简单的链式结构。在更复杂的Agent中,图可能包含并行分支、条件分支甚至循环(虽然Agent推理图通常是DAG)。
- 并行分支: 如果修改的节点只影响一个分支,其他已完成的分支可以保持不变。
- 条件分支: 用户修改可能会改变决策条件,导致Agent选择不同的分支。这需要更智能的图遍历和分支管理逻辑。
- 动态图重构: 在某些情况下,用户修改可能非常根本,以至于Agent需要完全重新规划其后续步骤,甚至动态地添加或删除节点和边。这通常涉及将用户指令重新输入LLM,让其从修改点开始生成一个全新的子图。
5.3 持久化与版本控制
为了支持断点续航,Agent的状态必须能够被持久化。这意味着计算图、节点状态、LLM交互历史等都需要存储。
- 数据库/文件系统: 将
AgentGraph对象序列化为JSON或Protocol Buffers,存储到数据库或文件系统中。 - 版本控制: 每次用户修改或Agent重要步骤完成后,保存一个“快照”,形成历史版本,方便追溯和比较。
5.4 安全性与权限
用户干预能力强大,但也带来了安全风险。
- 权限管理: 确保只有授权用户才能修改Agent的中间思考。
- 审计日志: 记录所有用户干预行为,以便审计和追溯。
- 输入验证: 对用户提供的修改内容进行严格验证,防止注入恶意指令。
结语
“Resume with Action”机制是构建健壮、可控AI Agent的关键一环。通过将Agent的思考抽象为计算图,并结合精细的状态管理和智能的图遍历算法,我们能够有效地应对用户干预带来的挑战。这不仅提升了Agent的实用性和可靠性,也开启了人机协作在复杂任务处理中的新范式,使得AI Agent能够更好地服务于人类,共同解决现实世界的难题。随着AI技术的不断演进,对这种灵活、可干预的Agent架构的需求将愈发强烈,它将是未来智能系统不可或缺的能力。