演进中的工作流编排:从确定性到适应性
各位同仁,大家好。今天我们齐聚一堂,探讨一个在现代AI驱动应用开发中日益凸显的关键议题:如何高效、优雅地处理工作流中的“概率性输出”。传统的工作流管理系统,以其确定性、可重复性和强大的调度能力,构成了多数企业级数据和业务流程的基石。然而,随着大型语言模型(LLM)和其他复杂AI模型的普及,我们面临的不再仅仅是简单的成功/失败或真/假判断,而是带有置信度、不确定性或多路径可能性的“概率性输出”。这类输出要求工作流具备更高的灵活性和适应性,能够根据不确定性程度动态调整执行路径,甚至进行自我修正。
本文将深入对比两种截然不同的工作流编排范式:以Apache Airflow为代表的传统有向无环图(DAG)工作流,以及专为构建LLM驱动的Agentic应用而设计的LangGraph。我们将聚焦于它们在处理“概率性输出”时的核心差异,并通过具体的代码示例,展示各自的优势与局限。
一、传统DAG工作流:确定性与显式路径的王国
传统DAG工作流,如Airflow,其核心理念是任务(Task)之间存在明确的依赖关系,并且这些任务的执行顺序形成一个有向无环图。这意味着:
- 确定性(Determinism):给定相同的输入,任务通常会产生相同的输出。
- 可重复性(Idempotency):多次运行同一个任务,其效果是相同的。
- 可观察性(Observability):任务状态、日志和执行历史清晰可见。
- 无环(Acyclic):任务流不会形成循环,确保任务最终会完成。
1.1 Airflow的核心原则与结构
Airflow DAG由一系列Operator组成,每个Operator封装了一个特定类型的工作(如执行Python函数、Bash命令、SQL查询等)。任务间的依赖通过 >> 或 set_upstream/downstream 定义。
代码示例:一个简单的Airflow ETL DAG
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data(**kwargs):
"""模拟数据抽取"""
print("Extracting data from source...")
# 实际场景中,这里会连接数据库、API等
data = {"id": 1, "name": "Alice", "value": 100}
kwargs['ti'].xcom_push(key='extracted_data', value=data)
print(f"Extracted: {data}")
def transform_data(**kwargs):
"""模拟数据转换"""
ti = kwargs['ti']
raw_data = ti.xcom_pull(key='extracted_data', task_ids='extract_task')
print(f"Transforming data: {raw_data}")
# 实际场景中,这里会进行清洗、聚合等操作
transformed_data = {
"user_id": raw_data["id"],
"user_name": raw_data["name"].upper(),
"processed_value": raw_data["value"] * 2
}
kwargs['ti'].xcom_push(key='transformed_data', value=transformed_data)
print(f"Transformed: {transformed_data}")
def load_data(**kwargs):
"""模拟数据加载"""
ti = kwargs['ti']
final_data = ti.xcom_pull(key='transformed_data', task_ids='transform_task')
print(f"Loading data to destination: {final_data}")
# 实际场景中,这里会将数据写入数据库、数据湖等
print("Data loaded successfully.")
with DAG(
dag_id='traditional_etl_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['etl', 'demo'],
) as dag:
start_task = BashOperator(
task_id='start_pipeline',
bash_command='echo "Starting ETL pipeline..."',
)
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_data,
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
)
load_task = PythonOperator(
task_id='load_task',
python_callable=load_data,
)
end_task = BashOperator(
task_id='end_pipeline',
bash_command='echo "ETL pipeline finished."',
)
start_task >> extract_task >> transform_task >> load_task >> end_task
这个DAG清晰地展示了顺序执行的特性。每个任务的输出(通过XCom机制)可以作为后续任务的输入,但整个流程是预先定义好的,不具备运行时动态改变结构的能力。
1.2 传统DAG中的条件逻辑处理
Airflow提供了BranchPythonOperator和ShortCircuitOperator等机制来引入条件逻辑。这些操作符允许根据Python函数的返回值来决定下一步执行哪个(或哪些)任务。然而,这些条件通常基于明确的、离散的判断。
代码示例:基于确定性条件的Airflow分支
假设我们有一个数据验证步骤,如果数据有效,则继续处理;如果无效,则发送通知并跳过后续处理。
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator # 在Airflow 2.x中通常被BaseOperator取代,但概念相同
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
def validate_data_quality(**kwargs):
"""模拟数据质量验证,返回布尔值"""
# 实际场景中,这里会执行复杂的校验逻辑
data_is_valid = True # 假设数据是有效的
# data_is_valid = False # 假设数据是无效的
print(f"Data validation result: {data_is_valid}")
return "process_data_task" if data_is_valid else "send_alert_task"
def process_data(**kwargs):
"""模拟数据处理"""
print("Data is valid, proceeding with processing...")
# ... 实际处理逻辑 ...
def send_alert(**kwargs):
"""模拟发送警报"""
print("Data is invalid, sending alert to ops team...")
# ... 实际警报逻辑 ...
with DAG(
dag_id='conditional_data_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['conditional', 'demo'],
) as dag:
start = BashOperator(task_id='start', bash_command='echo "Starting conditional pipeline..."')
validate_task = PythonOperator(
task_id='validate_data_quality',
python_callable=validate_data_quality,
)
branch_task = BranchPythonOperator(
task_id='branch_on_validation',
python_callable=validate_data_quality, # 这里的callable与validate_task的callable相同,或者可以从XCom获取结果
)
process_task = PythonOperator(
task_id='process_data_task',
python_callable=process_data,
)
send_alert_task = PythonOperator(
task_id='send_alert_task',
python_callable=send_alert,
)
end = BashOperator(task_id='end', bash_command='echo "Pipeline finished."')
start >> validate_task >> branch_task
branch_task >> [process_task, send_alert_task]
process_task >> end
send_alert_task >> end # 无论是否发送警报,最终都导向结束
在这个例子中,branch_on_validation操作符根据validate_data_quality函数的返回值(一个任务ID字符串)来决定接下来运行哪个任务。这是基于一个明确的二元(或多元)选择。
1.3 传统DAG处理“概率性输出”的挑战
“概率性输出”是指那些不只有简单的成功/失败或真/假结果,而是伴随有置信度、概率分数或多种不确定性程度的结果。例如:
- LLM生成内容的置信度:一个LLM生成了一段文本,并评估其“准确性”为75%。
- 模型预测的不确定性:一个图像识别模型预测图片是“猫”的概率为90%,是“狗”的概率为8%,是“模糊”的概率为2%。
- 人类审核结果的信心水平:人工审核员对某个内容的“合规性”给出了“高信心通过”、“中等信心需复审”或“低信心拒绝”的判断。
传统DAG在处理这类输出时面临以下挑战:
- 离散化信息损失:必须将连续的概率或多维不确定性强制转换为离散的、预定义的路径选择。例如,将“LLM置信度75%”转换为“通过人工审核”或“不通过”。这会丢失原始概率的精细信息,无法实现更 nuanced 的决策。
- 分支逻辑的复杂度爆炸:如果概率性输出有多种阈值,需要大量的
BranchPythonOperator和DummyOperator来构建复杂的扇出-扇入(fan-out/fan-in)逻辑。例如,置信度 > 90% -> 自动发布;70-90% -> 人工审核;< 70% -> LLM重试。这会导致DAG图变得极其庞大和难以维护。 - 缺乏自然循环与自我修正:DAG的无环特性意味着无法直接构建“如果结果不满意,则返回上一步重新尝试”的循环逻辑。要实现循环,通常需要依赖外部状态存储(如数据库)和复杂的Airflow传感器(Sensor)或外部触发机制,将一个DAG的失败或特定结果作为触发另一个DAG或同一DAG新运行的条件,这极大地增加了系统的复杂性。
- 状态管理分散:每个任务通常是无状态的,通过XCom传递少量信息。但对于需要迭代、累积性地处理概率性输出并基于此进行决策的场景,XCom显得力不从心,需要更强大的共享状态管理机制。
- 不适合Agentic行为:传统DAG更像是一个“流水线”,数据从一端流入,经过一系列预设步骤后从另一端流出。它不擅长模拟具有记忆、规划、反思和自我纠正能力的“智能体(Agent)”行为,而这正是处理概率性输出时往往需要的。
场景举例: 一个LLM生成营销文案,并提供一个“商业可行性”的置信度分数。
- 传统DAG:
llm_generate_copy_task-> 输出文案和置信度。branch_on_confidence_task(基于置信度阈值) ->publish_copy_task或manual_review_task或send_back_to_llm_for_retry_task。- 如果需要重试,
send_back_to_llm_for_retry_task不能直接指向llm_generate_copy_task,而是需要触发一个新的DAG运行或通过外部机制间接实现。每次重试都是一个独立的任务实例,缺乏上下文关联。
在这种模式下,概率性输出的精细度被简化,自适应和迭代能力受限。
二、LangGraph:状态、循环与自适应执行的范式
LangGraph是LangChain生态系统中的一个库,它提供了一种构建“有状态的、多Agent的、循环执行的”工作流的框架。与传统DAG的根本区别在于,LangGraph允许在图中存在循环,并且其核心设计理念就是为了支持具有复杂决策逻辑和迭代能力的Agentic应用。
2.1 范式转变:从DAG到带状态与循环的图
LangGraph将工作流视为一个包含节点(Nodes)和边(Edges)的图。
- 节点(Nodes):通常代表一个Agent或一个工具调用,执行某个特定的逻辑。
- 边(Edges):定义了节点之间的转换。这些边可以是简单的顺序执行,也可以是基于条件判断的动态跳转。
- 状态(State):这是LangGraph的核心概念之一。整个图维护一个共享的、可变的状态对象。节点的操作会读取和修改这个状态,而边的条件判断也基于当前的状态。
- 循环(Cycles):LangGraph明确支持图中的循环,这使得Agent能够进行多次迭代、反思或自我修正,直到达到某个条件为止。
这种设计使得LangGraph更像是一个“决策引擎”或“协调器”,而不是一个简单的任务调度器。它擅长模拟智能体(Agent)的行为,其中Agent根据当前环境(状态)做出决策,执行动作,然后观察结果,并可能根据结果调整后续行为。
2.2 LangGraph的核心概念
StateGraph:构建图的基础类。它需要定义一个StateType,这个类型描述了工作流中共享的状态结构。- 节点(Nodes):通过
add_node()方法添加。每个节点都关联一个Python函数或可调用对象,这个函数接收当前状态并返回一个更新状态的字典。 - 边(Edges):
- 普通边(
add_edge()):从一个节点到另一个节点的无条件转换。 - 条件边(
add_conditional_edges()):这是LangGraph处理复杂逻辑的关键。它接收一个源节点、一个条件函数和一个映射。条件函数接收当前状态,返回一个字符串,这个字符串对应映射中的目标节点。这允许根据状态动态决定下一个执行的节点。
- 普通边(
- 入口点(Entry Point):
set_entry_point()定义了图的起始节点。 - 出口点(Exit Points):
set_finish_point()定义了图的结束节点,当达到这个节点时,图的执行停止。
代码示例:一个简单的LangGraph
假设我们有一个工作流:用户输入 -> LLM处理 -> 输出。
from typing import TypedDict, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, START, END
import operator
# 1. 定义共享状态
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
# 其他可能的状态变量,例如:
# error_count: int
# processed_data: str
# 2. 定义节点函数
def call_llm(state: AgentState):
"""模拟一个LLM调用节点,接收消息,生成回复并更新状态。"""
print("---CALL LLM NODE---")
messages = state["messages"]
# 实际场景中,这里会调用LLM API
# 假设LLM简单地对用户消息进行回应
llm_response = f"LLM processed: {messages[-1].content}"
new_message = HumanMessage(content=llm_response, name="llm_agent")
return {"messages": [new_message]}
# 3. 构建图
workflow = StateGraph(AgentState)
# 3.1 添加节点
workflow.add_node("llm_processor", call_llm) # 节点名称 "llm_processor"
# 3.2 设置入口点和出口点
workflow.set_entry_point("llm_processor")
workflow.set_finish_point("llm_processor") # 这里让LLM处理完就结束
# 4. 编译图
app = workflow.compile()
# 5. 运行图
initial_state = {"messages": [HumanMessage(content="Hello LangGraph!")]}
final_state = app.invoke(initial_state)
print("n---FINAL STATE---")
print(final_state)
这个例子虽然简单,但展示了LangGraph的基本结构:定义状态、定义节点、构建图。
2.3 LangGraph如何处理“概率性输出”
LangGraph处理概率性输出的强大之处在于其状态管理、条件边和循环能力。核心思想是:将概率性输出作为状态的一部分,并利用条件边基于这个概率值动态地选择后续路径,包括返回到先前的节点进行迭代优化。
具体机制:
- 状态中携带概率/置信度:工作流中的任何节点都可以计算一个概率或置信度分数,并将其存储在共享的
AgentState中。 - 条件边判断:当一个节点完成执行后,LangGraph会调用一个条件函数。这个函数能够访问当前的
AgentState,包括其中存储的概率/置信度。它根据这个值决定下一个应该执行的节点。 - 迭代与循环:如果概率性输出表明当前结果不够理想(例如,置信度低于某个阈值),条件函数可以将流程导回到一个“重试”、“细化”或“人工干预”的节点,甚至回到最初的生成节点,从而形成一个自修正的循环。每次循环,状态都会更新,包含新的尝试或更多的信息。
- 累积状态:状态可以累积历史信息,例如重试次数、每次尝试的置信度历史等,这使得Agent能够“记住”过去的尝试并基于此进行更智能的决策。
举例: LLM生成文案并评估置信度。
generate_copy_node-> 产生文案和置信度。evaluate_confidence_node-> 更新状态中的confidence_score。- 条件边:
if confidence_score > 0.9: go topublish_nodeif 0.7 <= confidence_score <= 0.9: go tohuman_review_nodeif confidence_score < 0.7: go torefine_llm_node(并提供改进建议) -> 这个节点可以循环回generate_copy_node或evaluate_confidence_node。
这种模式完美契合了Agentic工作流的需求:根据不确定的结果动态调整策略,直到达成目标。
三、深度对比:内容生成与审核场景
为了更具体地说明LangGraph与传统DAG在处理概率性输出上的差异,我们来设计一个实际场景:
场景定义: 自动化内容生成与审核流程。
- LLM生成内容:一个LLM根据用户需求生成一篇短文。
- 置信度评估:另一个LLM(或一个独立的模型)评估生成内容的质量和准确性,输出一个0-1的置信度分数。
- 决策分支:
- 高置信度 (> 0.8):直接发布。
- 中置信度 (0.5 – 0.8):发送给人类专家进行审核。
- 低置信度 (< 0.5):将内容和评估结果反馈给LLM,要求其进行优化,并重试生成和评估过程。
- 人类审核:
- 如果人类专家批准,则发布。
- 如果人类专家拒绝或要求修改,则将拒绝理由反馈给LLM,要求其进行优化,并重试生成和评估过程。
- 循环限制:为了避免无限循环,设定一个最大优化/重试次数(例如3次)。
3.1 传统Airflow实现(挑战凸显)
在Airflow中实现上述场景会非常复杂,尤其是在处理重试循环和动态状态传递方面。我们需要:
- 多个
BranchPythonOperator来处理置信度阈值和人类审核结果。 PythonOperator来封装LLM调用、评估和发布逻辑。- 为了实现循环,我们不能在DAG内部直接回溯。最常见的(但仍不理想)方法是:
- 将重试逻辑封装在LLM生成任务内部,或者
- 利用外部存储(如数据库)来跟踪重试次数和历史结果,然后用Airflow Sensor来监听这些状态,并触发新的DAG运行或特定的任务。
- 使用
ExternalTaskSensor或TriggerDagRunOperator来触发新的DAG运行,但这会失去单个任务实例的上下文连续性。
- 为了避免无限循环,需要在任务内部或外部状态中管理重试计数。
Airflow DAG 伪代码结构示意:
# 这只是一个概念性的框架,实际Airflow代码会更长更复杂
# 并且无法优雅地实现内部循环和状态传递
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime
# 假设LLM和评估函数
def generate_content_llm(task_instance, max_retries=3):
current_retry = task_instance.xcom_pull(key='retry_count', task_ids='start_dag') or 0
if current_retry >= max_retries:
print("Max retries reached for content generation.")
# 可能抛出异常或返回特定状态
return {"content": "Failed to generate quality content.", "confidence": 0.0}
print(f"Generating content (retry {current_retry + 1})...")
# 模拟LLM调用
content = "Generated content draft."
confidence = 0.65 # 模拟中等置信度
# 在实际中,这里可能会根据previous_feedback进行优化
previous_feedback = task_instance.xcom_pull(key='feedback', task_ids='human_review')
if previous_feedback:
print(f"Applying feedback: {previous_feedback}")
# 模拟LLM根据反馈调整
content += f" (refined based on: {previous_feedback})"
confidence += 0.1 # 假设优化后置信度提高
task_instance.xcom_push(key='generated_content', value=content)
task_instance.xcom_push(key='confidence_score', value=confidence)
task_instance.xcom_push(key='retry_count', value=current_retry + 1) # 更新重试计数
return {"content": content, "confidence": confidence}
def decide_next_step(task_instance):
confidence = task_instance.xcom_pull(key='confidence_score', task_ids='generate_content')
retry_count = task_instance.xcom_pull(key='retry_count', task_ids='generate_content')
max_retries = 3 # Hardcoded or from params
if confidence > 0.8:
return 'publish_content'
elif 0.5 <= confidence <= 0.8:
return 'human_review'
elif confidence < 0.5 and retry_count < max_retries:
# 这里是挑战:如何回溯到 generate_content?
# Airflow DAG无法直接回溯。
# 只能通过触发新的DAG运行或复杂外部逻辑
# 或者,更常见的是,在 generate_content 内部处理重试循环
# 如果在内部处理,那么 confidence 评估也需要在内部循环,这与DAG理念不符
# 假设这里是触发一个重试分支,但实际回溯需要外部机制
return 'retry_content_branch' # 这是一个伪分支,实际无法简单实现
else: # confidence < 0.5 and max retries reached
return 'fail_generation'
def human_review_content(task_instance):
content = task_instance.xcom_pull(key='generated_content', task_ids='generate_content')
print(f"Human reviewing content: {content}")
# 模拟人工审核结果
review_status = "approved" # 或 "rejected"
feedback = ""
if review_status == "rejected":
feedback = "Content too generic, needs more detail."
task_instance.xcom_push(key='review_status', value=review_status)
task_instance.xcom_push(key='feedback', value=feedback)
return review_status
def decide_after_review(task_instance):
review_status = task_instance.xcom_pull(key='review_status', task_ids='human_review')
retry_count = task_instance.xcom_pull(key='retry_count', task_ids='generate_content')
max_retries = 3
if review_status == "approved":
return 'publish_content'
elif review_status == "rejected" and retry_count < max_retries:
# 同样,这里需要回溯到生成或优化步骤
return 'retry_content_after_review_branch'
else: # rejected and max retries reached
return 'fail_generation'
def publish_content(task_instance):
content = task_instance.xcom_pull(key='generated_content', task_ids='generate_content')
print(f"Publishing content: {content}")
def fail_generation_task():
print("Content generation failed after multiple attempts.")
with DAG(
dag_id='airflow_llm_content_workflow',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
tags=['llm', 'probabilistic'],
) as dag:
start_dag = DummyOperator(task_id='start_dag', dag=dag)
# 1. LLM生成内容和评估置信度 (合并在一个任务中,简化DAG结构,但失去了模块化)
# 或者,需要两个独立的任务,但 XCom 传递会更复杂
generate_content = PythonOperator(
task_id='generate_content',
python_callable=generate_content_llm,
provide_context=True,
)
# 2. 根据置信度分支
branch_confidence = BranchPythonOperator(
task_id='branch_on_confidence',
python_callable=decide_next_step,
provide_context=True,
)
# 高置信度路径
publish = PythonOperator(
task_id='publish_content',
python_callable=publish_content,
provide_context=True,
)
# 中置信度路径
human_review = PythonOperator(
task_id='human_review',
python_callable=human_review_content,
provide_context=True,
)
# 人工审核后的分支
branch_after_review = BranchPythonOperator(
task_id='branch_after_review',
python_callable=decide_after_review,
provide_context=True,
)
# 低置信度或人工拒绝后的重试路径 (这是Airflow中最难优雅实现的部分)
# 理论上,'retry_content_branch' 和 'retry_content_after_review_branch'
# 需要指向 generate_content,但在DAG中是不允许的循环。
# 实际操作中,可能需要:
# 1. 触发一个新的DAG运行(使用 TriggerDagRunOperator),并传递上下文。
# 2. 将 generate_content 任务设计成可以接收重试参数,并在内部循环,但这违背了Airflow任务的原子性。
# 3. 创建多个 generate_content_retry_1, generate_content_retry_2... 这样的任务链,导致DAG非常冗长。
# 假设我们通过创建多个任务实例来模拟重试,但这会使得DAG非常庞大且不灵活
# 为了简化,这里用一个DummyOperator表示需要回溯,但实际不可行
retry_content_branch = DummyOperator(task_id='retry_content_branch')
retry_content_after_review_branch = DummyOperator(task_id='retry_content_after_review_branch')
fail_task = PythonOperator(
task_id='fail_generation',
python_callable=fail_generation_task,
)
# 路径定义
start_dag >> generate_content >> branch_confidence
# 高置信度
branch_confidence >> publish
# 中置信度
branch_confidence >> human_review
human_review >> branch_after_review
branch_after_review >> publish
# 低置信度或人工拒绝 (Airflow 难以直接回溯)
branch_confidence >> retry_content_branch
branch_after_review >> retry_content_after_review_branch
# 模拟回溯,但在Airflow中,这将是一个新的DAG运行或复杂外部调度
# 这里为了演示,假设它们能神奇地触发 generate_content
# retry_content_branch >> generate_content # 这是不允许的循环!
# retry_content_after_review_branch >> generate_content # 这是不允许的循环!
# Airflow 实际实现会是这样:
# retry_content_branch >> TriggerDagRunOperator(task_id='trigger_retry_dag', trigger_dag_id='airflow_llm_content_workflow', conf={'retry_count': '{{ ti.xcom_pull(key="retry_count", task_ids="generate_content") }}'})
# retry_content_after_review_branch >> TriggerDagRunOperator(...)
# 失败路径
branch_confidence >> fail_task
branch_after_review >> fail_task
# 定义最终的结束点 (如果可以的话)
[publish, fail_task] # 它们是最终的结束节点
传统Airflow的挑战总结:
| 特性 | Airflow在处理此场景中的表现 |
|---|---|
| 状态管理 | 依赖XCom在任务间传递少量数据,但缺乏持久化的、可变的工作流全局状态。重试次数和历史反馈需要外部存储或复杂XCom管理。 |
| 循环逻辑 | DAG的无环特性使得直接循环回溯不可能。需要复杂的TriggerDagRunOperator或外部协调器来模拟循环,导致上下文丢失和系统复杂性增加。 |
| 动态决策 | BranchPythonOperator能实现分支,但只能基于离散判断。对于多个置信度阈值,分支链会变得非常冗长且难以管理。 |
| 可读性与维护 | 复杂的扇出-扇入和模拟循环会使DAG图变得庞大、混乱,难以理解和维护。 |
| Agentic行为 | 不支持Agent的迭代、反思、自我修正等行为,更像是一个严格的流水线。 |
3.2 LangGraph实现(优雅与强大)
LangGraph能够以非常直观和优雅的方式实现上述场景。其核心优势在于:
- 共享状态:整个流程共享一个
AgentState,其中包含内容、置信度、重试次数和审核反馈等所有必要信息。 - 条件边:根据
AgentState中的置信度分数和审核结果动态决定下一个节点。 - 内建循环:能够将流程导回到之前的节点(如LLM生成或优化节点),轻松实现迭代。
LangGraph 代码示例:
from typing import TypedDict, Annotated, List, Literal
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
import operator
import random # 用于模拟LLM的概率性输出
# 1. 定义共享状态
class ContentAgentState(TypedDict):
"""
工作流中共享的状态。
`content`: 当前LLM生成的内容草稿。
`confidence_score`: 对内容质量的评估分数 (0-1)。
`review_status`: 人工审核结果 ('pending', 'approved', 'rejected')。
`feedback`: 人工审核的反馈信息。
`retry_count`: 当前内容的优化/重试次数。
`messages`: 用于LLM交互的消息历史。
"""
content: str
confidence_score: float
review_status: Literal["pending", "approved", "rejected"]
feedback: str
retry_count: int
messages: Annotated[List[BaseMessage], operator.add]
# 2. 定义节点函数
def generate_content_node(state: ContentAgentState):
"""
LLM生成内容的节点。
根据当前状态中的反馈进行优化,并生成新的内容草稿。
"""
print("---NODE: generate_content_node---")
current_content = state.get("content", "")
feedback = state.get("feedback", "")
retry_count = state.get("retry_count", 0)
# 模拟LLM调用
if feedback:
print(f"LLM generating content with feedback: '{feedback}'")
new_content = f"Refined content (attempt {retry_count + 1}) based on: '{feedback}'. Original: {current_content[:50]}..."
else:
print(f"LLM generating initial content (attempt {retry_count + 1})...")
new_content = "Initial content draft about a new tech product launch. It needs to be engaging and informative."
# 模拟LLM生成消息
llm_message = AIMessage(content=new_content, name="content_generator")
return {
"content": new_content,
"messages": [llm_message],
"feedback": "", # 清空反馈,等待新的反馈或评估
"retry_count": retry_count + 1 # 增加重试计数
}
def evaluate_confidence_node(state: ContentAgentState):
"""
评估生成内容的质量,并输出置信度分数。
这是一个产生“概率性输出”的关键节点。
"""
print("---NODE: evaluate_confidence_node---")
content = state["content"]
retry_count = state["retry_count"]
# 模拟LLM或模型评估置信度
# 假设随着重试次数增加,置信度有提升的倾向
base_confidence = random.uniform(0.3, 0.7) # 初始置信度
confidence = min(1.0, base_confidence + (retry_count * 0.1)) # 模拟每次重试提升0.1
print(f"Evaluating content: '{content[:50]}...' -> Confidence: {confidence:.2f}")
return {"confidence_score": confidence}
def human_review_node(state: ContentAgentState):
"""
模拟人类专家审核内容的节点。
这是一个需要外部交互的节点。
"""
print("---NODE: human_review_node---")
content = state["content"]
print(f"n--- AWAITING HUMAN REVIEW ---")
print(f"Content for review: {content}")
print(f"Current confidence: {state['confidence_score']:.2f}")
# 模拟人工输入:
# review_decision = input("Approve (a), Reject (r) or Request Changes (c)? ").strip().lower()
# 为了演示自动化,我们模拟一个决策:
# 假设人工审核有50%概率直接批准,50%概率要求修改
if random.random() < 0.5:
review_decision = "a"
else:
review_decision = "c" # 要求修改
review_status = "pending"
feedback = ""
if review_decision == 'a':
review_status = "approved"
print("Human approved content.")
elif review_decision == 'r':
review_status = "rejected"
feedback = "Human rejected: Content is off-topic."
print("Human rejected content.")
elif review_decision == 'c':
review_status = "rejected" # 视为拒绝,需要重试
feedback = "Human requested changes: Needs more specific examples."
print("Human requested changes.")
else:
print("Invalid review decision, assuming pending.")
return {"review_status": review_status, "feedback": feedback}
def publish_content_node(state: ContentAgentState):
"""
发布内容的节点。
"""
print("---NODE: publish_content_node---")
print(f"--- PUBLISHING FINAL CONTENT ---")
print(f"Final Content: {state['content']}")
print(f"Confidence Score: {state['confidence_score']:.2f}")
return {"messages": [AIMessage(content="Content published successfully!", name="publisher")]}
def fail_workflow_node(state: ContentAgentState):
"""
当达到最大重试次数仍未成功时,标记流程失败。
"""
print("---NODE: fail_workflow_node---")
print("--- WORKFLOW FAILED: Max retries reached or unrecoverable error. ---")
print(f"Last content: {state['content']}")
return {"messages": [AIMessage(content="Content generation workflow failed!", name="error_handler")]}
# 3. 定义条件函数 (路由逻辑)
def decide_next_step(state: ContentAgentState):
"""
根据置信度分数和重试次数决定下一步。
这是LangGraph处理“概率性输出”的核心。
"""
print("---DECIDE: decide_next_step---")
confidence = state["confidence_score"]
retry_count = state["retry_count"]
max_retries = 3 # 设定最大重试次数
if confidence > 0.8:
print(f"Confidence {confidence:.2f} > 0.8. Path: publish_content")
return "publish_content_node"
elif 0.5 <= confidence <= 0.8:
print(f"Confidence {confidence:.2f} (0.5-0.8). Path: human_review")
return "human_review_node"
else: # confidence < 0.5
if retry_count < max_retries:
print(f"Confidence {confidence:.2f} < 0.5. Retrying (attempt {retry_count}/{max_retries}). Path: generate_content_node")
return "generate_content_node" # 回溯到生成节点进行优化
else:
print(f"Confidence {confidence:.2f} < 0.5 and max retries ({max_retries}) reached. Path: fail_workflow_node")
return "fail_workflow_node"
def decide_after_review(state: ContentAgentState):
"""
根据人工审核结果和重试次数决定下一步。
"""
print("---DECIDE: decide_after_review---")
review_status = state["review_status"]
retry_count = state["retry_count"]
max_retries = 3
if review_status == "approved":
print("Human approved. Path: publish_content_node")
return "publish_content_node"
elif review_status == "rejected":
if retry_count < max_retries:
print(f"Human rejected. Retrying (attempt {retry_count}/{max_retries}). Path: generate_content_node")
return "generate_content_node" # 回溯到生成节点进行优化
else:
print(f"Human rejected and max retries ({max_retries}) reached. Path: fail_workflow_node")
return "fail_workflow_node"
print("Review status pending or unknown. Path: fail_workflow_node") # 异常情况
return "fail_workflow_node"
# 4. 构建 LangGraph
workflow = StateGraph(ContentAgentState)
# 4.1 添加节点
workflow.add_node("generate_content_node", generate_content_node)
workflow.add_node("evaluate_confidence_node", evaluate_confidence_node)
workflow.add_node("human_review_node", human_review_node)
workflow.add_node("publish_content_node", publish_content_node)
workflow.add_node("fail_workflow_node", fail_workflow_node)
# 4.2 设置入口点
workflow.set_entry_point("generate_content_node")
# 4.3 定义边
# 从生成内容到评估置信度 (顺序执行)
workflow.add_edge("generate_content_node", "evaluate_confidence_node")
# 从评估置信度到下一步 (条件分支)
workflow.add_conditional_edges(
"evaluate_confidence_node", # 源节点
decide_next_step, # 条件函数
{ # 映射:条件函数返回值 -> 目标节点
"publish_content_node": "publish_content_node",
"human_review_node": "human_review_node",
"generate_content_node": "generate_content_node", # 循环回溯
"fail_workflow_node": "fail_workflow_node"
}
)
# 从人工审核到下一步 (条件分支)
workflow.add_conditional_edges(
"human_review_node", # 源节点
decide_after_review, # 条件函数
{ # 映射
"publish_content_node": "publish_content_node",
"generate_content_node": "generate_content_node", # 循环回溯
"fail_workflow_node": "fail_workflow_node"
}
)
# 4.4 设置结束点
workflow.set_finish_point("publish_content_node")
workflow.set_finish_point("fail_workflow_node")
# 5. 编译图
app = workflow.compile()
# 6. 运行图
print("--- STARTING LANGGRAPH WORKFLOW ---")
initial_state = ContentAgentState(
content="",
confidence_score=0.0,
review_status="pending",
feedback="",
retry_count=0,
messages=[HumanMessage(content="Please generate a marketing copy for a new product.")]
)
final_state = app.invoke(initial_state)
print("n--- LANGGRAPH WORKFLOW FINISHED ---")
print(f"Final Content: {final_state['content']}")
print(f"Final Confidence: {final_state['confidence_score']:.2f}")
print(f"Final Review Status: {final_state['review_status']}")
print(f"Total Retries: {final_state['retry_count'] - 1}") # 因为在生成节点中就+1了
# 还可以使用 LangSmith 追踪执行过程
# from langsmith import Client
# client = Client()
# with client.tracer(project_name="langgraph_probabilistic_demo") as run_tree:
# final_state = app.invoke(initial_state, {"recursion_limit": 10}) # 设置递归限制
# print(final_state)
LangGraph的优势总结:
| 特性 | LangGraph在处理此场景中的表现 |
|---|---|
| 状态管理 | 核心设计就是围绕一个共享的AgentState。所有节点都可以读写状态,完美支持跨任务的上下文和累积信息(如重试次数、历史反馈)。 |
| 循环逻辑 | 通过条件边可以直接将流程导回到任何前置节点,自然地实现迭代和自我修正的循环。无需外部触发器或复杂的外部状态管理,保持了工作流的内聚性。 |
| 动态决策 | add_conditional_edges允许基于状态中的任何数据(包括概率性输出)进行复杂的多路径决策。条件函数可以包含任意Python逻辑,轻松处理多个置信度阈值。 |
| 可读性与维护 | 图结构清晰,节点和边直观地表示了Agent的决策流程。循环和条件分支被自然地集成到图中,提高了可读性和可维护性。 |
| Agentic行为 | 专为Agentic工作流设计,能够模拟具有记忆、规划、反思和自我纠正能力的智能体行为,完美匹配LLM应用的需求。 |
四、架构考量与适用场景
选择LangGraph还是Airflow,并非“非此即彼”的问题,而是取决于具体的工作流需求和架构偏好。
4.1 状态管理与持久化
- Airflow:任务级别的XCom机制用于任务间数据传递,但并非全局共享状态。更复杂的状态管理通常依赖外部数据库(如PostgreSQL、MySQL)、数据湖或消息队列。任务是相对无状态的,其核心是调度和执行。
- LangGraph:内置了
AgentState作为核心概念,提供了在整个图执行过程中共享和修改状态的能力。默认状态是内存中的,但可以通过集成Checkpointer(如SqliteSaver、RedisSaver)实现状态的持久化,以便恢复、调试或支持长期运行的Agent。这种集中式的状态管理非常适合Agent的反思和记忆能力。
4.2 编排目标:调度器 vs. 智能体协调器
- Airflow:本质上是一个强大的任务调度器和数据管道编排器。它擅长于定时执行、批处理、数据移动和转换等确定性、可重复性强的任务。其核心关注点是任务的顺序、依赖、重试、并发和资源管理。
- LangGraph:更像是一个智能体(Agent)协调器或决策引擎。它旨在编排一系列智能体或工具,使其能够进行有状态的交互、多轮对话、规划、反思和自我修正。它关注的是如何根据动态输入和不确定性输出,引导智能体在复杂决策空间中探索最佳路径。
4.3 循环与迭代
- Airflow:严格的DAG结构决定了不允许图中出现循环。任何形式的迭代都必须通过外部机制(如触发新的DAG运行)或冗余的线性任务链来实现,这会割裂上下文,增加复杂度。
- LangGraph:核心设计支持循环。Agent可以根据结果反复回到之前的步骤进行优化或修正,直到满足预设条件。这是其处理概率性输出和实现自适应行为的关键。
4.4 扩展性与部署
- Airflow:拥有成熟的分布式架构,支持高可用性、大规模任务并发和灵活的资源管理(通过Executor)。部署复杂,但稳定性和扩展性经过了实战验证。
- LangGraph:作为一个库,其执行通常在单个Python进程中。对于需要分布式执行的复杂Agent,可能需要结合其他技术(如队列、微服务)来部署其节点。LangGraph本身不提供分布式调度能力,而是专注于图的逻辑编排。
4.5 可观测性与调试
- Airflow:提供丰富的Web UI,用于查看DAG图、任务状态、日志、XComs等。具有成熟的监控和警报机制。
- LangGraph:与LangSmith(LangChain的追踪和可观测性平台)深度集成,可以可视化Agent的执行路径、状态变化、LLM调用等,这对于调试复杂的Agent行为至关重要。
4.6 何时选择哪种工具?
| 特性/场景 | Airflow(传统DAG) | LangGraph |
|---|---|---|
| 工作流类型 | 批处理、ETL、数据管道、定时任务、调度型工作流 | LLM Agent、多Agent系统、人机协作、自适应决策流 |
| 输出特性 | 确定性、离散的成功/失败或预设分支 | 概率性、不确定性、多路径、需要迭代优化 |
| 核心机制 | 任务调度、依赖管理、重试、并发控制 | 状态管理、条件路由、循环、Agent决策逻辑 |
| 上下文/状态 | XComs(少量数据),外部存储(复杂状态) | 共享AgentState(内置),支持Checkpointer持久化 |
| 循环能力 | 无原生循环,需外部触发或复杂冗余 | 内置循环,支持迭代优化和自修正 |
| 可扩展性 | 成熟的分布式架构,高并发 | 库级别,通常单进程,分布式需额外集成 |
| 部署复杂度 | 复杂(需要数据库、消息队列、调度器、工作器等) | 相对简单(作为Python库运行),Agent节点可封装为微服务 |
| 主要用例 | 数据仓库更新、报告生成、定时数据同步、机器学习模型训练 | 智能客服、内容生成与审核、自动化数据分析助手、复杂任务规划 |
| 人机交互 | 任务失败通知,外部系统触发人工干预 | 内置人机协作节点,可等待人工输入并根据结果继续流程 |
混合方法:
在许多实际场景中,并非必须二选一。一个常见的混合架构是:
- Airflow 负责宏观调度,例如每天定时触发一个任务。
- 这个Airflow任务可以是一个
PythonOperator,它启动并运行一个 LangGraph Agent。 - LangGraph Agent 负责内部的复杂逻辑、LLM交互、迭代优化和动态决策。
- Agent 完成后,将最终结果或状态返回给Airflow,Airflow再执行后续的存储、通知或报告任务。
这种混合模式结合了Airflow的强大调度和监控能力与LangGraph在处理复杂、自适应Agent工作流方面的灵活性和效率。
V. 适应未来:智能工作流的基石
通过对LangGraph与传统DAG工作流在处理“概率性输出”时的核心差异进行深入探讨,我们清晰地看到两种范式在设计理念和适用场景上的根本区别。传统DAG以其确定性、可重复性和强大的调度能力,在处理大规模、可预测的数据和业务流程方面依然是无可替代的利器。然而,面对AI时代层出不穷的智能应用,尤其是那些依赖LLM进行决策、生成和反思的Agentic系统,其固有的无环结构和分散的状态管理,使得它在应对不确定性、实现迭代优化和动态自适应方面显得力不从心。
LangGraph正是为了填补这一空白而生。它通过引入有状态的图、条件边和原生支持循环的能力,为构建能够根据概率性输出进行智能决策、多轮交互和自我修正的Agent工作流提供了强大的框架。它使得开发者能够以更自然、更直观的方式,设计出能够“思考”、“学习”和“适应”的自动化流程。
理解并善用这两种工作流编排工具,将是编程专家在构建未来智能系统时的核心竞争力。它们并非互相替代,而是互为补充,共同构筑起从确定性批处理到智能自适应决策的完整工作流解决方案。在AI驱动的时代,掌握如何高效地处理概率性输出,是解锁更高级自动化和智能应用的必由之路。