各位同学,各位开发者,大家好!
今天,我们齐聚一堂,共同探讨一个在人工智能领域日益重要且充满挑战的话题:如何在 Agent 执行过程中实现“Human-in-the-loop”的双向交互,特别是如何让用户能够实时修改 Agent 所调用工具的参数。
AI Agent,作为我们数字世界的智能助理,正变得越来越强大。它们能够理解复杂的指令,自主规划任务,并调用各种工具来达成目标。然而,一个纯粹自主的 Agent 往往缺乏透明度、可控性和对动态变化的适应性。想象一下,一个 Agent 在执行一个重要任务时,它选择了一个工具,并根据其内部逻辑设定了一组参数。如果这些参数不符合用户的预期,或者外部环境发生了变化,导致这些参数不再最优,但用户却无法干预,那么最终的结果可能并不理想,甚至会造成损失。
这就是“Human-in-the-loop”(HITL,人在环路)概念的核心价值所在。HITL 旨在将人类的判断力、创造力和常识引入到自动化流程中,形成一个闭环协作系统。而我们今天要深入探讨的,是 HITL 中最具挑战性但也最具潜力的一个方面:在 Agent 执行的当口,精确地、实时地介入并修改其工具的调用参数。 这不仅仅是简单的审批或事后修正,而是一种深度、实时的协作,它将 Agent 的自主性与人类的精细控制完美结合,解锁了 Agent 在复杂、动态环境中应用的巨大潜力。
我们将从 Agent 系统的基础架构讲起,分析现有 HITL 模式的局限,然后深入探讨实现实时参数修改的多种技术路径,并辅以详尽的代码示例。最后,我们将讨论相关的架构设计、最佳实践以及未来的挑战。
Agent 系统基础与挑战
在深入实时交互之前,让我们快速回顾一下一个典型 Agent 系统的基本构成。
一个 Agent 通常包含以下核心组件:
- 大型语言模型 (LLM) / 大脑: 负责理解指令、进行推理、规划任务、选择工具以及生成响应。它是 Agent 的决策中心。
- 记忆 (Memory): 存储短期对话历史、长期知识、任务上下文等,帮助 Agent 维持连贯性和学习。
- 工具 (Tools): 外部功能接口,可以是 API 调用、数据库查询、代码执行器、文件操作等。Agent 通过调用这些工具来与外部世界交互并执行具体操作。
- 规划与执行循环 (Planning & Execution Loop): Agent 根据 LLM 的推理,生成一个或多个步骤的计划,然后按序执行,并在每一步根据结果进行反思和调整。
工具调用机制的固有特性
Agent 调用工具的基本流程通常是:
- LLM 根据当前任务和可用工具的描述,决定调用哪个工具。
- LLM 生成调用该工具所需的参数。
- Agent 执行器接收到工具名称和参数,然后调用相应的工具函数。
- 工具执行完毕,返回结果给 Agent。
- Agent 将结果反馈给 LLM 进行下一步的推理。
这里的关键在于,工具参数通常是由 LLM 一次性生成并直接传递给工具的。 在传统的 Agent 设计中,一旦参数生成并开始执行,人类用户很难在工具真正运行之前对其进行干预和修改。如果用户发现参数不合适,通常只能等待工具执行完毕(可能已经造成了不希望的结果),然后通过新的指令尝试引导 Agent 重新执行,或者手动中断整个流程。这显然不是“实时”的干预。
现有 HITL 模式的局限
目前,Agent 领域已经发展出多种 HITL 模式,但它们大多未能满足我们对“实时修改工具参数”的需求:
- 审批模式 (Approval Loop): Agent 提出一个行动方案(例如,调用哪个工具,做什么),然后等待用户批准。如果用户不批准,Agent 则会重新思考。这种模式通常在较高层次进行干预,例如批准整个工具调用,但很少能深入到修改具体参数的层面,或者修改后 Agent 仍然需要重新生成新的参数。
- 修正模式 (Correction Loop): Agent 执行了一个操作,用户发现结果不满意,然后提供反馈,Agent 根据反馈进行修正。这是一种事后补救,无法阻止错误的发生。
- 引导模式 (Guidance Loop): 用户在任务开始前提供详细的指令和约束,引导 Agent 的行为。这是一种预先干预,无法应对执行过程中的突发情况或动态调整。
- 人机协作模式 (Collaborative Mode): Agent 扮演辅助角色,在某些步骤上主动寻求人类帮助,或者人类可以随时介入并接管某些步骤。这种模式更接近我们的目标,但仍然需要更精细的机制来精确地拦截和修改工具参数。
以上模式,虽然提升了 Agent 的安全性与可靠性,但在“实时、精细化修改工具参数”这一核心需求上,都显得力不从心。它们往往涉及到 Agent 的暂停、重规划甚至重启,而非在工具即将执行的瞬间进行无缝的参数调整。
实时交互的必要性与场景
为什么我们需要在 Agent 执行过程中实时修改其工具参数?
- 动态环境变化: 现实世界是不断变化的。Agent 在规划时获取的信息可能在执行时已经过时。例如,一个自动化交易 Agent 在调用“下单”工具时,如果市场价格突然剧烈波动,用户可能需要实时调整交易数量或止损价格。
- 用户偏好与上下文的动态演进: 用户可能在 Agent 执行过程中产生了新的想法,或者意识到之前没有充分表达的偏好。例如,一个内容生成 Agent 在创作文章时,用户看到部分草稿后,可能希望实时调整后续段落的风格、关键词密度等参数。
- 复杂任务的逐步精炼: 对于多步骤的复杂任务,Agent 无法在开始时就完美预测所有参数。通过实时干预,用户可以逐步引导 Agent,根据中间结果调整后续步骤的参数,实现任务的迭代优化。
- 错误发现与即时纠正: Agent 的推理并非万无一失。它可能因理解偏差或信息不全而生成不当的工具参数。用户作为最终决策者,可以在错误发生前及时发现并纠正,避免不必要的损失。
- 提升用户信任与满意度: 当用户拥有实时控制权时,他们对 Agent 的信任度会显著提高。这种深度协作模式,让用户感觉自己是 Agent 的共同创造者,而非被动的旁观者。
具体应用场景举例:
| 场景 | Agent 类型 | 待调用工具示例 | 需实时修改的参数示例 | 修改目的 |
|---|---|---|---|---|
| 数据分析 | 数据分析 Agent | execute_sql_query |
SQL 语句中的 WHERE 条件、GROUP BY 字段、LIMIT |
调整数据过滤范围,优化聚合粒度,限制结果集大小 |
plot_data_chart |
图表类型、X/Y轴标签、颜色、标题、数据子集 | 改变可视化方式,突出特定数据,美化图表 | ||
| 代码生成 | 编程辅助 Agent | generate_code_snippet |
编程语言、目标框架、代码风格、变量命名约定、模块 | 适应不同项目规范,调整代码可读性,指定依赖库 |
refactor_code |
重构策略(例如,提取函数、内联变量)、目标范围 | 精细化重构操作,避免误改,确保代码质量 | ||
| 内容创作 | 写作/营销 Agent | draft_blog_post |
目标受众、语气、关键词密度、段落长度、引用来源 | 调整内容调性,优化 SEO,符合品牌规范 |
summarize_document |
摘要长度、侧重点、关键信息提取模式 | 获得不同粒度的摘要,突出特定主题 | ||
| 自动化运维 | 运维 Agent | run_shell_command |
命令参数、执行路径、目标服务器、超时时间 | 调整脚本执行细节,避免误操作,确保资源安全 |
provision_cloud_resource |
虚拟机大小、区域、存储类型、网络配置 | 优化资源配置,控制成本,满足性能需求 | ||
| 智能家居/设备 | 智能控制 Agent | set_device_status |
亮度、温度、模式、定时器、联动设备 | 实时调整设备状态,响应环境变化,满足个性化需求 |
这些场景都清晰地表明,仅仅靠预设或事后修正,已经无法满足日益复杂的 Agent 应用需求。我们需要一套机制,让 Agent 能够“在执行前停下来,问问人”,并根据人的反馈来调整其行动。
实现实时修改工具参数的技术路径
要实现 Agent 在执行过程中实时修改工具参数,核心思想可以概括为:中断、暴露、修改、恢复。
Agent 在即将调用工具的“前一刻”提供一个可供人类干预的“暂停点”或“拦截点”。在这个点上,Agent 将其即将使用的工具名称和参数暴露给用户界面。用户可以查看、修改这些参数,然后 Agent 使用修改后的参数继续执行。
我们将探讨几种实现这一目标的技术路径。
路径一:基于回调函数/事件机制的拦截
这是最直观的实现方式之一。Agent 的执行循环中,在调用工具的逻辑前后注入回调函数(Callback Function)或触发事件(Event)。这些回调函数可以用来拦截工具调用的参数,并提供给外部进行修改。
概念:
- 拦截器 (Interceptor): 一个抽象层,定义了在工具执行前、执行后等关键时刻要执行的逻辑。
- 回调函数: 在特定事件发生时被调用的函数。
实现细节:
- 定义一个统一的
Tool接口或基类,包含工具的名称、描述以及执行方法。 - 定义一个
Agent类,其内部包含一个 LLM 模拟器和工具注册表。 - 引入一个
ToolExecutionInterceptor接口或抽象类,它至少包含一个before_tool_execution(tool_name, tool_args)方法。 - 在
Agent的执行循环中,当 LLM 决定调用某个工具时,首先将工具名称和 LLM 推理出的参数传递给interceptor.before_tool_execution()方法。 before_tool_execution()方法负责与用户界面交互,展示参数并等待用户修改。它将返回用户修改后的参数,或者指示 Agent 继续使用原始参数。- Agent 接收到返回的参数后,使用这些参数调用工具。
代码示例 (Python):
import json
import time
from typing import Dict, Any, Callable, List, Optional
# --- 1. 定义工具接口 ---
class Tool:
def __init__(self, name: str, description: str, func: Callable):
self.name = name
self.description = description
self._func = func
def run(self, **kwargs) -> Any:
print(f"[Tool] Executing '{self.name}' with params: {kwargs}")
try:
result = self._func(**kwargs)
print(f"[Tool] '{self.name}' execution successful. Result: {result}")
return result
except Exception as e:
print(f"[Tool] Error executing '{self.name}': {e}")
raise
def get_signature(self) -> Dict[str, Any]:
"""获取工具的调用签名,供LLM理解"""
# 实际场景中,这里会解析 func 的参数,生成OpenAPI或JSON Schema
# 为了简化,我们假设描述中包含了足够的信息
return {
"name": self.name,
"description": self.description,
# 真实场景中,这里会有参数的详细schema,例如:
# "parameters": {
# "type": "object",
# "properties": {
# "query": {"type": "string", "description": "The search query"},
# "limit": {"type": "integer", "description": "Number of results"}
# },
# "required": ["query"]
# }
}
# 模拟一些具体工具
def search_web(query: str, num_results: int = 5) -> str:
"""在网络上搜索信息。
参数:
- query: 搜索关键词 (string)
- num_results: 返回结果数量 (integer, 默认5)
"""
time.sleep(1) # 模拟网络延迟
if "Python" in query:
return f"Found 10M results for '{query}'. Top 3: Python official site, PyPI, Real Python. (Limit: {num_results})"
else:
return f"Found 500K results for '{query}'. Top 3: Wiki, News, Blog. (Limit: {num_results})"
def write_file(filename: str, content: str, append: bool = False) -> str:
"""将内容写入文件。
参数:
- filename: 文件名 (string)
- content: 要写入的内容 (string)
- append: 是否以追加模式写入 (boolean, 默认False)
"""
mode = 'a' if append else 'w'
with open(filename, mode) as f:
f.write(content + "n")
return f"Content {'appended to' if append else 'written to'} '{filename}' successfully."
# --- 2. 定义拦截器接口 ---
class ToolExecutionInterceptor:
def before_tool_execution(self, tool_name: str, tool_params: Dict[str, Any]) -> Dict[str, Any]:
"""
在工具执行前被调用,允许修改工具参数。
:param tool_name: 即将执行的工具名称。
:param tool_params: LLM生成的原始工具参数。
:return: 经过修改或确认后的工具参数。
"""
raise NotImplementedError
# 实现一个命令行交互的拦截器
class ConsoleToolInterceptor(ToolExecutionInterceptor):
def before_tool_execution(self, tool_name: str, tool_params: Dict[str, Any]) -> Dict[str, Any]:
print(f"n--- INTERVENTION REQUIRED ---")
print(f"Agent proposes to call tool: '{tool_name}'")
print(f"Original parameters: {json.dumps(tool_params, indent=2)}")
modified_params = tool_params.copy()
while True:
action = input("Do you want to (M)odify parameters, (C)ontinue with original, or (A)bort? [M/C/A]: ").strip().lower()
if action == 'c':
print("Continuing with original parameters.")
return tool_params
elif action == 'a':
print("Tool execution aborted by user.")
raise InterruptionException("User aborted tool execution.")
elif action == 'm':
print("Enter new parameters in JSON format. Press Enter to skip a parameter.")
print("Example: {"query": "new search term", "num_results": 10}")
user_input = input("New parameters (JSON, leave empty to confirm current): ")
if not user_input.strip():
print("No modifications entered. Continuing with current parameters.")
return modified_params
try:
new_values = json.loads(user_input)
modified_params.update(new_values)
print(f"Parameters updated. Current parameters: {json.dumps(modified_params, indent=2)}")
confirm = input("Confirm these parameters? (Y/N): ").strip().lower()
if confirm == 'y':
return modified_params
else:
# 允许用户再次修改
continue
except json.JSONDecodeError:
print("Invalid JSON format. Please try again.")
except Exception as e:
print(f"An error occurred during parameter modification: {e}")
else:
print("Invalid action. Please choose M, C, or A.")
class InterruptionException(Exception):
"""用于表示用户中断了Agent的执行"""
pass
# --- 3. 定义 Agent ---
class SimpleAgent:
def __init__(self, tools: List[Tool], interceptor: Optional[ToolExecutionInterceptor] = None):
self.tools = {tool.name: tool for tool in tools}
self.interceptor = interceptor
self.tool_descriptions = "n".join([f"- {t.name}: {t.description}" for t in tools])
self.llm_response_history = [] # 模拟LLM的思考过程
def _mock_llm_response(self, prompt: str) -> Dict[str, Any]:
"""
模拟LLM的推理过程,返回工具调用或最终回答。
为了简化,这里硬编码一些逻辑。
真实Agent会调用实际的LLM API。
"""
print(f"n[Agent] LLM thinking based on prompt: '{prompt}'")
self.llm_response_history.append(prompt)
# 模拟LLM决定调用工具
if "search about Python" in prompt:
tool_name = "search_web"
tool_params = {"query": "Python programming language", "num_results": 3}
print(f"[Agent] LLM decides to call '{tool_name}' with {tool_params}")
return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
elif "save the search results" in prompt:
tool_name = "write_file"
# 这里模拟LLM根据之前搜索结果生成的内容
content_to_save = "Python is a high-level, interpreted programming language. It is popular for web development, data analysis, AI, and more."
tool_params = {"filename": "python_info.txt", "content": content_to_save, "append": False}
print(f"[Agent] LLM decides to call '{tool_name}' with {tool_params}")
return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
else:
print("[Agent] LLM decides to provide a final answer.")
return {"action": "final_answer", "answer": f"I processed your request about: {prompt}. If you have more questions, let me know."}
def run(self, initial_prompt: str):
current_prompt = initial_prompt
while True:
llm_output = self._mock_llm_response(current_prompt)
if llm_output["action"] == "call_tool":
tool_name = llm_output["tool_name"]
original_params = llm_output["tool_params"]
if tool_name not in self.tools:
print(f"[Agent Error] Tool '{tool_name}' not found.")
current_prompt = f"Error: Tool '{tool_name}' not found. Please re-evaluate."
continue
tool = self.tools[tool_name]
executed_params = original_params
if self.interceptor:
try:
# 重点:在这里调用拦截器,获取用户修改后的参数
executed_params = self.interceptor.before_tool_execution(tool_name, original_params)
print(f"--- INTERVENTION ENDED ---")
print(f"[Agent] Proceeding with parameters: {json.dumps(executed_params, indent=2)}")
except InterruptionException as e:
print(f"[Agent] Execution aborted: {e}")
break # 退出Agent循环
except Exception as e:
print(f"[Agent Error] Interceptor failed: {e}. Using original parameters.")
executed_params = original_params
try:
tool_result = tool.run(**executed_params)
current_prompt = f"Tool '{tool_name}' returned: {tool_result}nnBased on this, what should I do next?"
except Exception as e:
current_prompt = f"Tool '{tool_name}' failed with error: {e}. Please re-evaluate."
elif llm_output["action"] == "final_answer":
print(f"n[Agent] Final Answer: {llm_output['answer']}")
break
else:
print(f"[Agent Error] Unknown LLM action: {llm_output['action']}")
break
# --- 运行示例 ---
if __name__ == "__main__":
web_search_tool = Tool("search_web", search_web.__doc__, search_web)
file_writer_tool = Tool("write_file", write_file.__doc__, write_file)
my_interceptor = ConsoleToolInterceptor()
agent = SimpleAgent(tools=[web_search_tool, file_writer_tool], interceptor=my_interceptor)
print("--- Agent Execution Start (Scenario 1: Search and Modify) ---")
agent.run("Please search about Python programming language.")
print("n" + "="*50 + "n")
agent_no_interceptor = SimpleAgent(tools=[web_search_tool, file_writer_tool])
print("--- Agent Execution Start (Scenario 2: Search without Interceptor) ---")
agent_no_interceptor.run("Please search about Python programming language.")
print("n" + "="*50 + "n")
agent_with_interceptor_2 = SimpleAgent(tools=[web_search_tool, file_writer_tool], interceptor=my_interceptor)
print("--- Agent Execution Start (Scenario 3: Write File and Modify) ---")
agent_with_interceptor_2.run("Search about Python, then save the search results to a file.")
示例运行说明:
- 当
agent.run("Please search about Python programming language.")被调用时,模拟 LLM 会决定调用search_web工具,并生成参数{"query": "Python programming language", "num_results": 3}。 SimpleAgent在调用search_web.run()之前,会调用my_interceptor.before_tool_execution("search_web", {"query": ..., "num_results": ...})。ConsoleToolInterceptor会在控制台打印这些参数,并提示用户选择(修改/继续/中止)。- 如果用户选择
M并输入{"num_results": 1},那么search_web工具最终会以num_results=1的参数执行。 - 如果用户选择
A,则 Agent 会中断执行。
挑战与改进:
- 实时性与等待输入: 在上述控制台示例中,
input()函数会阻塞整个 Agent 的执行。在真实的异步系统中,before_tool_execution不应该阻塞,而是向前端发送一个事件,然后 Agent 自身进入等待状态,直到收到前端返回的修改参数(或确认)事件。 - 用户界面 (UI) 集成:
ConsoleToolInterceptor显然无法用于图形界面。我们需要将这个拦截器与一个 Web 前端(例如使用 React/Vue/Angular)或桌面应用(PyQt/Tkinter)连接起来,通过 WebSocket 或 HTTP 长轮询等方式进行实时通信。 - 参数校验: 用户可能输入无效的参数(例如,
num_results应该是整数但输入了字符串)。拦截器需要包含参数校验逻辑。
路径二:基于协程/异步编程的“暂停-恢复”
为了解决路径一中“实时性与等待输入”的挑战,异步编程(特别是 Python 中的 asyncio 和协程)提供了一个优雅的解决方案。它允许 Agent 的执行在等待用户输入时“挂起”而不阻塞整个应用程序,并在收到用户输入后“恢复”执行。
概念:
- 协程 (Coroutine): 一种可以暂停和恢复执行的函数,允许在等待某个操作完成时切换到其他任务。
async/await: Python 中用于编写异步代码的关键字。- 事件循环 (Event Loop): 异步编程的核心,负责调度和执行协程。
实现细节:
- 将 Agent 的核心执行逻辑(包括工具调用)改为异步协程。
- 定义一个异步的
UserInteractionManager,它负责与前端(或模拟前端)进行通信。 - 当 Agent 需要用户干预时,它
await user_interaction_manager.request_tool_param_modification(...)。 request_tool_param_modification方法会向前端发送参数,并等待前端通过异步通道(如 WebSocket)返回修改后的参数。在这个等待期间,Agent 的协程被挂起,但事件循环可以继续处理其他任务(例如其他 Agent 的执行、UI 事件等)。- 一旦收到用户反馈,
request_tool_param_modification协程恢复执行,并返回修改后的参数给 Agent。
代码示例 (Python with asyncio):
import asyncio
import json
import time
from typing import Dict, Any, Callable, List, Optional, Tuple
# --- 1. 定义异步工具接口 ---
class AsyncTool:
def __init__(self, name: str, description: str, func: Callable):
self.name = name
self.description = description
self._func = func
async def run(self, **kwargs) -> Any:
print(f"[AsyncTool] Executing '{self.name}' with params: {kwargs}")
try:
# 假设工具本身可能是异步的,或者需要模拟异步操作
if asyncio.iscoroutinefunction(self._func):
result = await self._func(**kwargs)
else:
# 如果是同步函数,可以在线程池中运行以避免阻塞事件循环
# 这里为了简化,直接调用,但真实场景应使用 run_in_executor
result = self._func(**kwargs)
print(f"[AsyncTool] '{self.name}' execution successful. Result: {result}")
return result
except Exception as e:
print(f"[AsyncTool] Error executing '{self.name}': {e}")
raise
def get_signature(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
}
# 模拟异步工具函数
async def async_search_web(query: str, num_results: int = 5) -> str:
"""在网络上异步搜索信息。
参数:
- query: 搜索关键词 (string)
- num_results: 返回结果数量 (integer, 默认5)
"""
await asyncio.sleep(1) # 模拟异步I/O延迟
if "Python" in query:
return f"Found 10M results for '{query}'. Top 3: Python official site, PyPI, Real Python. (Limit: {num_results})"
else:
return f"Found 500K results for '{query}'. Top 3: Wiki, News, Blog. (Limit: {num_results})"
# 同步函数也可以被 AsyncTool 包装,但最好在 run_in_executor 中调用
def sync_write_file(filename: str, content: str, append: bool = False) -> str:
"""将内容写入文件。"""
mode = 'a' if append else 'w'
with open(filename, mode) as f:
f.write(content + "n")
return f"Content {'appended to' if append else 'written to'} '{filename}' successfully."
# --- 2. 定义异步的用户交互管理器 ---
class AsyncUserInteractionManager:
def __init__(self):
# 使用 asyncio.Queue 来模拟前端到后端的消息传递
# 实际应用中,这里会是 WebSocket 或其他异步通信机制
self._input_queue = asyncio.Queue()
self._output_queue = asyncio.Queue() # 模拟后端到前端的输出
async def request_tool_param_modification(self, tool_name: str, tool_params: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]:
"""
异步请求用户修改工具参数。
:return: (修改后的参数, 是否中止)
"""
request_id = time.time() # 模拟请求ID,用于匹配响应
request_data = {
"type": "tool_param_request",
"request_id": request_id,
"tool_name": tool_name,
"original_params": tool_params
}
await self._output_queue.put(request_data) # 发送请求给前端模拟器
print(f"n[UserInteractionManager] Sent request {request_id} for tool '{tool_name}'. Waiting for user input...")
# 挂起,直到从输入队列中收到匹配的响应
while True:
response = await self._input_queue.get()
if response.get("request_id") == request_id:
if response.get("action") == "modify":
print(f"[UserInteractionManager] Received modified params for {tool_name}: {response['modified_params']}")
return response["modified_params"], False
elif response.get("action") == "continue":
print(f"[UserInteractionManager] User chose to continue with original params for {tool_name}.")
return tool_params, False
elif response.get("action") == "abort":
print(f"[UserInteractionManager] User chose to abort for {tool_name}.")
return {}, True # 返回空参数并指示中止
else:
# 将不匹配的响应重新放回队列,或处理为其他类型的消息
await self._input_queue.put(response) # 简单处理,实际应有更健壮的机制
async def _simulate_frontend_input(self):
"""
模拟前端接收请求并发送响应的后台任务。
在真实系统中,这部分是前端UI的逻辑。
"""
while True:
request = await self._output_queue.get()
if request["type"] == "tool_param_request":
tool_name = request["tool_name"]
original_params = request["original_params"]
request_id = request["request_id"]
print(f"n--- FRONTEND UI (Simulated) ---")
print(f"Agent wants to call tool: '{tool_name}'")
print(f"Original parameters: {json.dumps(original_params, indent=2)}")
modified_params = original_params.copy()
response_action = "continue"
action = input("UI: Do you want to (M)odify, (C)ontinue, (A)bort? [M/C/A]: ").strip().lower()
if action == 'm':
user_input = input("UI: Enter new parameters (JSON): ")
try:
new_values = json.loads(user_input)
modified_params.update(new_values)
print(f"UI: Parameters updated to: {json.dumps(modified_params, indent=2)}")
response_action = "modify"
except json.JSONDecodeError:
print("UI: Invalid JSON. Will continue with original.")
response_action = "continue"
elif action == 'a':
response_action = "abort"
await self._input_queue.put({
"type": "tool_param_response",
"request_id": request_id,
"action": response_action,
"modified_params": modified_params
})
await asyncio.sleep(0.1) # 小暂停,防止忙循环
# --- 3. 定义异步 Agent ---
class AsyncAgent:
def __init__(self, tools: List[AsyncTool], interaction_manager: Optional[AsyncUserInteractionManager] = None):
self.tools = {tool.name: tool for tool in tools}
self.interaction_manager = interaction_manager
self.tool_descriptions = "n".join([f"- {t.name}: {t.description}" for t in tools])
self.llm_response_history = [] # 模拟LLM的思考过程
async def _mock_llm_response(self, prompt: str) -> Dict[str, Any]:
"""异步模拟LLM的推理过程"""
print(f"n[AsyncAgent] LLM thinking based on prompt: '{prompt}'")
self.llm_response_history.append(prompt)
await asyncio.sleep(0.1) # 模拟LLM思考延迟
if "search about Python" in prompt:
tool_name = "async_search_web"
tool_params = {"query": "Python programming language", "num_results": 3}
print(f"[AsyncAgent] LLM decides to call '{tool_name}' with {tool_params}")
return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
elif "save the search results" in prompt:
tool_name = "sync_write_file" # 注意这里是同步函数包装的工具
content_to_save = "Python is a high-level, interpreted programming language. It is popular for web development, data analysis, AI, and more."
tool_params = {"filename": "python_info_async.txt", "content": content_to_save, "append": False}
print(f"[AsyncAgent] LLM decides to call '{tool_name}' with {tool_params}")
return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
else:
print("[AsyncAgent] LLM decides to provide a final answer.")
return {"action": "final_answer", "answer": f"I processed your request about: {prompt}. If you have more questions, let me know."}
async def run(self, initial_prompt: str):
current_prompt = initial_prompt
while True:
llm_output = await self._mock_llm_response(current_prompt)
if llm_output["action"] == "call_tool":
tool_name = llm_output["tool_name"]
original_params = llm_output["tool_params"]
if tool_name not in self.tools:
print(f"[AsyncAgent Error] Tool '{tool_name}' not found.")
current_prompt = f"Error: Tool '{tool_name}' not found. Please re-evaluate."
continue
tool = self.tools[tool_name]
executed_params = original_params
if self.interaction_manager:
try:
# 重点:异步等待用户输入
modified_params, should_abort = await self.interaction_manager.request_tool_param_modification(tool_name, original_params)
if should_abort:
print(f"[AsyncAgent] Execution aborted by user for tool '{tool_name}'.")
break # 退出Agent循环
executed_params = modified_params
print(f"[AsyncAgent] Proceeding with parameters: {json.dumps(executed_params, indent=2)}")
except Exception as e:
print(f"[AsyncAgent Error] User interaction failed: {e}. Using original parameters.")
executed_params = original_params
try:
tool_result = await tool.run(**executed_params) # 异步执行工具
current_prompt = f"Tool '{tool_name}' returned: {tool_result}nnBased on this, what should I do next?"
except Exception as e:
current_prompt = f"Tool '{tool_name}' failed with error: {e}. Please re-evaluate."
elif llm_output["action"] == "final_answer":
print(f"n[AsyncAgent] Final Answer: {llm_output['answer']}")
break
else:
print(f"[AsyncAgent Error] Unknown LLM action: {llm_output['action']}")
break
# --- 运行示例 ---
async def main():
web_search_tool = AsyncTool("async_search_web", async_search_web.__doc__, async_search_web)
file_writer_tool = AsyncTool("sync_write_file", sync_write_file.__doc__, sync_write_file)
interaction_manager = AsyncUserInteractionManager()
agent = AsyncAgent(tools=[web_search_tool, file_writer_tool], interaction_manager=interaction_manager)
# 启动模拟前端的后台任务
frontend_task = asyncio.create_task(interaction_manager._simulate_frontend_input())
print("--- Async Agent Execution Start (Scenario: Search and Modify) ---")
await agent.run("Please search about Python programming language.")
print("n" + "="*50 + "n")
# 重新初始化Agent和InteractionManager,避免状态污染
interaction_manager_2 = AsyncUserInteractionManager()
agent_2 = AsyncAgent(tools=[web_search_tool, file_writer_tool], interaction_manager=interaction_manager_2)
frontend_task_2 = asyncio.create_task(interaction_manager_2._simulate_frontend_input())
print("--- Async Agent Execution Start (Scenario: Write File and Modify) ---")
await agent_2.run("Search about Python, then save the search results to a file.")
# 取消模拟前端任务
frontend_task.cancel()
frontend_task_2.cancel()
try:
await frontend_task
await frontend_task_2
except asyncio.CancelledError:
pass # 正常取消
if __name__ == "__main__":
asyncio.run(main())
示例运行说明:
AsyncAgent.run()是一个async函数,在其中调用_mock_llm_response和tool.run都是await操作。- 关键在于
await self.interaction_manager.request_tool_param_modification(...)。当 Agent 调用此方法时,它会向_output_queue发送请求,然后await从_input_queue接收响应。 _simulate_frontend_input任务在后台运行,不断从_output_queue接收请求。当它收到请求时,会模拟控制台交互,等待用户输入,然后将用户选择(修改后的参数、继续或中止)放入_input_queue。- 当
_input_queue收到响应时,request_tool_param_modification协程就会被唤醒,Agent 拿到用户修改后的参数继续执行。 - 整个过程中,事件循环保持运行,没有被
input()阻塞,体现了异步的优势。
与前端集成:
在实际项目中,AsyncUserInteractionManager 的 _input_queue 和 _output_queue 将被替换为真正的异步通信机制:
- WebSockets: 最常见的选择。前端通过 WebSocket 连接到后端,后端可以将工具参数实时推送到前端,前端用户修改后通过 WebSocket 将新参数发回。FastAPI 配合
websockets库可以轻松实现。 - Server-Sent Events (SSE): 如果主要是后端向前端推送信息(例如 Agent 状态、工具参数),而前端只需要偶尔回传少量数据,SSE 也是一个选项。
- HTTP 长轮询/短轮询: 效率较低,但对于不需要极高实时性的场景,可以通过前端定时发送 HTTP 请求来获取 Agent 状态。
路径三:基于可序列化执行计划的修改
前两种路径主要是在“工具调用前”进行拦截。第三种路径则更进一步,允许 Agent 在生成执行计划后,但在执行计划中的每一步之前,都提供给用户修改的机会。这不仅可以修改当前步骤的工具参数,甚至可以修改、插入、删除整个计划中的步骤。
概念:
- 执行计划 (Execution Plan): Agent 的 LLM 不仅仅生成一个工具调用,而是生成一个由多个
ToolCall或Action对象组成的序列。 - 可序列化 (Serializable): 计划能够被转换成 JSON 或其他结构化格式,以便传输和存储。
- 迭代执行: Agent 逐个执行计划中的步骤,并在每一步执行前暂停等待用户确认或修改。
实现细节:
- 定义一个
Action或ToolCall数据结构,包含工具名称、参数、预期结果等。 - Agent 的 LLM 阶段输出一个
List[Action]作为执行计划。 - Agent 执行器不再是简单的循环推理,而是循环遍历这个执行计划。
- 在执行计划中的每一步之前,Agent 将当前
Action对象及其参数暴露给用户。 - 用户可以:
- 修改当前
Action的参数。 - 跳过当前
Action。 - 在当前
Action之前/之后插入新的Action。 - 删除后续的
Action。
- 修改当前
- Agent 根据用户的修改更新执行计划,然后继续执行。
代码示例 (Python):
import asyncio
import json
import uuid
from typing import Dict, Any, List, Literal, Optional, Callable
# 假设 AsyncTool 和 async_search_web, sync_write_file 已经定义
# --- 1. 定义可序列化的 Action / ToolCall 结构 ---
class ToolCall:
def __init__(self, tool_name: str, params: Dict[str, Any], call_id: Optional[str] = None):
self.call_id = call_id if call_id else str(uuid.uuid4())
self.tool_name = tool_name
self.params = params
self.status: Literal["pending", "executing", "completed", "failed", "skipped"] = "pending"
self.result: Optional[Any] = None
self.error: Optional[str] = None
def to_dict(self):
return {
"call_id": self.call_id,
"tool_name": self.tool_name,
"params": self.params,
"status": self.status,
"result": str(self.result) if self.result is not None else None,
"error": self.error
}
@classmethod
def from_dict(cls, data: Dict[str, Any]):
tc = cls(data["tool_name"], data["params"], data["call_id"])
tc.status = data.get("status", "pending")
tc.result = data.get("result")
tc.error = data.get("error")
return tc
# --- 2. 异步用户交互管理器 (与路径二类似,但处理更复杂的计划修改) ---
class AsyncPlanInteractionManager:
def __init__(self):
self._input_queue = asyncio.Queue()
self._output_queue = asyncio.Queue()
async def request_plan_step_modification(self, current_step_index: int, total_steps: int, current_call: ToolCall, full_plan_dicts: List[Dict[str, Any]]) -> Tuple[ToolCall, List[ToolCall], bool]:
"""
异步请求用户修改当前计划步骤或整个计划。
:param current_step_index: 当前步骤的索引。
:param total_steps: 总步骤数。
:param current_call: 当前即将执行的 ToolCall 对象。
:param full_plan_dicts: 整个计划(ToolCall dicts列表),供用户查看。
:return: (修改后的当前ToolCall, 修改后的整个计划列表, 是否中止)
"""
request_id = str(uuid.uuid4())
request_data = {
"type": "plan_step_request",
"request_id": request_id,
"current_step_index": current_step_index,
"total_steps": total_steps,
"current_call": current_call.to_dict(),
"full_plan": full_plan_dicts
}
await self._output_queue.put(request_data)
print(f"n[PlanInteractionManager] Sent plan step request {request_id} for step {current_step_index+1}/{total_steps}. Waiting for user input...")
while True:
response = await self._input_queue.get()
if response.get("request_id") == request_id:
action = response.get("action")
if action == "modify_current":
modified_call_data = response["modified_call"]
modified_call = ToolCall.from_dict(modified_call_data)
print(f"[PlanInteractionManager] Received modified current call: {modified_call.params}")
return modified_call, [ToolCall.from_dict(d) for d in response["modified_full_plan"]], False
elif action == "continue":
print(f"[PlanInteractionManager] User chose to continue with original current step.")
return current_call, [ToolCall.from_dict(d) for d in response["modified_full_plan"]], False
elif action == "abort":
print(f"[PlanInteractionManager] User chose to abort plan execution.")
return current_call, [], True
elif action == "modify_plan_only": # 用户只修改了计划,但当前步骤参数不变
print(f"[PlanInteractionManager] User modified full plan, but current step params remain.")
return current_call, [ToolCall.from_dict(d) for d in response["modified_full_plan"]], False
else:
await self._input_queue.put(response) # Put back if not for this request
async def _simulate_frontend_plan_input(self):
"""
模拟前端接收计划步骤请求并发送响应的后台任务。
"""
while True:
request = await self._output_queue.get()
if request["type"] == "plan_step_request":
current_step_index = request["current_step_index"]
total_steps = request["total_steps"]
current_call_data = request["current_call"]
full_plan_data = request["full_plan"]
request_id = request["request_id"]
print(f"n--- FRONTEND UI (Simulated Plan Editor) ---")
print(f"Agent Plan Step {current_step_index+1}/{total_steps}:")
print(f"Current Tool Call: {json.dumps(current_call_data, indent=2)}")
print(f"nFull Plan:")
for i, step in enumerate(full_plan_data):
status_marker = "->" if i == current_step_index else " "
print(f"{status_marker} Step {i+1}: {step['tool_name']} with {json.dumps(step['params'])}")
response_action = "continue"
modified_call_data_to_send = current_call_data
modified_full_plan_to_send = full_plan_data
action = input(
"UI: Do you want to (M)odify CURRENT step parameters, "
"(P)lan edit (add/delete/reorder), (C)ontinue, (A)bort? [M/P/C/A]: "
).strip().lower()
if action == 'm':
user_input = input("UI: Enter new parameters for current step (JSON): ")
try:
new_values = json.loads(user_input)
modified_call_data_to_send["params"].update(new_values)
print(f"UI: Current step parameters updated to: {json.dumps(modified_call_data_to_send['params'], indent=2)}")
response_action = "modify_current"
# 同时更新整个计划中的这一步
modified_full_plan_to_send[current_step_index] = modified_call_data_to_send
except json.JSONDecodeError:
print("UI: Invalid JSON. Will continue with original current step.")
elif action == 'p':
print("UI: Plan editing not fully implemented in this simulation, but imagine an interface here.")
print("UI: For now, you can try to modify the full plan JSON directly.")
plan_input = input("UI: Enter FULL modified plan (JSON array of tool calls): ")
try:
new_plan_data = json.loads(plan_input)
if isinstance(new_plan_data, list):
modified_full_plan_to_send = new_plan_data
# If current step is still valid, update it from new plan
if current_step_index < len(new_plan_data):
modified_call_data_to_send = new_plan_data[current_step_index]
response_action = "modify_plan_only"
else:
print("UI: Invalid plan format (not a JSON array).")
except json.JSONDecodeError:
print("UI: Invalid JSON for plan.")
elif action == 'a':
response_action = "abort"
await self._input_queue.put({
"type": "plan_step_response",
"request_id": request_id,
"action": response_action,
"modified_call": modified_call_data_to_send,
"modified_full_plan": modified_full_plan_to_send
})
await asyncio.sleep(0.1)
# --- 3. 定义支持计划的异步 Agent ---
class AsyncPlanningAgent:
def __init__(self, tools: List[AsyncTool], interaction_manager: Optional[AsyncPlanInteractionManager] = None):
self.tools = {tool.name: tool for tool in tools}
self.interaction_manager = interaction_manager
self.tool_descriptions = "n".join([f"- {t.name}: {t.description}" for t in tools])
self.llm_response_history = []
async def _mock_llm_plan(self, prompt: str) -> List[ToolCall]:
"""
模拟LLM生成一个多步骤的执行计划。
真实Agent会调用实际的LLM API来生成结构化输出。
"""
print(f"n[AsyncPlanningAgent] LLM planning based on prompt: '{prompt}'")
await asyncio.sleep(0.1)
if "search about Python and save results" in prompt:
return [
ToolCall("async_search_web", {"query": "Python programming language", "num_results": 3}),
ToolCall("sync_write_file", {"filename": "python_summary.txt", "content": "PLACEHOLDER_FOR_SEARCH_RESULT", "append": False})
]
elif "analyze market data" in prompt:
return [
ToolCall("get_market_data", {"symbol": "AAPL", "period": "1d"}),
ToolCall("analyze_trends", {"data": "PLACEHOLDER_FOR_MARKET_DATA", "metric": "RSI"})
]
else:
print("[AsyncPlanningAgent] LLM generated an empty or simple plan.")
return []
async def run(self, initial_prompt: str):
# 1. LLM生成初始计划
current_plan = await self._mock_llm_plan(initial_prompt)
# 模拟LLM将搜索结果填充到写文件工具的参数中
if len(current_plan) > 1 and current_plan[0].tool_name == "async_search_web" and "PLACEHOLDER_FOR_SEARCH_RESULT" in current_plan[1].params.get("content", ""):
# 简化处理,实际LLM会更智能
temp_result = await self.tools[current_plan[0].tool_name].run(**current_plan[0].params)
current_plan[0].status = "completed"
current_plan[0].result = temp_result
current_plan[1].params["content"] = f"Search results for {current_plan[0].params['query']}:n{temp_result}"
step_index = 0
while step_index < len(current_plan):
current_call = current_plan[step_index]
if current_call.status != "pending": # 如果已经执行过或被跳过,则直接到下一步
step_index += 1
continue
executed_call = current_call
if self.interaction_manager:
try:
modified_call, new_full_plan, should_abort = await self.interaction_manager.request_plan_step_modification(
step_index, len(current_plan), current_call, [tc.to_dict() for tc in current_plan]
)
if should_abort:
print(f"[AsyncPlanningAgent] Execution aborted by user.")
break
# 更新当前步骤和整个计划
executed_call = modified_call
current_plan = new_full_plan # 用户可能修改了整个计划
if step_index >= len(current_plan): # 如果当前步骤被删除
print("[AsyncPlanningAgent] Current step was removed from plan. Advancing to next valid step.")
continue # 直接跳到下一个循环,重新检查 step_index
# 确保当前执行的 call 是 plan 中对应位置的 call
# (用户可能在UI中修改了整个plan,导致某个call的id不再匹配)
# 这里的逻辑需要根据UI如何返回修改进行调整
current_plan[step_index] = executed_call # 确保计划中的当前步骤是最新的
print(f"[AsyncPlanningAgent] Proceeding with current step (modified: {executed_call.params}) and potentially new plan.")
except Exception as e:
print(f"[AsyncPlanningAgent Error] Plan interaction failed: {e}. Using original step.")
# 如果交互失败,保持原始计划和参数
if executed_call.tool_name not in self.tools:
print(f"[AsyncPlanningAgent Error] Tool '{executed_call.tool_name}' not found for step {step_index+1}.")
executed_call.status = "failed"
executed_call.error = "Tool not found"
step_index += 1
continue
tool = self.tools[executed_call.tool_name]
try:
executed_call.status = "executing"
tool_result = await tool.run(**executed_call.params)
executed_call.result = tool_result
executed_call.status = "completed"
print(f"[AsyncPlanningAgent] Step {step_index+1} completed. Result: {tool_result}")
# 可以在这里根据结果更新后续计划的PLACEHOLDER,或交由LLM重新规划
# 例如,如果下一个步骤的参数依赖于当前步骤的结果
if step_index + 1 < len(current_plan):
next_call = current_plan[step_index + 1]
if "PLACEHOLDER_FOR_SEARCH_RESULT" in next_call.params.get("content", ""):
next_call.params["content"] = f"Search results from previous step:n{tool_result}"
except Exception as e:
executed_call.error = str(e)
executed_call.status = "failed"
print(f"[AsyncPlanningAgent Error] Step {step_index+1} failed: {e}")
# 错误处理:是否中断?是否让LLM重新规划?这里简单继续
step_index += 1
print(f"n[AsyncPlanningAgent] Plan execution finished.")
print("Final Plan Status:")
for tc in current_plan:
print(f"- {tc.tool_name} (ID: {tc.call_id[:4]}...) Status: {tc.status}, Result: {tc.result[:50] if tc.result else 'N/A'}, Error: {tc.error if tc.error else 'N/A'}")
# --- 运行示例 ---
async def main_planning_agent():
web_search_tool = AsyncTool("async_search_web", async_search_web.__doc__, async_search_web)
file_writer_tool = AsyncTool("sync_write_file", sync_write_file.__doc__, sync_write_file)
plan_interaction_manager = AsyncPlanInteractionManager()
planning_agent = AsyncPlanningAgent(tools=[web_search_tool, file_writer_tool], interaction_manager=plan_interaction_manager)
frontend_plan_task = asyncio.create_task(plan_interaction_manager._simulate_frontend_plan_input())
print("--- Async Planning Agent Execution Start ---")
await planning_agent.run("Please search about Python programming language and save the results to a file.")
frontend_plan_task.cancel()
try:
await frontend_plan_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main_planning_agent())
示例运行说明:
AsyncPlanningAgent首先调用_mock_llm_plan模拟 LLM 生成一个包含ToolCall对象的列表作为初始计划。run方法会迭代执行这个计划。在执行每个ToolCall之前,它会调用interaction_manager.request_plan_step_modification。AsyncPlanInteractionManager的模拟前端会展示当前步骤的详细信息以及整个计划的概览。用户可以选择修改当前步骤的参数,甚至(在更完整的实现中)添加、删除、重新排序计划中的步骤。- 用户输入修改后,Agent 会更新
current_plan,然后使用修改后的参数执行当前步骤,并继续下一个步骤。
优势与挑战:
- 更强的控制力: 用户不仅可以修改参数,还可以对整个执行流程进行精细化调整。
- 可解释性: 计划的展示让用户更清楚 Agent 的意图。
- 复杂性增加: LLM 需要生成结构化、可解析、可修改的计划。用户界面需要更复杂才能提供强大的计划编辑功能。Agent 需要更智能地处理用户对计划的修改(例如,如果用户删除了一个关键步骤,Agent 是否需要重新规划?)。
路径四:结合 LangChain/LlamaIndex 等框架的实现
主流的 Agent 开发框架,如 LangChain 和 LlamaIndex,都提供了强大的回调(Callback)或观察者(Observer)机制,天然支持我们在 Agent 运行时的干预。
以 LangChain 为例,它提供了 BaseCallbackHandler 接口,允许开发者在 Agent 链(Chain)的各个阶段注入自定义逻辑,包括工具的开始、结束、LLM 的生成等。
实现细节:
- 创建一个继承自
langchain_core.callbacks.BaseCallbackHandler的自定义回调处理器。 - 重写
on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any方法。这个方法会在 Agent 决定调用工具时被调用,input_str通常包含 LLM 推理出的工具名称和参数(例如,以 ReAct 格式的字符串)。 - 在这个方法中,解析
input_str提取工具名称和参数。 - 然后,同样地,通过异步通信机制与用户界面交互,等待用户修改参数。
- 关键在于,
on_tool_start方法的返回类型是Any。如果返回一个值,它通常会被忽略或用于日志。为了实现参数修改,我们通常会:- 在
on_tool_start中触发一个异步事件,让 Agent 暂停(例如,通过asyncio.Event或asyncio.Queue)。 - 在 Agent 的执行逻辑中(例如,一个自定义的 Agent Executor),在调用
tool.run()之前,检查是否有用户干预事件被触发,并等待其完成,获取修改后的参数。 - 或者,更直接地,自定义 Agent Executor 的
_call方法,在调用self.tools[tool_name].run()前,显式调用一个交互函数。
- 在
代码示例 (LangChain 风格的概念性代码):
import asyncio
import json
from typing import Any, Dict, List, Optional, Tuple, Callable
# 假设 AsyncTool, async_search_web, sync_write_file 已经定义
# 模拟 LangChain 的工具定义
class LangChainToolWrapper:
def __init__(self, tool_instance: AsyncTool):
self._tool = tool_instance
self.name = tool_instance.name
self.description = tool_instance.description
async def _run(self, tool_input: str) -> str:
# LangChain 的工具通常接收一个字符串作为输入,需要自行解析
# 这里简化为直接调用
try:
params = json.loads(tool_input) # 假设输入是JSON字符串
result = await self._tool.run(**params)
return str(result)
except json.JSONDecodeError:
return f"Error: Invalid JSON input for tool {self.name}: {tool_input}"
except Exception as e:
return f"Error executing tool {self.name}: {e}"
# 模拟 LangChain AgentExecutor
class MockLangChainAgentExecutor:
def __init__(self, tools: List[LangChainToolWrapper], callback_handler: Optional[Any] = None, user_interaction_manager: Optional[Any] = None):
self.tools = {tool.name: tool for tool in tools}
self.callback_handler = callback_handler
self.user_interaction_manager = user_interaction_manager
self.llm_mock_response_queue = asyncio.Queue() # 模拟LLM输出
async def _mock_llm_step(self, intermediate_steps: List[Tuple[Dict[str, Any], str]]) -> str:
"""
模拟LLM的思考步骤,生成工具调用或最终回答。
为了简化,这里使用预设的LLM输出。
"""
if not intermediate_steps:
# 第一次调用,模拟LLM决定搜索
llm_output = json.dumps({"tool": "async_search_web", "tool_input": json.dumps({"query": "Python programming language", "num_results": 3})})
return f"Thought: I need to search about Python. Action: {llm_output}"
else:
# 模拟LLM根据之前结果进行下一步,例如保存文件
last_tool_output = intermediate_steps[-1][1]
if "Python official site" in last_tool_output:
content_to_save = f"Search results summary: {last_tool_output[:100]}..."
llm_output = json.dumps({"tool": "sync_write_file", "tool_input": json.dumps({"filename": "lc_python_info.txt", "content": content_to_save, "append": False})})
return f"Thought: I have the search results, now I should save them. Action: {llm_output}"
else:
return "Thought: I have completed the task. Final Answer: Task finished."
async def run(self, input_text: str) -> Dict[str, Any]:
intermediate_steps = []
while True:
llm_response_str = await self._mock_llm_step(intermediate_steps)
print(f"n[MockAgentExecutor] LLM Response: {llm_response_str}")
if "Final Answer:" in llm_response_str:
return {"output": llm_response_str.split("Final Answer:")[1].strip()}
# 解析LLM的Action
try:
# 假设LLM输出的Action部分是 "Action: {"tool": "tool_name", "tool_input": "..."}"
action_str = llm_response_str.split("Action:")[1].strip()
action_data = json.loads(action_str)
tool_name = action_data["tool"]
tool_input_str = action_data["tool_input"] # 这是一个JSON字符串,需要再次解析
original_params = json.loads(tool_input_str)
except (IndexError, json.JSONDecodeError, KeyError) as e:
print(f"[MockAgentExecutor Error] Failed to parse LLM action: {e}. LLM response: {llm_response_str}")
intermediate_steps.append(({"tool": "parse_error"}, f"Error parsing LLM response: {e}"))
continue
tool = self.tools.get(tool_name)
if not tool:
error_msg = f"Tool '{tool_name}' not found."
print(f"[MockAgentExecutor Error] {error_msg}")
intermediate_steps.append(({"tool": tool_name, "tool_input": tool_input_str}, error_msg))
continue
# --- 关键的拦截点 ---
modified_params = original_params
should_abort = False
if self.user_interaction_manager:
try:
# 将工具名称和原始参数传递给交互管理器
modified_params, should_abort = await self.user_interaction_manager.request_tool_param_modification(tool_name, original_params)
if should_abort:
print("[MockAgentExecutor] User aborted tool execution.")
return {"output": "Task aborted by user."}
except Exception as e:
print(f"[MockAgentExecutor Error] Interaction failed: {e}. Using original parameters.")
# 重新打包参数为工具需要的字符串格式
final_tool_input_str = json.dumps(modified_params)
# 模拟回调处理器的 on_tool_start
if self.callback_handler:
# 这里的serialized通常是工具的元数据,input_str是工具的输入
# LangChain回调通常不会直接返回修改后的参数,而是用于副作用 (logging, UI更新)
# 所以我们依赖 user_interaction_manager 来修改参数
await self.callback_handler.on_tool_start(
serialized={"name": tool_name, "description": tool.description},
input_str=final_tool_input_str,
tool_name=tool_name,
tool_input=final_tool_input_str # 传递修改后的参数
)
tool_output = await tool._run(final_tool_input_str)
if self.callback_handler:
await self.callback_handler.on_tool_end(tool_output, tool_name=tool_name, input_str=final_tool_input_str)
intermediate_steps.append(({"tool": tool_name, "tool_input": final_tool_input_str}, tool_output))
print(f"[MockAgentExecutor] Tool '{tool_name}' returned: {tool_output[:100]}...")
# 模拟 LangChain 的 BaseCallbackHandler
from langchain_core.callbacks import BaseCallbackHandler
class CustomLangChainCallbackHandler(BaseCallbackHandler):
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
print(f"n[Callback] Tool '{serialized['name']}' started with input: {input_str}")
# 这里可以向UI发送事件,但实际参数修改逻辑在 AgentExecutor 中处理
pass
def on_tool_end(self, output: str, **kwargs: Any) -> Any:
print(f"[Callback] Tool '{kwargs.get('tool_name', 'Unknown')}' ended with output: {output[:50]}...")
pass
# --- 运行示例 ---
async def main_langchain_style():
# 异步工具
web_search_tool_async = AsyncTool("async_search_web", async_search_web.__doc__, async_search_web)
file_writer_tool_sync = AsyncTool("sync_write_file", sync_write_file.__doc__, sync_write_file)
# 包装为 LangChain 风格的工具
lc_web_search_tool = LangChainToolWrapper(web_search_tool_async)
lc_file_writer_tool = LangChainToolWrapper(file_writer_tool_sync)
# 异步用户交互管理器 (与路径二的 AsyncUserInteractionManager 相同)
interaction_manager = AsyncUserInteractionManager()
# 自定义回调处理器
callback_handler = CustomLangChainCallbackHandler()
agent_executor = MockLangChainAgentExecutor(
tools=[lc_web_search_tool, lc_file_writer_tool],
callback_handler=callback_handler,
user_interaction_manager=interaction_manager
)
frontend_task = asyncio.create_task(interaction_manager._simulate_frontend_input())
print("--- LangChain Style Agent Execution Start ---")
result = await agent_executor.run("Find information about Python and save it.")
print(f"nFinal Agent Result: {result}")
frontend_task.cancel()
try:
await frontend_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main_langchain_style())
示例运行说明:
MockLangChainAgentExecutor模拟了 LangChain Agent 的执行循环,它通过_mock_llm_step模拟 LLM 的思维过程和工具调用决策。- 在解析 LLM 的
Action后,但在实际调用工具之前,MockLangChainAgentExecutor会显式调用self.user_interaction_manager.request_tool_param_modification。 - 用户交互管理器 (
AsyncUserInteractionManager) 会暂停 Agent 的执行,等待用户通过模拟前端修改参数。 - 修改后的参数会用于后续的
tool._run()调用。 CustomLangChainCallbackHandler的on_tool_start和on_tool_end方法也会被调用,用于记录或触发其他副作用,但参数修改的核心逻辑由user_interaction_manager完成。
框架集成的优势:
- 标准化接口: 框架提供了统一的回调接口,减少了从头开始编写 Agent 核心逻辑的复杂性。
- 生态系统: 可以利用框架中已有的工具、记忆、LLM 封装等。
- 可维护性: 将关注点分离,Agent 核心逻辑、工具管理、用户交互、日志记录等模块化。
总结:
无论采用哪种技术路径,核心都在于在 Agent 执行循环中找到合适的拦截点,并利用异步编程实现非阻塞的用户交互。 结合合适的后端(如 FastAPI)和前端(如 React/Vue)技术,通过 WebSocket 等实时通信协议,就能构建出强大的实时人机协作 Agent 系统。
架构设计与最佳实践
实现 Agent 实时工具参数修改的系统,需要仔细考虑整体架构和一系列最佳实践。
架构设计考量
一个支持实时 HITL 的 Agent 系统通常会采用以下分层架构:
-
用户界面 (Frontend):
- 负责展示 Agent 的当前状态、推理过程、即将调用的工具及其参数。
- 提供直观的界面供用户修改参数、批准、中止或编辑计划。
- 通过 WebSocket 或 SSE 与后端进行实时通信。
- 框架选择:React, Vue, Angular, Svelte, Streamlit, Gradio 等。
-
API 网关 / 后端服务 (Backend Service):
- 接收前端的用户指令和 Agent 的状态更新请求。
- 管理 Agent 实例的生命周期(启动、停止、暂停、恢复)。
- 充当 Agent 执行器与前端之间的桥梁,负责参数的来回传递。
- 框架选择:FastAPI, Flask, Django Channels。
-
Agent 核心 (Agent Core):
- 包含 LLM 接口、工具注册表、记忆模块和 Agent 的执行循环。
- 内置拦截点,当需要用户干预时,向后端服务发送请求并暂停执行。
- 接收后端服务返回的用户修改后的参数,并恢复执行。
-
实时通信层 (Real-time Communication Layer):
- 连接前端和后端,实现低延迟的双向数据流。
- WebSocket: 最理想的选择,支持全双工通信,适合频繁的状态更新和用户干预。
- Server-Sent Events (SSE): 适用于后端单向推送大量更新,前端少量回传的场景。
- 消息队列 (如 RabbitMQ, Kafka): 在更复杂的分布式系统中,可用于解耦 Agent 实例与用户界面,实现异步消息传递。
-
状态管理 (State Management):
- Agent 状态: Agent 当前处于哪个步骤?正在等待用户输入吗?哪个工具的参数被拦截了?这些状态需要在后端持久化,以便在 Agent 崩溃或重启后恢复。
- 会话状态: 每个用户与 Agent 的交互是独立的会话。需要管理每个会话的上下文、历史记录和当前 Agent 实例。
- 参数缓存: 可以在拦截点将原始参数缓存起来,方便用户在修改失败时回滚。
最佳实践
-
清晰的参数展示与编辑界面 (UX):
- 可读性: 以清晰易懂的格式(如 JSON 或结构化表单)展示工具参数。
- 默认值与建议: 显示 LLM 提出的原始默认参数,并提供修改建议(如果可能)。
- 类型提示与校验: 根据工具的参数签名,在 UI 上提供输入类型提示和即时校验,防止用户输入无效数据。
- 撤销/重做: 允许用户撤销对参数的修改或回滚到 Agent 原始提议。
- 实时反馈: 告诉用户 Agent 正在等待输入,或者修改已成功应用。
-
细粒度的控制与权限管理:
- 并非所有参数都适合用户