解析 ‘Inter-thought Streaming’:如何在前端实时渲染 Agent 的思维链,并在每一步增加用户‘修正’按钮?

各位同仁,下午好!

今天,我们将深入探讨一个激动人心且极具挑战性的领域:Inter-thought Streaming。顾名思义,我们关注的是如何实时、透明地展现AI智能体(Agent)的思维链(Chain of Thought),并赋予用户在每一步进行干预和修正的能力。这不仅仅是技术上的炫技,更是构建可信赖、可控、高效AI系统的关键一步。

想象一下,一个AI在尝试解决一个复杂问题时,不再只是吐出一个最终答案,而是像人类一样,一步步地思考、规划、执行、观察。更进一步,当它在某一步走偏时,我们能立即发现,并直接告诉它:“等等,这里你错了,应该这么做。” 这种人机协作模式,正是Inter-thought Streaming的核心价值。

1. Inter-thought Streaming:核心概念与价值

什么是Inter-thought Streaming?
它指的是将AI智能体(Agent)的内部思考过程(即其决策、规划、推理、工具调用等一系列中间步骤)以流式(streaming)的方式实时传输到前端界面,并在界面上逐步渲染出来。同时,它还包含一个关键的交互层:用户可以在任何已呈现的思考步骤上进行“修正”或“引导”,从而影响后续的AI行为。

为什么Inter-thought Streaming如此重要?

  1. 透明度与可解释性 (Transparency & Explainability):
    • 我们不再面对一个黑箱。用户可以看到AI是如何得出结论的,这对于调试、理解AI行为至关重要。
    • 在关键业务场景中,例如金融、医疗,了解AI的决策路径是合规性和信任的基础。
  2. 可控性与引导 (Controllability & Guidance):
    • 用户可以及时发现AI的错误或低效路径,并在问题扩大前进行干预。
    • AI可以从用户的修正中学习,或者在用户引导下探索更优解,实现“人类在环”(Human-in-the-Loop)的智能增强。
  3. 调试与优化 (Debugging & Optimization):
    • 开发者可以更直观地观察到AI模型在特定输入下的表现,快速定位推理错误或模型偏差。
    • 通过收集用户修正数据,可以进一步微调Agent的行为或改进基础模型。
  4. 用户体验 (User Experience):
    • 提供动态、交互式的体验,让用户感觉自己在与一个真正“思考”的伙伴协作,而非简单的工具。

核心挑战:

  • 实时性: 如何确保从后端到前端的思考步骤传输足够快,几乎没有延迟?
  • 状态管理: 在AI思考过程中,如何维护其内部状态,并在用户修正后能够“回溯”或“重置”并从修正点继续?
  • 并发与同步: 多个用户或单个用户多次修正可能带来的复杂性。
  • 前端渲染: 如何高效、流畅地渲染不断涌入的思维步骤,并处理用户交互?

2. 架构概览:构建 Inter-thought Streaming 系统

为了实现Inter-thought Streaming,我们需要一个典型的客户端-服务器架构,并辅以特定的通信协议。

核心组件:

  1. AI Agent (后端): 负责模拟或执行AI的思维过程。它将生成一系列结构化的“思考步骤”。
  2. API Server (后端): 充当AI Agent和前端之间的桥梁。它负责:
    • 接收前端的请求。
    • 与AI Agent交互,获取思考步骤。
    • 以流式方式将思考步骤发送到前端。
    • 接收前端的用户修正请求,并将其传达给AI Agent。
  3. Frontend Client (浏览器): 负责:
    • 建立与API Server的流式连接。
    • 实时接收并渲染思考步骤。
    • 在每一步提供“修正”按钮,并处理用户输入。
    • 将修正内容发送回API Server。

通信协议选择:

在实时流式通信方面,我们主要有两种选择:

特性 Server-Sent Events (SSE) WebSockets
通信方向 单向(服务器 -> 客户端) 双向(服务器 <-> 客户端)
协议 基于HTTP/1.1,使用text/event-stream MIME类型 独立于HTTP的TCP协议,通过HTTP握手升级
开销 较小,基于HTTP,易于代理和负载均衡 较大,需要独立服务器或代理支持,协议头开销稍高
心跳机制 内置(浏览器自动重连) 需要手动实现
数据格式 文本,data: [payload]nn 任意数据(文本、二进制),通常使用JSON
浏览器支持 良好 良好
适用场景 服务器主动推送数据,客户端被动接收,如新闻推送、聊天室通知 实时交互性强、双向通信频繁的场景,如在线游戏、即时通讯

选择SSE的原因:
对于Inter-thought Streaming,核心需求是服务器向客户端推送AI的思考步骤。虽然客户端也需要向服务器发送修正请求,但这个请求是独立的、非流式的。SSE的单向流式特性完美契合了思考步骤的推送,并且其基于HTTP的特性使得部署和扩展更为简单。对于修正请求,我们可以使用传统的HTTP POST请求。这种组合既简单又高效。

数据流概览:

graph TD
    A[Frontend Client] -- 1. 建立SSE连接 --> B(API Server)
    B -- 2. 请求AI Agent思考 --> C(AI Agent)
    C -- 3. 生成思考步骤 --> B
    B -- 4. SSE流式推送思考步骤 --> A
    A -- 5. 渲染步骤 & 显示修正按钮 --> A
    A -- 6. 用户点击修正,发送POST请求 --> B
    B -- 7. 接收修正,通知AI Agent --> C
    C -- 8. AI Agent重置/调整思考,生成新步骤 --> B
    B -- 9. SSE流式推送新步骤 --> A

3. 后端实现:AI Agent模拟与SSE流式传输 (Node.js/Express)

在后端,我们需要一个模拟AI Agent的模块,以及一个Express服务器来处理流式请求和修正请求。

3.1 AI Agent 模拟

我们的AI Agent将生成一系列“思考步骤”。每个步骤都包含其类型(例如:thoughtactionobservation)、内容、唯一ID和时间戳。

// src/agent/thoughtAgent.ts

import { v4 as uuidv4 } from 'uuid';

export type ThoughtStepType = 'thought' | 'action' | 'observation' | 'tool_call' | 'final_answer';

export interface ThoughtStep {
    stepId: string;
    type: ThoughtStepType;
    content: string;
    timestamp: string;
    status: 'pending' | 'completed' | 'corrected' | 'invalidated'; // New status for correction flow
    correctionPrompt?: string; // Stored user correction
}

// In a real scenario, this would involve LLM calls, RAG, tool execution etc.
// For this lecture, we simulate it with a predefined sequence and delays.
const initialThoughtSequence: ThoughtStep[] = [
    { stepId: uuidv4(), type: 'thought', content: '用户想了解如何优化网站性能,我需要先分析其当前的技术栈。', timestamp: new Date().toISOString(), status: 'pending' },
    { stepId: uuidv4(), type: 'action', content: '调用`analyzeWebsite(url)`工具获取网站信息。', timestamp: new Date().toISOString(), status: 'pending' },
    { stepId: uuidv4(), type: 'observation', content: '网站使用React前端,Node.js后端,数据库是MongoDB。静态资源未CDN化。', timestamp: new Date().toISOString(), status: 'pending' },
    { stepId: uuidv4(), type: 'thought', content: '基于现有信息,初步判断优化方向:图片优化、CDN部署、服务端渲染。', timestamp: new Date().toISOString(), status: 'pending' },
    { stepId: uuidv4(), type: 'action', content: '生成一份优化建议报告。', timestamp: new Date().toISOString(), status: 'pending' },
    { stepId: uuidv4(), type: 'final_answer', content: '报告已生成,包含图片优化、CDN部署和服务端渲染等建议。', timestamp: new Date().toISOString(), status: 'pending' },
];

class AgentSession {
    private sessionId: string;
    private currentThoughtChain: ThoughtStep[];
    private currentStepIndex: number;
    private isPaused: boolean;
    private listeners: ((step: ThoughtStep) => void)[] = [];

    constructor(sessionId: string, initialChain: ThoughtStep[] = initialThoughtSequence) {
        this.sessionId = sessionId;
        this.currentThoughtChain = JSON.parse(JSON.stringify(initialChain)); // Deep copy
        this.currentStepIndex = 0;
        this.isPaused = false;
        console.log(`Agent session ${sessionId} created.`);
    }

    // Register a listener for new thought steps
    public addListener(listener: (step: ThoughtStep) => void) {
        this.listeners.push(listener);
    }

    public removeListener(listener: (step: ThoughtStep) => void) {
        this.listeners = this.listeners.filter(l => l !== listener);
    }

    private notifyListeners(step: ThoughtStep) {
        this.listeners.forEach(listener => listener(step));
    }

    public async startThinking() {
        if (this.currentStepIndex >= this.currentThoughtChain.length) {
            console.log(`Agent session ${this.sessionId} finished thinking.`);
            return;
        }

        this.isPaused = false;
        console.log(`Agent session ${this.sessionId} started/resumed thinking from index ${this.currentStepIndex}.`);

        // Simulate thinking step by step
        while (this.currentStepIndex < this.currentThoughtChain.length && !this.isPaused) {
            const step = this.currentThoughtChain[this.currentStepIndex];
            step.timestamp = new Date().toISOString(); // Update timestamp on generation
            step.status = 'completed'; // Mark as completed when generated

            this.notifyListeners(step); // Push to listeners (SSE)
            console.log(`[${this.sessionId}] Emitted step ${this.currentStepIndex + 1}: ${step.type} - ${step.content.substring(0, 30)}...`);

            this.currentStepIndex++;
            if (this.currentStepIndex < this.currentThoughtChain.length) {
                await new Promise(resolve => setTimeout(resolve, Math.random() * 2000 + 1000)); // Simulate delay
            }
        }
        console.log(`Agent session ${this.sessionId} paused or finished.`);
    }

    public pauseThinking() {
        this.isPaused = true;
        console.log(`Agent session ${this.sessionId} paused.`);
    }

    // Handles user correction
    public async applyCorrection(stepId: string, newContent: string): Promise<boolean> {
        this.pauseThinking(); // Pause current generation
        console.log(`Applying correction for stepId: ${stepId}, newContent: ${newContent}`);

        const correctedIndex = this.currentThoughtChain.findIndex(step => step.stepId === stepId);

        if (correctedIndex === -1) {
            console.error(`Correction failed: Step ID ${stepId} not found.`);
            return false;
        }

        // 1. Update the corrected step
        const correctedStep = this.currentThoughtChain[correctedIndex];
        correctedStep.content = newContent;
        correctedStep.status = 'corrected';
        correctedStep.correctionPrompt = newContent;
        correctedStep.timestamp = new Date().toISOString();

        // 2. Invalidate all subsequent steps (they are now based on potentially wrong premise)
        for (let i = correctedIndex + 1; i < this.currentThoughtChain.length; i++) {
            this.currentThoughtChain[i].status = 'invalidated';
            // Optionally, you might want to clear content or modify it to reflect invalidation
        }

        // 3. Re-generate from the corrected point (simplified simulation)
        // In a real LLM Agent, this would involve re-prompting the LLM with the modified context
        // For simulation, we'll just cut the chain and generate a new sequence from here.
        this.currentThoughtChain = this.currentThoughtChain.slice(0, correctedIndex + 1);
        const newSequence = [
            { stepId: uuidv4(), type: 'thought', content: `基于修正:'${newContent}',我需要重新评估优化策略。`, timestamp: new Date().toISOString(), status: 'pending' },
            { stepId: uuidv4(), type: 'action', content: '考虑新的优化方向,例如数据库索引优化。', timestamp: new Date().toISOString(), status: 'pending' },
            { stepId: uuidv4(), type: 'final_answer', content: '新的优化报告将包含数据库优化建议。', timestamp: new Date().toISOString(), status: 'pending' },
        ];
        this.currentThoughtChain = [...this.currentThoughtChain, ...newSequence];
        this.currentStepIndex = correctedIndex; // Agent will restart from this index

        // Notify frontend that correction was applied and stream will resume
        this.notifyListeners({
            stepId: 'correction-event',
            type: 'system',
            content: JSON.stringify({
                action: 'correction_applied',
                correctedStepId: stepId,
                newChainLength: this.currentThoughtChain.length,
                resumeFromIndex: this.currentStepIndex
            }),
            timestamp: new Date().toISOString(),
            status: 'completed'
        });

        // Immediately restart thinking from the corrected point
        // A small delay ensures the correction event is processed first by frontend
        setTimeout(() => this.startThinking(), 100);
        return true;
    }

    public getCurrentChain(): ThoughtStep[] {
        return this.currentThoughtChain;
    }
}

// Global map to hold active agent sessions
const agentSessions = new Map<string, AgentSession>();

export function getAgentSession(sessionId: string): AgentSession {
    if (!agentSessions.has(sessionId)) {
        const newSession = new AgentSession(sessionId);
        agentSessions.set(sessionId, newSession);
    }
    return agentSessions.get(sessionId)!;
}

export function cleanupAgentSession(sessionId: string) {
    if (agentSessions.has(sessionId)) {
        agentSessions.get(sessionId)?.pauseThinking();
        agentSessions.delete(sessionId);
        console.log(`Agent session ${sessionId} cleaned up.`);
    }
}

AgentSession 的关键逻辑:

  • currentThoughtChain: 存储当前会话的思维链。
  • currentStepIndex: 标记当前正在处理的步骤。
  • isPaused: 控制流式生成。
  • addListener/notifyListeners: 实现发布-订阅模式,让SSE控制器能接收新步骤。
  • applyCorrection: 这是核心。它会暂停当前的思考,更新被修正的步骤,作废其后的所有步骤,然后模拟重新生成新的思考链,并从修正点重新开始。同时,它会发送一个特殊的 system 事件来通知前端修正已应用。

3.2 API Server (Express)

// src/server.ts

import express from 'express';
import cors from 'cors';
import { getAgentSession, ThoughtStep, cleanupAgentSession } from './agent/thoughtAgent';
import { v4 as uuidv4 } from 'uuid';

const app = express();
const PORT = 3001;

app.use(cors({
    origin: 'http://localhost:3000', // Allow frontend to connect
    credentials: true,
}));
app.use(express.json()); // For parsing correction POST requests

// SSE Endpoint for streaming thoughts
app.get('/api/stream/thoughts/:sessionId', (req, res) => {
    const sessionId = req.params.sessionId;
    const agentSession = getAgentSession(sessionId);

    // Set SSE headers
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.setHeader('Access-Control-Allow-Origin', '*'); // Necessary for CORS with SSE

    // If the session already has steps, send them first
    const currentChain = agentSession.getCurrentChain();
    currentChain.forEach(step => {
        res.write(`data: ${JSON.stringify(step)}nn`);
    });
    res.flush(); // Ensure headers and initial data are sent immediately

    const listener = (step: ThoughtStep) => {
        res.write(`data: ${JSON.stringify(step)}nn`);
        res.flush(); // Send data immediately
    };

    agentSession.addListener(listener);

    // Start thinking if not already started or paused at the beginning
    // If the session was corrected and restarted, it would already be thinking
    if (agentSession.getCurrentChain().length === 0 || agentSession.getCurrentChain().every(s => s.status !== 'completed')) {
         agentSession.startThinking();
    }

    req.on('close', () => {
        console.log(`Client disconnected from session ${sessionId}.`);
        agentSession.removeListener(listener);
        // In a real app, you might want to keep the session alive for a while
        // or offer a way to resume. For now, we'll clean it up.
        cleanupAgentSession(sessionId);
    });
});

// Endpoint for receiving user corrections
app.post('/api/stream/correct/:sessionId', async (req, res) => {
    const sessionId = req.params.sessionId;
    const { stepId, newContent } = req.body;

    if (!stepId || !newContent) {
        return res.status(400).json({ error: 'Missing stepId or newContent.' });
    }

    const agentSession = getAgentSession(sessionId);
    const success = await agentSession.applyCorrection(stepId, newContent);

    if (success) {
        res.json({ message: 'Correction applied. Agent will re-evaluate.', stepId });
    } else {
        res.status(500).json({ error: 'Failed to apply correction.' });
    }
});

app.listen(PORT, () => {
    console.log(`Backend streaming server running on http://localhost:${PORT}`);
});

后端代码解释:

  • SSE Endpoint (/api/stream/thoughts/:sessionId):
    • 设置必要的SSE头。
    • 创建一个listener函数,当Agent生成新步骤时,通过res.writeres.flush将其发送给客户端。
    • listener注册到AgentSession
    • 处理客户端断开连接的事件 (req.on('close')),移除监听器并清理会话。
    • agentSession.startThinking(): 启动Agent的思考过程。
  • Correction Endpoint (/api/stream/correct/:sessionId):
    • 接收stepIdnewContent
    • 调用agentSession.applyCorrection来处理修正逻辑。
    • 返回成功或失败响应。

4. 前端实现:实时渲染与交互 (React)

前端需要一个主组件来管理SSE连接、渲染思维步骤列表,以及处理用户修正的模态框。

4.1 数据结构

// src/types.ts
export type ThoughtStepType = 'thought' | 'action' | 'observation' | 'tool_call' | 'final_answer' | 'system';

export interface ThoughtStep {
    stepId: string;
    type: ThoughtStepType;
    content: string;
    timestamp: string;
    status: 'pending' | 'completed' | 'corrected' | 'invalidated';
    correctionPrompt?: string;
}

4.2 ThoughtStep 单个步骤组件

// src/components/ThoughtStep.tsx

import React from 'react';
import { ThoughtStep } from '../types';

interface ThoughtStepProps {
    step: ThoughtStep;
    onCorrect: (stepId: string) => void;
    isCorrecting: boolean; // Flag to disable correction button when another is active
}

const ThoughtStepComponent: React.FC<ThoughtStepProps> = ({ step, onCorrect, isCorrecting }) => {
    const getStatusClass = () => {
        switch (step.status) {
            case 'corrected': return 'bg-yellow-100 border-yellow-500';
            case 'invalidated': return 'bg-gray-100 border-gray-300 text-gray-500 line-through';
            case 'pending': return 'bg-blue-50 border-blue-300'; // Should not happen for rendered steps
            case 'completed':
            default: return 'bg-white border-gray-200';
        }
    };

    const getIcon = () => {
        switch (step.type) {
            case 'thought': return '🤔';
            case 'action': return '⚙️';
            case 'observation': return '👀';
            case 'tool_call': return '🛠️';
            case 'final_answer': return '✅';
            case 'system': return 'ℹ️';
            default: return '💬';
        }
    };

    return (
        <div className={`flex items-start p-4 mb-2 rounded-lg shadow-sm border ${getStatusClass()}`}>
            <div className="flex-shrink-0 mr-3 text-xl">{getIcon()}</div>
            <div className="flex-grow">
                <div className="text-sm font-semibold text-gray-700 mb-1 capitalize">
                    {step.type.replace('_', ' ')}
                    {step.status === 'corrected' && <span className="ml-2 text-yellow-700">(已修正)</span>}
                    {step.status === 'invalidated' && <span className="ml-2 text-gray-500">(已作废)</span>}
                </div>
                <p className="text-gray-800 whitespace-pre-wrap">{step.content}</p>
                {step.correctionPrompt && step.status === 'corrected' && (
                    <p className="mt-2 p-2 bg-yellow-50 border-l-4 border-yellow-400 text-sm text-yellow-800">
                        用户修正: {step.correctionPrompt}
                    </p>
                )}
                <div className="text-xs text-gray-500 mt-2">
                    {new Date(step.timestamp).toLocaleTimeString()}
                </div>
            </div>
            {step.type !== 'system' && step.status !== 'invalidated' && step.status !== 'corrected' && (
                <button
                    onClick={() => onCorrect(step.stepId)}
                    disabled={isCorrecting}
                    className="ml-4 px-3 py-1 bg-indigo-500 text-white text-sm rounded-md hover:bg-indigo-600 disabled:opacity-50 disabled:cursor-not-allowed transition-colors"
                >
                    修正
                </button>
            )}
        </div>
    );
};

export default ThoughtStepComponent;

4.3 CorrectionModal 修正模态框组件

// src/components/CorrectionModal.tsx

import React, { useState } from 'react';

interface CorrectionModalProps {
    isOpen: boolean;
    onClose: () => void;
    onSave: (newContent: string) => void;
    initialContent: string;
    loading: boolean;
}

const CorrectionModal: React.FC<CorrectionModalProps> = ({ isOpen, onClose, onSave, initialContent, loading }) => {
    const [newContent, setNewContent] = useState(initialContent);

    React.useEffect(() => {
        if (isOpen) {
            setNewContent(initialContent);
        }
    }, [isOpen, initialContent]);

    if (!isOpen) return null;

    return (
        <div className="fixed inset-0 bg-gray-600 bg-opacity-50 flex items-center justify-center z-50">
            <div className="bg-white p-6 rounded-lg shadow-xl w-full max-w-md">
                <h3 className="text-lg font-bold mb-4">修正思考步骤</h3>
                <textarea
                    className="w-full p-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-indigo-500 resize-y min-h-[100px]"
                    value={newContent}
                    onChange={(e) => setNewContent(e.target.value)}
                    placeholder="请输入修正后的内容..."
                    disabled={loading}
                ></textarea>
                <div className="mt-4 flex justify-end space-x-2">
                    <button
                        onClick={onClose}
                        className="px-4 py-2 bg-gray-200 text-gray-800 rounded-md hover:bg-gray-300 disabled:opacity-50"
                        disabled={loading}
                    >
                        取消
                    </button>
                    <button
                        onClick={() => onSave(newContent)}
                        className="px-4 py-2 bg-indigo-500 text-white rounded-md hover:bg-indigo-600 disabled:opacity-50"
                        disabled={loading || newContent.trim() === ''}
                    >
                        {loading ? '保存中...' : '保存修正'}
                    </button>
                </div>
            </div>
        </div>
    );
};

export default CorrectionModal;

4.4 ThoughtStream 主组件

这是核心组件,负责连接SSE、管理思维链状态和处理修正逻辑。

// src/components/ThoughtStream.tsx

import React, { useEffect, useRef, useState, useCallback } from 'react';
import { v4 as uuidv4 } from 'uuid';
import ThoughtStepComponent from './ThoughtStep';
import CorrectionModal from './CorrectionModal';
import { ThoughtStep } from '../types';

const API_BASE_URL = 'http://localhost:3001';

const ThoughtStream: React.FC = () => {
    const [sessionId, setSessionId] = useState<string>('');
    const [thoughtChain, setThoughtChain] = useState<ThoughtStep[]>([]);
    const [isConnecting, setIsConnecting] = useState<boolean>(false);
    const [isConnected, setIsConnected] = useState<boolean>(false);
    const [error, setError] = useState<string | null>(null);

    // Correction state
    const [isCorrectionModalOpen, setIsCorrectionModalOpen] = useState(false);
    const [currentCorrectionStep, setCurrentCorrectionStep] = useState<ThoughtStep | null>(null);
    const [isSendingCorrection, setIsSendingCorrection] = useState(false);

    const eventSourceRef = useRef<EventSource | null>(null);
    const scrollContainerRef = useRef<HTMLDivElement>(null);

    // Auto-scroll to bottom on new content
    useEffect(() => {
        if (scrollContainerRef.current) {
            scrollContainerRef.current.scrollTop = scrollContainerRef.current.scrollHeight;
        }
    }, [thoughtChain]);

    // Handle incoming SSE events
    const handleNewThoughtStep = useCallback((event: MessageEvent) => {
        try {
            const newStep: ThoughtStep = JSON.parse(event.data);

            // Handle system events like correction_applied
            if (newStep.type === 'system' && newStep.content) {
                const systemEvent = JSON.parse(newStep.content);
                if (systemEvent.action === 'correction_applied') {
                    console.log('Correction applied by backend:', systemEvent);
                    setThoughtChain(prevChain => {
                        const updatedChain = prevChain.map(step =>
                            step.stepId === systemEvent.correctedStepId
                                ? { ...step, status: 'corrected' } // Mark the corrected step
                                : step
                        );
                        // Filter out invalidated steps that are after the corrected one
                        // And if the backend is regenerating, it will send new steps
                        return updatedChain.filter((step, index) => {
                            const correctedIndex = updatedChain.findIndex(s => s.stepId === systemEvent.correctedStepId);
                            return index <= correctedIndex || step.status !== 'invalidated';
                        });
                    });
                    setIsCorrectionModalOpen(false); // Close modal after backend acknowledges
                    setIsSendingCorrection(false);
                    return; // Don't add system event to thoughtChain directly
                }
            }

            // Normal thought step
            setThoughtChain(prevChain => {
                // If we've already received this step (e.g., during re-connection or re-streaming after correction), update it.
                // Otherwise, append it.
                const existingIndex = prevChain.findIndex(s => s.stepId === newStep.stepId);
                if (existingIndex !== -1) {
                    const updatedChain = [...prevChain];
                    updatedChain[existingIndex] = { ...newStep, status: 'completed' }; // Ensure status is completed
                    return updatedChain;
                } else if (newStep.status !== 'invalidated') { // Only add if not marked invalidated by backend
                    return [...prevChain, { ...newStep, status: 'completed' }]; // Mark as completed when received
                }
                return prevChain;
            });
        } catch (err) {
            console.error('Error parsing SSE data:', err);
            setError('Failed to parse incoming data.');
        }
    }, []);

    // Establish SSE connection
    const connectToStream = useCallback((id: string) => {
        if (eventSourceRef.current) {
            eventSourceRef.current.close();
            eventSourceRef.current = null;
        }

        setIsConnecting(true);
        setError(null);
        setThoughtChain([]); // Clear previous chain on new connection

        const es = new EventSource(`${API_BASE_URL}/api/stream/thoughts/${id}`);
        eventSourceRef.current = es;

        es.onopen = () => {
            console.log('SSE connection opened.');
            setIsConnecting(false);
            setIsConnected(true);
            setError(null);
        };

        es.onmessage = handleNewThoughtStep;

        es.onerror = (err) => {
            console.error('SSE error:', err);
            setIsConnecting(false);
            setIsConnected(false);
            setError('Stream connection error. Please try again.');
            es.close();
        };

        return () => {
            console.log('SSE connection closed.');
            es.close();
            eventSourceRef.current = null;
            setIsConnected(false);
        };
    }, [handleNewThoughtStep]);

    // Initialize session or connect
    useEffect(() => {
        const storedSessionId = localStorage.getItem('agentSessionId');
        let currentId = storedSessionId || uuidv4();
        if (!storedSessionId) {
            localStorage.setItem('agentSessionId', currentId);
        }
        setSessionId(currentId);
        connectToStream(currentId);

        return () => {
            if (eventSourceRef.current) {
                eventSourceRef.current.close();
            }
        };
    }, [connectToStream]); // Only run once on mount

    // Correction handlers
    const handleCorrectButtonClick = (stepId: string) => {
        const stepToCorrect = thoughtChain.find(step => step.stepId === stepId);
        if (stepToCorrect) {
            setCurrentCorrectionStep(stepToCorrect);
            setIsCorrectionModalOpen(true);
        }
    };

    const handleCorrectionSave = async (newContent: string) => {
        if (!currentCorrectionStep || !sessionId) return;

        setIsSendingCorrection(true);
        setError(null);

        try {
            const response = await fetch(`${API_BASE_URL}/api/stream/correct/${sessionId}`, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                    stepId: currentCorrectionStep.stepId,
                    newContent: newContent,
                }),
            });

            if (!response.ok) {
                const errorData = await response.json();
                throw new Error(errorData.error || 'Failed to send correction.');
            }

            // Frontend updates the step's content and sets status to 'pending correction'
            // The actual 'corrected' status and new stream will come from SSE
            setThoughtChain(prevChain =>
                prevChain.map(step =>
                    step.stepId === currentCorrectionStep.stepId
                        ? { ...step, content: newContent, correctionPrompt: newContent, status: 'pending' }
                        : step // Keep other steps as they are, backend will send invalidation/new steps
                )
            );
            // Modal will be closed by the SSE event from backend
        } catch (err: any) {
            console.error('Error sending correction:', err);
            setError(`发送修正失败: ${err.message}`);
            setIsSendingCorrection(false);
            setIsCorrectionModalOpen(false);
        }
    };

    const handleCorrectionClose = () => {
        setIsCorrectionModalOpen(false);
        setCurrentCorrectionStep(null);
        setIsSendingCorrection(false);
    };

    const isAnyCorrectionActive = isCorrectionModalOpen || isSendingCorrection;

    return (
        <div className="min-h-screen bg-gray-50 p-6 font-sans">
            <h1 className="text-3xl font-bold text-gray-800 mb-6 text-center">Inter-thought Streaming: Agent 思维链</h1>

            <div className="max-w-3xl mx-auto bg-white rounded-xl shadow-lg p-6">
                <div className="mb-4 flex justify-between items-center">
                    <h2 className="text-xl font-semibold text-gray-700">Agent 思维过程</h2>
                    <span className={`px-3 py-1 text-sm rounded-full ${isConnected ? 'bg-green-100 text-green-700' : 'bg-red-100 text-red-700'}`}>
                        {isConnecting ? '连接中...' : (isConnected ? '已连接' : '已断开')}
                    </span>
                </div>

                {error && (
                    <div className="bg-red-100 border border-red-400 text-red-700 px-4 py-3 rounded relative mb-4" role="alert">
                        <strong className="font-bold">错误!</strong>
                        <span className="block sm:inline"> {error}</span>
                    </div>
                )}

                <div ref={scrollContainerRef} className="max-h-[70vh] overflow-y-auto pr-2">
                    {thoughtChain.length === 0 && !isConnecting && !error && (
                        <p className="text-gray-500 text-center py-8">Agent 正在启动思考...</p>
                    )}
                    {thoughtChain.map((step) => (
                        <ThoughtStepComponent
                            key={step.stepId}
                            step={step}
                            onCorrect={handleCorrectButtonClick}
                            isCorrecting={isAnyCorrectionActive} // Disable other correction buttons
                        />
                    ))}
                </div>
            </div>

            <CorrectionModal
                isOpen={isCorrectionModalOpen}
                onClose={handleCorrectionClose}
                onSave={handleCorrectionSave}
                initialContent={currentCorrectionStep?.content || ''}
                loading={isSendingCorrection}
            />
        </div>
    );
};

export default ThoughtStream;

前端代码解释:

  • 状态管理 (useState):
    • sessionId: 用于识别当前用户会话。
    • thoughtChain: 存储所有已接收的思考步骤。
    • isConnecting, isConnected, error: 连接状态和错误信息。
    • isCorrectionModalOpen, currentCorrectionStep, isSendingCorrection: 修正模态框及其状态。
  • EventSource API:
    • connectToStream 函数负责创建和管理EventSource实例。
    • es.onmessage: 接收服务器推送的数据,并调用 handleNewThoughtStep 处理。
    • es.onopen, es.onerror: 处理连接事件。
  • handleNewThoughtStep (核心逻辑):
    • 解析JSON数据。
    • 处理 system 事件: 当后端发送 correction_applied 事件时,更新 thoughtChain。这包括将修正的步骤标记为 corrected,并移除所有被后端标记为 invalidated 或在修正点之后的旧步骤。这是确保UI与后端状态同步的关键。
    • 处理普通步骤: 将新步骤添加到 thoughtChain 中。如果存在同 stepId 的旧步骤,则更新它(这在后端重新发送已存在的步骤时很有用)。
  • 修正逻辑:
    • handleCorrectButtonClick: 当用户点击“修正”按钮时,打开 CorrectionModal,并设置 currentCorrectionStep
    • handleCorrectionSave: 当用户在模态框中点击“保存修正”时,发送POST请求到后端 /api/stream/correct/:sessionId
    • isAnyCorrectionActive: 一个辅助状态,用于在修正过程中禁用其他“修正”按钮,避免并发修正导致的状态混乱。

5. 挑战与高级考量

5.1 真实的LLM Agent集成

  • API 抽象: 在真实场景中,AgentSession 不会直接模拟步骤,而是通过调用LLM API (如OpenAI GPT-4, Anthropic Claude, Llama 2等) 来获取响应。
  • 流式解析: 许多LLM API支持流式响应。后端需要解析这些流,将其转换为结构化的ThoughtStep,再通过SSE推送到前端。
  • 工具调用: 如果Agent涉及工具使用(RAG、外部API),后端需要协调工具执行,并将工具的输入、输出作为actionobservation类型的步骤推送。

5.2 状态持久化

  • 当前实现中,Agent会话在后端清理后就丢失。在生产环境中,Agent的思维链和修正历史需要持久化到数据库(如MongoDB, PostgreSQL)。
  • 用户可以重新加载会话,并从上次中断或修正的地方继续。

5.3 并发控制

  • 多用户: 每个用户都应该有独立的Agent会话。
  • 单个用户多次修正: 当前通过禁用其他修正按钮来避免。更健壮的方案可能是在后端维护一个修正队列,或者在收到修正后,强制Agent进入特定状态,直到修正处理完毕。

5.4 错误处理与重试

  • 网络中断: SSE内置了重连机制,但前端仍需处理重连失败的情况。
  • Agent内部错误: 如果Agent在思考过程中崩溃,后端应捕获错误并以system事件的形式通知前端。
  • 修正冲突: 如果用户尝试修正一个已被作废的步骤,后端应返回明确的错误信息。

5.5 用户体验优化

  • 加载指示: 在Agent思考时显示加载动画。
  • Diff视图: 如果用户修正了一个步骤,可以显示修正前后的差异,增强可读性。
  • 实时反馈: 当修正请求发送到后端时,立即在UI上显示“修正中”状态,而不是等待后端确认。
  • 可折叠步骤: 对于很长的思维链,允许用户折叠已完成的步骤。

5.6 安全性

  • 认证与授权: 确保只有授权用户才能访问和修正其Agent会话。
  • 输入验证: 严格验证用户修正的内容,防止恶意注入或滥用。
  • 速率限制: 防止DDoS攻击或过度使用Agent资源。

6. 总结与展望

Inter-thought Streaming 提供了一种全新的、高度透明且可控的人机协作模式。通过实时展现AI的思维过程并允许用户在关键节点介入,我们不仅提升了AI的可信赖性,也为AI的调试、学习和增强开辟了新的道路。虽然在状态管理、并发控制和真实Agent集成方面存在挑战,但通过结构化的后端流式服务和响应式的前端交互设计,我们可以构建出强大而灵活的Inter-thought Streaming系统。未来,这种能力将成为高级AI应用不可或缺的一部分,推动AI从工具向真正智能助手的演进。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注