深入 ‘Async Workflow Orchestration’:利用 Celery 与 LangChain 处理超长周期(数小时)的离线任务

深入异步工作流编排:利用 Celery 与 LangChain 处理超长周期离线任务

在现代软件系统中,我们经常会遇到需要长时间运行的离线任务。这些任务可能涉及大量数据处理、复杂的机器学习模型训练、大规模文档分析或持续的数据同步。它们通常无法在典型的同步请求-响应周期内完成,因为这会导致用户界面冻结、API 超时或资源长时间占用。处理这类任务需要一种强大的异步处理机制,而当这些任务中包含复杂的、多步骤的、甚至智能化的逻辑时,我们还需要一个能够编排这些智能步骤的框架。

本讲座将深入探讨如何结合使用 Celery 这一强大的分布式任务队列系统,以及 LangChain 这一日益流行的 LLM 应用开发框架,来构建和编排耗时数小时甚至更长的超长周期离线任务。我们将从基础概念出发,逐步构建一个实际的、结合 AI 能力的复杂工作流,并讨论其设计模式、实现细节以及生产环境下的考量。


一、离线任务的挑战与异步处理的必要性

长周期离线任务通常具有以下特点:

  1. 执行时间长:从几分钟到数小时,甚至几天。
  2. 资源密集型:可能需要大量计算资源(CPU、GPU、内存)或长时间的网络I/O。
  3. 非交互性:任务启动后,用户通常不需要立即获得结果,但可能需要查询其进度。
  4. 失败恢复能力:任务可能在执行过程中因各种原因失败,需要有重试、恢复或错误处理机制。
  5. 复杂性:单个任务可能由多个子步骤组成,这些子步骤之间存在依赖关系,甚至需要根据中间结果进行动态决策。

传统的同步处理或简单的后台脚本无法很好地应对这些挑战。当一个Web请求触发一个耗时任务时,Web服务器连接会被长时间占用,最终导致超时,用户体验极差。即使是简单的后台进程,也缺乏统一的调度、监控和容错能力。

异步处理是解决这些问题的核心。它将耗时操作从主应用流中剥离,交由独立的进程(或多个进程)在后台执行。用户界面可以立即响应,任务结果可以在稍后通过通知、查询或回调获得。

然而,对于跨越数小时的复杂任务,仅仅异步执行是不够的。我们需要:

  • 持久化任务状态:确保即使工作进程重启,任务也能从上次中断的地方恢复。
  • 工作流编排:定义任务之间的执行顺序、并行性、条件分支和依赖关系。
  • 统一的错误处理和重试策略:确保任务的健壮性。
  • 监控与管理:实时查看任务进度、诊断问题。

这就引出了我们今天的主题:异步工作流编排


二、Celery:分布式任务队列的基石

Celery 是一个用 Python 编写的开源异步任务队列,它允许我们将耗时的操作分解成可独立执行的任务,并将它们分发给一个或多个工作进程异步处理。

2.1 Celery 的核心组件

Celery 的基本架构包含以下几个核心组件:

组件名称 职责描述 常见实现/示例
Broker (消息代理) 接收任务生产者发送的任务消息,并将其存储在队列中,等待工作者消费。 RabbitMQ (推荐), Redis, Amazon SQS, Azure Service Bus
Worker (工作者) 从 Broker 队列中取出任务并执行。一个 Celery 应用可以有多个 Worker,它们可以部署在不同的机器上。 Celery Worker 进程
Producer (任务生产者) 应用程序中调用 Celery 任务的部分,将任务发送给 Broker。 Python 代码中调用 .delay().apply_async()
Backend (结果存储) 可选组件,用于存储任务的执行结果和状态。 Redis, RabbitMQ, PostgreSQL, MongoDB, RPC

工作流程概览:

  1. 应用程序(Producer)定义一个 Celery 任务。
  2. 当应用程序需要执行该任务时,它不是直接调用函数,而是通过 Celery 客户端将任务消息(包含任务名称、参数等)发送到 Broker。
  3. Broker 将任务消息放入相应的队列中。
  4. Celery Worker 持续监听 Broker 中的队列,一旦有新任务,Worker 就会取出任务消息。
  5. Worker 执行任务消息中指定的函数,并将执行结果(如果配置了 Backend)发送到 Backend 存储。
  6. 应用程序可以异步查询 Backend 以获取任务的执行状态和结果。

2.2 Celery 基础配置与任务定义

首先,我们需要安装 Celery 及其 Broker 驱动(以 Redis 为例):

pip install celery redis

接下来,我们创建一个 celery_app.py 文件来配置 Celery 实例:

# celery_app.py
from celery import Celery

# 配置 Celery 实例
# broker: 消息代理的URL,这里使用本地Redis
# backend: 结果存储的URL,这里也使用本地Redis
app = Celery(
    'my_long_running_app',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['tasks'] # 告诉Celery在启动时加载tasks.py中的任务
)

# 可选:配置任务序列化方式,JSON是默认且推荐的
app.conf.task_serializer = 'json'
app.conf.result_serializer = 'json'
app.conf.accept_content = ['json']
app.conf.timezone = 'Asia/Shanghai' # 设置时区
app.conf.enable_utc = True # 启用UTC时间

# 常用任务配置
app.conf.task_acks_late = True # 确保任务在执行完成后才发送ACK,防止任务中途失败丢失
app.conf.worker_prefetch_multiplier = 1 # 每次只预取一个任务,避免长时间任务阻塞队列
app.conf.task_track_started = True # 允许追踪'STARTED'状态

if __name__ == '__main__':
    app.start()

然后,创建 tasks.py 来定义我们的任务:

# tasks.py
import time
import random
from celery_app import app

@app.task
def long_running_data_processing(data_id: str, duration_seconds: int):
    """
    一个模拟长时间数据处理的Celery任务。
    它会更新任务状态,模拟计算过程,并可能随机失败。
    """
    print(f"[{data_id}] 任务开始执行,预计耗时 {duration_seconds} 秒...")
    # 模拟任务开始时的状态更新
    long_running_data_processing.update_state(
        state='PROGRESS',
        meta={'current_step': 'initializing', 'progress': 0}
    )

    try:
        total_steps = 10
        for i in range(total_steps):
            # 模拟每一步处理
            step_duration = duration_seconds / total_steps
            time.sleep(step_duration)

            # 模拟随机失败
            if random.random() < 0.05: # 5%的概率失败
                raise ValueError(f"模拟任务失败在步骤 {i+1}/{total_steps}!")

            progress = int((i + 1) / total_steps * 100)
            print(f"[{data_id}] 进度: {progress}% (步骤 {i+1}/{total_steps})")
            # 实时更新任务状态和进度
            long_running_data_processing.update_state(
                state='PROGRESS',
                meta={'current_step': f'processing_step_{i+1}', 'progress': progress}
            )

        final_result = f"数据ID {data_id} 处理完成!总耗时 {duration_seconds} 秒。"
        print(f"[{data_id}] 任务成功完成。")
        return final_result
    except Exception as e:
        print(f"[{data_id}] 任务执行失败: {e}")
        # Celery会自动将异常标记为'FAILURE'状态
        raise # 重新抛出异常,让Celery捕获并记录失败信息

@app.task(bind=True, max_retries=3, default_retry_delay=60) # 绑定self,设置最大重试次数和默认重试延迟
def reliable_long_task(self, data_id: str):
    """
    一个更可靠的长时间任务,带有自动重试机制。
    """
    try:
        print(f"[{data_id}] 尝试执行任务 (重试次数: {self.request.retries})...")
        # 模拟一个可能失败的操作
        if random.random() < 0.3 and self.request.retries < self.max_retries:
            raise ConnectionError("模拟临时网络波动或资源不可用")

        time.sleep(10) # 模拟10秒处理
        return f"数据 {data_id} 成功处理 (经过 {self.request.retries} 次重试)."
    except ConnectionError as e:
        print(f"[{data_id}] 任务遇到临时错误: {e}")
        try:
            # 尝试重试任务
            self.retry(exc=e)
        except self.MaxRetriesExceededError:
            print(f"[{data_id}] 达到最大重试次数,任务最终失败。")
            raise # 最终失败

@app.task
def add(x, y):
    """一个简单的加法任务,用于演示任务链"""
    print(f"执行 add({x}, {y})")
    time.sleep(2)
    return x + y

@app.task
def multiply(x, y):
    """一个简单的乘法任务,用于演示任务链"""
    print(f"执行 multiply({x}, {y})")
    time.sleep(2)
    return x * y

@app.task
def final_callback(results):
    """一个回调任务,处理前一个任务组的结果"""
    print(f"最终回调任务接收到结果: {results}")
    return f"所有并行任务完成,汇总结果: {sum(results)}"

运行 Celery Worker:

在终端中,进入项目目录,运行:

celery -A celery_app worker --loglevel=info

这将启动一个 Celery Worker,它会监听 Redis Broker 中的任务。

调用任务:

在另一个 Python 脚本或交互式会话中:

# client.py
from tasks import long_running_data_processing, reliable_long_task, add, multiply, final_callback
from celery.result import AsyncResult
import time

# 异步调用任务
print("------ 演示基本长任务 ------")
task = long_running_data_processing.delay("DOC-123", 15) # 预计15秒
print(f"任务 {task.id} 已提交。")

# 查询任务状态
while not task.ready(): # 任务未完成
    status = task.status
    info = task.info
    print(f"任务 {task.id} 状态: {status}, 进度: {info.get('progress', 'N/A')}%")
    time.sleep(2)

print(f"任务 {task.id} 最终状态: {task.status}")
if task.successful():
    print(f"任务结果: {task.get()}")
else:
    print(f"任务失败原因: {task.traceback}")

print("n------ 演示可靠任务重试 ------")
reliable_task = reliable_long_task.delay("RELIABLE-DATA-456")
print(f"可靠任务 {reliable_task.id} 已提交。")
while not reliable_task.ready():
    print(f"可靠任务 {reliable_task.id} 状态: {reliable_task.status}")
    time.sleep(5)
print(f"可靠任务 {reliable_task.id} 最终状态: {reliable_task.status}, 结果: {reliable_task.get()}")

print("n------ 演示 Celery 任务链 (chain) ------")
# 链式任务:先加法,再乘法
chained_task = (add.s(2, 3) | multiply.s(10)).delay()
print(f"任务链 {chained_task.id} 已提交。")
print(f"任务链结果: {chained_task.get()}") # 会阻塞直到链完成

print("n------ 演示 Celery 任务组 (group) 和和弦 (chord) ------")
# 并行执行多个任务,然后用一个回调任务处理它们的汇总结果
# group: 多个任务并行执行
# chord: 等待group中的所有任务完成后,将其结果作为参数传递给一个回调任务
parallel_tasks = [add.s(i, i+1) for i in range(5)]
chord_task = (group(parallel_tasks) | final_callback.s()).delay()
print(f"和弦任务 {chord_task.id} 已提交。")
print(f"和弦任务结果: {chord_task.get()}") # 会阻塞直到和弦完成

2.3 Celery 的高级编排原语

Celery 提供了强大的原语来编排复杂的工作流:

  • chain (链):任务按顺序执行,前一个任务的输出作为后一个任务的输入。
    task1.s(arg1, arg2) | task2.s(arg3) | task3.s()
  • group (组):多个任务并行执行,返回一个包含所有任务结果的列表。
    group(task1.s(), task2.s(), task3.s())
  • chord (和弦):一个 group 任务,当组内所有任务都完成后,将它们的汇总结果传递给一个回调任务。
    chord([task1.s(), task2.s()], callback_task.s())
  • map / starmap (映射):将一个任务应用于一个列表中的所有元素,类似于 Python 的 map 函数。
    task.map(list_of_args)
  • chunks (分块):将一个可迭代对象分成多个块,每个块由一个单独的任务处理,通常与 group 结合使用。

这些原语是构建复杂、多步骤、长时间离线任务的基础。它们使得我们能够将大型任务分解为更小的、可管理的部分,并灵活地定义它们的执行顺序和并发策略。


三、LangChain:LLM驱动工作流的赋能者

LangChain 是一个框架,用于开发由大型语言模型(LLM)驱动的应用程序。它提供了一套工具、组件和接口,使得开发者能够更容易地构建复杂的 LLM 应用,例如问答系统、聊天机器人、数据分析工具等。

3.1 LangChain 在长周期工作流中的作用

在超长周期的离线任务中引入 LLM 能力,可以极大地提升任务的智能化和自动化水平。例如:

  • 智能文档处理:自动提取关键信息、生成摘要、进行语义分析、分类。
  • 复杂数据报告生成:根据原始数据和分析结果,自动撰写结构化报告。
  • 代码生成与审查:根据需求生成代码草稿,或对现有代码进行审查。
  • 多步骤决策与规划:LLM Agent 可以作为智能控制器,根据当前状态和目标动态规划下一步操作。

LangChain 的模块化设计使得将 LLM 功能集成到 Celery 任务中变得非常方便。我们可以将一个 LangChain Chain、Agent 或 Tool 封装成一个 Celery 任务,使其成为我们整体工作流的一部分。

3.2 LangChain 核心概念回顾

组件名称 职责描述 在工作流中的应用示例
LLMs 封装了与各种大语言模型(如 OpenAI GPT, Google Gemini, Hugging Face models)交互的接口。 作为 Celery 任务的一部分,调用 LLM 进行文本生成、摘要等。
Prompts 管理和优化给 LLM 的输入提示词。支持模板化和变量替换。 定义标准化的提示模板,确保 Celery 任务中 LLM 调用的质量和一致性。
Chains 将 LLM、Prompt 和其他组件(如解析器、数据加载器)链接起来,形成一个端到端的逻辑流。 一个 Celery 任务可以是一个 LangChain Chain,例如 RetrievalQAChain 用于文档问答。
Retrievers 从外部数据源(如向量数据库)检索相关文档或信息。RAG (Retrieval Augmented Generation) 模式的关键。 Celery 任务中,利用 Retriever 从预先处理好的向量数据库中获取上下文。
Tools 赋予 LLM 执行特定操作的能力,如搜索网页、查询数据库、执行代码、调用自定义API。 Celery 任务中,LLM 可以通过 Tool 触发其他 Celery 任务,或查询外部系统状态。
Agents 利用 LLM 的推理能力,根据目标和可用 Tools 动态规划并执行一系列操作。 一个 Celery 任务可以封装一个 Agent,使其自主完成多步骤的智能工作。
Output Parsers 结构化 LLM 的输出,将其转换为特定的数据格式(如 JSON)。 确保 Celery 任务中 LLM 的输出能够被后续任务正确解析和使用。

3.3 将 LangChain 集成到 Celery 任务中

为了在 Celery 任务中使用 LangChain,我们通常会在任务函数内部实例化和运行 LangChain 组件。这需要确保 Celery Worker 运行环境中安装了所有必要的 LangChain 依赖和 LLM 访问凭证。

示例:一个 LangChain 驱动的文档摘要任务

首先,安装 LangChain 和一个 LLM 库(如 openai):

pip install langchain openai tiktoken

然后,在 tasks.py 中添加一个 LangChain 任务:

# tasks.py (添加以下代码)
import os
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from celery_app import app

# 确保在worker环境中设置了OPENAI_API_KEY
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY" # 实际生产中应通过环境变量或安全配置注入

@app.task(bind=True, max_retries=5, default_retry_delay=300) # 增加重试次数和延迟
def summarize_document_chunk(self, chunk_id: str, document_text: str):
    """
    使用 LangChain 和 LLM 摘要一个文档片段。
    这是一个可能长时间运行且依赖外部API的任务。
    """
    print(f"[{chunk_id}] 开始摘要文档片段...")
    self.update_state(
        state='PROGRESS',
        meta={'current_step': 'initializing_llm', 'progress': 0}
    )

    try:
        # 初始化 LLM
        llm = OpenAI(temperature=0.7, openai_api_key=os.getenv("OPENAI_API_KEY")) # 从环境变量获取API Key

        # 定义 Prompt Template
        prompt_template = PromptTemplate(
            input_variables=["text"],
            template="请对以下文档片段进行简洁的摘要,并提取2-3个关键点:nn{text}nn摘要和关键点:"
        )

        # 构建 LLM Chain
        llm_chain = LLMChain(llm=llm, prompt=prompt_template)

        self.update_state(
            state='PROGRESS',
            meta={'current_step': 'calling_llm', 'progress': 50}
        )

        # 执行 Chain
        summary_result = llm_chain.run(document_text)

        self.update_state(
            state='PROGRESS',
            meta={'current_step': 'parsing_result', 'progress': 90}
        )

        print(f"[{chunk_id}] 摘要完成。")
        return {
            "chunk_id": chunk_id,
            "summary": summary_result.strip()
        }

    except Exception as e:
        print(f"[{chunk_id}] 摘要任务失败: {e}")
        # 针对API调用失败等情况,可以考虑重试
        if "rate limit" in str(e).lower() or "connection error" in str(e).lower():
            print(f"[{chunk_id}] 遇到临时错误,尝试重试...")
            self.retry(exc=e, countdown=self.request.retries * 60 + 300) # 递增重试延迟
        else:
            raise # 其他不可恢复错误直接抛出

调用 LangChain 任务:

# client.py (添加以下代码)
from tasks import summarize_document_chunk
import os

# 确保在调用任务前设置API Key (或者在worker环境中设置)
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

print("n------ 演示 LangChain 摘要任务 ------")
sample_text = (
    "大型语言模型(LLMs)正在彻底改变我们与计算机交互的方式,"
    "它们能够理解和生成人类语言,从而实现各种应用,"
    "如内容创作、问答系统和代码辅助。然而,LLMs的训练和运行成本高昂,"
    "并且在处理长文本时存在上下文窗口限制。为了解决这些问题,"
    "研究人员正在探索更高效的模型架构、蒸馏技术以及检索增强生成(RAG)方法。"
)
summary_task = summarize_document_chunk.delay("TEXT-CHUNK-001", sample_text)
print(f"LangChain 摘要任务 {summary_task.id} 已提交。")

while not summary_task.ready():
    status = summary_task.status
    info = summary_task.info
    print(f"摘要任务 {summary_task.id} 状态: {status}, 步骤: {info.get('current_step', 'N/A')}")
    time.sleep(5)

print(f"摘要任务 {summary_task.id} 最终状态: {summary_task.status}")
if summary_task.successful():
    result = summary_task.get()
    print(f"摘要结果: {result['summary']}")
else:
    print(f"摘要任务失败原因: {summary_task.traceback}")

通过这种方式,我们将 LangChain 的智能能力封装到 Celery 的可靠执行框架中,为构建更复杂的智能工作流奠定了基础。


四、编排超长周期工作流:Celery 与 LangChain 的融合

处理数小时的离线任务,其核心挑战在于状态管理容错恢复。任务不能仅仅是简单的链式调用,因为任何一个环节的失败都可能导致整个工作流中断,且无法从中间恢复。我们需要一种机制来持久化工作流的整体状态和每个子任务的中间结果。

4.1 核心设计模式:显式状态管理

为了实现超长周期工作流的可靠编排,我们引入显式状态管理。这意味着我们将工作流的整体进度、每个子任务的状态和关键中间数据存储在一个持久化的数据库中。

数据库模型示例:

我们假设使用一个关系型数据库(如 PostgreSQL)来存储工作流状态。

# models.py (使用 SQLAlchemy ORM 示例)
from sqlalchemy import create_engine, Column, String, Text, DateTime, JSON, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import json

Base = declarative_base()

class WorkflowRun(Base):
    """
    表示一个完整工作流的运行实例。
    """
    __tablename__ = 'workflow_runs'
    id = Column(String, primary_key=True) # 可以是UUID或自定义ID
    name = Column(String, nullable=False)
    status = Column(String, default='PENDING', nullable=False) # PENDING, RUNNING, COMPLETED, FAILED, PAUSED
    input_data = Column(JSON, nullable=True) # 存储工作流的初始输入,如文件路径、配置等
    output_data = Column(JSON, nullable=True) # 存储工作流的最终输出
    start_time = Column(DateTime, default=datetime.utcnow)
    end_time = Column(DateTime, nullable=True)
    error_message = Column(Text, nullable=True)

    tasks = relationship("WorkflowTaskState", back_populates="workflow_run", order_by="WorkflowTaskState.start_time")

    def __repr__(self):
        return f"<WorkflowRun(id='{self.id}', status='{self.status}')>"

class WorkflowTaskState(Base):
    """
    表示工作流中一个特定Celery任务的执行状态。
    """
    __tablename__ = 'workflow_task_states'
    id = Column(String, primary_key=True) # 可以是Celery task ID
    workflow_run_id = Column(String, ForeignKey('workflow_runs.id'), nullable=False)
    task_name = Column(String, nullable=False) # Celery任务的名称
    celery_task_id = Column(String, unique=True, nullable=True) # 对应的Celery任务ID
    status = Column(String, default='PENDING', nullable=False) # PENDING, STARTED, PROGRESS, SUCCESS, FAILURE, RETRY
    intermediate_result = Column(JSON, nullable=True) # 存储任务的中间结果
    error_message = Column(Text, nullable=True)
    start_time = Column(DateTime, default=datetime.utcnow)
    end_time = Column(DateTime, nullable=True)
    retries = Column(Integer, default=0)

    workflow_run = relationship("WorkflowRun", back_populates="tasks")

    def __repr__(self):
        return f"<WorkflowTaskState(id='{self.id}', task='{self.task_name}', status='{self.status}')>"

# 数据库连接和会话管理
DATABASE_URL = "postgresql://user:password@localhost:5432/celery_langchain_db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def init_db():
    Base.metadata.create_all(bind=engine)

# 在tasks.py中导入SessionLocal
# from models import SessionLocal, WorkflowRun, WorkflowTaskState

# 注意:实际项目中,数据库配置和会话管理会更复杂,例如使用依赖注入。

更新 Celery 任务以持久化状态:

每个 Celery 任务在执行前、执行中和执行后都应该更新其对应的 WorkflowTaskState 记录。

# tasks.py (修改和新增任务)
from celery_app import app
from models import SessionLocal, WorkflowRun, WorkflowTaskState # 假设models.py已经配置好
from sqlalchemy.orm import Session
from datetime import datetime
import uuid
import time
import random
import os
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
# ... 其他必要的导入

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# 辅助函数:更新工作流运行状态
def update_workflow_run_status(db: Session, workflow_run_id: str, status: str, error_message: str = None, output_data: dict = None):
    workflow_run = db.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first()
    if workflow_run:
        workflow_run.status = status
        if status in ['COMPLETED', 'FAILED', 'PAUSED']:
            workflow_run.end_time = datetime.utcnow()
        if error_message:
            workflow_run.error_message = error_message
        if output_data:
            workflow_run.output_data = output_data
        db.add(workflow_run)
        db.commit()

# 辅助函数:更新工作流任务状态
def update_workflow_task_state(db: Session, workflow_task_state_id: str, status: str, celery_task_id: str = None, intermediate_result: dict = None, error_message: str = None):
    task_state = db.query(WorkflowTaskState).filter(WorkflowTaskState.id == workflow_task_state_id).first()
    if task_state:
        task_state.status = status
        if celery_task_id:
            task_state.celery_task_id = celery_task_id
        if intermediate_result:
            task_state.intermediate_result = intermediate_result
        if error_message:
            task_state.error_message = error_message
        if status in ['SUCCESS', 'FAILURE']:
            task_state.end_time = datetime.utcnow()
        db.add(task_state)
        db.commit()

# 重写一个Celery任务以包含状态管理
@app.task(bind=True, max_retries=5, default_retry_delay=60)
def process_document_chunk_with_llm(self, workflow_run_id: str, workflow_task_state_id: str, chunk_data: dict):
    """
    一个 Celery 任务,处理文档片段,并使用 LangChain 进行智能分析。
    """
    db = next(get_db()) # 获取数据库会话
    chunk_id = chunk_data.get('id')
    document_text = chunk_data.get('text')
    task_name = "process_document_chunk_with_llm"

    print(f"[{workflow_run_id}][{chunk_id}] 开始处理文档片段。")

    # 记录任务开始
    update_workflow_task_state(db, workflow_task_state_id, 'STARTED', celery_task_id=self.request.id)
    self.update_state(state='PROGRESS', meta={'current_step': 'initializing', 'progress': 0})

    try:
        llm = OpenAI(temperature=0.7, openai_api_key=os.getenv("OPENAI_API_KEY"))
        prompt_template = PromptTemplate(
            input_variables=["text"],
            template="请对以下文档片段进行简洁的摘要,提取关键点,并识别任何潜在的实体(如人名、地名、组织):nn{text}nn结构化输出:n摘要:n关键点:n实体:"
        )
        llm_chain = LLMChain(llm=llm, prompt=prompt_template)

        self.update_state(state='PROGRESS', meta={'current_step': 'calling_llm', 'progress': 30})
        llm_output = llm_chain.run(document_text)

        # 假设我们有一个解析器来结构化LLM的输出
        parsed_output = parse_llm_output(llm_output) # 这是一个虚构的函数

        result = {
            "chunk_id": chunk_id,
            "summary": parsed_output.get("summary"),
            "key_points": parsed_output.get("key_points"),
            "entities": parsed_output.get("entities"),
            "raw_llm_output": llm_output
        }

        # 记录任务成功和结果
        update_workflow_task_state(db, workflow_task_state_id, 'SUCCESS', intermediate_result=result)
        self.update_state(state='SUCCESS', meta={'progress': 100, 'result': result})
        print(f"[{workflow_run_id}][{chunk_id}] 文档片段处理成功。")
        return result

    except Exception as e:
        error_msg = f"任务 {task_name} 失败: {e}"
        print(error_msg)
        update_workflow_task_state(db, workflow_task_state_id, 'FAILURE', error_message=error_msg)
        self.update_state(state='FAILURE', meta={'error': error_msg})

        # 根据错误类型进行重试
        if "rate limit" in str(e).lower() or "connection error" in str(e).lower():
            self.retry(exc=e, countdown=self.request.retries * 120 + 300) # 更长的重试延迟
        else:
            # 对于其他错误,可能不需要重试,直接抛出
            raise

def parse_llm_output(output_text: str) -> dict:
    """
    虚构的LLM输出解析函数。实际中可能需要更复杂的正则或专门的OutputParser。
    """
    summary = ""
    key_points = []
    entities = []

    # 简单示例解析逻辑
    lines = output_text.split('n')
    in_summary = False
    in_key_points = False
    in_entities = False

    for line in lines:
        if line.startswith("摘要:"):
            summary = line[3:].strip()
            in_summary = True
            in_key_points = False
            in_entities = False
        elif line.startswith("关键点:"):
            key_points.append(line[4:].strip())
            in_summary = False
            in_key_points = True
            in_entities = False
        elif line.startswith("实体:"):
            entities.append(line[3:].strip())
            in_summary = False
            in_key_points = False
            in_entities = True
        elif in_key_points and line.strip().startswith("-"):
            key_points.append(line.strip()[1:].strip())
        elif in_entities and line.strip() and not line.strip().startswith("-"): # 假设实体是逗号分隔
            entities.extend([e.strip() for e in line.split(',') if e.strip()])

    return {
        "summary": summary,
        "key_points": key_points,
        "entities": entities
    }

@app.task(bind=True)
def final_workflow_report_generation(self, workflow_run_id: str, all_chunk_results: list):
    """
    接收所有文档片段的分析结果,并生成最终报告。
    """
    db = next(get_db())
    task_name = "final_workflow_report_generation"
    workflow_task_state_id = str(uuid.uuid4()) # 为这个任务创建一个新的任务状态ID

    # 在数据库中创建此任务的状态记录
    db_task_state = WorkflowTaskState(
        id=workflow_task_state_id,
        workflow_run_id=workflow_run_id,
        task_name=task_name,
        status='PENDING',
        celery_task_id=self.request.id
    )
    db.add(db_task_state)
    db.commit()

    print(f"[{workflow_run_id}] 开始生成最终报告...")
    update_workflow_task_state(db, workflow_task_state_id, 'STARTED', celery_task_id=self.request.id)
    self.update_state(state='PROGRESS', meta={'current_step': 'aggregating_results', 'progress': 0})

    try:
        # 聚合所有片段的摘要和关键点
        full_summary_parts = [r['summary'] for r in all_chunk_results if r and r.get('summary')]
        all_key_points = [kp for r in all_chunk_results for kp in (r['key_points'] if r and r.get('key_points') else [])]
        all_entities = [ent for r in all_chunk_results for ent in (r['entities'] if r and r.get('entities') else [])]

        # 使用 LangChain Agent 进行最终报告的生成和结构化
        # 这里为了简化,我们直接用一个LLM Chain来做,实际可能需要Agent
        llm = OpenAI(temperature=0.6, openai_api_key=os.getenv("OPENAI_API_KEY"))
        report_prompt = PromptTemplate(
            input_variables=["full_summary", "key_points_list", "entities_list"],
            template=(
                "根据以下摘要、关键点和实体列表,生成一份全面且结构化的报告。请突出主要内容,"
                "并对报告内容进行逻辑分段。确保报告内容连贯、专业。nn"
                "所有摘要片段:n{full_summary}nn"
                "所有关键点:n{key_points_list}nn"
                "所有实体:n{entities_list}nn"
                "最终报告:"
            )
        )
        report_chain = LLMChain(llm=llm, prompt=report_prompt)

        self.update_state(state='PROGRESS', meta={'current_step': 'generating_final_report', 'progress': 50})
        final_report_text = report_chain.run(
            full_summary="n".join(full_summary_parts),
            key_points_list="n".join(set(all_key_points)), # 去重
            entities_list=", ".join(set(all_entities)) # 去重
        )

        final_output_data = {
            "final_report": final_report_text,
            "aggregated_summaries": full_summary_parts,
            "unique_key_points": list(set(all_key_points)),
            "unique_entities": list(set(all_entities))
        }

        # 记录任务成功,并更新 WorkflowRun 的最终输出
        update_workflow_task_state(db, workflow_task_state_id, 'SUCCESS', intermediate_result=final_output_data)
        update_workflow_run_status(db, workflow_run_id, 'COMPLETED', output_data=final_output_data)
        self.update_state(state='SUCCESS', meta={'progress': 100, 'result': final_output_data})
        print(f"[{workflow_run_id}] 最终报告生成成功,工作流完成。")
        return final_output_data

    except Exception as e:
        error_msg = f"最终报告生成任务失败: {e}"
        print(error_msg)
        update_workflow_task_state(db, workflow_task_state_id, 'FAILURE', error_message=error_msg)
        update_workflow_run_status(db, workflow_run_id, 'FAILED', error_message=error_msg)
        self.update_state(state='FAILURE', meta={'error': error_msg})
        raise

4.2 协调器(Orchestrator)的实现

我们需要一个协调器来启动工作流,并利用 Celery 的链、组、和弦等原语来编排任务。这个协调器本身不是一个 Celery 任务,而是一个客户端逻辑,它负责:

  1. 创建 WorkflowRun 记录。
  2. 为每个将要执行的 Celery 任务创建 WorkflowTaskState 记录。
  3. 构建 Celery 任务组合(链、组、和弦)。
  4. 将组合任务提交给 Celery。
  5. (可选)监听工作流的整体进度并提供状态查询接口。
# orchestrator.py
import uuid
from models import SessionLocal, WorkflowRun, WorkflowTaskState, init_db
from tasks import process_document_chunk_with_llm, final_workflow_report_generation
from celery import group, chain, chord
from datetime import datetime
import time
import os

# 初始化数据库
init_db()

def start_document_analysis_workflow(document_content: str, workflow_name: str = "Document Analysis"):
    """
    启动一个文档分析工作流。
    这个工作流将文档分割成块,并行处理每个块,然后汇总生成最终报告。
    """
    db = next(SessionLocal())
    workflow_run_id = str(uuid.uuid4())

    # 1. 创建 WorkflowRun 记录
    new_workflow_run = WorkflowRun(
        id=workflow_run_id,
        name=workflow_name,
        status='PENDING',
        input_data={'document_content_preview': document_content[:200] + '...'}
    )
    db.add(new_workflow_run)
    db.commit()
    print(f"工作流运行 {workflow_run_id} 已创建,状态: PENDING。")

    # 2. 文档分块 (这里直接模拟分块,实际可能是一个独立的Celery任务)
    chunks = []
    chunk_size = 1000  # 假设每1000字符一个块
    for i in range(0, len(document_content), chunk_size):
        chunk_id = f"{workflow_run_id}-chunk-{i // chunk_size}"
        chunks.append({
            "id": chunk_id,
            "text": document_content[i:i + chunk_size]
        })
    print(f"文档已分割为 {len(chunks)} 个片段。")

    # 3. 为每个文档块创建 WorkflowTaskState 记录,并准备 Celery 任务
    parallel_chunk_tasks = []
    for chunk in chunks:
        task_state_id = str(uuid.uuid4())
        db_task_state = WorkflowTaskState(
            id=task_state_id,
            workflow_run_id=workflow_run_id,
            task_name="process_document_chunk_with_llm",
            status='PENDING',
            intermediate_result={'chunk_id': chunk['id']} # 存储一些初始信息
        )
        db.add(db_task_state)

        # 将 WorkflowRun ID 和 WorkflowTaskState ID 传递给 Celery 任务
        parallel_chunk_tasks.append(
            process_document_chunk_with_llm.s(workflow_run_id, task_state_id, chunk)
        )
    db.commit() # 批量提交任务状态记录

    # 4. 构建 Celery 和弦 (Chord): 并行处理所有块,然后汇总
    # group: 并行执行所有 process_document_chunk_with_llm 任务
    # final_workflow_report_generation.s: 回调任务,接收 group 的结果
    full_workflow_chord = chord(
        group(parallel_chunk_tasks),
        final_workflow_report_generation.s(workflow_run_id)
    )

    # 5. 提交 Celery 工作流
    print(f"提交 Celery 和弦任务...")
    celery_async_result = full_workflow_chord.delay()

    # 更新 WorkflowRun 为 RUNNING 状态
    db_workflow_run = db.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first()
    if db_workflow_run:
        db_workflow_run.status = 'RUNNING'
        db.add(db_workflow_run)
        db.commit()

    db.close()
    return workflow_run_id, celery_async_result.id

def monitor_workflow_status(workflow_run_id: str):
    """
    监控并打印工作流的实时状态。
    """
    db = next(SessionLocal())
    while True:
        workflow_run = db.query(WorkflowRun).filter(WorkflowRun.id == workflow_run_id).first()
        if not workflow_run:
            print(f"未找到工作流 {workflow_run_id}")
            break

        print(f"n--- 工作流 {workflow_run_id} (状态: {workflow_run.status}) ---")
        for task_state in workflow_run.tasks:
            # 过滤掉初始PENDING状态,只显示已提交或正在进行的任务
            if task_state.celery_task_id: 
                print(f"  任务 '{task_state.task_name}' ({task_state.id[:8]}...): CeleryID={task_state.celery_task_id[:8]}... Status={task_state.status}")
                if task_state.error_message:
                    print(f"    错误: {task_state.error_message[:100]}...")

        if workflow_run.status in ['COMPLETED', 'FAILED']:
            print(f"工作流结束。最终状态: {workflow_run.status}, 耗时: {workflow_run.end_time - workflow_run.start_time}")
            if workflow_run.output_data:
                print(f"部分输出数据预览: {str(workflow_run.output_data)[:500]}...")
            if workflow_run.error_message:
                print(f"工作流错误: {workflow_run.error_message}")
            break

        time.sleep(10) # 每10秒刷新一次
    db.close()

if __name__ == '__main__':
    # 确保设置了API Key
    # os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

    if not os.getenv("OPENAI_API_KEY"):
        print("错误: 请设置 OPENAI_API_KEY 环境变量以运行 LangChain 任务。")
        exit(1)

    # 模拟一个非常长的文档
    long_document = (
        "这是一个非常长的示例文档,它将包含多个段落和主题。nn"
        "第一部分关于人工智能的最新进展。近年来,深度学习取得了突破性进展,"
        "特别是在自然语言处理(NLP)和计算机视觉领域。大型语言模型(LLMs)如GPT-3、GPT-4,"
        "以及最近的Claude和Gemini,展示了惊人的文本生成、理解和推理能力。它们正在改变内容创作、"
        "客户服务和软件开发等多个行业。然而,这些模型的训练和部署成本巨大,并且存在偏见、"
        "幻觉和伦理方面的挑战。研究人员正在努力提高模型的透明度、可解释性和安全性。nn"
        "第二部分探讨气候变化及其影响。全球气温持续上升,导致极端天气事件频发,"
        "如热浪、干旱、洪水和森林火灾。海平面上升威胁着沿海城市和生态系统。为了应对气候变化,"
        "国际社会正在推动能源转型,从化石燃料转向可再生能源,如太阳能和风能。同时,碳捕获技术、"
        "植树造林和循环经济模式也被视为重要的解决方案。然而,实现这些目标需要全球范围内的合作和政策支持。nn"
        "第三部分关注量子计算的前景。量子计算是一种全新的计算范式,它利用量子力学的原理,"
        "如叠加和纠缠,来处理传统计算机无法解决的复杂问题。虽然目前仍处于早期发展阶段,"
        "但量子计算有望在药物研发、材料科学、金融建模和密码学等领域带来革命性的突破。IBM、Google和微软等"
        "科技巨头正在投入大量资源进行量子硬件和软件的研发。然而,构建稳定、可靠的量子计算机仍然面临巨大的技术挑战,"
        "包括量子比特的相干性、纠错和可扩展性问题。nn"
    ) * 10 # 将文档重复10次,使其更长

    print("启动文档分析工作流...")
    wf_id, _ = start_document_analysis_workflow(long_document, "大型文档智能分析")

    print(f"工作流 {wf_id} 已提交。开始监控...")
    monitor_workflow_status(wf_id)
    print("监控结束。")

运行整个系统:

  1. 启动 Redis 和 PostgreSQL:确保它们正在运行。
  2. 创建数据库表:在 orchestrator.pyif __name__ == '__main__': 块中调用 init_db()
  3. 设置 OpenAI API Keyexport OPENAI_API_KEY="sk-..."
  4. 启动 Celery Workercelery -A celery_app worker --loglevel=info -P gevent -c 10 (使用 geventeventlet 可以提高IO密集型任务的并发,LLM API调用通常是IO密集型)。
  5. 运行 Orchestrator 客户端python orchestrator.py

这个例子展示了一个完整的超长周期工作流:文档被分成多个块,每个块由一个独立的 Celery 任务并行处理,该任务内部调用 LangChain 进行智能分析。所有这些并行任务完成后,一个回调任务会聚合结果,并再次使用 LangChain 生成最终的综合报告。整个过程的状态都被持久化到数据库中,确保了即使在长时间运行中途出现故障,也能有迹可循,甚至支持从上次成功点恢复(尽管本示例未直接实现恢复逻辑,但状态持久化是其基础)。

4.3 Agentic Workflow Orchestration (高级)

更高级的编排方式是使用 LangChain Agent 作为工作流本身的智能控制器。Agent 可以根据当前工作流状态、目标和可用工具(Tools)动态决定下一步要执行的操作,包括启动 Celery 任务、查询 Celery 任务状态、解析结果等。

Agent 作为工作流控制器:

  1. 定义 Celery Dispatch Tool:Agent 可以调用此工具来提交新的 Celery 任务。
  2. 定义 Celery Status Check Tool:Agent 可以调用此工具来查询已提交 Celery 任务的状态和结果。
  3. 定义 Workflow State Update Tool:Agent 可以调用此工具来更新数据库中的 WorkflowRunWorkflowTaskState

这种方式让工作流的决策逻辑更加灵活和适应性强,特别适用于那些步骤之间有复杂条件判断或需要动态规划的任务。例如,如果一个文档片段的摘要结果不理想,Agent 可以决定将其重新处理或提交给人类审查任务。


五、监控、扩展与生产考量

将 Celery 和 LangChain 部署到生产环境需要考虑诸多因素,以确保系统的稳定性、性能和可维护性。

5.1 监控

  • Celery Flower:Celery 官方推荐的实时监控工具,提供任务状态、历史、工作者状态等信息。
    pip install flower
    celery -A celery_app flower --port=5555
  • 日志记录:配置 Celery Worker 和任务的详细日志,将日志发送到集中式日志系统(如 ELK Stack, Grafana Loki)。
  • 指标收集:使用 Prometheus/Grafana 收集 Celery Worker 的性能指标(CPU、内存使用率、任务处理速率、队列深度)以及 LLM API 的调用延迟、错误率等。
  • 数据库监控:监控 WorkflowRunWorkflowTaskState 表,可以直观地了解工作流的整体健康状况。

5.2 扩展性

  • Celery Worker 扩展:根据任务负载,可以水平扩展 Celery Worker 实例。Worker 可以部署在多台机器上,提高并发处理能力。
  • Broker 扩展:Redis 或 RabbitMQ 都可以通过集群模式进行扩展,以处理更高的消息吞吐量和提高可用性。
  • Backend 扩展:如果使用 Redis 作为 Backend,也需要考虑其扩展性。对于高并发的结果存储,PostgreSQL 或其他数据库可能更合适。
  • LLM API 限制:LLM API 通常有速率限制和并发限制。在设计工作流时,需要考虑这些限制,可能需要实现指数退避重试、令牌桶限流或负载均衡。

5.3 资源管理

  • 内存与 CPU:LLM 推理(尤其是本地模型)可能非常消耗内存和 CPU。确保 Celery Worker 运行在资源充足的机器上,或者将特定的 LLM 任务分配给专门的高配置 Worker 队列。
  • 并发控制:Celery Worker 的并发度 (-c 参数) 需要根据任务类型(IO 密集型 vs. CPU 密集型)和机器资源进行调整。过高的并发可能导致 OOM 或性能下降。
  • 批处理:如果可能,将多个小的 LLM 调用合并为一次批处理调用,可以减少 API 调用次数和开销。

5.4 容错性与韧性

  • 任务重试:如示例所示,充分利用 Celery 的 max_retriesdefault_retry_delay 参数,并根据错误类型实现自定义重试逻辑。
  • 幂等性:设计任务时考虑幂等性。即使任务重复执行多次,其对外部系统的影响也应该是相同的。这对于重试机制至关重要。
  • 心跳与故障检测:Celery Worker 会定期向 Broker 发送心跳。Flower 可以监控 Worker 的在线状态。
  • 消息持久化:确保 Broker 配置为持久化消息,防止 Broker 重启时任务丢失。
  • 数据库事务:在更新 WorkflowRunWorkflowTaskState 时,使用数据库事务来保证数据的一致性。

5.5 安全性

  • API Key 管理:LLM API Key 属于敏感信息,不应硬编码。使用环境变量、秘密管理服务(如 Vault, AWS Secrets Manager)或 Kubernetes Secrets。
  • 网络安全:确保 Celery Worker、Broker 和 Backend 之间的通信安全,例如使用 TLS/SSL 加密。限制对这些组件的访问。
  • 任务输入/输出验证:对进入 Celery 任务的数据和任务输出进行严格的验证,防止注入攻击或数据损坏。

5.6 成本管理

  • LLM API 成本:大型语言模型的 API 调用通常按令牌计费。优化提示词、使用缓存、对不必要的调用进行剪枝是降低成本的关键。
  • 云资源成本:Celery Worker 运行在云服务器上会产生计算费用。根据负载动态调整 Worker 数量(自动伸缩)可以有效控制成本。

六、异步智能工作流的未来展望

通过 Celery 和 LangChain 的结合,我们构建了一个既能处理长时间运行任务的健壮性,又能融入复杂智能逻辑的强大框架。这种组合使得我们能够解决过去难以自动化或需要大量人工干预的复杂业务问题。

展望未来,这种异步智能工作流将继续演进。更先进的工作流编排工具(如 Apache Airflow、Temporal.io、Cadence)可能会与 LangChain 深度集成,提供更强大的可视化、版本控制、依赖管理和故障恢复能力。同时,随着 LLM 自身能力的提升,Agentic 工作流将变得更加智能和自主,能够进行更复杂的规划、自我修正和适应性学习,从而实现真正意义上的“自愈”和“自适应”智能离线任务。这种技术融合无疑将开启自动化和人工智能应用的新篇章。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注