各位同仁,下午好!
今天,我们将深入探讨一个激动人心且极具挑战性的领域:Inter-thought Streaming。顾名思义,我们关注的是如何实时、透明地展现AI智能体(Agent)的思维链(Chain of Thought),并赋予用户在每一步进行干预和修正的能力。这不仅仅是技术上的炫技,更是构建可信赖、可控、高效AI系统的关键一步。
想象一下,一个AI在尝试解决一个复杂问题时,不再只是吐出一个最终答案,而是像人类一样,一步步地思考、规划、执行、观察。更进一步,当它在某一步走偏时,我们能立即发现,并直接告诉它:“等等,这里你错了,应该这么做。” 这种人机协作模式,正是Inter-thought Streaming的核心价值。
1. Inter-thought Streaming:核心概念与价值
什么是Inter-thought Streaming?
它指的是将AI智能体(Agent)的内部思考过程(即其决策、规划、推理、工具调用等一系列中间步骤)以流式(streaming)的方式实时传输到前端界面,并在界面上逐步渲染出来。同时,它还包含一个关键的交互层:用户可以在任何已呈现的思考步骤上进行“修正”或“引导”,从而影响后续的AI行为。
为什么Inter-thought Streaming如此重要?
- 透明度与可解释性 (Transparency & Explainability):
- 我们不再面对一个黑箱。用户可以看到AI是如何得出结论的,这对于调试、理解AI行为至关重要。
- 在关键业务场景中,例如金融、医疗,了解AI的决策路径是合规性和信任的基础。
- 可控性与引导 (Controllability & Guidance):
- 用户可以及时发现AI的错误或低效路径,并在问题扩大前进行干预。
- AI可以从用户的修正中学习,或者在用户引导下探索更优解,实现“人类在环”(Human-in-the-Loop)的智能增强。
- 调试与优化 (Debugging & Optimization):
- 开发者可以更直观地观察到AI模型在特定输入下的表现,快速定位推理错误或模型偏差。
- 通过收集用户修正数据,可以进一步微调Agent的行为或改进基础模型。
- 用户体验 (User Experience):
- 提供动态、交互式的体验,让用户感觉自己在与一个真正“思考”的伙伴协作,而非简单的工具。
核心挑战:
- 实时性: 如何确保从后端到前端的思考步骤传输足够快,几乎没有延迟?
- 状态管理: 在AI思考过程中,如何维护其内部状态,并在用户修正后能够“回溯”或“重置”并从修正点继续?
- 并发与同步: 多个用户或单个用户多次修正可能带来的复杂性。
- 前端渲染: 如何高效、流畅地渲染不断涌入的思维步骤,并处理用户交互?
2. 架构概览:构建 Inter-thought Streaming 系统
为了实现Inter-thought Streaming,我们需要一个典型的客户端-服务器架构,并辅以特定的通信协议。
核心组件:
- AI Agent (后端): 负责模拟或执行AI的思维过程。它将生成一系列结构化的“思考步骤”。
- API Server (后端): 充当AI Agent和前端之间的桥梁。它负责:
- 接收前端的请求。
- 与AI Agent交互,获取思考步骤。
- 以流式方式将思考步骤发送到前端。
- 接收前端的用户修正请求,并将其传达给AI Agent。
- Frontend Client (浏览器): 负责:
- 建立与API Server的流式连接。
- 实时接收并渲染思考步骤。
- 在每一步提供“修正”按钮,并处理用户输入。
- 将修正内容发送回API Server。
通信协议选择:
在实时流式通信方面,我们主要有两种选择:
| 特性 | Server-Sent Events (SSE) | WebSockets |
|---|---|---|
| 通信方向 | 单向(服务器 -> 客户端) | 双向(服务器 <-> 客户端) |
| 协议 | 基于HTTP/1.1,使用text/event-stream MIME类型 |
独立于HTTP的TCP协议,通过HTTP握手升级 |
| 开销 | 较小,基于HTTP,易于代理和负载均衡 | 较大,需要独立服务器或代理支持,协议头开销稍高 |
| 心跳机制 | 内置(浏览器自动重连) | 需要手动实现 |
| 数据格式 | 文本,data: [payload]nn |
任意数据(文本、二进制),通常使用JSON |
| 浏览器支持 | 良好 | 良好 |
| 适用场景 | 服务器主动推送数据,客户端被动接收,如新闻推送、聊天室通知 | 实时交互性强、双向通信频繁的场景,如在线游戏、即时通讯 |
选择SSE的原因:
对于Inter-thought Streaming,核心需求是服务器向客户端推送AI的思考步骤。虽然客户端也需要向服务器发送修正请求,但这个请求是独立的、非流式的。SSE的单向流式特性完美契合了思考步骤的推送,并且其基于HTTP的特性使得部署和扩展更为简单。对于修正请求,我们可以使用传统的HTTP POST请求。这种组合既简单又高效。
数据流概览:
graph TD
A[Frontend Client] -- 1. 建立SSE连接 --> B(API Server)
B -- 2. 请求AI Agent思考 --> C(AI Agent)
C -- 3. 生成思考步骤 --> B
B -- 4. SSE流式推送思考步骤 --> A
A -- 5. 渲染步骤 & 显示修正按钮 --> A
A -- 6. 用户点击修正,发送POST请求 --> B
B -- 7. 接收修正,通知AI Agent --> C
C -- 8. AI Agent重置/调整思考,生成新步骤 --> B
B -- 9. SSE流式推送新步骤 --> A
3. 后端实现:AI Agent模拟与SSE流式传输 (Node.js/Express)
在后端,我们需要一个模拟AI Agent的模块,以及一个Express服务器来处理流式请求和修正请求。
3.1 AI Agent 模拟
我们的AI Agent将生成一系列“思考步骤”。每个步骤都包含其类型(例如:thought、action、observation)、内容、唯一ID和时间戳。
// src/agent/thoughtAgent.ts
import { v4 as uuidv4 } from 'uuid';
export type ThoughtStepType = 'thought' | 'action' | 'observation' | 'tool_call' | 'final_answer';
export interface ThoughtStep {
stepId: string;
type: ThoughtStepType;
content: string;
timestamp: string;
status: 'pending' | 'completed' | 'corrected' | 'invalidated'; // New status for correction flow
correctionPrompt?: string; // Stored user correction
}
// In a real scenario, this would involve LLM calls, RAG, tool execution etc.
// For this lecture, we simulate it with a predefined sequence and delays.
const initialThoughtSequence: ThoughtStep[] = [
{ stepId: uuidv4(), type: 'thought', content: '用户想了解如何优化网站性能,我需要先分析其当前的技术栈。', timestamp: new Date().toISOString(), status: 'pending' },
{ stepId: uuidv4(), type: 'action', content: '调用`analyzeWebsite(url)`工具获取网站信息。', timestamp: new Date().toISOString(), status: 'pending' },
{ stepId: uuidv4(), type: 'observation', content: '网站使用React前端,Node.js后端,数据库是MongoDB。静态资源未CDN化。', timestamp: new Date().toISOString(), status: 'pending' },
{ stepId: uuidv4(), type: 'thought', content: '基于现有信息,初步判断优化方向:图片优化、CDN部署、服务端渲染。', timestamp: new Date().toISOString(), status: 'pending' },
{ stepId: uuidv4(), type: 'action', content: '生成一份优化建议报告。', timestamp: new Date().toISOString(), status: 'pending' },
{ stepId: uuidv4(), type: 'final_answer', content: '报告已生成,包含图片优化、CDN部署和服务端渲染等建议。', timestamp: new Date().toISOString(), status: 'pending' },
];
class AgentSession {
private sessionId: string;
private currentThoughtChain: ThoughtStep[];
private currentStepIndex: number;
private isPaused: boolean;
private listeners: ((step: ThoughtStep) => void)[] = [];
constructor(sessionId: string, initialChain: ThoughtStep[] = initialThoughtSequence) {
this.sessionId = sessionId;
this.currentThoughtChain = JSON.parse(JSON.stringify(initialChain)); // Deep copy
this.currentStepIndex = 0;
this.isPaused = false;
console.log(`Agent session ${sessionId} created.`);
}
// Register a listener for new thought steps
public addListener(listener: (step: ThoughtStep) => void) {
this.listeners.push(listener);
}
public removeListener(listener: (step: ThoughtStep) => void) {
this.listeners = this.listeners.filter(l => l !== listener);
}
private notifyListeners(step: ThoughtStep) {
this.listeners.forEach(listener => listener(step));
}
public async startThinking() {
if (this.currentStepIndex >= this.currentThoughtChain.length) {
console.log(`Agent session ${this.sessionId} finished thinking.`);
return;
}
this.isPaused = false;
console.log(`Agent session ${this.sessionId} started/resumed thinking from index ${this.currentStepIndex}.`);
// Simulate thinking step by step
while (this.currentStepIndex < this.currentThoughtChain.length && !this.isPaused) {
const step = this.currentThoughtChain[this.currentStepIndex];
step.timestamp = new Date().toISOString(); // Update timestamp on generation
step.status = 'completed'; // Mark as completed when generated
this.notifyListeners(step); // Push to listeners (SSE)
console.log(`[${this.sessionId}] Emitted step ${this.currentStepIndex + 1}: ${step.type} - ${step.content.substring(0, 30)}...`);
this.currentStepIndex++;
if (this.currentStepIndex < this.currentThoughtChain.length) {
await new Promise(resolve => setTimeout(resolve, Math.random() * 2000 + 1000)); // Simulate delay
}
}
console.log(`Agent session ${this.sessionId} paused or finished.`);
}
public pauseThinking() {
this.isPaused = true;
console.log(`Agent session ${this.sessionId} paused.`);
}
// Handles user correction
public async applyCorrection(stepId: string, newContent: string): Promise<boolean> {
this.pauseThinking(); // Pause current generation
console.log(`Applying correction for stepId: ${stepId}, newContent: ${newContent}`);
const correctedIndex = this.currentThoughtChain.findIndex(step => step.stepId === stepId);
if (correctedIndex === -1) {
console.error(`Correction failed: Step ID ${stepId} not found.`);
return false;
}
// 1. Update the corrected step
const correctedStep = this.currentThoughtChain[correctedIndex];
correctedStep.content = newContent;
correctedStep.status = 'corrected';
correctedStep.correctionPrompt = newContent;
correctedStep.timestamp = new Date().toISOString();
// 2. Invalidate all subsequent steps (they are now based on potentially wrong premise)
for (let i = correctedIndex + 1; i < this.currentThoughtChain.length; i++) {
this.currentThoughtChain[i].status = 'invalidated';
// Optionally, you might want to clear content or modify it to reflect invalidation
}
// 3. Re-generate from the corrected point (simplified simulation)
// In a real LLM Agent, this would involve re-prompting the LLM with the modified context
// For simulation, we'll just cut the chain and generate a new sequence from here.
this.currentThoughtChain = this.currentThoughtChain.slice(0, correctedIndex + 1);
const newSequence = [
{ stepId: uuidv4(), type: 'thought', content: `基于修正:'${newContent}',我需要重新评估优化策略。`, timestamp: new Date().toISOString(), status: 'pending' },
{ stepId: uuidv4(), type: 'action', content: '考虑新的优化方向,例如数据库索引优化。', timestamp: new Date().toISOString(), status: 'pending' },
{ stepId: uuidv4(), type: 'final_answer', content: '新的优化报告将包含数据库优化建议。', timestamp: new Date().toISOString(), status: 'pending' },
];
this.currentThoughtChain = [...this.currentThoughtChain, ...newSequence];
this.currentStepIndex = correctedIndex; // Agent will restart from this index
// Notify frontend that correction was applied and stream will resume
this.notifyListeners({
stepId: 'correction-event',
type: 'system',
content: JSON.stringify({
action: 'correction_applied',
correctedStepId: stepId,
newChainLength: this.currentThoughtChain.length,
resumeFromIndex: this.currentStepIndex
}),
timestamp: new Date().toISOString(),
status: 'completed'
});
// Immediately restart thinking from the corrected point
// A small delay ensures the correction event is processed first by frontend
setTimeout(() => this.startThinking(), 100);
return true;
}
public getCurrentChain(): ThoughtStep[] {
return this.currentThoughtChain;
}
}
// Global map to hold active agent sessions
const agentSessions = new Map<string, AgentSession>();
export function getAgentSession(sessionId: string): AgentSession {
if (!agentSessions.has(sessionId)) {
const newSession = new AgentSession(sessionId);
agentSessions.set(sessionId, newSession);
}
return agentSessions.get(sessionId)!;
}
export function cleanupAgentSession(sessionId: string) {
if (agentSessions.has(sessionId)) {
agentSessions.get(sessionId)?.pauseThinking();
agentSessions.delete(sessionId);
console.log(`Agent session ${sessionId} cleaned up.`);
}
}
AgentSession 的关键逻辑:
currentThoughtChain: 存储当前会话的思维链。currentStepIndex: 标记当前正在处理的步骤。isPaused: 控制流式生成。addListener/notifyListeners: 实现发布-订阅模式,让SSE控制器能接收新步骤。applyCorrection: 这是核心。它会暂停当前的思考,更新被修正的步骤,作废其后的所有步骤,然后模拟重新生成新的思考链,并从修正点重新开始。同时,它会发送一个特殊的system事件来通知前端修正已应用。
3.2 API Server (Express)
// src/server.ts
import express from 'express';
import cors from 'cors';
import { getAgentSession, ThoughtStep, cleanupAgentSession } from './agent/thoughtAgent';
import { v4 as uuidv4 } from 'uuid';
const app = express();
const PORT = 3001;
app.use(cors({
origin: 'http://localhost:3000', // Allow frontend to connect
credentials: true,
}));
app.use(express.json()); // For parsing correction POST requests
// SSE Endpoint for streaming thoughts
app.get('/api/stream/thoughts/:sessionId', (req, res) => {
const sessionId = req.params.sessionId;
const agentSession = getAgentSession(sessionId);
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*'); // Necessary for CORS with SSE
// If the session already has steps, send them first
const currentChain = agentSession.getCurrentChain();
currentChain.forEach(step => {
res.write(`data: ${JSON.stringify(step)}nn`);
});
res.flush(); // Ensure headers and initial data are sent immediately
const listener = (step: ThoughtStep) => {
res.write(`data: ${JSON.stringify(step)}nn`);
res.flush(); // Send data immediately
};
agentSession.addListener(listener);
// Start thinking if not already started or paused at the beginning
// If the session was corrected and restarted, it would already be thinking
if (agentSession.getCurrentChain().length === 0 || agentSession.getCurrentChain().every(s => s.status !== 'completed')) {
agentSession.startThinking();
}
req.on('close', () => {
console.log(`Client disconnected from session ${sessionId}.`);
agentSession.removeListener(listener);
// In a real app, you might want to keep the session alive for a while
// or offer a way to resume. For now, we'll clean it up.
cleanupAgentSession(sessionId);
});
});
// Endpoint for receiving user corrections
app.post('/api/stream/correct/:sessionId', async (req, res) => {
const sessionId = req.params.sessionId;
const { stepId, newContent } = req.body;
if (!stepId || !newContent) {
return res.status(400).json({ error: 'Missing stepId or newContent.' });
}
const agentSession = getAgentSession(sessionId);
const success = await agentSession.applyCorrection(stepId, newContent);
if (success) {
res.json({ message: 'Correction applied. Agent will re-evaluate.', stepId });
} else {
res.status(500).json({ error: 'Failed to apply correction.' });
}
});
app.listen(PORT, () => {
console.log(`Backend streaming server running on http://localhost:${PORT}`);
});
后端代码解释:
- SSE Endpoint (
/api/stream/thoughts/:sessionId):- 设置必要的SSE头。
- 创建一个
listener函数,当Agent生成新步骤时,通过res.write和res.flush将其发送给客户端。 - 将
listener注册到AgentSession。 - 处理客户端断开连接的事件 (
req.on('close')),移除监听器并清理会话。 agentSession.startThinking(): 启动Agent的思考过程。
- Correction Endpoint (
/api/stream/correct/:sessionId):- 接收
stepId和newContent。 - 调用
agentSession.applyCorrection来处理修正逻辑。 - 返回成功或失败响应。
- 接收
4. 前端实现:实时渲染与交互 (React)
前端需要一个主组件来管理SSE连接、渲染思维步骤列表,以及处理用户修正的模态框。
4.1 数据结构
// src/types.ts
export type ThoughtStepType = 'thought' | 'action' | 'observation' | 'tool_call' | 'final_answer' | 'system';
export interface ThoughtStep {
stepId: string;
type: ThoughtStepType;
content: string;
timestamp: string;
status: 'pending' | 'completed' | 'corrected' | 'invalidated';
correctionPrompt?: string;
}
4.2 ThoughtStep 单个步骤组件
// src/components/ThoughtStep.tsx
import React from 'react';
import { ThoughtStep } from '../types';
interface ThoughtStepProps {
step: ThoughtStep;
onCorrect: (stepId: string) => void;
isCorrecting: boolean; // Flag to disable correction button when another is active
}
const ThoughtStepComponent: React.FC<ThoughtStepProps> = ({ step, onCorrect, isCorrecting }) => {
const getStatusClass = () => {
switch (step.status) {
case 'corrected': return 'bg-yellow-100 border-yellow-500';
case 'invalidated': return 'bg-gray-100 border-gray-300 text-gray-500 line-through';
case 'pending': return 'bg-blue-50 border-blue-300'; // Should not happen for rendered steps
case 'completed':
default: return 'bg-white border-gray-200';
}
};
const getIcon = () => {
switch (step.type) {
case 'thought': return '🤔';
case 'action': return '⚙️';
case 'observation': return '👀';
case 'tool_call': return '🛠️';
case 'final_answer': return '✅';
case 'system': return 'ℹ️';
default: return '💬';
}
};
return (
<div className={`flex items-start p-4 mb-2 rounded-lg shadow-sm border ${getStatusClass()}`}>
<div className="flex-shrink-0 mr-3 text-xl">{getIcon()}</div>
<div className="flex-grow">
<div className="text-sm font-semibold text-gray-700 mb-1 capitalize">
{step.type.replace('_', ' ')}
{step.status === 'corrected' && <span className="ml-2 text-yellow-700">(已修正)</span>}
{step.status === 'invalidated' && <span className="ml-2 text-gray-500">(已作废)</span>}
</div>
<p className="text-gray-800 whitespace-pre-wrap">{step.content}</p>
{step.correctionPrompt && step.status === 'corrected' && (
<p className="mt-2 p-2 bg-yellow-50 border-l-4 border-yellow-400 text-sm text-yellow-800">
用户修正: {step.correctionPrompt}
</p>
)}
<div className="text-xs text-gray-500 mt-2">
{new Date(step.timestamp).toLocaleTimeString()}
</div>
</div>
{step.type !== 'system' && step.status !== 'invalidated' && step.status !== 'corrected' && (
<button
onClick={() => onCorrect(step.stepId)}
disabled={isCorrecting}
className="ml-4 px-3 py-1 bg-indigo-500 text-white text-sm rounded-md hover:bg-indigo-600 disabled:opacity-50 disabled:cursor-not-allowed transition-colors"
>
修正
</button>
)}
</div>
);
};
export default ThoughtStepComponent;
4.3 CorrectionModal 修正模态框组件
// src/components/CorrectionModal.tsx
import React, { useState } from 'react';
interface CorrectionModalProps {
isOpen: boolean;
onClose: () => void;
onSave: (newContent: string) => void;
initialContent: string;
loading: boolean;
}
const CorrectionModal: React.FC<CorrectionModalProps> = ({ isOpen, onClose, onSave, initialContent, loading }) => {
const [newContent, setNewContent] = useState(initialContent);
React.useEffect(() => {
if (isOpen) {
setNewContent(initialContent);
}
}, [isOpen, initialContent]);
if (!isOpen) return null;
return (
<div className="fixed inset-0 bg-gray-600 bg-opacity-50 flex items-center justify-center z-50">
<div className="bg-white p-6 rounded-lg shadow-xl w-full max-w-md">
<h3 className="text-lg font-bold mb-4">修正思考步骤</h3>
<textarea
className="w-full p-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-indigo-500 resize-y min-h-[100px]"
value={newContent}
onChange={(e) => setNewContent(e.target.value)}
placeholder="请输入修正后的内容..."
disabled={loading}
></textarea>
<div className="mt-4 flex justify-end space-x-2">
<button
onClick={onClose}
className="px-4 py-2 bg-gray-200 text-gray-800 rounded-md hover:bg-gray-300 disabled:opacity-50"
disabled={loading}
>
取消
</button>
<button
onClick={() => onSave(newContent)}
className="px-4 py-2 bg-indigo-500 text-white rounded-md hover:bg-indigo-600 disabled:opacity-50"
disabled={loading || newContent.trim() === ''}
>
{loading ? '保存中...' : '保存修正'}
</button>
</div>
</div>
</div>
);
};
export default CorrectionModal;
4.4 ThoughtStream 主组件
这是核心组件,负责连接SSE、管理思维链状态和处理修正逻辑。
// src/components/ThoughtStream.tsx
import React, { useEffect, useRef, useState, useCallback } from 'react';
import { v4 as uuidv4 } from 'uuid';
import ThoughtStepComponent from './ThoughtStep';
import CorrectionModal from './CorrectionModal';
import { ThoughtStep } from '../types';
const API_BASE_URL = 'http://localhost:3001';
const ThoughtStream: React.FC = () => {
const [sessionId, setSessionId] = useState<string>('');
const [thoughtChain, setThoughtChain] = useState<ThoughtStep[]>([]);
const [isConnecting, setIsConnecting] = useState<boolean>(false);
const [isConnected, setIsConnected] = useState<boolean>(false);
const [error, setError] = useState<string | null>(null);
// Correction state
const [isCorrectionModalOpen, setIsCorrectionModalOpen] = useState(false);
const [currentCorrectionStep, setCurrentCorrectionStep] = useState<ThoughtStep | null>(null);
const [isSendingCorrection, setIsSendingCorrection] = useState(false);
const eventSourceRef = useRef<EventSource | null>(null);
const scrollContainerRef = useRef<HTMLDivElement>(null);
// Auto-scroll to bottom on new content
useEffect(() => {
if (scrollContainerRef.current) {
scrollContainerRef.current.scrollTop = scrollContainerRef.current.scrollHeight;
}
}, [thoughtChain]);
// Handle incoming SSE events
const handleNewThoughtStep = useCallback((event: MessageEvent) => {
try {
const newStep: ThoughtStep = JSON.parse(event.data);
// Handle system events like correction_applied
if (newStep.type === 'system' && newStep.content) {
const systemEvent = JSON.parse(newStep.content);
if (systemEvent.action === 'correction_applied') {
console.log('Correction applied by backend:', systemEvent);
setThoughtChain(prevChain => {
const updatedChain = prevChain.map(step =>
step.stepId === systemEvent.correctedStepId
? { ...step, status: 'corrected' } // Mark the corrected step
: step
);
// Filter out invalidated steps that are after the corrected one
// And if the backend is regenerating, it will send new steps
return updatedChain.filter((step, index) => {
const correctedIndex = updatedChain.findIndex(s => s.stepId === systemEvent.correctedStepId);
return index <= correctedIndex || step.status !== 'invalidated';
});
});
setIsCorrectionModalOpen(false); // Close modal after backend acknowledges
setIsSendingCorrection(false);
return; // Don't add system event to thoughtChain directly
}
}
// Normal thought step
setThoughtChain(prevChain => {
// If we've already received this step (e.g., during re-connection or re-streaming after correction), update it.
// Otherwise, append it.
const existingIndex = prevChain.findIndex(s => s.stepId === newStep.stepId);
if (existingIndex !== -1) {
const updatedChain = [...prevChain];
updatedChain[existingIndex] = { ...newStep, status: 'completed' }; // Ensure status is completed
return updatedChain;
} else if (newStep.status !== 'invalidated') { // Only add if not marked invalidated by backend
return [...prevChain, { ...newStep, status: 'completed' }]; // Mark as completed when received
}
return prevChain;
});
} catch (err) {
console.error('Error parsing SSE data:', err);
setError('Failed to parse incoming data.');
}
}, []);
// Establish SSE connection
const connectToStream = useCallback((id: string) => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
setIsConnecting(true);
setError(null);
setThoughtChain([]); // Clear previous chain on new connection
const es = new EventSource(`${API_BASE_URL}/api/stream/thoughts/${id}`);
eventSourceRef.current = es;
es.onopen = () => {
console.log('SSE connection opened.');
setIsConnecting(false);
setIsConnected(true);
setError(null);
};
es.onmessage = handleNewThoughtStep;
es.onerror = (err) => {
console.error('SSE error:', err);
setIsConnecting(false);
setIsConnected(false);
setError('Stream connection error. Please try again.');
es.close();
};
return () => {
console.log('SSE connection closed.');
es.close();
eventSourceRef.current = null;
setIsConnected(false);
};
}, [handleNewThoughtStep]);
// Initialize session or connect
useEffect(() => {
const storedSessionId = localStorage.getItem('agentSessionId');
let currentId = storedSessionId || uuidv4();
if (!storedSessionId) {
localStorage.setItem('agentSessionId', currentId);
}
setSessionId(currentId);
connectToStream(currentId);
return () => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
}
};
}, [connectToStream]); // Only run once on mount
// Correction handlers
const handleCorrectButtonClick = (stepId: string) => {
const stepToCorrect = thoughtChain.find(step => step.stepId === stepId);
if (stepToCorrect) {
setCurrentCorrectionStep(stepToCorrect);
setIsCorrectionModalOpen(true);
}
};
const handleCorrectionSave = async (newContent: string) => {
if (!currentCorrectionStep || !sessionId) return;
setIsSendingCorrection(true);
setError(null);
try {
const response = await fetch(`${API_BASE_URL}/api/stream/correct/${sessionId}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
stepId: currentCorrectionStep.stepId,
newContent: newContent,
}),
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(errorData.error || 'Failed to send correction.');
}
// Frontend updates the step's content and sets status to 'pending correction'
// The actual 'corrected' status and new stream will come from SSE
setThoughtChain(prevChain =>
prevChain.map(step =>
step.stepId === currentCorrectionStep.stepId
? { ...step, content: newContent, correctionPrompt: newContent, status: 'pending' }
: step // Keep other steps as they are, backend will send invalidation/new steps
)
);
// Modal will be closed by the SSE event from backend
} catch (err: any) {
console.error('Error sending correction:', err);
setError(`发送修正失败: ${err.message}`);
setIsSendingCorrection(false);
setIsCorrectionModalOpen(false);
}
};
const handleCorrectionClose = () => {
setIsCorrectionModalOpen(false);
setCurrentCorrectionStep(null);
setIsSendingCorrection(false);
};
const isAnyCorrectionActive = isCorrectionModalOpen || isSendingCorrection;
return (
<div className="min-h-screen bg-gray-50 p-6 font-sans">
<h1 className="text-3xl font-bold text-gray-800 mb-6 text-center">Inter-thought Streaming: Agent 思维链</h1>
<div className="max-w-3xl mx-auto bg-white rounded-xl shadow-lg p-6">
<div className="mb-4 flex justify-between items-center">
<h2 className="text-xl font-semibold text-gray-700">Agent 思维过程</h2>
<span className={`px-3 py-1 text-sm rounded-full ${isConnected ? 'bg-green-100 text-green-700' : 'bg-red-100 text-red-700'}`}>
{isConnecting ? '连接中...' : (isConnected ? '已连接' : '已断开')}
</span>
</div>
{error && (
<div className="bg-red-100 border border-red-400 text-red-700 px-4 py-3 rounded relative mb-4" role="alert">
<strong className="font-bold">错误!</strong>
<span className="block sm:inline"> {error}</span>
</div>
)}
<div ref={scrollContainerRef} className="max-h-[70vh] overflow-y-auto pr-2">
{thoughtChain.length === 0 && !isConnecting && !error && (
<p className="text-gray-500 text-center py-8">Agent 正在启动思考...</p>
)}
{thoughtChain.map((step) => (
<ThoughtStepComponent
key={step.stepId}
step={step}
onCorrect={handleCorrectButtonClick}
isCorrecting={isAnyCorrectionActive} // Disable other correction buttons
/>
))}
</div>
</div>
<CorrectionModal
isOpen={isCorrectionModalOpen}
onClose={handleCorrectionClose}
onSave={handleCorrectionSave}
initialContent={currentCorrectionStep?.content || ''}
loading={isSendingCorrection}
/>
</div>
);
};
export default ThoughtStream;
前端代码解释:
- 状态管理 (
useState):sessionId: 用于识别当前用户会话。thoughtChain: 存储所有已接收的思考步骤。isConnecting,isConnected,error: 连接状态和错误信息。isCorrectionModalOpen,currentCorrectionStep,isSendingCorrection: 修正模态框及其状态。
EventSourceAPI:connectToStream函数负责创建和管理EventSource实例。es.onmessage: 接收服务器推送的数据,并调用handleNewThoughtStep处理。es.onopen,es.onerror: 处理连接事件。
handleNewThoughtStep(核心逻辑):- 解析JSON数据。
- 处理
system事件: 当后端发送correction_applied事件时,更新thoughtChain。这包括将修正的步骤标记为corrected,并移除所有被后端标记为invalidated或在修正点之后的旧步骤。这是确保UI与后端状态同步的关键。 - 处理普通步骤: 将新步骤添加到
thoughtChain中。如果存在同stepId的旧步骤,则更新它(这在后端重新发送已存在的步骤时很有用)。
- 修正逻辑:
handleCorrectButtonClick: 当用户点击“修正”按钮时,打开CorrectionModal,并设置currentCorrectionStep。handleCorrectionSave: 当用户在模态框中点击“保存修正”时,发送POST请求到后端/api/stream/correct/:sessionId。isAnyCorrectionActive: 一个辅助状态,用于在修正过程中禁用其他“修正”按钮,避免并发修正导致的状态混乱。
5. 挑战与高级考量
5.1 真实的LLM Agent集成
- API 抽象: 在真实场景中,
AgentSession不会直接模拟步骤,而是通过调用LLM API (如OpenAI GPT-4, Anthropic Claude, Llama 2等) 来获取响应。 - 流式解析: 许多LLM API支持流式响应。后端需要解析这些流,将其转换为结构化的
ThoughtStep,再通过SSE推送到前端。 - 工具调用: 如果Agent涉及工具使用(RAG、外部API),后端需要协调工具执行,并将工具的输入、输出作为
action和observation类型的步骤推送。
5.2 状态持久化
- 当前实现中,Agent会话在后端清理后就丢失。在生产环境中,Agent的思维链和修正历史需要持久化到数据库(如MongoDB, PostgreSQL)。
- 用户可以重新加载会话,并从上次中断或修正的地方继续。
5.3 并发控制
- 多用户: 每个用户都应该有独立的Agent会话。
- 单个用户多次修正: 当前通过禁用其他修正按钮来避免。更健壮的方案可能是在后端维护一个修正队列,或者在收到修正后,强制Agent进入特定状态,直到修正处理完毕。
5.4 错误处理与重试
- 网络中断: SSE内置了重连机制,但前端仍需处理重连失败的情况。
- Agent内部错误: 如果Agent在思考过程中崩溃,后端应捕获错误并以
system事件的形式通知前端。 - 修正冲突: 如果用户尝试修正一个已被作废的步骤,后端应返回明确的错误信息。
5.5 用户体验优化
- 加载指示: 在Agent思考时显示加载动画。
- Diff视图: 如果用户修正了一个步骤,可以显示修正前后的差异,增强可读性。
- 实时反馈: 当修正请求发送到后端时,立即在UI上显示“修正中”状态,而不是等待后端确认。
- 可折叠步骤: 对于很长的思维链,允许用户折叠已完成的步骤。
5.6 安全性
- 认证与授权: 确保只有授权用户才能访问和修正其Agent会话。
- 输入验证: 严格验证用户修正的内容,防止恶意注入或滥用。
- 速率限制: 防止DDoS攻击或过度使用Agent资源。
6. 总结与展望
Inter-thought Streaming 提供了一种全新的、高度透明且可控的人机协作模式。通过实时展现AI的思维过程并允许用户在关键节点介入,我们不仅提升了AI的可信赖性,也为AI的调试、学习和增强开辟了新的道路。虽然在状态管理、并发控制和真实Agent集成方面存在挑战,但通过结构化的后端流式服务和响应式的前端交互设计,我们可以构建出强大而灵活的Inter-thought Streaming系统。未来,这种能力将成为高级AI应用不可或缺的一部分,推动AI从工具向真正智能助手的演进。