各位开发者,下午好!
今天,我们将深入探讨一个在现代企业应用中日益重要的主题:如何为基于 LangGraph 的“审批界面(Approval UI)”构建一个健壮且实时的后端适配层。具体来说,我们将聚焦于如何利用 API 接口,将 LangGraph 的复杂“挂起状态”实时、准确地同步到前端,从而为用户提供流畅、响应迅速的审批体验。
随着业务流程的日益复杂,传统的线性审批流已经难以满足需求。LangGraph,作为 LangChain 的一个强大扩展,为我们提供了一种构建有状态、多代理、循环决策图的强大能力。它的核心优势在于能够自然地处理状态流转、条件分支以及最关键的——“挂起”并等待外部输入(例如人工审批)的能力。然而,要将这种后端智能有效地呈现在前端用户面前,并允许用户进行交互,就需要一套精心设计的后端 API 策略。
本次讲座,我将以编程专家的视角,为大家剖析从概念到实现的每一个环节,并辅以详尽的代码示例,确保大家能够掌握核心技术,并将其应用到实际项目中。
引言:理解 LangGraph 在审批流中的潜力
传统的审批系统往往是基于固定的流程引擎或工作流系统构建的。它们虽然能够处理常见的线性或简单分支流程,但在面对需要动态决策、多代理协作、长时间等待外部输入(如人工审批、外部系统回调)的复杂场景时,往往显得力不从心。维护这些流程的状态、历史以及在不同节点间的上下文传递,常常成为开发者的噩梦。
LangGraph 的出现,为这一挑战带来了新的解决方案。它将流程建模为有向图,其中节点(Nodes)代表业务逻辑或代理行为,边(Edges)定义了状态的流转。LangGraph 的强大之处在于其内置的“状态管理”和“检查点(Checkpoint)”机制。这意味着当一个流程需要暂停,等待某个外部事件(比如一位经理的批准)时,LangGraph 能够优雅地将当前状态持久化,并在事件发生后从中断点恢复执行,而无需我们手动管理复杂的上下文。这种“挂起(Suspension)”能力,正是构建响应式审批 UI 的基石。
为何 LangGraph 特别适合复杂审批流?
- 状态持久化与恢复:LangGraph 能够自动保存其内部状态,并在需要时从任何检查点恢复,这对于需要长时间等待外部输入的审批流至关重要。
- 动态决策与条件路由:审批流往往包含复杂的条件逻辑。LangGraph 的条件边允许我们根据当前状态或代理的输出动态地决定下一个执行的节点,完美契合审批场景。
- 多代理协作:如果审批流中包含多个角色(如发起人、部门经理、财务经理、HR),LangGraph 可以将每个角色视为一个“代理”或一个“节点”,在它们之间传递信息和控制流。
- 清晰的流程可视化:通过图形化的方式定义流程,使得复杂的审批逻辑更易于理解和维护。
“挂起状态”的意义和价值
在 LangGraph 中,“挂起状态”通常发生在某个节点需要外部输入时。例如,当流程到达一个“人工审批”节点,它会停止执行,等待用户通过前端界面提供“批准”或“拒绝”的决策。此时,LangGraph 的内部状态被保存下来,等待外部触发。
对于前端 UI 而言,实时同步这种“挂起状态”至关重要:
- 即时反馈:用户能立即看到审批请求的最新进展,无需手动刷新。
- 任务分配:审批者能及时收到待处理任务通知,并查看所需的所有上下文信息。
- 流程透明度:所有参与者都能追踪流程的当前位置、历史决策和潜在瓶颈。
- 提升用户体验:一个实时更新的 UI 能够显著提升用户对系统的信任度和满意度。
本次讲座的目标,就是构建一个后端,它能够精确地捕捉 LangGraph 的挂起事件,并利用现代 API 技术(尤其是 WebSockets)将其高效地推送给前端,同时提供必要的 RESTful 接口供前端进行操作(如发起审批、提交决策)。
核心概念:LangGraph 状态管理与挂起机制
在深入后端实现之前,我们必须对 LangGraph 的核心概念及其状态管理机制有一个清晰的理解。
LangGraph 是一个基于有向图(Directed Graph)的状态机。它由以下几个核心组件构成:
- State (状态): 这是 LangGraph 流程的当前上下文,它是一个可变的数据结构,在图中的节点之间传递。每个节点都可以读取或修改这个状态。
- Nodes (节点): 代表图中的一个操作或一个步骤。它可以是一个函数、一个LLM调用、一个外部API调用,或者一个等待人工输入的步骤。
- Edges (边): 定义了状态如何在节点之间流转。边可以是无条件的(从一个节点直接到另一个),也可以是条件性的(根据状态或节点输出决定下一个节点)。
- Graph (图): 整个流程的定义,由节点和边组成。
- Checkpoint (检查点): LangGraph 存储其当前状态和历史执行记录的机制。这对于长时间运行的流程至关重要,它允许流程在中断后恢复。
如何定义一个带有“人工审批”节点的 LangGraph
为了模拟一个审批流,我们需要一个节点,它在特定条件下“挂起”,等待外部的人工输入。在 LangGraph 中,通常通过一个特殊的节点或一个节点内部的逻辑来实现这一点。当节点执行完毕,并返回一个特定信号(例如一个表示“等待审批”的状态),或者当节点内部显式地抛出一个需要外部处理的异常时,流程就会暂停。在更实际的场景中,我们会让节点执行一个操作,然后根据该操作的结果(或缺少结果)来决定是否挂起。
让我们看一个简化的审批流程 LangGraph 定义,其中包含一个人工审批步骤:
# graph_definitions.py
from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.base import Checkpoint
from langgraph.checkpoint.sqlite import SqliteSaver # 示例使用,实际项目会用更强大的DB
# 1. 定义流程状态
class ApprovalGraphState(TypedDict):
"""
Represent the state of our approval process.
"""
request_id: str
initiator: str
approver: str
amount: float
description: str
status: str # "pending_review", "approved", "rejected", "initiated"
approver_comment: str
current_node: str # Added to track current node for UI
history: Annotated[List[str], operator.add] # For tracking execution history
# 2. 定义节点函数
def initiate_request(state: ApprovalGraphState) -> ApprovalGraphState:
print(f"Node: Initiating request {state['request_id']} by {state['initiator']}")
return {
"status": "pending_review",
"current_node": "initiate_request",
"history": [f"Request {state['request_id']} initiated by {state['initiator']}."]
}
def review_request(state: ApprovalGraphState) -> ApprovalGraphState:
"""
This node represents the point where human intervention is required.
It returns the current state, indicating it's awaiting external input.
The status will remain "pending_review" until an external action updates it.
"""
print(f"Node: Reviewing request {state['request_id']}. Awaiting {state['approver']}'s decision.")
# In a real system, this node would trigger a notification to the approver
# and then effectively "pause" by just returning the state without advancing.
# The actual approval/rejection logic will be handled by an external API call
# that updates the state and resumes the graph.
return {
"current_node": "review_request",
"history": [f"Request {state['request_id']} sent for review to {state['approver']}."]
}
def process_approval(state: ApprovalGraphState) -> ApprovalGraphState:
print(f"Node: Processing approved request {state['request_id']}")
return {
"status": "approved",
"current_node": "process_approval",
"history": [f"Request {state['request_id']} approved. Comment: {state['approver_comment']}"]
}
def process_rejection(state: ApprovalGraphState) -> ApprovalGraphState:
print(f"Node: Processing rejected request {state['request_id']}")
return {
"status": "rejected",
"current_node": "process_rejection",
"history": [f"Request {state['request_id']} rejected. Comment: {state['approver_comment']}"]
}
# 3. 定义路由函数 (条件边)
def decide_on_review(state: ApprovalGraphState) -> str:
"""
Determines the next step based on the 'status' updated by external approval.
"""
if state["status"] == "approved":
return "approved"
elif state["status"] == "rejected":
return "rejected"
else:
# This state should ideally not be reached if external action updates status correctly
# Or it implies the graph is still pending review.
# For simplicity, we assume external action will set "approved" or "rejected".
# In a more robust system, you might have an explicit 'await_approval' state.
return "review_request" # Keep it in review loop if not approved/rejected yet
# 4. 构建 LangGraph
def create_approval_graph(memory: Checkpoint):
workflow = StateGraph(ApprovalGraphState)
# Add nodes
workflow.add_node("initiate_request", initiate_request)
workflow.add_node("review_request", review_request)
workflow.add_node("process_approval", process_approval)
workflow.add_node("process_rejection", process_rejection)
# Set entry point
workflow.set_entry_point("initiate_request")
# Add edges
workflow.add_edge("initiate_request", "review_request")
# Conditional edge from review_request
workflow.add_conditional_edges(
"review_request", # From node
decide_on_review, # Router function
{
"approved": "process_approval",
"rejected": "process_rejection",
"review_request": "review_request" # Loop back if still pending for some reason
}
)
# End points
workflow.add_edge("process_approval", END)
workflow.add_edge("process_rejection", END)
app = workflow.compile(checkpointer=memory)
return app
在这个例子中:
ApprovalGraphState定义了流程的上下文,包括status和current_node,这两个字段对于前端同步至关重要。review_request节点是人工审批的关键点。当流程到达此节点时,它会返回当前状态,但不会进一步推进流程,直到外部 API 调用更新了status字段。decide_on_review是一个条件路由函数,它根据status字段的值来决定下一步是进入process_approval还是process_rejection。
checkpoint 机制与状态持久化
LangGraph 的 checkpointer 是其状态管理的核心。它负责将 Graph 的当前状态和执行历史保存到持久存储中,例如数据库。当我们使用 app.invoke() 或 app.stream() 运行 LangGraph 时,它会根据 thread_id(或 config["configurable"]["thread_id"])来加载或保存状态。
为了实现持久化,我们需要一个 Checkpoint 实现。LangGraph 提供了 SqliteSaver 作为示例,但在生产环境中,我们通常会使用更健壮的数据库,如 PostgreSQL。我们将自定义一个基于 SQLAlchemy 的 Checkpoint 来实现这一点。
RunnableWithMessageHistory 和 add_messages (LangChain 概念)
虽然 LangGraph 本身是更底层的状态机,但它与 LangChain 的 RunnableWithMessageHistory 概念紧密结合。RunnableWithMessageHistory 允许我们将 LangGraph 包装成一个可以处理消息历史的 LangChain Runnable,并利用其 config 参数中的 configurable={"thread_id": ...} 来指定检查点 ID。这使得我们能够通过一个统一的接口来启动、恢复和查询 LangGraph 流程。
后端架构选型与基础搭建
为了承载 LangGraph 审批流并提供实时同步能力,我们需要一个高性能、异步的后端服务。
API 框架选择:FastAPI
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建基于 Python 标准类型提示的 API。它基于 Starlette 和 Pydantic,提供了自动的 OpenAPI(Swagger UI)文档生成,并且原生支持异步编程 (async/await),这对于处理 I/O 密集型任务(如数据库操作、网络通信、WebSocket 连接)至关重要。
数据库:PostgreSQL
PostgreSQL 是一种功能强大、开源的对象关系型数据库系统。它具有高度的可靠性、数据完整性和鲁棒性,并且支持复杂的数据类型(如 JSONB),非常适合存储 LangGraph 的复杂状态信息以及业务相关的审批数据。我们将使用 SQLAlchemy 作为 ORM,并结合 asyncpg 实现异步数据库操作。
异步通信:Redis Pub/Sub 与 WebSockets
- Redis Pub/Sub: 当 LangGraph 的状态在后端更新时,我们需要一种机制来通知所有相关的客户端。Redis 的发布/订阅(Pub/Sub)模式是实现这一目标的高效方式。后端服务可以将状态更新发布到特定的 Redis 频道,而 WebSocket 服务器(或其他服务)可以订阅这些频道并转发消息。
- WebSockets: 为了实现前端的“实时同步”,WebSockets 是最佳选择。它提供了客户端和服务器之间的全双工通信通道,允许服务器主动将数据推送到客户端,而无需客户端不断轮询。FastAPI 原生支持 WebSockets。
整体架构概览(文字描述)
我们的后端架构将由以下核心组件构成:
- FastAPI 应用: 负责暴露 RESTful API 和 WebSocket 端点。
- LangGraph 运行时: 在 FastAPI 应用内部,每个审批流程实例都由一个 LangGraph 实例管理。
- PostgreSQL 数据库:
- 存储 LangGraph 的检查点(State and History)。
- 存储业务审批数据(如审批请求详情、用户信息)。
- Redis (可选但推荐): 用于实现 Pub/Sub 消息广播,确保多个后端实例之间也能同步状态更新,并作为 WebSocket 服务器与 LangGraph 运行时之间的桥梁。
+-------------------+ +-------------------+
| Frontend UI | <-----> | FastAPI Backend |
| (React/Vue/Angular)| | |
+-------------------+ | - RESTful APIs |
^ | - WebSocket Server|
| Real-time updates | - LangGraph Runner|
| (WebSocket) | |
| +-------------------+
| |
| API calls (REST) | Database Ops (SQLAlchemy)
V V
+-------------------+ +-------------------+
| Redis (Pub/Sub) | <-----> | PostgreSQL DB |
| (State Broadcast) | | (LangGraph Checkpoint, |
+-------------------+ | Business Data) |
+-------------------+
代码示例:FastAPI 项目结构与基础配置
首先,确保安装了必要的库:
pip install fastapi uvicorn "sqlalchemy[asyncio]" asyncpg pydantic langgraph redis python-jose
项目结构:
.
├── main.py # FastAPI 应用入口
├── config.py # 配置管理
├── database.py # 数据库连接和模型定义
├── models.py # Pydantic 数据模型
├── crud.py # 数据库操作逻辑
├── langgraph_service.py # LangGraph 相关的业务逻辑
├── routers/ # API 路由
│ ├── approval_processes.py
│ └── websockets.py
├── core/ # 核心组件
│ └── custom_checkpoint.py # 自定义 LangGraph 检查点
└── __init__.py
config.py:
# config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql+asyncpg://user:password@localhost/approval_db"
REDIS_URL: str = "redis://localhost:6379/0"
SECRET_KEY: str = "YOUR_SUPER_SECRET_KEY_FOR_JWT" # For JWT
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
settings = Settings()
database.py:
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from config import settings
DATABASE_URL = settings.DATABASE_URL
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# Example: Define a simple model for LangGraph checkpoints (for custom storage)
# In a real scenario, LangGraph's default checkpoint might store JSON directly.
# Here, we'll store checkpoint data as JSONB.
from sqlalchemy import Column, String, JSON, DateTime, func
from datetime import datetime
class LangGraphCheckpoint(Base):
__tablename__ = "langgraph_checkpoints"
thread_id = Column(String, primary_key=True, index=True)
checkpoint_data = Column(JSON, nullable=False)
last_updated = Column(DateTime, default=func.now(), onupdate=func.now())
def __repr__(self):
return f"<LangGraphCheckpoint(thread_id='{self.thread_id}', last_updated='{self.last_updated}')>"
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
core/custom_checkpoint.py: 实现一个基于 SQLAlchemy 的 LangGraph Checkpoint。
# core/custom_checkpoint.py
import json
from typing import Dict, Any, Optional
from langgraph.checkpoint.base import Checkpoint, CheckpointTuple, CheckpointMetadata
from langgraph.serde.json import custom_dumps, custom_loads
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import LangGraphCheckpoint
import asyncio
class SQLAlchemyCheckpoint(Checkpoint):
"""
A custom LangGraph Checkpoint that uses SQLAlchemy to store state in a PostgreSQL database.
"""
def __init__(self, session_factory):
self._session_factory = session_factory
async def get_tuple(self, thread_id: str) -> Optional[CheckpointTuple]:
async with self._session_factory() as session:
stmt = select(LangGraphCheckpoint).where(LangGraphCheckpoint.thread_id == thread_id)
result = await session.execute(stmt)
db_checkpoint = result.scalar_one_or_none()
if db_checkpoint:
# Deserialize the checkpoint data
checkpoint_data = custom_loads(db_checkpoint.checkpoint_data)
# Check if the checkpoint data contains the required keys
if "v" in checkpoint_data and "ts" in checkpoint_data and "id" in checkpoint_data:
return CheckpointTuple(
config={"configurable": {"thread_id": thread_id}},
checkpoint=checkpoint_data,
metadata=CheckpointMetadata(
source="update", # Or "empty" or "v1" based on your needs
type="v1",
timestamp=checkpoint_data["ts"],
checkpoint_id=checkpoint_data["id"]
)
)
return None
async def put_tuple(self, thread_id: str, checkpoint_tuple: CheckpointTuple) -> None:
async with self._session_factory() as session:
# LangGraph's checkpoint object itself is already a dictionary.
# We need to serialize it to JSON for storage.
serialized_checkpoint = custom_dumps(checkpoint_tuple.checkpoint)
# Use JSON string for SQLAlchemy's JSON column if needed, or dict directly
# For JSONB in PG, SQLAlchemy can often handle dict directly if type is JSON.
# Let's assume JSON for direct storage.
stmt = select(LangGraphCheckpoint).where(LangGraphCheckpoint.thread_id == thread_id)
result = await session.execute(stmt)
db_checkpoint = result.scalar_one_or_none()
if db_checkpoint:
db_checkpoint.checkpoint_data = serialized_checkpoint
else:
db_checkpoint = LangGraphCheckpoint(
thread_id=thread_id,
checkpoint_data=serialized_checkpoint
)
session.add(db_checkpoint)
await session.commit()
API 接口设计:实现状态同步的核心
良好的 API 设计是后端与前端高效协作的关键。我们将结合 RESTful API 和 WebSockets 来实现状态操作与实时同步。
API 类型选择:RESTful API 用于操作,WebSocket 用于实时通知
- RESTful API: 适用于请求-响应模式,用于触发 LangGraph 流程、提交审批决策、获取流程历史等操作。它具有无状态、易于缓存、可伸缩等优点。
- WebSocket: 适用于服务器推送和实时双向通信。当 LangGraph 的状态发生变化时(例如,从“发起”到“待审批”,或从“待审批”到“已批准”),后端可以立即通过 WebSocket 通知前端,实现真正的实时更新。
核心资源:审批流程实例 (ApprovalProcess)
我们将把每个 LangGraph 运行实例视为一个 ApprovalProcess 资源。每个资源都有一个唯一的 ID。
RESTful API 端点设计
| 方法 | 端点 | 描述 | 权限 |
|---|---|---|---|
POST |
/api/processes |
发起新的审批流程。接收请求详情。 | authenticated |
GET |
/api/processes/{process_id} |
获取指定审批流程的当前状态及历史。 | authenticated |
POST |
/api/processes/{process_id}/approve |
提交审批决策(批准)。将流程从挂起状态恢复。 | approver |
POST |
/api/processes/{process_id}/reject |
提交审批决策(拒绝)。将流程从挂起状态恢复。 | approver |
GET |
/api/processes/pending_tasks |
获取当前用户待处理的审批任务列表。 | authenticated |
WebSocket 端点设计
| 端点 | 描述 | 权限 |
|---|---|---|
/ws/processes/{process_id}/status |
实时推送指定审批流程的状态更新。 | authenticated |
数据模型设计 (Pydantic)
models.py:
# models.py
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
# --- Request Models ---
class ApprovalRequestIn(BaseModel):
initiator: str = Field(..., description="The user initiating the request.")
approver: str = Field(..., description="The user designated as the approver.")
amount: float = Field(..., gt=0, description="The monetary amount of the request.")
description: str = Field(..., min_length=10, description="A detailed description of the request.")
class ApprovalDecisionIn(BaseModel):
approver_comment: Optional[str] = None
# --- Response Models ---
class ApprovalProcessStatus(BaseModel):
request_id: str
initiator: str
approver: str
amount: float
description: str
status: str
approver_comment: Optional[str] = None
current_node: str
history: List[str]
created_at: datetime
last_updated: datetime
class PendingTask(BaseModel):
request_id: str
initiator: str
amount: float
description: str
status: str
created_at: datetime
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
代码示例:FastAPI 路由定义
routers/approval_processes.py:
# routers/approval_processes.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
import uuid
import asyncio
from database import get_db, AsyncSessionLocal
from models import ApprovalRequestIn, ApprovalProcessStatus, ApprovalDecisionIn, PendingTask
from langgraph_service import get_langgraph_app, run_graph_async, get_graph_state_from_db, update_graph_state_in_db, get_graph_checkpoint_data
from core.custom_checkpoint import SQLAlchemyCheckpoint
from core.auth import get_current_user # Assuming authentication is set up
from redis_client import redis_client # For Pub/Sub
router = APIRouter(prefix="/api/processes", tags=["Approval Processes"])
@router.post("/", response_model=ApprovalProcessStatus)
async def initiate_approval_process(
request_in: ApprovalRequestIn,
db: AsyncSession = Depends(get_db),
current_user: str = Depends(get_current_user) # Authenticated user
):
"""
Initiates a new approval process using LangGraph.
"""
if current_user != request_in.initiator:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User not authorized to initiate requests for others."
)
request_id = str(uuid.uuid4())
initial_state = {
"request_id": request_id,
"initiator": request_in.initiator,
"approver": request_in.approver,
"amount": request_in.amount,
"description": request_in.description,
"status": "initiated",
"approver_comment": None,
"current_node": "",
"history": []
}
# Get LangGraph app with our custom checkpoint
memory = SQLAlchemyCheckpoint(AsyncSessionLocal)
app = get_langgraph_app(memory)
# Run the graph in a background task to not block the API response
# The first run will hit 'initiate_request' then 'review_request'
# and then the graph will effectively "hang" at 'review_request'
# until the status is updated externally.
asyncio.create_task(run_graph_async(app, request_id, initial_state, db))
# For immediate response, we can fetch the initial state after the first step
# or just return the initial state. LangGraph will save the checkpoint.
# Let's return a projected state that would be expected after initiation.
# Wait a tiny bit for the checkpoint to be saved (or implement a more robust sync)
await asyncio.sleep(0.1) # HACK: In real app, poll or get state from graph directly if possible
# Retrieve the saved state to ensure it's correct
current_state = await get_graph_state_from_db(request_id, db)
if not current_state:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve graph state.")
# Publish update to Redis for WebSocket clients
await redis_client.publish(f"process_updates:{request_id}", current_state.model_dump_json())
return current_state
@router.get("/{process_id}", response_model=ApprovalProcessStatus)
async def get_approval_process_status(
process_id: str,
db: AsyncSession = Depends(get_db),
current_user: str = Depends(get_current_user)
):
"""
Retrieves the current status and history of an approval process.
"""
state = await get_graph_state_from_db(process_id, db)
if not state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Approval process not found.")
# Authorization check: only initiator, approver, or admin can view
if current_user not in [state.initiator, state.approver] and not (current_user == "admin_user"): # Dummy admin
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User not authorized to view this process."
)
return state
@router.post("/{process_id}/approve", response_model=ApprovalProcessStatus)
async def approve_request(
process_id: str,
decision_in: ApprovalDecisionIn,
db: AsyncSession = Depends(get_db),
current_user: str = Depends(get_current_user)
):
"""
Approves an approval request, resuming the LangGraph process.
"""
current_state = await get_graph_state_from_db(process_id, db)
if not current_state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Approval process not found.")
if current_user != current_state.approver:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User not authorized to approve this request."
)
if current_state.status != "pending_review":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Request is not in 'pending_review' state."
)
# Update state for LangGraph to pick up
updated_state_dict = current_state.model_dump()
updated_state_dict["status"] = "approved"
updated_state_dict["approver_comment"] = decision_in.approver_comment
# LangGraph will add its own history entry based on the node execution
# Update the checkpoint and resume the graph
memory = SQLAlchemyCheckpoint(AsyncSessionLocal)
app = get_langgraph_app(memory)
# This invoke will load the state, run decide_on_review, then process_approval, and save.
final_state = await run_graph_async(app, process_id, updated_state_dict, db)
# Publish update to Redis for WebSocket clients
await redis_client.publish(f"process_updates:{process_id}", final_state.model_dump_json())
return final_state
@router.post("/{process_id}/reject", response_model=ApprovalProcessStatus)
async def reject_request(
process_id: str,
decision_in: ApprovalDecisionIn,
db: AsyncSession = Depends(get_db),
current_user: str = Depends(get_current_user)
):
"""
Rejects an approval request, resuming the LangGraph process.
"""
current_state = await get_graph_state_from_db(process_id, db)
if not current_state:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Approval process not found.")
if current_user != current_state.approver:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User not authorized to reject this request."
)
if current_state.status != "pending_review":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Request is not in 'pending_review' state."
)
# Update state for LangGraph to pick up
updated_state_dict = current_state.model_dump()
updated_state_dict["status"] = "rejected"
updated_state_dict["approver_comment"] = decision_in.approver_comment
# Update the checkpoint and resume the graph
memory = SQLAlchemyCheckpoint(AsyncSessionLocal)
app = get_langgraph_app(memory)
# This invoke will load the state, run decide_on_review, then process_rejection, and save.
final_state = await run_graph_async(app, process_id, updated_state_dict, db)
# Publish update to Redis for WebSocket clients
await redis_client.publish(f"process_updates:{process_id}", final_state.model_dump_json())
return final_state
@router.get("/pending_tasks", response_model=List[PendingTask])
async def get_pending_tasks(
db: AsyncSession = Depends(get_db),
current_user: str = Depends(get_current_user)
):
"""
Retrieves a list of pending approval tasks for the current user.
"""
# This requires querying our custom checkpoint table directly or having a separate
# table for tasks. For simplicity, we query the checkpoint data.
from sqlalchemy import select
from database import LangGraphCheckpoint
import json
stmt = select(LangGraphCheckpoint)
result = await db.execute(stmt)
all_checkpoints = result.scalars().all()
pending_tasks = []
for cp in all_checkpoints:
try:
# We assume checkpoint_data is a JSON string of ApprovalGraphState
state_data = json.loads(cp.checkpoint_data) # Use json.loads for raw JSON in DB
if state_data.get("approver") == current_user and state_data.get("status") == "pending_review":
# Need to fill created_at, which is not in LangGraph state by default.
# In a real app, we'd have a separate 'ApprovalRequest' table with more metadata.
# For this example, we'll use a placeholder or assume it's part of initial state.
created_at = state_data.get("created_at", cp.last_updated) # Fallback to last_updated
pending_tasks.append(PendingTask(
request_id=state_data["request_id"],
initiator=state_data["initiator"],
amount=state_data["amount"],
description=state_data["description"],
status=state_data["status"],
created_at=created_at
))
except json.JSONDecodeError:
print(f"Warning: Could not decode checkpoint data for thread_id {cp.thread_id}")
continue
return pending_tasks
LangGraph 后端集成:生命周期管理
这是整个系统的核心,负责 LangGraph 实例的创建、运行、状态持久化和恢复。
langgraph_service.py:
# langgraph_service.py
from typing import Dict, Any, List
from langgraph.graph import StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.checkpoint.base import Checkpoint
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime
from core.custom_checkpoint import SQLAlchemyCheckpoint
from graph_definitions import create_approval_graph, ApprovalGraphState # Import our graph definition
from models import ApprovalProcessStatus # For type hints and conversion
_langgraph_app_cache: Dict[str, CompiledGraph] = {}
def get_langgraph_app(memory: Checkpoint) -> CompiledGraph:
"""
Returns a compiled LangGraph application. Caches it for efficiency.
"""
# In a real application, you might have different graph definitions.
# For now, we assume one approval graph.
graph_key = "default_approval_graph"
if graph_key not in _langgraph_app_cache:
app = create_approval_graph(memory)
_langgraph_app_cache[graph_key] = app
return _langgraph_app_cache[graph_key]
async def run_graph_async(app: CompiledGraph, thread_id: str, initial_state_or_update: Dict[str, Any], db: AsyncSession):
"""
Runs the LangGraph application asynchronously.
This function handles both initial invocation and resuming the graph.
It will automatically use the checkpoint to load/save state.
"""
try:
# LangGraph's invoke method takes config={"configurable": {"thread_id": thread_id}}
# to manage checkpointing.
# It also handles loading the previous state if the thread_id exists.
# If initial_state_or_update is passed, it will be merged with the loaded state.
# We need to manually add `created_at` for new requests if not present
if initial_state_or_update.get("status") == "initiated" and "created_at" not in initial_state_or_update:
initial_state_or_update["created_at"] = datetime.now()
# Invoke returns the final state after execution.
# If a node returns a "pend" signal or effectively pauses,
# the last saved state will be the pending state.
final_state_generator = app.stream(
initial_state_or_update,
config={"configurable": {"thread_id": thread_id}},
stream_mode="updates" # Stream updates to get intermediate states
)
# Consume the generator to ensure full execution and checkpoint saving
last_state_update = None
async for state_update in final_state_generator:
# Each update contains the state changes at that step.
# The full state is implied by applying these changes.
# LangGraph handles full state persistence with the checkpointer.
# We just need to ensure the graph runs to its next checkpoint or conclusion.
last_state_update = state_update
print(f"Graph {thread_id} finished execution step. Last update: {last_state_update}")
# The actual final state can be retrieved from the checkpointer
# for a consistent view after the async run.
# This ensures the state returned by get_graph_state_from_db is what's truly persisted.
except Exception as e:
print(f"Error running LangGraph for thread {thread_id}: {e}")
# Potentially update status to "error" in the checkpoint if needed
# Or log and handle specific LangGraph errors
async def get_graph_state_from_db(thread_id: str, db: AsyncSession) -> Optional[ApprovalProcessStatus]:
"""
Retrieves the current state of a LangGraph process from the database.
"""
memory = SQLAlchemyCheckpoint(AsyncSessionLocal) # Create new memory instance for each request
app = get_langgraph_app(memory)
# LangGraph's get_state method (if it were exposed and easy) or
# we can retrieve the checkpoint directly.
# The checkpoint tuple contains the full state.
checkpoint_tuple = await memory.get_tuple(thread_id)
if checkpoint_tuple and checkpoint_tuple.checkpoint:
# The checkpoint contains the full state dictionary.
state_dict = checkpoint_tuple.checkpoint.get("values", {}) # LangGraph stores state in 'values' key within checkpoint
if not state_dict: # If 'values' is empty, maybe the state is directly at top level for some versions
state_dict = checkpoint_tuple.checkpoint
# Ensure all fields for ApprovalProcessStatus are present, providing defaults
# We need to manually add 'created_at' and 'last_updated' if not part of graph state
# In a robust app, these would be in a separate ApprovalRequest entity.
# For now, let's derive them or use defaults
created_at = state_dict.get("created_at") or datetime.min # Placeholder
last_updated = checkpoint_tuple.metadata.timestamp or datetime.now()
return ApprovalProcessStatus(
request_id=state_dict.get("request_id", thread_id),
initiator=state_dict.get("initiator", "unknown"),
approver=state_dict.get("approver", "unknown"),
amount=state_dict.get("amount", 0.0),
description=state_dict.get("description", "No description"),
status=state_dict.get("status", "unknown"),
approver_comment=state_dict.get("approver_comment"),
current_node=state_dict.get("current_node", "unknown"),
history=state_dict.get("history", []),
created_at=created_at,
last_updated=last_updated
)
return None
async def update_graph_state_in_db(thread_id: str, new_state_data: Dict[str, Any], db: AsyncSession):
"""
Updates the LangGraph's internal state directly in the checkpoint.
This is used when an external action (like approval) changes the state.
"""
memory = SQLAlchemyCheckpoint(AsyncSessionLocal)
app = get_langgraph_app(memory)
# Get current checkpoint
checkpoint_tuple = await memory.get_tuple(thread_id)
if not checkpoint_tuple:
raise ValueError(f"No checkpoint found for thread_id: {thread_id}")
# Merge new_state_data into the existing state
# LangGraph state is typically under 'values' key in checkpoint
current_values = checkpoint_tuple.checkpoint.get("values", {})
current_values.update(new_state_data)
checkpoint_tuple.checkpoint["values"] = current_values
# Update the checkpoint in the database
await memory.put_tuple(thread_id, checkpoint_tuple)
async def get_graph_checkpoint_data(thread_id: str, db: AsyncSession) -> Optional[Dict[str, Any]]:
"""
Retrieves the raw checkpoint data for a given thread_id.
"""
memory = SQLAlchemyCheckpoint(AsyncSessionLocal)
checkpoint_tuple = await memory.get_tuple(thread_id)
return checkpoint_tuple.checkpoint if checkpoint_tuple else None
在 run_graph_async 中,我们使用了 app.stream(..., stream_mode="updates")。虽然 LangGraph 的 stream 方法主要用于流式处理中间结果,但在 async for 循环中消费它,可以确保 LangGraph 的内部逻辑(包括 checkpoint 的保存)被正确执行。当流程在 review_request 节点“挂起”时,stream 会停止迭代,等待外部状态更新后再次被 invoke(通过 run_graph_async 再次调用)。
关键点:LangGraph 的挂起与恢复
-
挂起: 当 LangGraph 流程执行到
review_request节点时,该节点函数会返回当前状态,但不会显式地将流程推向approved或rejected路径。decide_on_review路由函数会检查state["status"]。如果它仍然是pending_review,路由函数会指示流程回到review_request节点。然而,由于我们通过app.stream或app.invoke运行 LangGraph,并且review_request节点本身不执行任何阻塞操作,它会立即完成并保存当前状态(status: "pending_review")到检查点。此时,从 LangGraph 的角度来看,它已经完成了一轮执行,并等待下一次invoke。对于我们的系统而言,这就构成了“挂起状态”。 -
恢复: 当用户通过
/api/processes/{process_id}/approve或/api/processes/{process_id}/reject端点提交决策时,后端会:- 从数据库加载 LangGraph 的最新检查点。
- 更新检查点中的
status字段(例如改为approved)。 - 再次调用
app.stream()(通过run_graph_async),传入更新后的状态。 - LangGraph 会从上一个检查点加载状态,并应用新的
status。 decide_on_review路由函数会根据新的status决定进入process_approval或process_rejection节点,从而使流程继续执行直到结束。
实时同步机制:WebSockets 的应用
要实现前端的实时同步,WebSockets 是不可或缺的。它提供了持久连接,允许服务器在状态更新时主动推送数据。
为何选择 WebSocket:轮询的局限性,WebSocket 的优势
- 轮询(Polling)的局限性: 传统方式是前端每隔几秒钟发送一个 HTTP 请求来查询最新状态。这会导致:
- 延迟: 状态更新不能立即送达,取决于轮询间隔。
- 资源浪费: 频繁的请求会增加服务器负载和网络流量,即使没有状态更新。
- 复杂性: 客户端需要管理轮询逻辑,并处理大量重复数据。
- WebSocket 的优势:
- 实时性: 服务器可以在状态发生变化时立即推送数据,几乎没有延迟。
- 效率高: 一旦建立连接,只需发送实际的数据帧,减少了 HTTP 头部的开销。
- 双向通信: 客户端和服务器都可以随时发送数据,非常适合交互式应用。
WebSocket 服务器实现:FastAPI websockets 模块
FastAPI 内置了对 WebSockets 的支持,使用起来非常直观。
routers/websockets.py:
# routers/websockets.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from typing import Dict, List
import asyncio
import json
from core.auth import get_current_user_ws # Assuming WS specific auth
from redis_client import redis_client # Re-use redis_client
from models import ApprovalProcessStatus
router = APIRouter(prefix="/ws", tags=["WebSockets"])
# Store active WebSocket connections by process_id
# In a multi-instance deployment, this would need a shared state (e.g., Redis Set)
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, process_id: str):
await websocket.accept()
if process_id not in self.active_connections:
self.active_connections[process_id] = []
self.active_connections[process_id].append(websocket)
print(f"WebSocket connected for process {process_id}. Total: {len(self.active_connections[process_id])}")
def disconnect(self, websocket: WebSocket, process_id: str):
if process_id in self.active_connections:
self.active_connections[process_id].remove(websocket)
if not self.active_connections[process_id]:
del self.active_connections[process_id]
print(f"WebSocket disconnected for process {process_id}. Remaining: {self.active_connections.get(process_id, [])}")
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast_to_process(self, message: str, process_id: str):
if process_id in self.active_connections:
for connection in self.active_connections[process_id]:
try:
await connection.send_text(message)
except RuntimeError as e: # Catch "WebSocket is not connected" errors
print(f"Error sending to WebSocket for {process_id}: {e}")
# Optionally handle disconnection here if needed
manager = ConnectionManager()
@router.websocket("/processes/{process_id}/status")
async def websocket_endpoint(
websocket: WebSocket,
process_id: str,
# current_user: str = Depends(get_current_user_ws) # If WS requires authentication
):
# In a real app, you'd verify user's access to this process_id
# based on current_user and process metadata.
# For simplicity, we skip full auth check here, assuming it's done during initial handshake or for the REST API.
await manager.connect(websocket, process_id)
try:
# Start a background task to listen to Redis Pub/Sub for this process_id
# and forward messages to the connected WebSocket client.
async def redis_listener():
pubsub = redis_client.pubsub()
await pubsub.subscribe(f"process_updates:{process_id}")
print(f"Subscribed to Redis channel: process_updates:{process_id}")
try:
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1)
if message and message['type'] == 'message':
data = message['data'].decode('utf-8')
print(f"Redis message received for {process_id}: {data}")
await manager.send_personal_message(data, websocket)
await asyncio.sleep(0.01) # Small sleep to yield control
except asyncio.CancelledError:
print(f"Redis listener for {process_id} cancelled.")
finally:
await pubsub.unsubscribe(f"process_updates:{process_id}")
print(f"Unsubscribed from Redis channel: process_updates:{process_id}")
listener_task = asyncio.create_task(redis_listener())
# Keep the WebSocket connection open indefinitely.
# This loop is primarily to detect client disconnections.
while True:
# You might want to receive messages from the client here if needed
# For status updates, client mostly listens.
# await websocket.receive_text() # If client sends heartbeat or commands
await asyncio.sleep(60) # Keep connection alive, or handle client pings
except WebSocketDisconnect:
print(f"WebSocketDisconnect for process {process_id}")
except Exception as e:
print(f"WebSocket error for process {process_id}: {e}")
finally:
listener_task.cancel() # Cancel the background Redis listener task
manager.disconnect(websocket, process_id)
redis_client.py:
# redis_client.py
import redis.asyncio as redis
from config import settings
redis_client = redis.from_url(settings.REDIS_URL, decode_responses=True)
状态变更通知:利用 Redis Pub/Sub 广播
当 LangGraph 状态在 langgraph_service.py 或 routers/approval_processes.py 中更新并持久化到数据库后,我们需要通知所有订阅了该 process_id 的 WebSocket 客户端。
在 initiate_approval_process, approve_request, reject_request 路由中,当 LangGraph 运行完成并保存最新状态后,我们调用 await redis_client.publish(f"process_updates:{request_id}", current_state.model_dump_json())。
redis_client.publish(): 将 JSON 格式的最新状态信息发布到名为process_updates:{process_id}的 Redis 频道。- WebSocket 服务器中的
redis_listener任务会订阅这个频道。一旦有新消息发布,pubsub.get_message()就会收到。 manager.send_personal_message(): 随后将接收到的消息转发给该process_id下所有连接的 WebSocket 客户端。
这种模式实现了:
- 解耦: LangGraph 状态更新逻辑与 WebSocket 推送逻辑解耦。
- 可扩展性: 多个 FastAPI 实例可以同时运行,它们都连接到同一个 Redis 实例。无论哪个实例处理了审批操作并更新了 LangGraph 状态,它都会将更新发布到 Redis。所有连接到 Redis 的 WebSocket 服务器实例都会收到消息并推送到其各自的客户端,确保了多实例环境下的实时同步。
安全性、健壮性与可扩展性考量
构建生产级应用,除了功能实现,还需要考虑安全性、健壮性和可扩展性。
认证与授权
- 认证 (Authentication): 确认用户的身份。我们将使用 JWT (JSON Web Tokens) 作为认证机制。用户登录后获取一个 JWT,在后续的 API 请求中通过
Authorization: Bearer <token>头发送。 - 授权 (Authorization): 确认用户是否有权限执行某个操作。例如,只有
approver角色才能批准请求,只有initiator或approver或admin才能查看特定流程的状态。
core/auth.py:
# core/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
from config import settings
from models import TokenData
# OAuth2PasswordBearer is a helper class that helps us implement OAuth2-based authentication.
# The tokenUrl argument specifies the URL where the client can get a token.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = settings.SECRET_KEY
ALGORITHM = settings.ALGORITHM
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)) -> str:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
return token_data.username
# For WebSocket connections, you might pass the token as a query parameter
# or in a header during connection handshake, then validate it.
async def get_current_user_ws(websocket: WebSocket) -> str:
# Example: Token passed as a query parameter: ws://.../status?token=<JWT>
token = websocket.query_params.get("token")
if not token:
raise WebSocketDisconnect(code=status.WS_1008_POLICY_VIOLATION, reason="Authentication token missing")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise WebSocketDisconnect(code=status.WS_1008_POLICY_VIOLATION, reason="Invalid token payload")
except JWTError:
raise WebSocketDisconnect(code=status.HTTP_401_UNAUTHORIZED, reason="Could not validate credentials")
return username
在 main.py 中添加认证路由:
# main.py (snippet)
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from routers import approval_processes, websockets
from core.auth import create_access_token, get_current_user
from models import Token
from database import create_db_and_tables
app = FastAPI(title="LangGraph Approval API")
@app.on_event("startup")
async def on_startup():
await create_db_and_tables()
app.include_router(approval_processes.router)
app.include_router(websockets.router)
# Dummy user for demonstration
users_db = {
"alice": {"password": "password", "roles": ["initiator"]},
"bob": {"password": "password", "roles": ["approver"]},
"admin_user": {"password": "password", "roles": ["admin", "initiator", "approver"]},
}
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user_data = users_db.get(form_data.username)
if not user_data or user_data["password"] != form_data.password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": form_data.username, "roles": user_data["roles"]}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: str = Depends(get_current_user)):
return {"username": current_user}
错误处理
FastAPI 提供了统一的异常处理机制。使用 HTTPException 可以返回标准的 HTTP 错误响应。在 LangGraph 运行过程中,也需要捕获可能的异常,并将其转换为合适的 API 错误。
幂等性
对于审批操作(批准/拒绝),确保其幂等性非常重要。这意味着多次提交相同的批准/拒绝请求,其结果与提交一次相同。在我们的设计中:
- 如果请求已经批准/拒绝,再次提交相同的操作会因
status != "pending_review"检查而失败,返回 400 错误。 - 如果多次提交同一决策,LangGraph 也会从最新状态恢复,并执行一次相应的节点,不会导致重复的逻辑执行。
可扩展性
- 数据库连接池:
create_async_engine默认会管理连接池,确保高效利用数据库资源。 - 异步任务处理:
asyncio.create_task用于将 LangGraph 的运行放在后台,不阻塞 API 请求。对于更复杂的异步工作流,可以考虑 Celery 等任务队列。 - 负载均衡: 多个 FastAPI 实例可以部署在负载均衡器之后,共同处理请求。Redis Pub/Sub 确保了在这种多实例环境下 WebSocket 消息的正确广播。
- LangGraph Checkpoint: 使用 PostgreSQL 存储检查点,可以在数据库层面进行扩展和备份。
前端交互视角 (后端支持)
尽管本次讲座侧重后端,但理解前端如何利用这些 API 至关重要。
- 认证: 前端首先通过
/token端点获取 JWT,并将其存储(例如在localStorage或sessionStorage)。 - 发起审批: 用户填写表单后,前端发送
POST /api/processes请求,并在Authorization头中包含 JWT。后端返回新创建的审批流程状态。 - 显示实时状态:
- 前端解析后端返回的
ApprovalProcessStatus,获取request_id。 - 前端建立一个 WebSocket 连接到
/ws/processes/{request_id}/status,并在连接时(或通过查询参数)发送认证 JWT。 - 一旦连接成功,前端监听来自 WebSocket 的消息。每当后端推送新的状态更新时,前端 UI 就会立即更新相应的审批卡片、进度条或历史记录。
- 前端解析后端返回的
- 显示待处理任务: 审批者登录后,前端可以定期(或在页面加载时)发送
GET /api/processes/pending_tasks请求,获取所有待其处理的审批列表,并在 UI 中展示为“我的任务”。 - 提交审批决策: 审批者在 UI 上点击“批准”或“拒绝”按钮,并填写评论后,前端发送
POST /api/processes/{process_id}/approve或POST /api/processes/{process_id}/reject请求,同样携带 JWT。后端处理后,会立即通过 WebSocket 推送更新,前端 UI 会自动刷新。 - 查看历史: 用户点击某个审批流程,前端发送
GET /api/processes/{process_id}请求,获取完整的状态和历史信息,并在详情页展示。
通过这种方式,前端无需编写复杂的轮询逻辑,只需订阅 WebSocket,就能构建出高度响应和用户友好的审批界面。
结语:构建响应式审批流的实践之路
今天,我们深入探讨了如何利用 LangGraph 的强大状态管理能力,结合 FastAPI 构建高性能异步后端,并通过 RESTful API 和 WebSockets 实现与前端的实时状态同步。我们看到了 LangGraph 如何优雅地处理复杂审批流的挂起与恢复,以及如何通过自定义检查点将其状态持久化到 PostgreSQL。更重要的是,我们构建了一个基于 Redis Pub/Sub 和 FastAPI WebSockets 的实时通知系统,确保前端能够即时响应后端状态的任何变化。
通过这种架构,我们不仅能够构建出功能强大、逻辑清晰的审批系统,还能为用户提供前所未有的流畅和透明的交互体验。LangGraph 作为后端智能的核心,结合现代异步 Web 框架和实时通信技术,为构建下一代企业级审批应用开辟了广阔的道路。希望本次讲座能为大家在实践中带来启发,并助力大家构建出更智能、更高效的业务流程系统。