解析 ‘Human-in-the-loop’ 的双向交互:如何让用户在 Agent 执行过程中实时修改其工具参数?

各位同学,各位开发者,大家好!

今天,我们齐聚一堂,共同探讨一个在人工智能领域日益重要且充满挑战的话题:如何在 Agent 执行过程中实现“Human-in-the-loop”的双向交互,特别是如何让用户能够实时修改 Agent 所调用工具的参数。

AI Agent,作为我们数字世界的智能助理,正变得越来越强大。它们能够理解复杂的指令,自主规划任务,并调用各种工具来达成目标。然而,一个纯粹自主的 Agent 往往缺乏透明度、可控性和对动态变化的适应性。想象一下,一个 Agent 在执行一个重要任务时,它选择了一个工具,并根据其内部逻辑设定了一组参数。如果这些参数不符合用户的预期,或者外部环境发生了变化,导致这些参数不再最优,但用户却无法干预,那么最终的结果可能并不理想,甚至会造成损失。

这就是“Human-in-the-loop”(HITL,人在环路)概念的核心价值所在。HITL 旨在将人类的判断力、创造力和常识引入到自动化流程中,形成一个闭环协作系统。而我们今天要深入探讨的,是 HITL 中最具挑战性但也最具潜力的一个方面:在 Agent 执行的当口,精确地、实时地介入并修改其工具的调用参数。 这不仅仅是简单的审批或事后修正,而是一种深度、实时的协作,它将 Agent 的自主性与人类的精细控制完美结合,解锁了 Agent 在复杂、动态环境中应用的巨大潜力。

我们将从 Agent 系统的基础架构讲起,分析现有 HITL 模式的局限,然后深入探讨实现实时参数修改的多种技术路径,并辅以详尽的代码示例。最后,我们将讨论相关的架构设计、最佳实践以及未来的挑战。


Agent 系统基础与挑战

在深入实时交互之前,让我们快速回顾一下一个典型 Agent 系统的基本构成。

一个 Agent 通常包含以下核心组件:

  1. 大型语言模型 (LLM) / 大脑: 负责理解指令、进行推理、规划任务、选择工具以及生成响应。它是 Agent 的决策中心。
  2. 记忆 (Memory): 存储短期对话历史、长期知识、任务上下文等,帮助 Agent 维持连贯性和学习。
  3. 工具 (Tools): 外部功能接口,可以是 API 调用、数据库查询、代码执行器、文件操作等。Agent 通过调用这些工具来与外部世界交互并执行具体操作。
  4. 规划与执行循环 (Planning & Execution Loop): Agent 根据 LLM 的推理,生成一个或多个步骤的计划,然后按序执行,并在每一步根据结果进行反思和调整。

工具调用机制的固有特性

Agent 调用工具的基本流程通常是:

  1. LLM 根据当前任务和可用工具的描述,决定调用哪个工具。
  2. LLM 生成调用该工具所需的参数。
  3. Agent 执行器接收到工具名称和参数,然后调用相应的工具函数。
  4. 工具执行完毕,返回结果给 Agent。
  5. Agent 将结果反馈给 LLM 进行下一步的推理。

这里的关键在于,工具参数通常是由 LLM 一次性生成并直接传递给工具的。 在传统的 Agent 设计中,一旦参数生成并开始执行,人类用户很难在工具真正运行之前对其进行干预和修改。如果用户发现参数不合适,通常只能等待工具执行完毕(可能已经造成了不希望的结果),然后通过新的指令尝试引导 Agent 重新执行,或者手动中断整个流程。这显然不是“实时”的干预。

现有 HITL 模式的局限

目前,Agent 领域已经发展出多种 HITL 模式,但它们大多未能满足我们对“实时修改工具参数”的需求:

  • 审批模式 (Approval Loop): Agent 提出一个行动方案(例如,调用哪个工具,做什么),然后等待用户批准。如果用户不批准,Agent 则会重新思考。这种模式通常在较高层次进行干预,例如批准整个工具调用,但很少能深入到修改具体参数的层面,或者修改后 Agent 仍然需要重新生成新的参数。
  • 修正模式 (Correction Loop): Agent 执行了一个操作,用户发现结果不满意,然后提供反馈,Agent 根据反馈进行修正。这是一种事后补救,无法阻止错误的发生。
  • 引导模式 (Guidance Loop): 用户在任务开始前提供详细的指令和约束,引导 Agent 的行为。这是一种预先干预,无法应对执行过程中的突发情况或动态调整。
  • 人机协作模式 (Collaborative Mode): Agent 扮演辅助角色,在某些步骤上主动寻求人类帮助,或者人类可以随时介入并接管某些步骤。这种模式更接近我们的目标,但仍然需要更精细的机制来精确地拦截和修改工具参数。

以上模式,虽然提升了 Agent 的安全性与可靠性,但在“实时、精细化修改工具参数”这一核心需求上,都显得力不从心。它们往往涉及到 Agent 的暂停、重规划甚至重启,而非在工具即将执行的瞬间进行无缝的参数调整。


实时交互的必要性与场景

为什么我们需要在 Agent 执行过程中实时修改其工具参数?

  1. 动态环境变化: 现实世界是不断变化的。Agent 在规划时获取的信息可能在执行时已经过时。例如,一个自动化交易 Agent 在调用“下单”工具时,如果市场价格突然剧烈波动,用户可能需要实时调整交易数量或止损价格。
  2. 用户偏好与上下文的动态演进: 用户可能在 Agent 执行过程中产生了新的想法,或者意识到之前没有充分表达的偏好。例如,一个内容生成 Agent 在创作文章时,用户看到部分草稿后,可能希望实时调整后续段落的风格、关键词密度等参数。
  3. 复杂任务的逐步精炼: 对于多步骤的复杂任务,Agent 无法在开始时就完美预测所有参数。通过实时干预,用户可以逐步引导 Agent,根据中间结果调整后续步骤的参数,实现任务的迭代优化。
  4. 错误发现与即时纠正: Agent 的推理并非万无一失。它可能因理解偏差或信息不全而生成不当的工具参数。用户作为最终决策者,可以在错误发生前及时发现并纠正,避免不必要的损失。
  5. 提升用户信任与满意度: 当用户拥有实时控制权时,他们对 Agent 的信任度会显著提高。这种深度协作模式,让用户感觉自己是 Agent 的共同创造者,而非被动的旁观者。

具体应用场景举例:

场景 Agent 类型 待调用工具示例 需实时修改的参数示例 修改目的
数据分析 数据分析 Agent execute_sql_query SQL 语句中的 WHERE 条件、GROUP BY 字段、LIMIT 调整数据过滤范围,优化聚合粒度,限制结果集大小
plot_data_chart 图表类型、X/Y轴标签、颜色、标题、数据子集 改变可视化方式,突出特定数据,美化图表
代码生成 编程辅助 Agent generate_code_snippet 编程语言、目标框架、代码风格、变量命名约定、模块 适应不同项目规范,调整代码可读性,指定依赖库
refactor_code 重构策略(例如,提取函数、内联变量)、目标范围 精细化重构操作,避免误改,确保代码质量
内容创作 写作/营销 Agent draft_blog_post 目标受众、语气、关键词密度、段落长度、引用来源 调整内容调性,优化 SEO,符合品牌规范
summarize_document 摘要长度、侧重点、关键信息提取模式 获得不同粒度的摘要,突出特定主题
自动化运维 运维 Agent run_shell_command 命令参数、执行路径、目标服务器、超时时间 调整脚本执行细节,避免误操作,确保资源安全
provision_cloud_resource 虚拟机大小、区域、存储类型、网络配置 优化资源配置,控制成本,满足性能需求
智能家居/设备 智能控制 Agent set_device_status 亮度、温度、模式、定时器、联动设备 实时调整设备状态,响应环境变化,满足个性化需求

这些场景都清晰地表明,仅仅靠预设或事后修正,已经无法满足日益复杂的 Agent 应用需求。我们需要一套机制,让 Agent 能够“在执行前停下来,问问人”,并根据人的反馈来调整其行动。


实现实时修改工具参数的技术路径

要实现 Agent 在执行过程中实时修改工具参数,核心思想可以概括为:中断、暴露、修改、恢复。
Agent 在即将调用工具的“前一刻”提供一个可供人类干预的“暂停点”或“拦截点”。在这个点上,Agent 将其即将使用的工具名称和参数暴露给用户界面。用户可以查看、修改这些参数,然后 Agent 使用修改后的参数继续执行。

我们将探讨几种实现这一目标的技术路径。

路径一:基于回调函数/事件机制的拦截

这是最直观的实现方式之一。Agent 的执行循环中,在调用工具的逻辑前后注入回调函数(Callback Function)或触发事件(Event)。这些回调函数可以用来拦截工具调用的参数,并提供给外部进行修改。

概念:

  • 拦截器 (Interceptor): 一个抽象层,定义了在工具执行前、执行后等关键时刻要执行的逻辑。
  • 回调函数: 在特定事件发生时被调用的函数。

实现细节:

  1. 定义一个统一的 Tool 接口或基类,包含工具的名称、描述以及执行方法。
  2. 定义一个 Agent 类,其内部包含一个 LLM 模拟器和工具注册表。
  3. 引入一个 ToolExecutionInterceptor 接口或抽象类,它至少包含一个 before_tool_execution(tool_name, tool_args) 方法。
  4. Agent 的执行循环中,当 LLM 决定调用某个工具时,首先将工具名称和 LLM 推理出的参数传递给 interceptor.before_tool_execution() 方法。
  5. before_tool_execution() 方法负责与用户界面交互,展示参数并等待用户修改。它将返回用户修改后的参数,或者指示 Agent 继续使用原始参数。
  6. Agent 接收到返回的参数后,使用这些参数调用工具。

代码示例 (Python):

import json
import time
from typing import Dict, Any, Callable, List, Optional

# --- 1. 定义工具接口 ---
class Tool:
    def __init__(self, name: str, description: str, func: Callable):
        self.name = name
        self.description = description
        self._func = func

    def run(self, **kwargs) -> Any:
        print(f"[Tool] Executing '{self.name}' with params: {kwargs}")
        try:
            result = self._func(**kwargs)
            print(f"[Tool] '{self.name}' execution successful. Result: {result}")
            return result
        except Exception as e:
            print(f"[Tool] Error executing '{self.name}': {e}")
            raise

    def get_signature(self) -> Dict[str, Any]:
        """获取工具的调用签名,供LLM理解"""
        # 实际场景中,这里会解析 func 的参数,生成OpenAPI或JSON Schema
        # 为了简化,我们假设描述中包含了足够的信息
        return {
            "name": self.name,
            "description": self.description,
            # 真实场景中,这里会有参数的详细schema,例如:
            # "parameters": {
            #     "type": "object",
            #     "properties": {
            #         "query": {"type": "string", "description": "The search query"},
            #         "limit": {"type": "integer", "description": "Number of results"}
            #     },
            #     "required": ["query"]
            # }
        }

# 模拟一些具体工具
def search_web(query: str, num_results: int = 5) -> str:
    """在网络上搜索信息。
    参数:
    - query: 搜索关键词 (string)
    - num_results: 返回结果数量 (integer, 默认5)
    """
    time.sleep(1) # 模拟网络延迟
    if "Python" in query:
        return f"Found 10M results for '{query}'. Top 3: Python official site, PyPI, Real Python. (Limit: {num_results})"
    else:
        return f"Found 500K results for '{query}'. Top 3: Wiki, News, Blog. (Limit: {num_results})"

def write_file(filename: str, content: str, append: bool = False) -> str:
    """将内容写入文件。
    参数:
    - filename: 文件名 (string)
    - content: 要写入的内容 (string)
    - append: 是否以追加模式写入 (boolean, 默认False)
    """
    mode = 'a' if append else 'w'
    with open(filename, mode) as f:
        f.write(content + "n")
    return f"Content {'appended to' if append else 'written to'} '{filename}' successfully."

# --- 2. 定义拦截器接口 ---
class ToolExecutionInterceptor:
    def before_tool_execution(self, tool_name: str, tool_params: Dict[str, Any]) -> Dict[str, Any]:
        """
        在工具执行前被调用,允许修改工具参数。
        :param tool_name: 即将执行的工具名称。
        :param tool_params: LLM生成的原始工具参数。
        :return: 经过修改或确认后的工具参数。
        """
        raise NotImplementedError

# 实现一个命令行交互的拦截器
class ConsoleToolInterceptor(ToolExecutionInterceptor):
    def before_tool_execution(self, tool_name: str, tool_params: Dict[str, Any]) -> Dict[str, Any]:
        print(f"n--- INTERVENTION REQUIRED ---")
        print(f"Agent proposes to call tool: '{tool_name}'")
        print(f"Original parameters: {json.dumps(tool_params, indent=2)}")

        modified_params = tool_params.copy()
        while True:
            action = input("Do you want to (M)odify parameters, (C)ontinue with original, or (A)bort? [M/C/A]: ").strip().lower()

            if action == 'c':
                print("Continuing with original parameters.")
                return tool_params
            elif action == 'a':
                print("Tool execution aborted by user.")
                raise InterruptionException("User aborted tool execution.")
            elif action == 'm':
                print("Enter new parameters in JSON format. Press Enter to skip a parameter.")
                print("Example: {"query": "new search term", "num_results": 10}")
                user_input = input("New parameters (JSON, leave empty to confirm current): ")
                if not user_input.strip():
                    print("No modifications entered. Continuing with current parameters.")
                    return modified_params
                try:
                    new_values = json.loads(user_input)
                    modified_params.update(new_values)
                    print(f"Parameters updated. Current parameters: {json.dumps(modified_params, indent=2)}")
                    confirm = input("Confirm these parameters? (Y/N): ").strip().lower()
                    if confirm == 'y':
                        return modified_params
                    else:
                        # 允许用户再次修改
                        continue
                except json.JSONDecodeError:
                    print("Invalid JSON format. Please try again.")
                except Exception as e:
                    print(f"An error occurred during parameter modification: {e}")
            else:
                print("Invalid action. Please choose M, C, or A.")

class InterruptionException(Exception):
    """用于表示用户中断了Agent的执行"""
    pass

# --- 3. 定义 Agent ---
class SimpleAgent:
    def __init__(self, tools: List[Tool], interceptor: Optional[ToolExecutionInterceptor] = None):
        self.tools = {tool.name: tool for tool in tools}
        self.interceptor = interceptor
        self.tool_descriptions = "n".join([f"- {t.name}: {t.description}" for t in tools])
        self.llm_response_history = [] # 模拟LLM的思考过程

    def _mock_llm_response(self, prompt: str) -> Dict[str, Any]:
        """
        模拟LLM的推理过程,返回工具调用或最终回答。
        为了简化,这里硬编码一些逻辑。
        真实Agent会调用实际的LLM API。
        """
        print(f"n[Agent] LLM thinking based on prompt: '{prompt}'")
        self.llm_response_history.append(prompt)

        # 模拟LLM决定调用工具
        if "search about Python" in prompt:
            tool_name = "search_web"
            tool_params = {"query": "Python programming language", "num_results": 3}
            print(f"[Agent] LLM decides to call '{tool_name}' with {tool_params}")
            return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
        elif "save the search results" in prompt:
            tool_name = "write_file"
            # 这里模拟LLM根据之前搜索结果生成的内容
            content_to_save = "Python is a high-level, interpreted programming language. It is popular for web development, data analysis, AI, and more."
            tool_params = {"filename": "python_info.txt", "content": content_to_save, "append": False}
            print(f"[Agent] LLM decides to call '{tool_name}' with {tool_params}")
            return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
        else:
            print("[Agent] LLM decides to provide a final answer.")
            return {"action": "final_answer", "answer": f"I processed your request about: {prompt}. If you have more questions, let me know."}

    def run(self, initial_prompt: str):
        current_prompt = initial_prompt
        while True:
            llm_output = self._mock_llm_response(current_prompt)

            if llm_output["action"] == "call_tool":
                tool_name = llm_output["tool_name"]
                original_params = llm_output["tool_params"]

                if tool_name not in self.tools:
                    print(f"[Agent Error] Tool '{tool_name}' not found.")
                    current_prompt = f"Error: Tool '{tool_name}' not found. Please re-evaluate."
                    continue

                tool = self.tools[tool_name]
                executed_params = original_params

                if self.interceptor:
                    try:
                        # 重点:在这里调用拦截器,获取用户修改后的参数
                        executed_params = self.interceptor.before_tool_execution(tool_name, original_params)
                        print(f"--- INTERVENTION ENDED ---")
                        print(f"[Agent] Proceeding with parameters: {json.dumps(executed_params, indent=2)}")
                    except InterruptionException as e:
                        print(f"[Agent] Execution aborted: {e}")
                        break # 退出Agent循环
                    except Exception as e:
                        print(f"[Agent Error] Interceptor failed: {e}. Using original parameters.")
                        executed_params = original_params

                try:
                    tool_result = tool.run(**executed_params)
                    current_prompt = f"Tool '{tool_name}' returned: {tool_result}nnBased on this, what should I do next?"
                except Exception as e:
                    current_prompt = f"Tool '{tool_name}' failed with error: {e}. Please re-evaluate."
            elif llm_output["action"] == "final_answer":
                print(f"n[Agent] Final Answer: {llm_output['answer']}")
                break
            else:
                print(f"[Agent Error] Unknown LLM action: {llm_output['action']}")
                break

# --- 运行示例 ---
if __name__ == "__main__":
    web_search_tool = Tool("search_web", search_web.__doc__, search_web)
    file_writer_tool = Tool("write_file", write_file.__doc__, write_file)

    my_interceptor = ConsoleToolInterceptor()
    agent = SimpleAgent(tools=[web_search_tool, file_writer_tool], interceptor=my_interceptor)

    print("--- Agent Execution Start (Scenario 1: Search and Modify) ---")
    agent.run("Please search about Python programming language.")

    print("n" + "="*50 + "n")

    agent_no_interceptor = SimpleAgent(tools=[web_search_tool, file_writer_tool])
    print("--- Agent Execution Start (Scenario 2: Search without Interceptor) ---")
    agent_no_interceptor.run("Please search about Python programming language.")

    print("n" + "="*50 + "n")

    agent_with_interceptor_2 = SimpleAgent(tools=[web_search_tool, file_writer_tool], interceptor=my_interceptor)
    print("--- Agent Execution Start (Scenario 3: Write File and Modify) ---")
    agent_with_interceptor_2.run("Search about Python, then save the search results to a file.")

示例运行说明:

  1. agent.run("Please search about Python programming language.") 被调用时,模拟 LLM 会决定调用 search_web 工具,并生成参数 {"query": "Python programming language", "num_results": 3}
  2. SimpleAgent 在调用 search_web.run() 之前,会调用 my_interceptor.before_tool_execution("search_web", {"query": ..., "num_results": ...})
  3. ConsoleToolInterceptor 会在控制台打印这些参数,并提示用户选择(修改/继续/中止)。
  4. 如果用户选择 M 并输入 {"num_results": 1},那么 search_web 工具最终会以 num_results=1 的参数执行。
  5. 如果用户选择 A,则 Agent 会中断执行。

挑战与改进:

  • 实时性与等待输入: 在上述控制台示例中,input() 函数会阻塞整个 Agent 的执行。在真实的异步系统中,before_tool_execution 不应该阻塞,而是向前端发送一个事件,然后 Agent 自身进入等待状态,直到收到前端返回的修改参数(或确认)事件。
  • 用户界面 (UI) 集成: ConsoleToolInterceptor 显然无法用于图形界面。我们需要将这个拦截器与一个 Web 前端(例如使用 React/Vue/Angular)或桌面应用(PyQt/Tkinter)连接起来,通过 WebSocket 或 HTTP 长轮询等方式进行实时通信。
  • 参数校验: 用户可能输入无效的参数(例如,num_results 应该是整数但输入了字符串)。拦截器需要包含参数校验逻辑。

路径二:基于协程/异步编程的“暂停-恢复”

为了解决路径一中“实时性与等待输入”的挑战,异步编程(特别是 Python 中的 asyncio 和协程)提供了一个优雅的解决方案。它允许 Agent 的执行在等待用户输入时“挂起”而不阻塞整个应用程序,并在收到用户输入后“恢复”执行。

概念:

  • 协程 (Coroutine): 一种可以暂停和恢复执行的函数,允许在等待某个操作完成时切换到其他任务。
  • async/await: Python 中用于编写异步代码的关键字。
  • 事件循环 (Event Loop): 异步编程的核心,负责调度和执行协程。

实现细节:

  1. 将 Agent 的核心执行逻辑(包括工具调用)改为异步协程。
  2. 定义一个异步的 UserInteractionManager,它负责与前端(或模拟前端)进行通信。
  3. 当 Agent 需要用户干预时,它 await user_interaction_manager.request_tool_param_modification(...)
  4. request_tool_param_modification 方法会向前端发送参数,并等待前端通过异步通道(如 WebSocket)返回修改后的参数。在这个等待期间,Agent 的协程被挂起,但事件循环可以继续处理其他任务(例如其他 Agent 的执行、UI 事件等)。
  5. 一旦收到用户反馈,request_tool_param_modification 协程恢复执行,并返回修改后的参数给 Agent。

代码示例 (Python with asyncio):

import asyncio
import json
import time
from typing import Dict, Any, Callable, List, Optional, Tuple

# --- 1. 定义异步工具接口 ---
class AsyncTool:
    def __init__(self, name: str, description: str, func: Callable):
        self.name = name
        self.description = description
        self._func = func

    async def run(self, **kwargs) -> Any:
        print(f"[AsyncTool] Executing '{self.name}' with params: {kwargs}")
        try:
            # 假设工具本身可能是异步的,或者需要模拟异步操作
            if asyncio.iscoroutinefunction(self._func):
                result = await self._func(**kwargs)
            else:
                # 如果是同步函数,可以在线程池中运行以避免阻塞事件循环
                # 这里为了简化,直接调用,但真实场景应使用 run_in_executor
                result = self._func(**kwargs)
            print(f"[AsyncTool] '{self.name}' execution successful. Result: {result}")
            return result
        except Exception as e:
            print(f"[AsyncTool] Error executing '{self.name}': {e}")
            raise

    def get_signature(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "description": self.description,
        }

# 模拟异步工具函数
async def async_search_web(query: str, num_results: int = 5) -> str:
    """在网络上异步搜索信息。
    参数:
    - query: 搜索关键词 (string)
    - num_results: 返回结果数量 (integer, 默认5)
    """
    await asyncio.sleep(1) # 模拟异步I/O延迟
    if "Python" in query:
        return f"Found 10M results for '{query}'. Top 3: Python official site, PyPI, Real Python. (Limit: {num_results})"
    else:
        return f"Found 500K results for '{query}'. Top 3: Wiki, News, Blog. (Limit: {num_results})"

# 同步函数也可以被 AsyncTool 包装,但最好在 run_in_executor 中调用
def sync_write_file(filename: str, content: str, append: bool = False) -> str:
    """将内容写入文件。"""
    mode = 'a' if append else 'w'
    with open(filename, mode) as f:
        f.write(content + "n")
    return f"Content {'appended to' if append else 'written to'} '{filename}' successfully."

# --- 2. 定义异步的用户交互管理器 ---
class AsyncUserInteractionManager:
    def __init__(self):
        # 使用 asyncio.Queue 来模拟前端到后端的消息传递
        # 实际应用中,这里会是 WebSocket 或其他异步通信机制
        self._input_queue = asyncio.Queue()
        self._output_queue = asyncio.Queue() # 模拟后端到前端的输出

    async def request_tool_param_modification(self, tool_name: str, tool_params: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]:
        """
        异步请求用户修改工具参数。
        :return: (修改后的参数, 是否中止)
        """
        request_id = time.time() # 模拟请求ID,用于匹配响应
        request_data = {
            "type": "tool_param_request",
            "request_id": request_id,
            "tool_name": tool_name,
            "original_params": tool_params
        }
        await self._output_queue.put(request_data) # 发送请求给前端模拟器

        print(f"n[UserInteractionManager] Sent request {request_id} for tool '{tool_name}'. Waiting for user input...")

        # 挂起,直到从输入队列中收到匹配的响应
        while True:
            response = await self._input_queue.get()
            if response.get("request_id") == request_id:
                if response.get("action") == "modify":
                    print(f"[UserInteractionManager] Received modified params for {tool_name}: {response['modified_params']}")
                    return response["modified_params"], False
                elif response.get("action") == "continue":
                    print(f"[UserInteractionManager] User chose to continue with original params for {tool_name}.")
                    return tool_params, False
                elif response.get("action") == "abort":
                    print(f"[UserInteractionManager] User chose to abort for {tool_name}.")
                    return {}, True # 返回空参数并指示中止
            else:
                # 将不匹配的响应重新放回队列,或处理为其他类型的消息
                await self._input_queue.put(response) # 简单处理,实际应有更健壮的机制

    async def _simulate_frontend_input(self):
        """
        模拟前端接收请求并发送响应的后台任务。
        在真实系统中,这部分是前端UI的逻辑。
        """
        while True:
            request = await self._output_queue.get()
            if request["type"] == "tool_param_request":
                tool_name = request["tool_name"]
                original_params = request["original_params"]
                request_id = request["request_id"]

                print(f"n--- FRONTEND UI (Simulated) ---")
                print(f"Agent wants to call tool: '{tool_name}'")
                print(f"Original parameters: {json.dumps(original_params, indent=2)}")

                modified_params = original_params.copy()
                response_action = "continue"

                action = input("UI: Do you want to (M)odify, (C)ontinue, (A)bort? [M/C/A]: ").strip().lower()
                if action == 'm':
                    user_input = input("UI: Enter new parameters (JSON): ")
                    try:
                        new_values = json.loads(user_input)
                        modified_params.update(new_values)
                        print(f"UI: Parameters updated to: {json.dumps(modified_params, indent=2)}")
                        response_action = "modify"
                    except json.JSONDecodeError:
                        print("UI: Invalid JSON. Will continue with original.")
                        response_action = "continue"
                elif action == 'a':
                    response_action = "abort"

                await self._input_queue.put({
                    "type": "tool_param_response",
                    "request_id": request_id,
                    "action": response_action,
                    "modified_params": modified_params
                })
            await asyncio.sleep(0.1) # 小暂停,防止忙循环

# --- 3. 定义异步 Agent ---
class AsyncAgent:
    def __init__(self, tools: List[AsyncTool], interaction_manager: Optional[AsyncUserInteractionManager] = None):
        self.tools = {tool.name: tool for tool in tools}
        self.interaction_manager = interaction_manager
        self.tool_descriptions = "n".join([f"- {t.name}: {t.description}" for t in tools])
        self.llm_response_history = [] # 模拟LLM的思考过程

    async def _mock_llm_response(self, prompt: str) -> Dict[str, Any]:
        """异步模拟LLM的推理过程"""
        print(f"n[AsyncAgent] LLM thinking based on prompt: '{prompt}'")
        self.llm_response_history.append(prompt)
        await asyncio.sleep(0.1) # 模拟LLM思考延迟

        if "search about Python" in prompt:
            tool_name = "async_search_web"
            tool_params = {"query": "Python programming language", "num_results": 3}
            print(f"[AsyncAgent] LLM decides to call '{tool_name}' with {tool_params}")
            return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
        elif "save the search results" in prompt:
            tool_name = "sync_write_file" # 注意这里是同步函数包装的工具
            content_to_save = "Python is a high-level, interpreted programming language. It is popular for web development, data analysis, AI, and more."
            tool_params = {"filename": "python_info_async.txt", "content": content_to_save, "append": False}
            print(f"[AsyncAgent] LLM decides to call '{tool_name}' with {tool_params}")
            return {"action": "call_tool", "tool_name": tool_name, "tool_params": tool_params}
        else:
            print("[AsyncAgent] LLM decides to provide a final answer.")
            return {"action": "final_answer", "answer": f"I processed your request about: {prompt}. If you have more questions, let me know."}

    async def run(self, initial_prompt: str):
        current_prompt = initial_prompt
        while True:
            llm_output = await self._mock_llm_response(current_prompt)

            if llm_output["action"] == "call_tool":
                tool_name = llm_output["tool_name"]
                original_params = llm_output["tool_params"]

                if tool_name not in self.tools:
                    print(f"[AsyncAgent Error] Tool '{tool_name}' not found.")
                    current_prompt = f"Error: Tool '{tool_name}' not found. Please re-evaluate."
                    continue

                tool = self.tools[tool_name]
                executed_params = original_params

                if self.interaction_manager:
                    try:
                        # 重点:异步等待用户输入
                        modified_params, should_abort = await self.interaction_manager.request_tool_param_modification(tool_name, original_params)
                        if should_abort:
                            print(f"[AsyncAgent] Execution aborted by user for tool '{tool_name}'.")
                            break # 退出Agent循环
                        executed_params = modified_params
                        print(f"[AsyncAgent] Proceeding with parameters: {json.dumps(executed_params, indent=2)}")
                    except Exception as e:
                        print(f"[AsyncAgent Error] User interaction failed: {e}. Using original parameters.")
                        executed_params = original_params

                try:
                    tool_result = await tool.run(**executed_params) # 异步执行工具
                    current_prompt = f"Tool '{tool_name}' returned: {tool_result}nnBased on this, what should I do next?"
                except Exception as e:
                    current_prompt = f"Tool '{tool_name}' failed with error: {e}. Please re-evaluate."
            elif llm_output["action"] == "final_answer":
                print(f"n[AsyncAgent] Final Answer: {llm_output['answer']}")
                break
            else:
                print(f"[AsyncAgent Error] Unknown LLM action: {llm_output['action']}")
                break

# --- 运行示例 ---
async def main():
    web_search_tool = AsyncTool("async_search_web", async_search_web.__doc__, async_search_web)
    file_writer_tool = AsyncTool("sync_write_file", sync_write_file.__doc__, sync_write_file)

    interaction_manager = AsyncUserInteractionManager()
    agent = AsyncAgent(tools=[web_search_tool, file_writer_tool], interaction_manager=interaction_manager)

    # 启动模拟前端的后台任务
    frontend_task = asyncio.create_task(interaction_manager._simulate_frontend_input())

    print("--- Async Agent Execution Start (Scenario: Search and Modify) ---")
    await agent.run("Please search about Python programming language.")

    print("n" + "="*50 + "n")

    # 重新初始化Agent和InteractionManager,避免状态污染
    interaction_manager_2 = AsyncUserInteractionManager()
    agent_2 = AsyncAgent(tools=[web_search_tool, file_writer_tool], interaction_manager=interaction_manager_2)
    frontend_task_2 = asyncio.create_task(interaction_manager_2._simulate_frontend_input())

    print("--- Async Agent Execution Start (Scenario: Write File and Modify) ---")
    await agent_2.run("Search about Python, then save the search results to a file.")

    # 取消模拟前端任务
    frontend_task.cancel()
    frontend_task_2.cancel()
    try:
        await frontend_task
        await frontend_task_2
    except asyncio.CancelledError:
        pass # 正常取消

if __name__ == "__main__":
    asyncio.run(main())

示例运行说明:

  1. AsyncAgent.run() 是一个 async 函数,在其中调用 _mock_llm_responsetool.run 都是 await 操作。
  2. 关键在于 await self.interaction_manager.request_tool_param_modification(...)。当 Agent 调用此方法时,它会向 _output_queue 发送请求,然后 await_input_queue 接收响应。
  3. _simulate_frontend_input 任务在后台运行,不断从 _output_queue 接收请求。当它收到请求时,会模拟控制台交互,等待用户输入,然后将用户选择(修改后的参数、继续或中止)放入 _input_queue
  4. _input_queue 收到响应时,request_tool_param_modification 协程就会被唤醒,Agent 拿到用户修改后的参数继续执行。
  5. 整个过程中,事件循环保持运行,没有被 input() 阻塞,体现了异步的优势。

与前端集成:

在实际项目中,AsyncUserInteractionManager_input_queue_output_queue 将被替换为真正的异步通信机制:

  • WebSockets: 最常见的选择。前端通过 WebSocket 连接到后端,后端可以将工具参数实时推送到前端,前端用户修改后通过 WebSocket 将新参数发回。FastAPI 配合 websockets 库可以轻松实现。
  • Server-Sent Events (SSE): 如果主要是后端向前端推送信息(例如 Agent 状态、工具参数),而前端只需要偶尔回传少量数据,SSE 也是一个选项。
  • HTTP 长轮询/短轮询: 效率较低,但对于不需要极高实时性的场景,可以通过前端定时发送 HTTP 请求来获取 Agent 状态。

路径三:基于可序列化执行计划的修改

前两种路径主要是在“工具调用前”进行拦截。第三种路径则更进一步,允许 Agent 在生成执行计划后,但在执行计划中的每一步之前,都提供给用户修改的机会。这不仅可以修改当前步骤的工具参数,甚至可以修改、插入、删除整个计划中的步骤。

概念:

  • 执行计划 (Execution Plan): Agent 的 LLM 不仅仅生成一个工具调用,而是生成一个由多个 ToolCallAction 对象组成的序列。
  • 可序列化 (Serializable): 计划能够被转换成 JSON 或其他结构化格式,以便传输和存储。
  • 迭代执行: Agent 逐个执行计划中的步骤,并在每一步执行前暂停等待用户确认或修改。

实现细节:

  1. 定义一个 ActionToolCall 数据结构,包含工具名称、参数、预期结果等。
  2. Agent 的 LLM 阶段输出一个 List[Action] 作为执行计划。
  3. Agent 执行器不再是简单的循环推理,而是循环遍历这个执行计划。
  4. 在执行计划中的每一步之前,Agent 将当前 Action 对象及其参数暴露给用户。
  5. 用户可以:
    • 修改当前 Action 的参数。
    • 跳过当前 Action
    • 在当前 Action 之前/之后插入新的 Action
    • 删除后续的 Action
  6. Agent 根据用户的修改更新执行计划,然后继续执行。

代码示例 (Python):

import asyncio
import json
import uuid
from typing import Dict, Any, List, Literal, Optional, Callable

# 假设 AsyncTool 和 async_search_web, sync_write_file 已经定义

# --- 1. 定义可序列化的 Action / ToolCall 结构 ---
class ToolCall:
    def __init__(self, tool_name: str, params: Dict[str, Any], call_id: Optional[str] = None):
        self.call_id = call_id if call_id else str(uuid.uuid4())
        self.tool_name = tool_name
        self.params = params
        self.status: Literal["pending", "executing", "completed", "failed", "skipped"] = "pending"
        self.result: Optional[Any] = None
        self.error: Optional[str] = None

    def to_dict(self):
        return {
            "call_id": self.call_id,
            "tool_name": self.tool_name,
            "params": self.params,
            "status": self.status,
            "result": str(self.result) if self.result is not None else None,
            "error": self.error
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]):
        tc = cls(data["tool_name"], data["params"], data["call_id"])
        tc.status = data.get("status", "pending")
        tc.result = data.get("result")
        tc.error = data.get("error")
        return tc

# --- 2. 异步用户交互管理器 (与路径二类似,但处理更复杂的计划修改) ---
class AsyncPlanInteractionManager:
    def __init__(self):
        self._input_queue = asyncio.Queue()
        self._output_queue = asyncio.Queue()

    async def request_plan_step_modification(self, current_step_index: int, total_steps: int, current_call: ToolCall, full_plan_dicts: List[Dict[str, Any]]) -> Tuple[ToolCall, List[ToolCall], bool]:
        """
        异步请求用户修改当前计划步骤或整个计划。
        :param current_step_index: 当前步骤的索引。
        :param total_steps: 总步骤数。
        :param current_call: 当前即将执行的 ToolCall 对象。
        :param full_plan_dicts: 整个计划(ToolCall dicts列表),供用户查看。
        :return: (修改后的当前ToolCall, 修改后的整个计划列表, 是否中止)
        """
        request_id = str(uuid.uuid4())
        request_data = {
            "type": "plan_step_request",
            "request_id": request_id,
            "current_step_index": current_step_index,
            "total_steps": total_steps,
            "current_call": current_call.to_dict(),
            "full_plan": full_plan_dicts
        }
        await self._output_queue.put(request_data)

        print(f"n[PlanInteractionManager] Sent plan step request {request_id} for step {current_step_index+1}/{total_steps}. Waiting for user input...")

        while True:
            response = await self._input_queue.get()
            if response.get("request_id") == request_id:
                action = response.get("action")
                if action == "modify_current":
                    modified_call_data = response["modified_call"]
                    modified_call = ToolCall.from_dict(modified_call_data)
                    print(f"[PlanInteractionManager] Received modified current call: {modified_call.params}")
                    return modified_call, [ToolCall.from_dict(d) for d in response["modified_full_plan"]], False
                elif action == "continue":
                    print(f"[PlanInteractionManager] User chose to continue with original current step.")
                    return current_call, [ToolCall.from_dict(d) for d in response["modified_full_plan"]], False
                elif action == "abort":
                    print(f"[PlanInteractionManager] User chose to abort plan execution.")
                    return current_call, [], True
                elif action == "modify_plan_only": # 用户只修改了计划,但当前步骤参数不变
                    print(f"[PlanInteractionManager] User modified full plan, but current step params remain.")
                    return current_call, [ToolCall.from_dict(d) for d in response["modified_full_plan"]], False
            else:
                await self._input_queue.put(response) # Put back if not for this request

    async def _simulate_frontend_plan_input(self):
        """
        模拟前端接收计划步骤请求并发送响应的后台任务。
        """
        while True:
            request = await self._output_queue.get()
            if request["type"] == "plan_step_request":
                current_step_index = request["current_step_index"]
                total_steps = request["total_steps"]
                current_call_data = request["current_call"]
                full_plan_data = request["full_plan"]
                request_id = request["request_id"]

                print(f"n--- FRONTEND UI (Simulated Plan Editor) ---")
                print(f"Agent Plan Step {current_step_index+1}/{total_steps}:")
                print(f"Current Tool Call: {json.dumps(current_call_data, indent=2)}")
                print(f"nFull Plan:")
                for i, step in enumerate(full_plan_data):
                    status_marker = "->" if i == current_step_index else "  "
                    print(f"{status_marker} Step {i+1}: {step['tool_name']} with {json.dumps(step['params'])}")

                response_action = "continue"
                modified_call_data_to_send = current_call_data
                modified_full_plan_to_send = full_plan_data

                action = input(
                    "UI: Do you want to (M)odify CURRENT step parameters, "
                    "(P)lan edit (add/delete/reorder), (C)ontinue, (A)bort? [M/P/C/A]: "
                ).strip().lower()

                if action == 'm':
                    user_input = input("UI: Enter new parameters for current step (JSON): ")
                    try:
                        new_values = json.loads(user_input)
                        modified_call_data_to_send["params"].update(new_values)
                        print(f"UI: Current step parameters updated to: {json.dumps(modified_call_data_to_send['params'], indent=2)}")
                        response_action = "modify_current"
                        # 同时更新整个计划中的这一步
                        modified_full_plan_to_send[current_step_index] = modified_call_data_to_send
                    except json.JSONDecodeError:
                        print("UI: Invalid JSON. Will continue with original current step.")
                elif action == 'p':
                    print("UI: Plan editing not fully implemented in this simulation, but imagine an interface here.")
                    print("UI: For now, you can try to modify the full plan JSON directly.")
                    plan_input = input("UI: Enter FULL modified plan (JSON array of tool calls): ")
                    try:
                        new_plan_data = json.loads(plan_input)
                        if isinstance(new_plan_data, list):
                            modified_full_plan_to_send = new_plan_data
                            # If current step is still valid, update it from new plan
                            if current_step_index < len(new_plan_data):
                                modified_call_data_to_send = new_plan_data[current_step_index]
                            response_action = "modify_plan_only"
                        else:
                            print("UI: Invalid plan format (not a JSON array).")
                    except json.JSONDecodeError:
                        print("UI: Invalid JSON for plan.")
                elif action == 'a':
                    response_action = "abort"

                await self._input_queue.put({
                    "type": "plan_step_response",
                    "request_id": request_id,
                    "action": response_action,
                    "modified_call": modified_call_data_to_send,
                    "modified_full_plan": modified_full_plan_to_send
                })
            await asyncio.sleep(0.1)

# --- 3. 定义支持计划的异步 Agent ---
class AsyncPlanningAgent:
    def __init__(self, tools: List[AsyncTool], interaction_manager: Optional[AsyncPlanInteractionManager] = None):
        self.tools = {tool.name: tool for tool in tools}
        self.interaction_manager = interaction_manager
        self.tool_descriptions = "n".join([f"- {t.name}: {t.description}" for t in tools])
        self.llm_response_history = []

    async def _mock_llm_plan(self, prompt: str) -> List[ToolCall]:
        """
        模拟LLM生成一个多步骤的执行计划。
        真实Agent会调用实际的LLM API来生成结构化输出。
        """
        print(f"n[AsyncPlanningAgent] LLM planning based on prompt: '{prompt}'")
        await asyncio.sleep(0.1)

        if "search about Python and save results" in prompt:
            return [
                ToolCall("async_search_web", {"query": "Python programming language", "num_results": 3}),
                ToolCall("sync_write_file", {"filename": "python_summary.txt", "content": "PLACEHOLDER_FOR_SEARCH_RESULT", "append": False})
            ]
        elif "analyze market data" in prompt:
            return [
                ToolCall("get_market_data", {"symbol": "AAPL", "period": "1d"}),
                ToolCall("analyze_trends", {"data": "PLACEHOLDER_FOR_MARKET_DATA", "metric": "RSI"})
            ]
        else:
            print("[AsyncPlanningAgent] LLM generated an empty or simple plan.")
            return []

    async def run(self, initial_prompt: str):
        # 1. LLM生成初始计划
        current_plan = await self._mock_llm_plan(initial_prompt)

        # 模拟LLM将搜索结果填充到写文件工具的参数中
        if len(current_plan) > 1 and current_plan[0].tool_name == "async_search_web" and "PLACEHOLDER_FOR_SEARCH_RESULT" in current_plan[1].params.get("content", ""):
            # 简化处理,实际LLM会更智能
            temp_result = await self.tools[current_plan[0].tool_name].run(**current_plan[0].params)
            current_plan[0].status = "completed"
            current_plan[0].result = temp_result
            current_plan[1].params["content"] = f"Search results for {current_plan[0].params['query']}:n{temp_result}"

        step_index = 0
        while step_index < len(current_plan):
            current_call = current_plan[step_index]

            if current_call.status != "pending": # 如果已经执行过或被跳过,则直接到下一步
                step_index += 1
                continue

            executed_call = current_call

            if self.interaction_manager:
                try:
                    modified_call, new_full_plan, should_abort = await self.interaction_manager.request_plan_step_modification(
                        step_index, len(current_plan), current_call, [tc.to_dict() for tc in current_plan]
                    )
                    if should_abort:
                        print(f"[AsyncPlanningAgent] Execution aborted by user.")
                        break

                    # 更新当前步骤和整个计划
                    executed_call = modified_call
                    current_plan = new_full_plan # 用户可能修改了整个计划
                    if step_index >= len(current_plan): # 如果当前步骤被删除
                        print("[AsyncPlanningAgent] Current step was removed from plan. Advancing to next valid step.")
                        continue # 直接跳到下一个循环,重新检查 step_index

                    # 确保当前执行的 call 是 plan 中对应位置的 call
                    # (用户可能在UI中修改了整个plan,导致某个call的id不再匹配)
                    # 这里的逻辑需要根据UI如何返回修改进行调整
                    current_plan[step_index] = executed_call # 确保计划中的当前步骤是最新的

                    print(f"[AsyncPlanningAgent] Proceeding with current step (modified: {executed_call.params}) and potentially new plan.")

                except Exception as e:
                    print(f"[AsyncPlanningAgent Error] Plan interaction failed: {e}. Using original step.")
                    # 如果交互失败,保持原始计划和参数

            if executed_call.tool_name not in self.tools:
                print(f"[AsyncPlanningAgent Error] Tool '{executed_call.tool_name}' not found for step {step_index+1}.")
                executed_call.status = "failed"
                executed_call.error = "Tool not found"
                step_index += 1
                continue

            tool = self.tools[executed_call.tool_name]
            try:
                executed_call.status = "executing"
                tool_result = await tool.run(**executed_call.params)
                executed_call.result = tool_result
                executed_call.status = "completed"
                print(f"[AsyncPlanningAgent] Step {step_index+1} completed. Result: {tool_result}")

                # 可以在这里根据结果更新后续计划的PLACEHOLDER,或交由LLM重新规划
                # 例如,如果下一个步骤的参数依赖于当前步骤的结果
                if step_index + 1 < len(current_plan):
                    next_call = current_plan[step_index + 1]
                    if "PLACEHOLDER_FOR_SEARCH_RESULT" in next_call.params.get("content", ""):
                        next_call.params["content"] = f"Search results from previous step:n{tool_result}"

            except Exception as e:
                executed_call.error = str(e)
                executed_call.status = "failed"
                print(f"[AsyncPlanningAgent Error] Step {step_index+1} failed: {e}")
                # 错误处理:是否中断?是否让LLM重新规划?这里简单继续

            step_index += 1

        print(f"n[AsyncPlanningAgent] Plan execution finished.")
        print("Final Plan Status:")
        for tc in current_plan:
            print(f"- {tc.tool_name} (ID: {tc.call_id[:4]}...) Status: {tc.status}, Result: {tc.result[:50] if tc.result else 'N/A'}, Error: {tc.error if tc.error else 'N/A'}")

# --- 运行示例 ---
async def main_planning_agent():
    web_search_tool = AsyncTool("async_search_web", async_search_web.__doc__, async_search_web)
    file_writer_tool = AsyncTool("sync_write_file", sync_write_file.__doc__, sync_write_file)

    plan_interaction_manager = AsyncPlanInteractionManager()
    planning_agent = AsyncPlanningAgent(tools=[web_search_tool, file_writer_tool], interaction_manager=plan_interaction_manager)

    frontend_plan_task = asyncio.create_task(plan_interaction_manager._simulate_frontend_plan_input())

    print("--- Async Planning Agent Execution Start ---")
    await planning_agent.run("Please search about Python programming language and save the results to a file.")

    frontend_plan_task.cancel()
    try:
        await frontend_plan_task
    except asyncio.CancelledError:
        pass

if __name__ == "__main__":
    asyncio.run(main_planning_agent())

示例运行说明:

  1. AsyncPlanningAgent 首先调用 _mock_llm_plan 模拟 LLM 生成一个包含 ToolCall 对象的列表作为初始计划。
  2. run 方法会迭代执行这个计划。在执行每个 ToolCall 之前,它会调用 interaction_manager.request_plan_step_modification
  3. AsyncPlanInteractionManager 的模拟前端会展示当前步骤的详细信息以及整个计划的概览。用户可以选择修改当前步骤的参数,甚至(在更完整的实现中)添加、删除、重新排序计划中的步骤。
  4. 用户输入修改后,Agent 会更新 current_plan,然后使用修改后的参数执行当前步骤,并继续下一个步骤。

优势与挑战:

  • 更强的控制力: 用户不仅可以修改参数,还可以对整个执行流程进行精细化调整。
  • 可解释性: 计划的展示让用户更清楚 Agent 的意图。
  • 复杂性增加: LLM 需要生成结构化、可解析、可修改的计划。用户界面需要更复杂才能提供强大的计划编辑功能。Agent 需要更智能地处理用户对计划的修改(例如,如果用户删除了一个关键步骤,Agent 是否需要重新规划?)。

路径四:结合 LangChain/LlamaIndex 等框架的实现

主流的 Agent 开发框架,如 LangChain 和 LlamaIndex,都提供了强大的回调(Callback)或观察者(Observer)机制,天然支持我们在 Agent 运行时的干预。

LangChain 为例,它提供了 BaseCallbackHandler 接口,允许开发者在 Agent 链(Chain)的各个阶段注入自定义逻辑,包括工具的开始、结束、LLM 的生成等。

实现细节:

  1. 创建一个继承自 langchain_core.callbacks.BaseCallbackHandler 的自定义回调处理器。
  2. 重写 on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any 方法。这个方法会在 Agent 决定调用工具时被调用,input_str 通常包含 LLM 推理出的工具名称和参数(例如,以 ReAct 格式的字符串)。
  3. 在这个方法中,解析 input_str 提取工具名称和参数。
  4. 然后,同样地,通过异步通信机制与用户界面交互,等待用户修改参数。
  5. 关键在于,on_tool_start 方法的返回类型是 Any。如果返回一个值,它通常会被忽略或用于日志。为了实现参数修改,我们通常会:
    • on_tool_start触发一个异步事件,让 Agent 暂停(例如,通过 asyncio.Eventasyncio.Queue)。
    • 在 Agent 的执行逻辑中(例如,一个自定义的 Agent Executor),在调用 tool.run() 之前,检查是否有用户干预事件被触发,并等待其完成,获取修改后的参数。
    • 或者,更直接地,自定义 Agent Executor 的 _call 方法,在调用 self.tools[tool_name].run() 前,显式调用一个交互函数。

代码示例 (LangChain 风格的概念性代码):

import asyncio
import json
from typing import Any, Dict, List, Optional, Tuple, Callable

# 假设 AsyncTool, async_search_web, sync_write_file 已经定义
# 模拟 LangChain 的工具定义
class LangChainToolWrapper:
    def __init__(self, tool_instance: AsyncTool):
        self._tool = tool_instance
        self.name = tool_instance.name
        self.description = tool_instance.description

    async def _run(self, tool_input: str) -> str:
        # LangChain 的工具通常接收一个字符串作为输入,需要自行解析
        # 这里简化为直接调用
        try:
            params = json.loads(tool_input) # 假设输入是JSON字符串
            result = await self._tool.run(**params)
            return str(result)
        except json.JSONDecodeError:
            return f"Error: Invalid JSON input for tool {self.name}: {tool_input}"
        except Exception as e:
            return f"Error executing tool {self.name}: {e}"

# 模拟 LangChain AgentExecutor
class MockLangChainAgentExecutor:
    def __init__(self, tools: List[LangChainToolWrapper], callback_handler: Optional[Any] = None, user_interaction_manager: Optional[Any] = None):
        self.tools = {tool.name: tool for tool in tools}
        self.callback_handler = callback_handler
        self.user_interaction_manager = user_interaction_manager
        self.llm_mock_response_queue = asyncio.Queue() # 模拟LLM输出

    async def _mock_llm_step(self, intermediate_steps: List[Tuple[Dict[str, Any], str]]) -> str:
        """
        模拟LLM的思考步骤,生成工具调用或最终回答。
        为了简化,这里使用预设的LLM输出。
        """
        if not intermediate_steps:
            # 第一次调用,模拟LLM决定搜索
            llm_output = json.dumps({"tool": "async_search_web", "tool_input": json.dumps({"query": "Python programming language", "num_results": 3})})
            return f"Thought: I need to search about Python. Action: {llm_output}"
        else:
            # 模拟LLM根据之前结果进行下一步,例如保存文件
            last_tool_output = intermediate_steps[-1][1]
            if "Python official site" in last_tool_output:
                content_to_save = f"Search results summary: {last_tool_output[:100]}..."
                llm_output = json.dumps({"tool": "sync_write_file", "tool_input": json.dumps({"filename": "lc_python_info.txt", "content": content_to_save, "append": False})})
                return f"Thought: I have the search results, now I should save them. Action: {llm_output}"
            else:
                return "Thought: I have completed the task. Final Answer: Task finished."

    async def run(self, input_text: str) -> Dict[str, Any]:
        intermediate_steps = []
        while True:
            llm_response_str = await self._mock_llm_step(intermediate_steps)
            print(f"n[MockAgentExecutor] LLM Response: {llm_response_str}")

            if "Final Answer:" in llm_response_str:
                return {"output": llm_response_str.split("Final Answer:")[1].strip()}

            # 解析LLM的Action
            try:
                # 假设LLM输出的Action部分是 "Action: {"tool": "tool_name", "tool_input": "..."}"
                action_str = llm_response_str.split("Action:")[1].strip()
                action_data = json.loads(action_str)
                tool_name = action_data["tool"]
                tool_input_str = action_data["tool_input"] # 这是一个JSON字符串,需要再次解析
                original_params = json.loads(tool_input_str)
            except (IndexError, json.JSONDecodeError, KeyError) as e:
                print(f"[MockAgentExecutor Error] Failed to parse LLM action: {e}. LLM response: {llm_response_str}")
                intermediate_steps.append(({"tool": "parse_error"}, f"Error parsing LLM response: {e}"))
                continue

            tool = self.tools.get(tool_name)
            if not tool:
                error_msg = f"Tool '{tool_name}' not found."
                print(f"[MockAgentExecutor Error] {error_msg}")
                intermediate_steps.append(({"tool": tool_name, "tool_input": tool_input_str}, error_msg))
                continue

            # --- 关键的拦截点 ---
            modified_params = original_params
            should_abort = False
            if self.user_interaction_manager:
                try:
                    # 将工具名称和原始参数传递给交互管理器
                    modified_params, should_abort = await self.user_interaction_manager.request_tool_param_modification(tool_name, original_params)
                    if should_abort:
                        print("[MockAgentExecutor] User aborted tool execution.")
                        return {"output": "Task aborted by user."}
                except Exception as e:
                    print(f"[MockAgentExecutor Error] Interaction failed: {e}. Using original parameters.")

            # 重新打包参数为工具需要的字符串格式
            final_tool_input_str = json.dumps(modified_params)

            # 模拟回调处理器的 on_tool_start
            if self.callback_handler:
                # 这里的serialized通常是工具的元数据,input_str是工具的输入
                # LangChain回调通常不会直接返回修改后的参数,而是用于副作用 (logging, UI更新)
                # 所以我们依赖 user_interaction_manager 来修改参数
                await self.callback_handler.on_tool_start(
                    serialized={"name": tool_name, "description": tool.description}, 
                    input_str=final_tool_input_str, 
                    tool_name=tool_name, 
                    tool_input=final_tool_input_str # 传递修改后的参数
                )

            tool_output = await tool._run(final_tool_input_str)

            if self.callback_handler:
                await self.callback_handler.on_tool_end(tool_output, tool_name=tool_name, input_str=final_tool_input_str)

            intermediate_steps.append(({"tool": tool_name, "tool_input": final_tool_input_str}, tool_output))
            print(f"[MockAgentExecutor] Tool '{tool_name}' returned: {tool_output[:100]}...")

# 模拟 LangChain 的 BaseCallbackHandler
from langchain_core.callbacks import BaseCallbackHandler

class CustomLangChainCallbackHandler(BaseCallbackHandler):
    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
        print(f"n[Callback] Tool '{serialized['name']}' started with input: {input_str}")
        # 这里可以向UI发送事件,但实际参数修改逻辑在 AgentExecutor 中处理
        pass

    def on_tool_end(self, output: str, **kwargs: Any) -> Any:
        print(f"[Callback] Tool '{kwargs.get('tool_name', 'Unknown')}' ended with output: {output[:50]}...")
        pass

# --- 运行示例 ---
async def main_langchain_style():
    # 异步工具
    web_search_tool_async = AsyncTool("async_search_web", async_search_web.__doc__, async_search_web)
    file_writer_tool_sync = AsyncTool("sync_write_file", sync_write_file.__doc__, sync_write_file)

    # 包装为 LangChain 风格的工具
    lc_web_search_tool = LangChainToolWrapper(web_search_tool_async)
    lc_file_writer_tool = LangChainToolWrapper(file_writer_tool_sync)

    # 异步用户交互管理器 (与路径二的 AsyncUserInteractionManager 相同)
    interaction_manager = AsyncUserInteractionManager()

    # 自定义回调处理器
    callback_handler = CustomLangChainCallbackHandler()

    agent_executor = MockLangChainAgentExecutor(
        tools=[lc_web_search_tool, lc_file_writer_tool], 
        callback_handler=callback_handler,
        user_interaction_manager=interaction_manager
    )

    frontend_task = asyncio.create_task(interaction_manager._simulate_frontend_input())

    print("--- LangChain Style Agent Execution Start ---")
    result = await agent_executor.run("Find information about Python and save it.")
    print(f"nFinal Agent Result: {result}")

    frontend_task.cancel()
    try:
        await frontend_task
    except asyncio.CancelledError:
        pass

if __name__ == "__main__":
    asyncio.run(main_langchain_style())

示例运行说明:

  1. MockLangChainAgentExecutor 模拟了 LangChain Agent 的执行循环,它通过 _mock_llm_step 模拟 LLM 的思维过程和工具调用决策。
  2. 在解析 LLM 的 Action 后,但在实际调用工具之前,MockLangChainAgentExecutor 会显式调用 self.user_interaction_manager.request_tool_param_modification
  3. 用户交互管理器 (AsyncUserInteractionManager) 会暂停 Agent 的执行,等待用户通过模拟前端修改参数。
  4. 修改后的参数会用于后续的 tool._run() 调用。
  5. CustomLangChainCallbackHandleron_tool_starton_tool_end 方法也会被调用,用于记录或触发其他副作用,但参数修改的核心逻辑由 user_interaction_manager 完成。

框架集成的优势:

  • 标准化接口: 框架提供了统一的回调接口,减少了从头开始编写 Agent 核心逻辑的复杂性。
  • 生态系统: 可以利用框架中已有的工具、记忆、LLM 封装等。
  • 可维护性: 将关注点分离,Agent 核心逻辑、工具管理、用户交互、日志记录等模块化。

总结:

无论采用哪种技术路径,核心都在于在 Agent 执行循环中找到合适的拦截点,并利用异步编程实现非阻塞的用户交互。 结合合适的后端(如 FastAPI)和前端(如 React/Vue)技术,通过 WebSocket 等实时通信协议,就能构建出强大的实时人机协作 Agent 系统。


架构设计与最佳实践

实现 Agent 实时工具参数修改的系统,需要仔细考虑整体架构和一系列最佳实践。

架构设计考量

一个支持实时 HITL 的 Agent 系统通常会采用以下分层架构:

  1. 用户界面 (Frontend):

    • 负责展示 Agent 的当前状态、推理过程、即将调用的工具及其参数。
    • 提供直观的界面供用户修改参数、批准、中止或编辑计划。
    • 通过 WebSocket 或 SSE 与后端进行实时通信。
    • 框架选择:React, Vue, Angular, Svelte, Streamlit, Gradio 等。
  2. API 网关 / 后端服务 (Backend Service):

    • 接收前端的用户指令和 Agent 的状态更新请求。
    • 管理 Agent 实例的生命周期(启动、停止、暂停、恢复)。
    • 充当 Agent 执行器与前端之间的桥梁,负责参数的来回传递。
    • 框架选择:FastAPI, Flask, Django Channels。
  3. Agent 核心 (Agent Core):

    • 包含 LLM 接口、工具注册表、记忆模块和 Agent 的执行循环。
    • 内置拦截点,当需要用户干预时,向后端服务发送请求并暂停执行。
    • 接收后端服务返回的用户修改后的参数,并恢复执行。
  4. 实时通信层 (Real-time Communication Layer):

    • 连接前端和后端,实现低延迟的双向数据流。
    • WebSocket: 最理想的选择,支持全双工通信,适合频繁的状态更新和用户干预。
    • Server-Sent Events (SSE): 适用于后端单向推送大量更新,前端少量回传的场景。
    • 消息队列 (如 RabbitMQ, Kafka): 在更复杂的分布式系统中,可用于解耦 Agent 实例与用户界面,实现异步消息传递。
  5. 状态管理 (State Management):

    • Agent 状态: Agent 当前处于哪个步骤?正在等待用户输入吗?哪个工具的参数被拦截了?这些状态需要在后端持久化,以便在 Agent 崩溃或重启后恢复。
    • 会话状态: 每个用户与 Agent 的交互是独立的会话。需要管理每个会话的上下文、历史记录和当前 Agent 实例。
    • 参数缓存: 可以在拦截点将原始参数缓存起来,方便用户在修改失败时回滚。

最佳实践

  1. 清晰的参数展示与编辑界面 (UX):

    • 可读性: 以清晰易懂的格式(如 JSON 或结构化表单)展示工具参数。
    • 默认值与建议: 显示 LLM 提出的原始默认参数,并提供修改建议(如果可能)。
    • 类型提示与校验: 根据工具的参数签名,在 UI 上提供输入类型提示和即时校验,防止用户输入无效数据。
    • 撤销/重做: 允许用户撤销对参数的修改或回滚到 Agent 原始提议。
    • 实时反馈: 告诉用户 Agent 正在等待输入,或者修改已成功应用。
  2. 细粒度的控制与权限管理:

    • 并非所有参数都适合用户

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注