图执行中的部分状态流式传输:实时推送中间思考过程至前端
各位专家,下午好!今天我们来探讨一个在构建复杂系统,特别是数据处理管道、机器学习工作流或自动化引擎时,常常遇到的核心挑战:如何在图(Graph)状任务执行过程中,将其中间“思考过程”——即实时产生的中间状态和结果——透明、高效地推送至前端进行展示。我们称之为“Partial State Streaming”,即部分状态的流式传输。
一、引言:图执行与实时反馈的挑战
在现代软件架构中,许多复杂业务逻辑和数据处理流程都可以抽象为有向无环图(DAG)的形式。每一个节点(Node)代表一个任务或一个计算步骤,边(Edge)则表示数据流或任务间的依赖关系。例如,一个数据ETL(抽取、转换、加载)管道可能包含数据源读取、清洗、转换、聚合、写入等多个节点;一个机器学习训练工作流可能涉及数据预处理、模型训练、评估、部署等环节。
这些图任务的执行往往耗时较长,短则几秒,长则数小时甚至数天。对于用户而言,长时间的等待而没有任何反馈是极其糟糕的体验。他们需要知道:
- 任务是否还在运行?
- 当前执行到哪个阶段了?
- 已经完成了多少?
- 中间产生了什么结果?
- 是否有错误发生?
传统的做法是等待整个图执行完成,然后一次性返回最终结果。但这显然无法满足实时性反馈的需求。因此,“Partial State Streaming”应运而生:它旨在解决如何在图执行的“中间”阶段,当任务仍在进行时,将已完成节点的结果、节点内部的进度、甚至整个图的运行状态变更,以流式的方式实时推送到前端,让用户能够“看见”图的思考过程。
“中间思考过程”在这里是一个广义的概念,它可以包括:
- 节点级别的输出: 某个节点完成其计算后产生的中间数据或结果。
- 节点内部进度: 对于一个耗时较长的节点,其内部可能包含多个子步骤,这些子步骤的完成度或状态变化。
- 图级别的状态: 整个图的执行进度(例如,已完成节点数/总节点数)、整体健康状况或全局共享的指标更新。
- 日志和错误信息: 实时生成的日志输出或任何在执行过程中发现的错误。
实现这一目标,需要我们在图执行引擎、后端推送服务以及前端展示逻辑等多个层面进行精心设计。
二、问题剖析:需求与技术栈考量
在深入技术细节之前,我们首先明确一下实现Partial State Streaming的核心需求和可选的技术栈。
2.1 核心需求
- 实时性 (Real-time): 中间状态的推送应尽量低延迟,确保前端展示与后端实际执行状态的高度同步。
- 部分性 (Partial): 只推送当前已就绪、已完成或已更新的部分状态,而不是等待整个图执行完成。
- 状态性 (Stateful): 传输的数据应能准确描述特定节点或整个图的当前状态,并可供前端重建或更新其内部状态模型。
- 可追踪性 (Traceability): 每个推送的中间状态都应能清晰地关联到其来源,例如属于哪个图实例、哪个节点。
- 健壮性 (Robustness): 系统应能处理网络中断、消息丢失、后端过载等异常情况,并尽可能保证数据的可靠传输。
- 可扩展性 (Scalability): 能够支持大量并发的图执行实例以及高并发的前端连接。
- 灵活性 (Flexibility): 允许根据具体场景选择推送的粒度(节点完成、节点内部步骤、全局状态等)。
2.2 技术栈考量
为了满足上述需求,我们需要在整个技术栈中做出选择:
- 后端图执行框架: 可以是自研的图调度器,也可以是基于Apache Airflow、Prefect、Kubeflow Pipelines等开源项目。关键在于如何从这些框架中捕获中间状态。
- 后端推送机制: 这是实现实时性的核心。可选方案包括:
- HTTP Long Polling (长轮询): 客户端发起请求,服务器在有数据更新或超时时才响应。
- Server-Sent Events (SSE): 客户端建立持久HTTP连接,服务器单向推送文本流。
- WebSockets: 客户端和服务器建立持久双向连接。
- 消息队列 (Message Queues) + 推送服务: 图执行引擎将状态推送到消息队列,独立的推送服务消费队列消息并推送到前端。
- 数据序列化: 用于将结构化的中间状态转换为网络传输格式。常用的有JSON和Protocol Buffers (Protobuf)。
- 前端接收与渲染: 负责接收后端推送的数据,更新UI状态,并动态渲染图的可视化。通常使用JavaScript配合现代前端框架(React, Vue, Angular)。
三、核心机制:如何在图执行中捕获中间状态
实现Partial State Streaming的第一步也是最关键的一步,是如何在图执行引擎内部有效地捕获这些“中间思考过程”。这要求我们的图执行引擎具备一定的可观测性和钩子(Hooks)机制。
假设我们有一个简化的图执行引擎,由 Graph、Node 和 ExecutionEngine 组成。
# graph_engine.py
import uuid
import time
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Callable
# 定义中间状态消息的结构
class IntermediateMessage:
def __init__(self, execution_id: str, node_id: str, timestamp: float,
message_type: str, payload: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None):
self.execution_id = execution_id
self.node_id = node_id
self.timestamp = timestamp
self.message_type = message_type
self.payload = payload
self.metadata = metadata if metadata is not None else {}
def to_dict(self):
return {
"execution_id": self.execution_id,
"node_id": self.node_id,
"timestamp": self.timestamp,
"message_type": self.message_type,
"payload": self.payload,
"metadata": self.metadata
}
# 定义一个抽象节点
class Node(ABC):
def __init__(self, node_id: str, name: str, dependencies: Optional[List[str]] = None):
self.node_id = node_id
self.name = name
self.dependencies = dependencies if dependencies is not None else []
self._output: Any = None
self._status: str = "PENDING" # PENDING, RUNNING, COMPLETED, FAILED
@property
def output(self) -> Any:
return self._output
@property
def status(self) -> str:
return self._status
@status.setter
def status(self, value: str):
self._status = value
@abstractmethod
def execute(self, inputs: Dict[str, Any],
stream_callback: Optional[Callable[[IntermediateMessage], None]] = None) -> Any:
"""
执行节点逻辑。
:param inputs: 来自依赖节点的输入。
:param stream_callback: 用于推送中间状态的回调函数。
:return: 节点的输出。
"""
pass
# 图的执行上下文,用于存储共享数据和传递流式推送的机制
class ExecutionContext:
def __init__(self, execution_id: str, stream_callback: Callable[[IntermediateMessage], None]):
self.execution_id = execution_id
self.stream_callback = stream_callback
self.results: Dict[str, Any] = {} # 存储已完成节点的输出
self.node_statuses: Dict[str, str] = {} # 存储所有节点的当前状态
def emit_message(self, node_id: str, message_type: str, payload: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None):
"""包装并调用流式推送回调"""
message = IntermediateMessage(
execution_id=self.execution_id,
node_id=node_id,
timestamp=time.time(),
message_type=message_type,
payload=payload,
metadata=metadata
)
self.stream_callback(message)
def set_node_status(self, node_id: str, status: str):
self.node_statuses[node_id] = status
self.emit_message(node_id, "NODE_STATUS_UPDATE", {"status": status})
# 简单的具体节点实现
class DataLoadNode(Node):
def __init__(self, node_id: str, name: str, data_path: str, dependencies: Optional[List[str]] = None):
super().__init__(node_id, name, dependencies)
self.data_path = data_path
def execute(self, inputs: Dict[str, Any],
stream_callback: Optional[Callable[[IntermediateMessage], None]] = None) -> Any:
self.status = "RUNNING"
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A", # Will be filled by context
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_STATUS_UPDATE",
payload={"status": "RUNNING"},
metadata={"name": self.name}
))
stream_callback(IntermediateMessage(
execution_id="N/A", # Will be filled by context
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_PROGRESS",
payload={"message": f"Loading data from {self.data_path}"},
metadata={"percentage": 10}
))
print(f"[{self.name}] Loading data from {self.data_path}...")
time.sleep(2) # Simulate work
data = {"id": 1, "value": f"data_from_{self.data_path}"}
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A", # Will be filled by context
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_PROGRESS",
payload={"message": "Data loaded"},
metadata={"percentage": 100}
))
stream_callback(IntermediateMessage(
execution_id="N/A", # Will be filled by context
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_OUTPUT",
payload={"output_preview": str(data)[:50]}, # Send a preview, not full data
metadata={"output_size": len(str(data))}
))
self._output = data
self.status = "COMPLETED"
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A", # Will be filled by context
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_STATUS_UPDATE",
payload={"status": "COMPLETED"},
metadata={"name": self.name}
))
return self._output
class ProcessDataNode(Node):
def __init__(self, node_id: str, name: str, processing_logic: Callable[[Any], Any], dependencies: Optional[List[str]] = None):
super().__init__(node_id, name, dependencies)
self.processing_logic = processing_logic
def execute(self, inputs: Dict[str, Any],
stream_callback: Optional[Callable[[IntermediateMessage], None]] = None) -> Any:
self.status = "RUNNING"
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A",
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_STATUS_UPDATE",
payload={"status": "RUNNING"},
metadata={"name": self.name}
))
input_data = list(inputs.values())[0] # Assume single input for simplicity
print(f"[{self.name}] Processing data: {input_data}...")
for i in range(1, 4):
time.sleep(1) # Simulate work
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A",
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_PROGRESS",
payload={"message": f"Step {i}/3 completed"},
metadata={"percentage": i * 33}
))
processed_data = self.processing_logic(input_data)
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A",
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_OUTPUT",
payload={"output_preview": str(processed_data)[:50]},
metadata={"output_size": len(str(processed_data))}
))
self._output = processed_data
self.status = "COMPLETED"
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A",
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_STATUS_UPDATE",
payload={"status": "COMPLETED"},
metadata={"name": self.name}
))
return self._output
class Graph:
def __init__(self, graph_id: str, nodes: Dict[str, Node]):
self.graph_id = graph_id
self.nodes = nodes
self.adjacency_list = self._build_adjacency_list()
def _build_adjacency_list(self):
adj = {node_id: [] for node_id in self.nodes}
for node_id, node in self.nodes.items():
for dep_id in node.dependencies:
if dep_id not in self.nodes:
raise ValueError(f"Dependency {dep_id} for node {node_id} not found in graph.")
adj[dep_id].append(node_id) # Adjacency list for downstream nodes
return adj
def get_downstream_nodes(self, node_id: str) -> List[str]:
return self.adjacency_list.get(node_id, [])
def get_upstream_nodes(self, node_id: str) -> List[str]:
return self.nodes[node_id].dependencies
# 执行引擎,负责调度和执行图
class ExecutionEngine:
def __init__(self, graph: Graph):
self.graph = graph
self.pending_nodes: List[str] = list(graph.nodes.keys())
self.completed_nodes: set = set()
self.running_nodes: set = set()
self.node_outputs: Dict[str, Any] = {}
self.execution_id: str = str(uuid.uuid4())
def _get_ready_nodes(self) -> List[str]:
ready = []
for node_id in self.pending_nodes:
node = self.graph.nodes[node_id]
# 检查所有依赖是否已完成
if all(dep_id in self.completed_nodes for dep_id in node.dependencies):
ready.append(node_id)
return ready
def execute_graph(self, stream_callback: Callable[[IntermediateMessage], None]):
context = ExecutionContext(self.execution_id, stream_callback)
print(f"Starting graph execution {self.execution_id}")
# 初始化所有节点状态为PENDING
for node_id in self.graph.nodes:
context.set_node_status(node_id, "PENDING")
while self.pending_nodes or self.running_nodes:
ready_nodes = self._get_ready_nodes()
# 移除已准备好的节点,并将它们标记为正在运行
for node_id in ready_nodes:
self.pending_nodes.remove(node_id)
self.running_nodes.add(node_id)
context.set_node_status(node_id, "RUNNING")
print(f"Node {node_id} is now RUNNING.")
if not ready_nodes and not self.running_nodes:
# Deadlock or no more nodes to process
if self.pending_nodes:
print("Error: Graph execution halted, some nodes are still pending but cannot run (possibly cyclic dependency).")
for node_id in self.pending_nodes:
context.set_node_status(node_id, "FAILED")
break
# 模拟并发执行
nodes_to_process_this_round = list(self.running_nodes) # Take a snapshot
for node_id in nodes_to_process_this_round:
node = self.graph.nodes[node_id]
inputs = {dep_id: self.node_outputs[dep_id] for dep_id in node.dependencies}
# 在这里,我们将上下文的emit_message方法传递给节点的execute方法
# 节点内部就可以直接通过这个callback推送消息
node_stream_callback = lambda msg: context.emit_message(
node_id, msg.message_type, msg.payload, msg.metadata
)
try:
# 关键:将 stream_callback 传递给 node.execute
output = node.execute(inputs, node_stream_callback)
self.node_outputs[node_id] = output
self.completed_nodes.add(node_id)
self.running_nodes.remove(node_id)
context.set_node_status(node_id, "COMPLETED")
print(f"Node {node_id} COMPLETED.")
except Exception as e:
print(f"Node {node_id} FAILED: {e}")
context.set_node_status(node_id, "FAILED")
context.emit_message(node_id, "NODE_ERROR", {"error": str(e)})
self.running_nodes.remove(node_id) # Remove from running even if failed
# For simplicity, we stop on first error. In real system, might continue or mark downstream as skipped.
# Mark all pending nodes as FAILED/SKIPPED if upstream failed
for pending_node_id in self.pending_nodes:
context.set_node_status(pending_node_id, "SKIPPED")
self.pending_nodes.clear() # Stop further processing
break
# Small sleep to simulate async processing and avoid tight loop
time.sleep(0.1)
print(f"Graph execution {self.execution_id} finished.")
context.emit_message("GRAPH", "GRAPH_COMPLETED", {"status": "SUCCESS" if not self.pending_nodes and not self.running_nodes else "FAILED"})
return self.node_outputs
在上述代码中,我们引入了 IntermediateMessage 类来规范化推送的消息格式。ExecutionContext 封装了 stream_callback,允许图中的任何组件通过它来发送消息。最重要的是,Node 抽象类中的 execute 方法现在接受一个 stream_callback 参数。这意味着:
3.1 策略A:节点输出作为中间状态
当一个节点完全执行完毕后,它的输出就可以被视为一个中间思考结果。我们可以通过在 Node.execute 方法的末尾,将其输出(或输出的摘要)通过 stream_callback 推送出去。
在 DataLoadNode 和 ProcessDataNode 的 execute 方法中,我们已经包含了在节点完成时发送 NODE_OUTPUT 类型的消息。
3.2 策略B:细粒度状态更新(节点内部进度)
对于内部包含多个子步骤的复杂节点,我们可能需要更细粒度的进度反馈。这可以通过在节点的 execute 方法内部,每完成一个子步骤就调用 stream_callback 来实现。
例如,ProcessDataNode 模拟了多步处理,并在每一步完成后发送 NODE_PROGRESS 消息,包含当前的百分比。
# ... (in ProcessDataNode.execute method) ...
for i in range(1, 4):
time.sleep(1) # Simulate work
if stream_callback:
stream_callback(IntermediateMessage(
execution_id="N/A", # Filled by context
node_id=self.node_id,
timestamp=time.time(),
message_type="NODE_PROGRESS",
payload={"message": f"Step {i}/3 completed"},
metadata={"percentage": i * 33} # 实时更新进度
))
# ...
3.3 策略C:全局执行上下文的变更追踪
除了节点级别的状态,整个图的执行上下文也可能包含需要实时反馈的信息,例如全局计数器、共享配置的更新,或者重要的全局日志。ExecutionContext 内部的 emit_message 方法就是为此而设计的,ExecutionEngine 在启动和完成时,以及更新节点状态时,都在使用它。
# ... (in ExecutionEngine.execute_graph method) ...
# 初始化所有节点状态为PENDING
for node_id in self.graph.nodes:
context.set_node_status(node_id, "PENDING") # 触发 NODE_STATUS_UPDATE 消息
# ...
context.emit_message("GRAPH", "GRAPH_COMPLETED", {"status": "SUCCESS" ...})
# ...
通过这种设计,我们成功地将中间状态的捕获逻辑与图执行逻辑紧密集成,并通过一个统一的 stream_callback 接口向外暴露。
四、后端推送架构:从捕获到推送
捕获到中间状态后,下一步是如何将其从后端服务推送到前端。这里我们探讨几种常见的后端推送机制。
为了演示,我们将使用Python Flask作为Web框架,并模拟一个简单的推送服务。
# app.py (Flask application)
from flask import Flask, Response, request, jsonify
import json
import time
import threading
import queue
# 假设这个是我们的消息队列,实际上可能是Redis Pub/Sub, RabbitMQ, Kafka
message_queue = queue.Queue()
app = Flask(__name__)
# 一个简单的回调函数,将IntermediateMessage放入全局消息队列
def backend_stream_callback(message: IntermediateMessage):
message_queue.put(message.to_dict())
print(f"Backend put message to queue: {message.message_type} for {message.node_id}")
# ------------------------------------------------------------------------------------
# 模拟图执行的入口
@app.route('/start_graph_execution', methods=['POST'])
def start_graph_execution():
# 构造一个简单的图
node_a = DataLoadNode("node_a", "Load_Data_A", "path/to/data_a.csv")
node_b = DataLoadNode("node_b", "Load_Data_B", "path/to/data_b.json")
node_c = ProcessDataNode("node_c", "Process_A", lambda x: {"processed_a": x}, dependencies=["node_a"])
node_d = ProcessDataNode("node_d", "Process_B", lambda x: {"processed_b": x}, dependencies=["node_b"])
node_e = ProcessDataNode("node_e", "Aggregate_Results", lambda x: {"aggregated": x}, dependencies=["node_c", "node_d"])
graph = Graph("my_complex_graph", {
"node_a": node_a,
"node_b": node_b,
"node_c": node_c,
"node_d": node_d,
"node_e": node_e
})
engine = ExecutionEngine(graph)
# 在一个新线程中执行图,避免阻塞主HTTP请求
execution_thread = threading.Thread(target=engine.execute_graph, args=(backend_stream_callback,))
execution_thread.start()
return jsonify({"execution_id": engine.execution_id, "message": "Graph execution started."}), 202
# ------------------------------------------------------------------------------------
# A. 基于 HTTP Long Polling (长轮询)
@app.route('/stream_long_polling/<execution_id>')
def stream_long_polling(execution_id):
timeout = 20 # seconds
start_time = time.time()
last_message_index = int(request.args.get('last_message_index', -1)) # 客户端需要告知上次收到的消息索引
messages_to_send = []
current_index = 0
while time.time() - start_time < timeout:
if not message_queue.empty():
try:
# 实际的长轮询可能需要一个更复杂的队列管理,这里简化为从头遍历
# 生产环境会维护每个客户端的已读消息索引
temp_queue_list = list(message_queue.queue) # Convert to list to iterate
for i in range(last_message_index + 1, len(temp_queue_list)):
msg = temp_queue_list[i]
if msg.get("execution_id") == execution_id:
messages_to_send.append(msg)
current_index = i
if messages_to_send:
return jsonify({"messages": messages_to_send, "last_message_index": current_index})
except Exception as e:
print(f"Error reading from queue: {e}")
time.sleep(0.1) # Check queue periodically
return jsonify({"messages": [], "last_message_index": last_message_index}), 200 # No new messages, return empty
# ------------------------------------------------------------------------------------
# B. 基于 Server-Sent Events (SSE)
@app.route('/stream_sse/<execution_id>')
def stream_sse(execution_id):
def generate():
last_yielded_index = -1
while True:
# 遍历队列,只发送新的且属于当前 execution_id 的消息
temp_queue_list = list(message_queue.queue)
messages_to_send = []
current_max_index = last_yielded_index
for i in range(last_yielded_index + 1, len(temp_queue_list)):
msg = temp_queue_list[i]
if msg.get("execution_id") == execution_id:
messages_to_send.append(msg)
current_max_index = i # Update max index seen
if messages_to_send:
for msg in messages_to_send:
yield f"data: {json.dumps(msg)}nn"
last_yielded_index = current_max_index # Update for next iteration
time.sleep(0.5) # Polling interval for new messages
return Response(generate(), mimetype='text/event-stream')
# ------------------------------------------------------------------------------------
# C. 基于 WebSockets
# Flask本身不直接支持WebSockets,通常需要借助Flask-SocketIO或使用ASGI服务器如FastAPI/Starlette。
# 这里我们仅提供概念性的伪代码和说明,不实现完整的WebSocket服务器。
#
# @app.route('/ws_endpoint/<execution_id>')
# def websocket_connection(execution_id):
# # ... WebSocket握手和连接建立 ...
# #
# # 假设有一个全局的 active_ws_connections 字典,存储 execution_id -> list_of_ws_clients
# # 将当前连接加入到 active_ws_connections[execution_id] 中
# #
# # 启动一个循环,从 message_queue 中获取消息,如果属于当前 execution_id,则通过 WebSocket 发送
# #
# # 伪代码:
# # ws_client = get_websocket_client()
# # active_ws_connections.get(execution_id, []).append(ws_client)
# # while ws_client.is_connected():
# # if not message_queue.empty():
# # msg = message_queue.get()
# # if msg.get("execution_id") == execution_id:
# # ws_client.send(json.dumps(msg))
# # time.sleep(0.05)
# #
# # 当连接断开时,从 active_ws_connections 中移除
# pass
#
#
# 对于生产环境的WebSocket,推荐使用:
# - Python: FastAPI + websockets 库, 或者 Flask-SocketIO (基于 Socket.IO)
# - Node.js: Express + ws 库 (或 Socket.IO)
# - Go: Gorilla WebSocket
#
# 消息推送服务会订阅 backend_stream_callback 产生的消息,然后通过WebSocket推送到匹配的客户端。
# ------------------------------------------------------------------------------------
if __name__ == '__main__':
app.run(debug=True, threaded=True, port=5000)
4.1 基于 HTTP Long Polling (长轮询)
- 机制: 客户端发送一个HTTP请求到服务器,服务器不会立即响应,而是将请求挂起。当有新的中间状态数据产生时,服务器立即响应并返回数据;或者在一定时间(如20-30秒)后,即使没有数据,也返回一个空响应。客户端收到响应后,无论是否有数据,都会立即发送新的请求。
- 优点: 兼容性好,几乎所有浏览器和HTTP客户端都支持。实现相对简单。
- 缺点:
- 延迟: 服务器需要等待新数据或超时才能响应,可能引入延迟。
- 资源消耗: 每个客户端连接都会占用服务器的一个线程或进程,在高并发下资源消耗大。
- 复杂性: 客户端需要管理请求的发送、接收和重试。服务器端需要维护每个客户端的“已读”状态以避免重复发送或丢失消息。
- 适用场景: 对实时性要求不是非常高,或者客户端环境限制(如不支持SSE/WebSocket)的场景。
4.2 基于 Server-Sent Events (SSE)
- 机制: 客户端通过
EventSourceAPI 发送一个HTTP请求。服务器保持这个连接打开,并可以单向地、连续地向客户端推送文本数据流。数据格式通常为data: {json_payload}nn。客户端会自动处理断线重连。 - 优点:
- 简单: 基于HTTP/1.1,浏览器原生支持
EventSource对象,使用简单。 - 单向高效: 专为服务器到客户端的单向数据流设计,比长轮询效率更高。
- 自动重连: 客户端在连接断开时会自动尝试重新连接。
- 简单: 基于HTTP/1.1,浏览器原生支持
- 缺点:
- 单向通信: 只能从服务器推送到客户端,客户端无法通过同一个连接向服务器发送数据。
- 不支持二进制: 仅支持文本数据。
- 连接限制: 受限于浏览器对单个域名的HTTP连接数限制(通常为6个)。
- 适用场景: 只需要服务器向客户端推送数据,如实时日志、进度条、新闻feed等。对于Partial State Streaming,这是一个非常合适的选择。
4.3 基于 WebSockets
- 机制: 客户端和服务器通过HTTP协议进行一次“握手”,成功后将连接升级为一个持久的双向通信通道。一旦建立,双方可以随时发送和接收数据,支持文本和二进制数据。
- 优点:
- 全双工: 客户端和服务器可以同时发送和接收数据,实现真正的双向实时通信。
- 低延迟: 一旦连接建立,数据传输开销小。
- 支持二进制: 适用于传输图片、音频等二进制数据。
- 协议效率高: 在HTTP层之上,帧开销小。
- 缺点:
- 复杂性高: 相较于SSE,实现和管理WebSocket连接更为复杂,需要专门的WebSocket服务器或库。
- 防火墙问题: 某些代理或防火墙可能不支持WebSocket协议,但现在已较少见。
- 适用场景: 需要高频、低延迟的双向通信,如在线游戏、聊天应用、协作编辑、以及对实时性和交互性要求极高的Partial State Streaming。
4.4 消息队列作为中间件
在生产环境中,特别是在分布式系统和高并发场景下,直接在Web服务器中处理消息队列和推送逻辑可能会导致复杂性和扩展性问题。通常会引入消息队列(如RabbitMQ, Apache Kafka, Redis Pub/Sub)作为中间件。
- 机制:
- 图执行引擎(或其他后端服务)在产生中间状态时,将
IntermediateMessage发布到消息队列的特定主题(Topic)或交换机(Exchange)。 - 一个独立的“推送服务”(Push Service)订阅这些消息。
- 推送服务根据消息内容(例如
execution_id)将消息转发给通过SSE或WebSocket连接的前端客户端。
- 图执行引擎(或其他后端服务)在产生中间状态时,将
- 优点:
- 解耦: 图执行引擎与推送逻辑完全分离,各自可以独立扩展。
- 异步处理: 图执行引擎无需等待消息被推送,只需将消息放入队列即可。
- 削峰填谷: 消息队列可以缓冲突发的大量消息,保护推送服务不被压垮。
- 可靠性: 许多消息队列支持消息持久化和事务,确保消息不会丢失。
- 可扩展性: 消息队列本身可以水平扩展,推送服务也可以部署多个实例。
- 缺点: 引入额外的组件,增加了系统的复杂性和运维成本。
- 适用场景: 大规模、高并发、需要高可靠性和可扩展性的系统。这是推荐的生产级方案。
后端推送机制对比表
| 特性/机制 | HTTP Long Polling | Server-Sent Events (SSE) | WebSockets | 消息队列 + 推送服务 |
|---|---|---|---|---|
| 通信方向 | 伪双向 (请求-响应) | 服务器 -> 客户端 (单向) | 双向 (全双工) | 异步解耦,推送服务双向/单向 |
| 实时性 | 中等,有延迟 | 高 | 极高 | 极高,取决于MQ和推送服务 |
| 实现复杂度 | 低-中 | 低 | 中-高 | 高 (多组件) |
| 浏览器支持 | 极好 | 好 (IE不支持EventSource) | 很好 (IE10+,现代浏览器) | 客户端通过SSE/WebSocket连接 |
| 资源消耗 | 高 (每个连接一个线程) | 中 (持久HTTP连接) | 低 (持久TCP连接) | 低 (推送服务专注推送) |
| 数据类型 | 文本 | 文本 | 文本/二进制 | 文本/二进制 (取决于MQ和协议) |
| 扩展性 | 差 | 中 | 好 | 极好 (分布式架构) |
| 断线重连 | 需手动实现 | 客户端自动重连 | 需手动实现 | 客户端自动重连 (SSE/WS特性) |
| 典型用途 | 早期聊天,通知 | 进度条,日志流,新闻 | 聊天,游戏,协作 | 大规模实时系统,分布式通知 |
五、数据结构与协议设计
无论选择哪种推送机制,统一且清晰的消息数据结构至关重要。它定义了后端发送什么,以及前端如何解析和处理。
我们之前定义的 IntermediateMessage 类是一个很好的起点:
class IntermediateMessage:
def __init__(self, execution_id: str, node_id: str, timestamp: float,
message_type: str, payload: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None):
self.execution_id = execution_id # 哪个图实例的执行
self.node_id = node_id # 哪个节点产生了信息 ("GRAPH"表示图级别消息)
self.timestamp = timestamp # 消息产生的时间戳
self.message_type = message_type # 消息类型,用于前端区分处理逻辑
self.payload = payload # 实际数据,具体内容取决于消息类型
self.metadata = metadata if metadata is not None else {} # 额外元数据
def to_dict(self):
return {
"execution_id": self.execution_id,
"node_id": self.node_id,
"timestamp": self.timestamp,
"message_type": self.message_type,
"payload": self.payload,
"metadata": self.metadata
}
message_type 的常见枚举值:
GRAPH_STARTED: 图执行开始。GRAPH_COMPLETED: 图执行成功完成。GRAPH_FAILED: 图执行失败。NODE_STATUS_UPDATE: 节点状态变更 (PENDING, RUNNING, COMPLETED, FAILED, SKIPPED)。NODE_PROGRESS: 节点内部进度更新 (e.g., 百分比, 当前子任务)。NODE_OUTPUT: 节点完成后的输出(通常是摘要或预览)。NODE_LOG: 节点产生的实时日志。NODE_ERROR: 节点执行过程中发生的错误。GLOBAL_METRIC_UPDATE: 全局指标更新。
序列化:
- JSON: 跨语言兼容性好,人类可读,调试方便。缺点是数据量相对较大,解析开销略高。对于大部分场景足够。
- Protobuf (Protocol Buffers): Google开发的一种语言中立、平台中立、可扩展的序列化数据格式。优点是数据量小,序列化/反序列化速度快。缺点是需要定义
.proto文件,生成代码,且数据不可读。适用于对性能和数据量有极致要求,或需要强类型检查的场景。
版本控制: 随着系统演进,消息协议可能会发生变化。建议在消息中包含一个版本号字段,或者使用不同的消息类型命名空间,以便前端能够兼容处理不同版本的消息。
六、前端接收与渲染
前端的主要任务是建立与后端的连接,接收流式数据,并根据数据更新其内部状态模型,最终动态渲染UI。
6.1 接收器 (Receiver)
使用 EventSource (for SSE):
// frontend.js (simplified)
const executionId = 'YOUR_EXECUTION_ID_HERE'; // 从后端启动图执行时获取
const eventSource = new EventSource(`/stream_sse/${executionId}`);
eventSource.onopen = function() {
console.log("SSE connection opened.");
};
eventSource.onmessage = function(event) {
const message = JSON.parse(event.data);
console.log("Received SSE message:", message);
updateGraphUI(message); // 调用UI更新函数
};
eventSource.onerror = function(error) {
console.error("SSE error:", error);
// 自动重连,但可以在这里处理重连失败的提示
if (eventSource.readyState === EventSource.CLOSED) {
console.log("SSE connection closed, attempting to reconnect...");
}
};
eventSource.onclose = function() {
console.log("SSE connection closed.");
};
使用 WebSocket (伪代码):
// frontend.js (simplified)
const executionId = 'YOUR_EXECUTION_ID_HERE';
const ws = new WebSocket(`ws://localhost:5000/ws_endpoint/${executionId}`); // 假设后端是ws://
ws.onopen = function(event) {
console.log("WebSocket connection opened.");
// 可以在这里发送初始消息,例如身份验证或请求历史数据
};
ws.onmessage = function(event) {
const message = JSON.parse(event.data);
console.log("Received WebSocket message:", message);
updateGraphUI(message); // 调用UI更新函数
};
ws.onclose = function(event) {
console.log("WebSocket connection closed:", event.code, event.reason);
// 实现手动重连逻辑
setTimeout(() => {
console.log("Attempting to reconnect WebSocket...");
// Re-establish connection, potentially with exponential backoff
// ws = new WebSocket(...)
}, 1000);
};
ws.onerror = function(error) {
console.error("WebSocket error:", error);
};
6.2 状态管理 (State Management)
前端需要维护一个表示当前图执行状态的数据模型。这通常是一个嵌套的JavaScript对象或Map结构,存储图的整体状态以及每个节点的详细状态。
// frontend_state_manager.js
class GraphStateManager {
constructor(executionId, initialGraphDefinition) {
this.executionId = executionId;
this.graph = {
id: executionId,
status: "PENDING", // GRAPH_PENDING, GRAPH_RUNNING, GRAPH_COMPLETED, GRAPH_FAILED
nodes: {}, // Map<node_id, NodeState>
// ... 其他图级别信息
};
this.listeners = []; // UI组件可以注册监听器
// 初始化节点状态
for (const nodeId in initialGraphDefinition.nodes) {
this.graph.nodes[nodeId] = {
id: nodeId,
name: initialGraphDefinition.nodes[nodeId].name,
status: "PENDING", // PENDING, RUNNING, COMPLETED, FAILED, SKIPPED
progress: 0, // 0-100
outputPreview: null,
latestMessage: null,
error: null,
// ... 其他节点属性
};
}
}
// 注册UI组件的更新回调
subscribe(callback) {
this.listeners.push(callback);
}
// 通知所有监听器状态已更新
notifyListeners() {
this.listeners.forEach(callback => callback(this.graph));
}
// 根据后端消息更新状态
updateState(message) {
if (message.execution_id !== this.executionId) {
console.warn("Received message for different execution ID:", message.execution_id);
return;
}
const { node_id, message_type, payload, metadata } = message;
if (node_id === "GRAPH") {
// 更新图级别状态
if (message_type === "GRAPH_STARTED") {
this.graph.status = "RUNNING";
} else if (message_type === "GRAPH_COMPLETED") {
this.graph.status = "COMPLETED";
} else if (message_type === "GRAPH_FAILED") {
this.graph.status = "FAILED";
}
// ... 其他图级别消息
} else if (this.graph.nodes[node_id]) {
const nodeState = this.graph.nodes[node_id];
nodeState.latestMessage = message; // 记录最新消息
if (message_type === "NODE_STATUS_UPDATE") {
nodeState.status = payload.status;
} else if (message_type === "NODE_PROGRESS") {
nodeState.progress = metadata.percentage || nodeState.progress;
nodeState.latestMessage = payload.message;
} else if (message_type === "NODE_OUTPUT") {
nodeState.outputPreview = payload.output_preview;
// ... 可以存储更多输出元数据
} else if (message_type === "NODE_ERROR") {
nodeState.error = payload.error;
nodeState.status = "FAILED"; // 错误通常意味着失败
}
// ... 其他节点级别消息
}
this.notifyListeners(); // 通知UI进行渲染
}
}
// 假设我们有一个初始的图定义(通常在页面加载时从后端获取)
const initialGraphDef = {
nodes: {
"node_a": { name: "Load_Data_A", dependencies: [] },
"node_b": { name: "Load_Data_B", dependencies: [] },
"node_c": { name: "Process_A", dependencies: ["node_a"] },
"node_d": { name: "Process_B", dependencies: ["node_b"] },
"node_e": { name: "Aggregate_Results", dependencies: ["node_c", "node_d"] }
}
// ... edges, etc.
};
const graphStateManager = new GraphStateManager(executionId, initialGraphDef);
// 在 EventSource.onmessage 或 WebSocket.onmessage 中调用
// updateGraphUI 函数现在会变成:
function updateGraphUI(message) {
graphStateManager.updateState(message);
}
// 示例:一个简单的React组件如何订阅状态
// function GraphView() {
// const [graphState, setGraphState] = React.useState(graphStateManager.graph);
//
// React.useEffect(() => {
// const unsubscribe = graphStateManager.subscribe(newState => {
// setGraphState({ ...newState }); // 深度拷贝以触发React更新
// });
// return () => unsubscribe(); // 清理订阅
// }, []);
//
// // ... 根据 graphState 渲染图和节点
// }
6.3 渲染 (Rendering)
前端UI组件会监听状态管理器的变化。一旦 graphStateManager 通知有更新,UI组件就会根据最新的 graphState 重新渲染。这通常包括:
- 图的整体状态: 显示图的运行状态(运行中、完成、失败)。
- 节点可视化:
- 根据
node.status改变节点的颜色(例如:灰色-PENDING,蓝色-RUNNING,绿色-COMPLETED,红色-FAILED)。 - 显示节点的
progress(例如,进度条)。 - 显示
node.outputPreview或node.latestMessage。 - 显示
node.error信息。
- 根据
- 边可视化: 可以在节点完成时,高亮显示连接到该节点的边,表示数据流已通过。
为了高效渲染,特别是对于大型图,可以考虑使用:
- 增量更新: 只更新UI中发生变化的部分,而不是重新渲染整个图。
- 虚拟化/画布渲染: 对于节点数量非常多的图,使用Canvas或SVG库(如D3.js, React Flow, GoJS)进行渲染,并实现虚拟滚动,只渲染视口内的节点。
七、容错与可靠性
在实时数据流的场景中,容错与可靠性是不可忽视的。
- 断线重连 (Reconnection):
- 客户端:
EventSource原生支持自动重连。WebSocket需要手动实现重连逻辑,通常包括指数退避(Exponential Backoff)策略,以避免服务器在短时间内接收到大量重连请求。 - 服务器: 推送服务需要能够识别客户端的断线和重连,并可能需要重新发送自客户端断开以来错过的消息。
- 客户端:
- 数据丢失 (Data Loss):
- 消息队列: 使用持久化的消息队列(如Kafka, RabbitMQ),确保即使推送服务崩溃,消息也不会丢失。
- 客户端同步: 对于长轮询和WebSocket,客户端可以在重连时发送
last_message_id或last_timestamp给服务器,请求服务器发送自该ID/时间戳之后的所有消息。这要求服务器端维护一个消息历史记录。
- 幂等性 (Idempotency):
- 前端处理消息时应具有幂等性。即,接收同一条消息多次,对最终状态的影响与接收一次相同。这通过消息中的
timestamp和message_type结合node_id来判断是否是重复消息或过时消息。
- 前端处理消息时应具有幂等性。即,接收同一条消息多次,对最终状态的影响与接收一次相同。这通过消息中的
- 背压 (Backpressure):
- 如果前端处理消息的速度慢于后端生成消息的速度,可能会导致内存溢出。
- 后端: 可以实现流量控制,例如当推送服务的发送缓冲区满时,暂停从消息队列消费消息,或者通知图执行引擎减慢速度(这在DAG执行中通常很难实现)。
- 前端: 可以通过丢弃旧消息、限制渲染频率或增加缓冲区来应对。
- 错误报告 (Error Reporting):
- 后端应将执行过程中的错误(包括节点执行失败、系统错误)通过
NODE_ERROR或GRAPH_FAILED消息类型推送给前端,以便用户及时发现问题。
- 后端应将执行过程中的错误(包括节点执行失败、系统错误)通过
八、性能与扩展性
- 连接管理:
- 负载均衡器: 对于SSE和WebSocket,需要配置支持持久连接的负载均衡器(如Nginx, HAProxy, AWS ALB),将特定客户端的请求路由到同一个推送服务实例,或者使用粘性会话(Sticky Sessions)。
- 高并发服务器: 推送服务本身需要使用高性能的异步Web框架(如Python的FastAPI/Starlette, Node.js的Express/Fastify)来处理大量并发连接。
- 消息吞吐量:
- 消息队列水平扩展: Kafka等MQ可以水平扩展以处理极高的消息吞吐量。
- 批处理消息: 如果单个节点产生大量细粒度消息,可以考虑在后端进行批处理,将多条小消息打包成一条大消息再发送,减少网络开销。
- 数据量优化:
- 只发送变更:只发送发生变化的数据,而不是每次都发送完整状态。
- 压缩:在HTTP/WebSocket层启用Gzip或其他压缩。
- 二进制协议:使用Protobuf等二进制协议可以显著减少消息大小。
- 前端渲染优化:
- 增量更新: 避免每次都全量渲染整个图。
- 虚拟滚动: 对于大型图,只渲染当前视口中的节点。
- Debounce/Throttle: 限制UI更新的频率,防止在短时间内接收大量消息时导致UI卡顿。
九、案例场景与高级应用
Partial State Streaming的应用场景非常广泛:
- AI/ML 训练管道可视化: 实时展示数据预处理、特征工程、模型训练、评估等各个阶段的进度、输出日志、模型指标(如损失函数、准确率)。
- 数据 ETL 流程监控: 实时跟踪数据从源到目的地的流动,显示每个转换步骤的成功与否、处理的数据量、可能发生的错误。
- 自动化工作流的实时状态: 例如CI/CD管道、业务流程自动化,用户可以实时看到哪些步骤已完成、哪些正在运行、哪些失败了。
- 交互式数据分析工具: 用户提交一个复杂的查询或分析任务,后端以图的形式执行,前端实时展示中间计算结果,允许用户基于部分结果进行下一步操作。
- 多用户协作与实时更新: 多个用户同时查看或操作同一个图执行实例,所有用户的客户端都能实时同步图的最新状态。
十、结语
Partial State Streaming是提升用户体验、增强系统可观测性和可调试性的关键技术。通过精心设计图执行引擎的钩子机制、选择合适的后端推送架构、定义清晰的数据协议,并配合智能的前端状态管理和渲染策略,我们可以将复杂的后端“思考过程”以直观、实时的方式呈现给用户。这不仅能极大地改善用户体验,也能帮助开发者更快地定位问题和优化系统性能。技术的选择和实现的复杂度应与具体业务场景对实时性、可靠性和扩展性的要求相匹配,从而构建出既强大又易用的系统。