尊敬的各位同仁、技术爱好者们:
大家好!
今天,我们聚焦一个在构建智能体(Agent)系统时至关重要,却又常被忽视的议题——渐进式揭示(Progressive Revelation)。特别是在复杂的图执行(Graph Execution)过程中,如何分阶段地向用户展示 Agent 的思考进度,这不仅是技术挑战,更是用户体验设计的艺术。
想象一下,你正在与一个 AI 助手交互,它需要完成一个复杂的多步骤任务:搜索信息、分析数据、调用工具、生成报告。如果它在处理过程中没有任何反馈,只是长时间的“思考中……”,你会感到焦虑、困惑,甚至开始怀疑它是否真的在工作,或者它卡住了。这就是“黑盒”问题。而渐进式揭示,正是为了打破这个黑盒,让 Agent 的内部运作变得透明、可追踪、可理解。
本次讲座,我将作为一名编程专家,带领大家深入探讨如何在图执行框架下,设计并实现一套高效、实时的渐进式揭示系统。我们将从核心原理出发,逐步深入到架构设计、通信机制、代码实现,并探讨其在提升用户体验、增强系统可信度、简化调试流程方面的巨大价值。
1. Agent 的思考流程与图执行:揭开复杂性的面纱
在深入渐进式揭示之前,我们首先需要理解 Agent 的“思考”是如何发生的,以及图执行在其中扮演的角色。
1.1 什么是 Agent 的“思考”?
对于一个基于大型语言模型(LLM)的 Agent 而言,其“思考”并非人类意义上的意识活动,而是一系列结构化的决策、规划、执行和反思过程。它通常包含以下核心阶段:
- 目标理解与分解(Goal Understanding & Planning): Agent 接收用户请求,理解其意图,并将其分解为一系列可执行的子任务。
- 工具选择(Tool Selection): 根据当前子任务的需求,从可用的工具集中选择最合适的工具。工具可以是搜索、代码解释器、API 调用等。
- 参数生成与工具调用(Parameter Generation & Tool Invocation): 为选定的工具生成正确的输入参数,并执行工具。
- 结果观察与分析(Observation & Analysis): 接收工具执行结果,并对其进行分析,判断是否达到预期,或是否需要进一步行动。
- 反思与修正(Reflection & Refinement): 基于观察结果,评估当前进度,调整后续计划,或修正错误。
- 最终答案生成(Final Answer Generation): 综合所有中间结果,生成最终的用户响应。
这些阶段并非线性,而是可能形成复杂的循环和分支。
1.2 图执行:结构化 Agent 复杂逻辑的利器
为了管理 Agent 复杂的、非线性的思考流程,图执行(Graph Execution)成为了一种非常有效且流行的范式。它将 Agent 的每一个思考步骤或状态建模为图中的一个节点(Node),而节点之间的依赖关系和数据流则通过边(Edge)来表示。
一个典型的 Agent 图执行流程可能包含以下节点类型:
| 节点类型 | 描述 | 示例输入 | 示例输出 |
|---|---|---|---|
| Start | 任务的起始点,接收用户初始请求 | 用户请求文本 | 初始 Agent 状态,包含目标 |
| Plan | Agent 基于目标制定初步计划 | 目标,历史对话 | 规划的步骤列表 |
| Tool_Select | 根据当前计划和上下文,选择合适的工具 | 当前步骤,可用工具列表 | 选定的工具名称 |
| Tool_Execute | 执行选定的工具,并获取结果 | 工具名称,工具参数 | 工具执行的原始输出 |
| Observe | 解析工具执行结果,提取关键信息 | 工具原始输出 | 结构化的观察结果 |
| Reflect | 对观察结果进行反思,决定下一步行动(继续、修正、完成) | 观察结果,当前计划,历史记录 | 下一步决策(如“继续规划”、“调用工具”、“完成”) |
| Generate_Answer | 综合所有信息,生成最终答案 | 所有中间结果,历史对话 | 最终用户响应 |
| End | 任务的结束点 | 最终用户响应 | 完成状态 |
通过图执行,Agent 的复杂逻辑被解构为一系列离散、可管理的步骤。这为我们提供了天然的观测点,正是渐进式揭示的基础。
2. 渐进式揭示的核心原理与挑战
渐进式揭示并非简单地打印日志,它要求我们深思熟虑,以一种结构化、实时、可理解的方式呈现信息。
2.1 核心原则
- 粒度适中(Appropriate Granularity): 信息的揭示应有不同的粒度层级。太粗糙,用户依然茫然;太细致,则信息过载。我们需要找到一个平衡点,通常是节点级别、关键思考步骤级别。
- 实时性(Real-time): 用户期望在 Agent 思考的同时看到进度,而不是等待所有任务完成后一次性展示。这意味着需要使用实时通信技术。
- 可理解性(Understandability): 揭示的信息必须是用户能够理解的,避免过多的技术细节或内部状态。可以适当进行抽象和解释。
- 可追溯性(Traceability): 用户应该能够看到 Agent 的决策路径,理解它为何采取某个行动,以及在何处可能出现问题。
- 可控性(Controllability): 高级的渐进式揭示甚至可以允许用户在特定阶段介入,提供反馈或修正。
2.2 挑战
实现渐进式揭示并非没有挑战:
- 性能开销: 频繁地收集、处理和传输状态更新会带来额外的计算和网络开销。
- 数据量管理: 复杂的 Agent 流程可能产生大量中间数据和事件,如何有效地过滤、聚合和传输这些数据是关键。
- 并发与异步: 图执行通常是异步的,可能涉及并行处理。如何准确地追踪和报告并发执行的进度需要精巧的设计。
- 错误处理: 当 Agent 流程中出现错误时,如何清晰地报告错误发生的位置、原因,并提示可能的解决方案,是提升用户体验的重要一环。
- 前端展示复杂性: 后端发送的数据需要前端能够有效地渲染成用户友好的界面,这本身也是一个复杂的前端工程问题。
- 安全与隐私: 某些中间数据可能包含敏感信息,在揭示过程中需要进行过滤或脱敏。
3. 构建可观测的图执行框架
为了实现渐进式揭示,我们首先需要一个能够发出(emit)状态更新的图执行框架。这通常通过事件驱动的架构来实现。
3.1 事件驱动架构的核心思想
在图的每个关键节点执行前、执行中、执行后,以及发生重要状态转换时,Agent 应该发出一个事件(Event)。这些事件包含了当前 Agent 状态的快照或关键信息。一个中央的事件发射器(Event Emitter)负责收集这些事件,并将其转发给所有注册的事件监听器(Event Listener)。
3.2 定义事件数据结构
首先,我们定义一个通用的数据结构来封装所有状态更新事件。
import time
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
# 定义事件类型
class GraphEventType(str, Enum):
NODE_START = "node_start"
NODE_PROGRESS = "node_progress" # 例如,LLM流式输出
NODE_END = "node_end"
GRAPH_START = "graph_start"
GRAPH_END = "graph_end"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
LLM_INPUT = "llm_input"
LLM_OUTPUT = "llm_output"
ERROR = "error"
# 定义事件的通用载荷
class EventPayload(BaseModel):
# 可以在这里定义一个通用的字段,例如,一个字典来存储动态数据
data: Dict[str, Any] = Field(default_factory=dict)
# 核心事件模型
class GraphExecutionEvent(BaseModel):
event_id: str = Field(default_factory=lambda: str(time.time_ns()))
timestamp: float = Field(default_factory=time.time)
event_type: GraphEventType
graph_id: str
node_id: Optional[str] = None # 如果是节点相关的事件,则有节点ID
node_name: Optional[str] = None # 节点名称,更友好
status: Optional[str] = None # 例如 'running', 'success', 'failed'
message: Optional[str] = None # 简短的用户友好信息
payload: Optional[EventPayload] = None # 详细的事件数据
class Config:
use_enum_values = True
3.3 实现一个简单的事件发射器
我们将创建一个 EventEmitter 类,它能够注册监听器并广播事件。
import asyncio
from typing import Callable, List, Coroutine
class EventEmitter:
def __init__(self):
self._listeners: List[Callable[[GraphExecutionEvent], Coroutine[Any, Any, None]]] = []
def on(self, listener: Callable[[GraphExecutionEvent], Coroutine[Any, Any, None]]):
"""注册一个异步监听器"""
self._listeners.append(listener)
async def emit(self, event: GraphExecutionEvent):
"""异步广播事件给所有监听器"""
# 注意:这里我们使用 asyncio.gather 来并发执行所有监听器
# 如果任何一个监听器失败,它不会阻止其他监听器
await asyncio.gather(*[listener(event) for listener in self._listeners], return_exceptions=True)
# 全局或单例事件发射器
global_event_emitter = EventEmitter()
3.4 可观测的 Agent 节点和图执行器
现在,我们来设计一个可观测的 BaseNode 和 GraphExecutor。
import uuid
class BaseNode:
def __init__(self, node_id: str, node_name: str, emitter: EventEmitter):
self.node_id = node_id
self.node_name = node_name
self.emitter = emitter
async def _emit_event(self, event_type: GraphEventType, graph_id: str, status: Optional[str] = None,
message: Optional[str] = None, payload_data: Optional[Dict[str, Any]] = None):
payload = EventPayload(data=payload_data) if payload_data else None
event = GraphExecutionEvent(
event_type=event_type,
graph_id=graph_id,
node_id=self.node_id,
node_name=self.node_name,
status=status,
message=message,
payload=payload
)
await self.emitter.emit(event)
async def execute(self, graph_id: str, input_data: Any) -> Any:
"""
抽象方法,子类需实现具体的节点逻辑。
此方法将作为事件发出点。
"""
raise NotImplementedError
# 示例:一个简单的规划节点
class PlanningNode(BaseNode):
def __init__(self, emitter: EventEmitter):
super().__init__(node_id="node_plan", node_name="规划任务", emitter=emitter)
async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]:
user_query = input_data.get("user_query", "")
await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message=f"开始规划:'{user_query}'")
# 模拟LLM思考时间
await asyncio.sleep(1)
plan = f"根据查询 '{user_query}',我将执行以下步骤:1. 搜索相关信息;2. 总结信息;3. 生成最终报告。"
await self._emit_event(GraphEventType.LLM_INPUT, graph_id, payload_data={"prompt": f"规划任务:{user_query}"})
await self._emit_event(GraphEventType.LLM_OUTPUT, graph_id, payload_data={"response": plan})
await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="规划完成")
return {"plan": plan, "next_step": "search_info"}
# 示例:一个简单的工具执行节点
class ToolExecutionNode(BaseNode):
def __init__(self, emitter: EventEmitter):
super().__init__(node_id="node_tool_exec", node_name="执行工具", emitter=emitter)
async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]:
tool_name = input_data.get("tool_name", "unknown_tool")
tool_params = input_data.get("tool_params", {})
await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message=f"开始执行工具:'{tool_name}'")
await self._emit_event(GraphEventType.TOOL_CALL, graph_id, payload_data={"tool": tool_name, "params": tool_params})
# 模拟工具执行
if tool_name == "search_web":
await asyncio.sleep(2) # 模拟网络延迟
result = f"搜索 '{tool_params.get('query', '')}' 的结果:发现3篇文章..."
else:
result = f"未知工具 '{tool_name}'"
await self._emit_event(GraphEventType.TOOL_RESULT, graph_id, payload_data={"tool": tool_name, "result": result})
await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="工具执行完成")
return {"tool_output": result, "next_step": "summarize_result"}
# 示例:一个最终答案生成节点
class GenerateAnswerNode(BaseNode):
def __init__(self, emitter: EventEmitter):
super().__init__(node_id="node_answer", node_name="生成最终答案", emitter=emitter)
async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]:
summary = input_data.get("summary", "")
await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message="开始生成最终答案")
final_answer_prompt = f"根据以下信息生成最终答案:{summary}"
await self._emit_event(GraphEventType.LLM_INPUT, graph_id, payload_data={"prompt": final_answer_prompt})
await asyncio.sleep(1.5)
final_answer = f"根据您的查询,Agent 经过搜索和总结,最终得出结论:{summary[:50]}..." # 简化处理
await self._emit_event(GraphEventType.LLM_OUTPUT, graph_id, payload_data={"response": final_answer})
await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="最终答案生成完成")
return {"final_answer": final_answer}
# 简单的图执行器
class SimpleGraphExecutor:
def __init__(self, emitter: EventEmitter):
self.emitter = emitter
self.nodes: Dict[str, BaseNode] = {
"plan": PlanningNode(emitter),
"tool_search": ToolExecutionNode(emitter),
"generate_answer": GenerateAnswerNode(emitter),
# 可以添加更多节点
}
# 简化图结构:直接定义执行顺序
self.workflow = [
"plan",
"tool_search",
"generate_answer"
]
async def execute_graph(self, user_query: str) -> Dict[str, Any]:
graph_id = str(uuid.uuid4())
current_data = {"user_query": user_query}
await self.emitter._emit_event(GraphExecutionEvent(
event_type=GraphEventType.GRAPH_START,
graph_id=graph_id,
message=f"Agent 任务开始,目标:'{user_query}'"
))
try:
for node_name in self.workflow:
node = self.nodes.get(node_name)
if not node:
raise ValueError(f"Unknown node: {node_name}")
# 模拟数据传递和决策
if node_name == "tool_search":
# 从plan节点获取的plan,这里简化为直接使用查询
current_data["tool_name"] = "search_web"
current_data["tool_params"] = {"query": user_query}
elif node_name == "generate_answer":
# 从tool_search节点获取的output
current_data["summary"] = current_data.get("tool_output", "未获取到搜索结果")
node_output = await node.execute(graph_id, current_data)
current_data.update(node_output) # 更新当前数据流
await self.emitter._emit_event(GraphExecutionEvent(
event_type=GraphEventType.GRAPH_END,
graph_id=graph_id,
status="success",
message=f"Agent 任务完成,最终答案:{current_data.get('final_answer', '无')}"
))
return current_data
except Exception as e:
await self.emitter._emit_event(GraphExecutionEvent(
event_type=GraphEventType.ERROR,
graph_id=graph_id,
message=f"Agent 任务执行失败:{str(e)}",
status="failed",
payload_data={"error_type": type(e).__name__, "error_message": str(e)}
))
raise
通过这种方式,我们的 Agent 框架就具备了发出实时进度更新的能力。下一步是讨论如何将这些事件实时传输给用户。
4. 实时通信机制的选择与实现
将后端生成的事件实时推送到前端是渐进式揭示的关键。有几种主流的实时通信技术,每种都有其适用场景。
4.1 通信机制对比
| 特性 | HTTP Polling (轮询) | Server-Sent Events (SSE) | WebSockets |
|---|---|---|---|
| 连接方式 | 短连接,多次请求 | 长连接,单向(服务器->客户端) | 长连接,双向 |
| 协议 | HTTP/1.1 | HTTP/1.1 (基于 HTTP) | WebSocket Protocol |
| 实时性 | 较低(取决于轮询间隔) | 较高 | 极高(全双工) |
| 实现难度 | 简单 | 中等 | 中等偏上 |
| 浏览器支持 | 普遍支持 | 良好 | 良好 |
| 使用场景 | 简单、低实时性要求 | 纯粹的服务器推送,如通知、日志流 | 实时聊天、游戏、协同编辑、Agent 进度 |
| 开销 | 每次请求/响应都有 HTTP 头开销 | 保持连接有少量开销 | 协议升级和保持连接有少量开销 |
对于 Agent 思考进度的渐进式揭示,我们通常需要高实时性且纯粹的服务器推送(如果用户不需要主动发送控制命令),或者双向通信(如果用户可能需要暂停、干预 Agent)。因此,SSE 和 WebSockets 是最佳选择。
4.2 使用 FastAPI 实现 SSE
Server-Sent Events 允许服务器通过一个持久的 HTTP 连接,向客户端发送事件流。它的实现相对简单,非常适合纯粹的单向进度推送。
# main.py (FastAPI 应用)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
# 导入之前定义的 EventEmitter 和 GraphExecutionEvent
# from your_module import global_event_emitter, GraphExecutionEvent, GraphEventType, SimpleGraphExecutor
# 初始化 Agent 执行器
agent_executor = SimpleGraphExecutor(global_event_emitter)
@app.get("/events")
async def sse_events(request: Request):
"""
SSE 端点,用于向客户端推送 Agent 进度事件。
"""
async def event_generator():
# 用于存储待发送事件的队列
event_queue = asyncio.Queue()
# 注册一个监听器,将事件放入队列
async def listener(event: GraphExecutionEvent):
await event_queue.put(event.json()) # 将事件转换为 JSON 字符串
global_event_emitter.on(listener)
try:
while True:
# 检查客户端是否断开连接
if await request.is_disconnected():
print("Client disconnected from SSE.")
break
event_data = await event_queue.get()
# SSE 规范要求数据格式为 "data: [json_string]nn"
yield f"data: {event_data}nn"
await asyncio.sleep(0.01) # 小幅延迟,避免CPU空转
except asyncio.CancelledError:
print("SSE generator cancelled.")
finally:
# 在连接断开或取消时,移除监听器
# 注意:如果有很多客户端,每个客户端都需要一个独立的监听器
# 这里的实现是简化的,实际生产中可能需要更复杂的管理
print("SSE listener cleaned up.")
# global_event_emitter._listeners.remove(listener) # 移除监听器,这里需要考虑并发和唯一性
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.post("/run_agent")
async def run_agent(query: str):
"""
启动 Agent 任务的 HTTP 端点。
"""
try:
# 在后台启动 Agent 任务,不阻塞当前请求
asyncio.create_task(agent_executor.execute_graph(query))
return {"message": "Agent 任务已启动,请连接 /events 端点获取进度。"}
except Exception as e:
return {"error": str(e)}, 500
# 启动 FastAPI: uvicorn main:app --reload
前端可以使用 JavaScript 的 EventSource API 连接 /events 端点:
// 简单的前端示例(HTML/JS)
const eventSource = new EventSource('/events');
const logDiv = document.getElementById('log');
eventSource.onmessage = function(event) {
const eventData = JSON.parse(event.data);
// 在这里处理并展示 eventData
logDiv.innerHTML += `<p><strong>[${eventData.node_name || eventData.event_type}]</strong> ${eventData.message || ''} - ${JSON.stringify(eventData.payload?.data || {})}</p>`;
logDiv.scrollTop = logDiv.scrollHeight; // 滚动到底部
};
eventSource.onerror = function(err) {
console.error("EventSource failed:", err);
eventSource.close();
};
function runAgent() {
const query = document.getElementById('queryInput').value;
fetch(`/run_agent?query=${encodeURIComponent(query)}`, { method: 'POST' })
.then(response => response.json())
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
}
4.3 使用 FastAPI 实现 WebSockets
WebSockets 提供了全双工的通信通道,这意味着客户端和服务器可以同时发送和接收消息。这对于需要用户干预 Agent 流程的场景非常有用,或者仅仅是为了更强大的实时性。
# main.py (FastAPI 应用 - WebSockets 部分)
from fastapi import WebSocket, WebSocketDisconnect
# ... (之前定义的 EventEmitter, GraphExecutionEvent, SimpleGraphExecutor) ...
# 存储所有活跃的 WebSocket 连接
active_websocket_connections: List[WebSocket] = []
async def websocket_listener(event: GraphExecutionEvent):
"""
WebSocket 专用的事件监听器,将事件广播给所有连接的客户端。
"""
# 广播事件给所有活跃的 WebSocket 连接
for connection in active_websocket_connections:
try:
await connection.send_text(event.json())
except RuntimeError as e:
# 如果连接已经关闭,可能会抛出 RuntimeError
print(f"Failed to send to WebSocket, connection might be closed: {e}")
# 可以在这里移除失效的连接,但需要注意并发问题
except Exception as e:
print(f"Error sending to WebSocket: {e}")
global_event_emitter.on(websocket_listener) # 注册 WebSocket 监听器
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_websocket_connections.append(websocket)
print(f"WebSocket client connected: {websocket.client}")
try:
while True:
# 客户端可以发送消息来控制 Agent,例如暂停、取消
message = await websocket.receive_text()
print(f"Received message from WebSocket client: {message}")
# 这里可以解析 message,并触发 Agent 的控制逻辑
# 例如:if message == "cancel": agent_executor.cancel_current_task()
await websocket.send_text(f"Server received: {message}") # 简单回复
except WebSocketDisconnect:
print(f"WebSocket client disconnected: {websocket.client}")
except Exception as e:
print(f"WebSocket error: {e}")
finally:
active_websocket_connections.remove(websocket)
print(f"WebSocket client removed: {websocket.client}")
# run_agent 保持不变,它会触发事件,然后由 websocket_listener 广播出去
前端可以使用 JavaScript 的 WebSocket API 连接 /ws 端点:
// 简单的前端示例(HTML/JS)
const websocket = new WebSocket('ws://localhost:8000/ws'); // 替换为你的服务器地址
const wsLogDiv = document.getElementById('ws-log');
websocket.onopen = function(event) {
console.log("WebSocket connection opened.");
wsLogDiv.innerHTML += `<p><em>WebSocket 连接已建立。</em></p>`;
};
websocket.onmessage = function(event) {
const eventData = JSON.parse(event.data);
wsLogDiv.innerHTML += `<p><strong>[${eventData.node_name || eventData.event_type}]</strong> ${eventData.message || ''} - ${JSON.stringify(eventData.payload?.data || {})}</p>`;
wsLogDiv.scrollTop = wsLogDiv.scrollHeight;
};
websocket.onclose = function(event) {
console.log("WebSocket connection closed:", event);
wsLogDiv.innerHTML += `<p><em>WebSocket 连接已关闭。</em></p>`;
};
websocket.onerror = function(error) {
console.error("WebSocket Error:", error);
wsLogDiv.innerHTML += `<p style="color:red;"><em>WebSocket 错误: ${error.message}</em></p>`;
};
function runAgentWS() {
const query = document.getElementById('queryInputWS').value;
fetch(`/run_agent?query=${encodeURIComponent(query)}`, { method: 'POST' })
.then(response => response.json())
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
}
// 客户端也可以发送消息给服务器
function sendToAgent(message) {
if (websocket.readyState === WebSocket.OPEN) {
websocket.send(message);
} else {
console.warn("WebSocket is not open.");
}
}
在实际生产环境中,WebSocket 的连接管理会更加复杂,需要处理重连、心跳包、认证授权等问题。
5. Agent 思考进度的粒度控制与数据呈现
渐进式揭示的质量,很大程度上取决于我们如何控制事件的粒度以及事件中包含的数据。
5.1 粒度控制
-
粗粒度(Node-level):
- 何时发出: 节点开始执行(
NODE_START)、节点完成执行(NODE_END)、节点失败(ERROR)。 - 内容: 节点名称、状态、耗时、简要结果或错误信息。
- 优点: 信息量适中,易于理解整体流程。
- 适用场景: 概览 Agent 进度条,高亮当前活动节点。
- 何时发出: 节点开始执行(
-
中粒度(Tool-level / LLM Interaction-level):
- 何时发出: Agent 决定调用某个工具(
TOOL_CALL)、工具执行完成并返回结果(TOOL_RESULT)、LLM 收到输入(LLM_INPUT)、LLM 生成输出(LLM_OUTPUT)。 - 内容: 工具名称、参数、返回结果;LLM 提示词、响应。
- 优点: 揭示 Agent 的具体行动和思考过程。
- 适用场景: 展示 Agent 的“思考气泡”,工具调用日志。
- 何时发出: Agent 决定调用某个工具(
-
细粒度(Token-level LLM Streaming):
- 何时发出: LLM 在生成响应时,每生成一个 token 就发出一个事件(
NODE_PROGRESS或专门的LLM_TOKEN)。 - 内容: 当前生成的 token。
- 优点: 提供极致的实时性,用户几乎可以同步看到 Agent 的“打字”过程。
- 适用场景: 聊天界面中 Agent 响应的流式显示。
- 何时发出: LLM 在生成响应时,每生成一个 token 就发出一个事件(
在我们的 GraphExecutionEvent 中,event_type 和 payload 字段就是为了支持这种多粒度设计。例如,LLM_INPUT 和 LLM_OUTPUT 提供了 LLM 交互的中间粒度信息。
实现 LLM Token 流式输出的示例:
假设你正在使用一个支持流式输出的 LLM 客户端(如 OpenAI 的 client.chat.completions.create(stream=True))。你可以在 LLMNode 内部这样处理:
# LLMNode 示例,假设 LLM 客户端支持流式输出
class LLMGenerationNode(BaseNode):
def __init__(self, emitter: EventEmitter, llm_client):
super().__init__(node_id="node_llm_gen", node_name="LLM生成响应", emitter=emitter)
self.llm_client = llm_client
async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]:
prompt = input_data.get("prompt", "请生成一个通用响应。")
await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message="开始调用LLM生成响应")
await self._emit_event(GraphEventType.LLM_INPUT, graph_id, payload_data={"prompt": prompt})
full_response_content = []
try:
# 假设 llm_client.chat_stream 是一个异步生成器,返回 token
async for chunk in self.llm_client.chat_stream(prompt): # 模拟一个流式LLM客户端
token = chunk.get("token", "") # 假设 chunk 包含 token
if token:
full_response_content.append(token)
await self._emit_event(
GraphEventType.NODE_PROGRESS, # 或者定义一个 LLM_TOKEN 事件类型
graph_id,
message="LLM 正在生成...",
payload_data={"token": token}
)
# 模拟 LLM 思考
await asyncio.sleep(0.05)
final_response = "".join(full_response_content)
await self._emit_event(GraphEventType.LLM_OUTPUT, graph_id, payload_data={"response": final_response})
await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="LLM响应生成完成")
return {"llm_response": final_response}
except Exception as e:
await self._emit_event(GraphEventType.ERROR, graph_id, status="failed", message=f"LLM调用失败: {str(e)}")
raise
5.2 数据呈现策略 (前端视角)
虽然这是后端讲座,但了解数据如何被消费有助于我们设计更好的事件。
- 实时日志流: 最简单直观的方式,按照时间顺序显示所有事件。用户可以看到每个步骤的详细信息。
- 动态图可视化: 可以在前端渲染一个 Agent 图,并根据
NODE_START和NODE_END事件高亮显示当前正在执行的节点,或者用不同颜色表示节点状态(进行中、成功、失败)。 - “思考气泡”/“草稿纸”: 专门的区域显示 Agent 的 LLM 提示词、内部思考过程、工具调用及结果,模拟 Agent 的“内心独白”。
- 进度条/状态指示: 整体任务的进度条,或者每个节点旁边的加载动画,提供即时视觉反馈。
- 结构化输出区: 当 Agent 最终生成答案时,将其清晰地展示在指定区域。
6. 高级主题与最佳实践
6.1 错误处理与重试策略
在渐进式揭示中,错误是不可避免的。当节点执行失败时:
- 发出
ERROR事件: 包含详细的错误信息、堆栈追踪(如果安全允许)、发生错误的节点 ID。 - 图的健壮性: 可以设计图执行器,在某些节点失败时,尝试回退、重试,或跳过,并发出相应的事件。
- 用户反馈: 前端应清晰地展示错误信息,并引导用户进行修正或重试。
6.2 持久化与回放
为了调试、审计和用户体验,将事件流持久化非常重要。
- 存储: 将
GraphExecutionEvent存储到数据库(如 PostgreSQL, MongoDB)或日志系统(如 Elasticsearch)。 - 回放: 允许用户或开发者加载历史事件流,并在前端“回放”Agent 的思考过程,这对于理解复杂 Bug 或展示 Agent 能力非常有帮助。
6.3 安全性与敏感信息过滤
Agent 的中间思考过程可能包含用户数据、API 密钥片段(如果处理不当)、内部系统路径等敏感信息。
- 数据过滤: 在事件发出前,对
payload中的数据进行严格过滤和脱敏。 - 权限控制: 只有授权用户才能查看完整的、未经脱敏的事件流。
- 传输加密: 使用 HTTPS 和 WSS 确保数据在传输过程中的安全。
6.4 可扩展性与事件总线
在大型分布式 Agent 系统中,单个 EventEmitter 可能会成为瓶颈。
- 消息队列: 引入 Kafka、RabbitMQ 等消息队列作为事件总线。Agent 的各个服务将事件发布到队列,监听服务从队列消费事件并推送给客户端。这提供了高度解耦和可伸缩性。
- 服务注册与发现: 动态管理事件监听器和 Agent 服务。
6.5 用户干预与交互
渐进式揭示的终极目标之一是允许用户在 Agent 思考的关键节点进行干预。
- 暂停/继续: 用户可以通过前端发送命令,暂停 Agent 的执行,检查当前状态,然后决定继续或修改。
- 修改计划: 在 Agent 规划阶段,用户可以审查计划并提出修改建议。
- 工具选择确认: 在 Agent 决定调用某个工具前,征求用户确认。
- 错误纠正: 当 Agent 遇到错误时,用户可以直接提供修正方案。
这需要双向通信(如 WebSocket)和 Agent 内部状态管理机制,使其能够响应外部指令并调整行为。
总结
渐进式揭示不仅仅是一种技术实现,它更是一种设计哲学,旨在将智能体从“黑盒”转变为“透明盒”。通过在 Agent 图执行的关键节点注入可观测性,并利用实时通信机制将这些洞察传递给用户,我们不仅极大地提升了用户体验,减少了等待焦虑,更建立了用户对 Agent 的信任。同时,它也为开发者提供了强大的调试和审计工具,让复杂 Agent 系统的开发和维护变得更加可控。掌握并实践渐进式揭示,是构建下一代智能、透明、用户友好型 AI 系统的必由之路。
感谢大家的聆听!