如何使用`Celery`的`任务`签名和`工作流`实现`复杂`的`后台`任务。

Celery 任务签名与工作流:构建复杂后台任务的艺术

大家好!今天我们来深入探讨 Celery 中两个非常重要的概念:任务签名(Signatures)和工作流(Workflows)。它们是构建复杂、可靠的后台任务系统的基石。我们将从任务签名开始,逐步过渡到更高级的工作流应用,并通过大量的代码示例,让你真正理解如何运用它们来解决实际问题。

任务签名:定义任务调用的蓝图

任务签名本质上就是一个定义了任务调用的蓝图。它包含了执行任务所需的全部信息,例如要执行的任务函数、传递给任务的参数、以及一些执行选项(例如重试策略、截止时间等)。有了签名,我们可以将任务调用与其执行解耦,从而实现更灵活的调度和编排。

1. 基本概念

在 Celery 中,我们可以通过 signature 函数或者任务装饰器的 .s() 方法来创建任务签名。

from celery import Celery

app = Celery('my_app', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

# 使用 signature 函数创建签名
from celery import signature
sig1 = signature('my_app.add', args=(1, 2), countdown=10)

# 使用 .s() 方法创建签名
sig2 = add.s(1, 2)

# 使用 .si() 方法创建 immutable 签名
sig3 = add.si(1, 2) # 必须指定所有参数,且不能更改

# 使用 .s() 结合 .set() 方法设置额外的选项
sig4 = add.s(1, 2).set(countdown=10, expires=30)

让我们逐一分析上述代码:

  • signature('my_app.add', args=(1, 2), countdown=10): signature 函数接收任务的名称(字符串)、args(位置参数元组)、kwargs(关键字参数字典)以及其他的执行选项。countdown=10 表示任务将在 10 秒后执行。

  • add.s(1, 2): add.s(1, 2)add 任务对象的 .s() 方法的调用,它创建了一个与 signature 函数等价的签名。这种方式更加简洁,也更符合 Python 的习惯。ssignature 的缩写。

  • add.si(1, 2): add.si() 方法创建的是一个 immutable 签名。这意味着你必须在创建签名时提供所有的参数,并且之后无法修改它们。 sisignature immutable 的缩写。

  • add.s(1, 2).set(countdown=10, expires=30): .set() 方法允许你修改签名对象中的选项。例如,在这里我们设置了 countdownexpires 选项。expires 表示任务的过期时间,如果任务在指定时间内没有被执行,Celery 将会放弃执行该任务。

2. 签名对象的属性

签名对象有很多有用的属性,可以用来获取任务的信息。

属性 描述
name 任务的名称(字符串)。
args 任务的位置参数(元组)。
kwargs 任务的关键字参数(字典)。
options 任务的执行选项(字典),例如 countdownexpiresretryretry_policy 等。
immutable 布尔值,指示签名是否为 immutable。
type 签名的类型,可以是 'chord', 'group', 'chain'None
task 任务函数本身。
id 任务的唯一 ID,只有在任务被实际执行后才会生成。
sig = add.s(1, 2).set(countdown=10, expires=30)
print(sig.name)       # 输出: 'my_app.add'
print(sig.args)       # 输出: (1, 2)
print(sig.kwargs)     # 输出: {}
print(sig.options)    # 输出: {'countdown': 10, 'expires': 30}
print(sig.immutable)  # 输出: False
print(sig.type)       # 输出: None
print(sig.task)       # 输出: <@task: my_app.add of my_app>
print(sig.id)         # 输出: None (任务尚未执行)

3. 签名对象的执行

有了签名对象,我们可以通过 .apply_async() 方法来异步执行任务。

sig = add.s(1, 2).set(countdown=10)
result = sig.apply_async()
print(result.get()) # 阻塞直到任务完成并获取结果

apply_async() 方法返回一个 AsyncResult 对象,我们可以通过它来获取任务的执行状态和结果。

4. 偏函数应用

签名可以看作是任务函数的偏函数应用。这意味着我们可以预先绑定一部分参数,然后在稍后执行任务时再提供剩余的参数。

from functools import partial

# 使用 partial 创建偏函数
add_one = partial(add, 1)  # 预先绑定 x=1

# 将偏函数转换为签名
add_one_sig = app.signature(add_one.func, args=add_one.args, kwargs=add_one.keywords)

# 或者直接使用partial函数创建
add_one_sig = app.signature(partial(add, 1))

# 执行任务,提供 y 的值
result = add_one_sig.apply_async(kwargs={'y': 2})
print(result.get())  # 输出: 3

在这个例子中,我们使用 functools.partial 创建了一个偏函数 add_one,它预先绑定了 add 函数的 x 参数。然后,我们将 add_one 转换为一个签名,并在执行任务时提供了 y 参数。

5. 任务签名序列化与反序列化

任务签名可以被序列化成字符串,然后在稍后反序列化成签名对象。这使得我们可以将签名存储在数据库中,或者通过网络传输。

import pickle

sig = add.s(1, 2)

# 序列化
serialized_sig = pickle.dumps(sig)

# 反序列化
deserialized_sig = pickle.loads(serialized_sig)

# 执行任务
result = deserialized_sig.apply_async()
print(result.get())  # 输出: 3

在这个例子中,我们使用了 Python 的 pickle 模块来序列化和反序列化签名对象。当然,你也可以使用其他的序列化方式,例如 JSON。

工作流:编排复杂的任务流程

有了任务签名,我们就可以构建复杂的工作流了。Celery 提供了几种不同的工作流类型,包括链(Chains)、组(Groups)和 Chord。

1. 链(Chains)

链是一种线性工作流,其中每个任务的输出作为下一个任务的输入。我们可以使用 | 运算符或者 chain 函数来创建链。

from celery import chain

@app.task
def multiply(x, y):
    return x * y

@app.task
def square(x):
    return x * x

# 使用 | 运算符创建链
chain1 = add.s(1, 2) | multiply.s(3) | square.s()

# 使用 chain 函数创建链
chain2 = chain(add.s(1, 2), multiply.s(3), square.s())

# 执行链
result = chain1.apply_async()
print(result.get())  # 输出: 81 ((1+2)*3)^2 = 81

在这个例子中,我们创建了一个包含三个任务的链:addmultiplysquareadd 任务的输出 (1+2=3) 作为 multiply 任务的输入,multiply 任务的输出 (3*3=9) 作为 square 任务的输入,最终 square 任务的输出 (9*9=81) 作为整个链的结果。

2. 组(Groups)

组是一种并行工作流,其中多个任务可以同时执行。我们可以使用 group 函数来创建组。

from celery import group

# 创建组
group1 = group(add.s(1, 2), multiply.s(3, 4), square.s(5))

# 执行组
result = group1.apply_async()
print(result.get())  # 输出: [3, 12, 25]

在这个例子中,我们创建了一个包含三个任务的组:addmultiplysquare。这三个任务会同时执行,并且组的结果是一个包含所有任务结果的列表。

3. Chord

Chord 是一种特殊的任务类型,它将一个组和一个回调任务组合在一起。Chord 的工作流程如下:

  1. 一个组中的所有任务并行执行。
  2. 当组中的所有任务都完成后,回调任务会被执行。回调任务接收组中所有任务的结果作为输入。

我们可以使用 chord 函数来创建 Chord。

from celery import chord

@app.task
def total(results):
    return sum(results)

# 创建 Chord
chord1 = chord([add.s(1, 2), multiply.s(3, 4), square.s(5)], total.s())

# 执行 Chord
result = chord1.apply_async()
print(result.get())  # 输出: 40 (3 + 12 + 25 = 40)

在这个例子中,我们创建了一个 Chord,它包含一个包含三个任务的组和一个回调任务 totaltotal 任务接收组中所有任务的结果 (3, 12, 25) 作为输入,并计算它们的总和 (3+12+25=40)。

4. 组合工作流

我们可以将不同的工作流类型组合在一起,构建更复杂的任务流程。

# 组合链、组和 Chord
complex_workflow = (
    add.s(1, 2) |
    group(multiply.s(3, 4), square.s(5)) |
    chord(total.s())
)

# 执行复杂工作流
result = complex_workflow.apply_async()
print(result.get())

5. 错误处理

在复杂的任务流程中,错误处理至关重要。Celery 提供了多种机制来处理任务执行过程中发生的错误,包括重试、忽略和报告。

  • 重试(Retry): 我们可以使用 retryretry_policy 选项来配置任务的重试策略。

    @app.task(bind=True, retry_backoff=True, retry_kwargs={'max_retries': 3})
    def flaky_task(self):
        try:
            # 模拟一个可能失败的任务
            if random.random() < 0.5:
                raise Exception('Task failed!')
            return 'Task succeeded!'
        except Exception as exc:
            # 重试任务
            raise self.retry(exc=exc)

    在这个例子中,我们使用了 retry_backoff=True 来实现指数退避重试策略。这意味着每次重试之间的时间间隔会逐渐增加。retry_kwargs={'max_retries': 3} 限制了任务的最大重试次数为 3 次。

  • 忽略(Ignore): 我们可以使用 ignore_result 选项来忽略任务的结果。这可以提高性能,减少资源消耗。

    @app.task(ignore_result=True)
    def log_message(message):
        print(message)

    在这个例子中,log_message 任务的结果会被忽略。

  • 报告(Report): 我们可以使用 Celery 的信号机制来报告任务的执行状态。例如,我们可以使用 task_successtask_failure 信号来记录任务的成功和失败事件。

    from celery.signals import task_success, task_failure
    
    @task_success.connect
    def task_success_handler(sender=None, **kwargs):
        print(f'Task {sender.name} succeeded!')
    
    @task_failure.connect
    def task_failure_handler(sender=None, exception=None, **kwargs):
        print(f'Task {sender.name} failed with exception: {exception}')

    在这个例子中,我们定义了两个信号处理函数,task_success_handlertask_failure_handler。当任务成功完成时,task_success_handler 会被调用;当任务失败时,task_failure_handler 会被调用。

6. 动态工作流

在某些情况下,我们可能需要在运行时动态地创建工作流。例如,我们可能需要根据用户的输入来决定要执行的任务。

def create_dynamic_workflow(task_names, args):
    """
    动态创建工作流。

    Args:
        task_names: 一个包含任务名称的列表。
        args: 一个包含任务参数的列表。

    Returns:
        一个 Celery 链对象。
    """
    signatures = []
    for i, task_name in enumerate(task_names):
        signatures.append(app.signature(task_name, args=args[i]))
    return chain(*signatures)

# 动态创建工作流
task_names = ['my_app.add', 'my_app.multiply', 'my_app.square']
args = [(1, 2), (3, 4), (5,)]
dynamic_workflow = create_dynamic_workflow(task_names, args)

# 执行动态工作流
result = dynamic_workflow.apply_async()
print(result.get())

在这个例子中,我们定义了一个 create_dynamic_workflow 函数,它可以根据任务名称和参数动态地创建工作流。

7. 实际案例:图片处理流水线

让我们通过一个实际的案例来演示如何使用 Celery 的任务签名和工作流来构建一个复杂的后台任务系统。假设我们需要构建一个图片处理流水线,它可以完成以下任务:

  1. 上传图片。
  2. 调整图片大小。
  3. 添加水印。
  4. 生成缩略图。
  5. 将处理后的图片存储到云存储。

我们可以使用 Celery 的链来构建这个流水线。

@app.task
def upload_image(image_path):
    """上传图片."""
    # 模拟上传图片
    print(f"Uploading image: {image_path}")
    return "uploaded_image_url"

@app.task
def resize_image(image_url, width, height):
    """调整图片大小."""
    # 模拟调整图片大小
    print(f"Resizing image: {image_url} to {width}x{height}")
    return "resized_image_url"

@app.task
def add_watermark(image_url, watermark_text):
    """添加水印."""
    # 模拟添加水印
    print(f"Adding watermark: {watermark_text} to {image_url}")
    return "watermarked_image_url"

@app.task
def generate_thumbnail(image_url, size):
    """生成缩略图."""
    # 模拟生成缩略图
    print(f"Generating thumbnail: {image_url} with size {size}")
    return "thumbnail_url"

@app.task
def store_image(image_url):
    """存储图片到云存储."""
    # 模拟存储图片到云存储
    print(f"Storing image: {image_url} to cloud storage")
    return "cloud_storage_url"

# 创建图片处理流水线
image_processing_pipeline = (
    upload_image.s("path/to/image.jpg") |
    resize_image.s(width=800, height=600) |
    add_watermark.s(watermark_text="My Watermark") |
    generate_thumbnail.s(size=(100, 100)) |
    store_image.s()
)

# 执行流水线
result = image_processing_pipeline.apply_async()
print(result.get())

在这个例子中,我们定义了五个任务:upload_imageresize_imageadd_watermarkgenerate_thumbnailstore_image。然后,我们使用 Celery 的链将这些任务连接在一起,构建了一个图片处理流水线。

更健壮的系统

通过掌握 Celery 的任务签名和工作流,我们可以构建复杂、可靠的后台任务系统。 任务签名可以将任务调用与其执行解耦,从而实现更灵活的调度和编排。 工作流可以将多个任务组合在一起,构建复杂的任务流程。 结合错误处理、动态工作流等高级特性,我们可以构建一个健壮的系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注