Celery 任务签名与工作流:构建复杂后台任务的艺术
大家好!今天我们来深入探讨 Celery 中两个非常重要的概念:任务签名(Signatures)和工作流(Workflows)。它们是构建复杂、可靠的后台任务系统的基石。我们将从任务签名开始,逐步过渡到更高级的工作流应用,并通过大量的代码示例,让你真正理解如何运用它们来解决实际问题。
任务签名:定义任务调用的蓝图
任务签名本质上就是一个定义了任务调用的蓝图。它包含了执行任务所需的全部信息,例如要执行的任务函数、传递给任务的参数、以及一些执行选项(例如重试策略、截止时间等)。有了签名,我们可以将任务调用与其执行解耦,从而实现更灵活的调度和编排。
1. 基本概念
在 Celery 中,我们可以通过 signature
函数或者任务装饰器的 .s()
方法来创建任务签名。
from celery import Celery
app = Celery('my_app', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
# 使用 signature 函数创建签名
from celery import signature
sig1 = signature('my_app.add', args=(1, 2), countdown=10)
# 使用 .s() 方法创建签名
sig2 = add.s(1, 2)
# 使用 .si() 方法创建 immutable 签名
sig3 = add.si(1, 2) # 必须指定所有参数,且不能更改
# 使用 .s() 结合 .set() 方法设置额外的选项
sig4 = add.s(1, 2).set(countdown=10, expires=30)
让我们逐一分析上述代码:
-
signature('my_app.add', args=(1, 2), countdown=10)
:signature
函数接收任务的名称(字符串)、args
(位置参数元组)、kwargs
(关键字参数字典)以及其他的执行选项。countdown=10
表示任务将在 10 秒后执行。 -
add.s(1, 2)
:add.s(1, 2)
是add
任务对象的.s()
方法的调用,它创建了一个与signature
函数等价的签名。这种方式更加简洁,也更符合 Python 的习惯。s
是signature
的缩写。 -
add.si(1, 2)
:add.si()
方法创建的是一个 immutable 签名。这意味着你必须在创建签名时提供所有的参数,并且之后无法修改它们。si
是signature immutable
的缩写。 -
add.s(1, 2).set(countdown=10, expires=30)
:.set()
方法允许你修改签名对象中的选项。例如,在这里我们设置了countdown
和expires
选项。expires
表示任务的过期时间,如果任务在指定时间内没有被执行,Celery 将会放弃执行该任务。
2. 签名对象的属性
签名对象有很多有用的属性,可以用来获取任务的信息。
属性 | 描述 |
---|---|
name |
任务的名称(字符串)。 |
args |
任务的位置参数(元组)。 |
kwargs |
任务的关键字参数(字典)。 |
options |
任务的执行选项(字典),例如 countdown 、expires 、retry 、retry_policy 等。 |
immutable |
布尔值,指示签名是否为 immutable。 |
type |
签名的类型,可以是 'chord' , 'group' , 'chain' 或 None 。 |
task |
任务函数本身。 |
id |
任务的唯一 ID,只有在任务被实际执行后才会生成。 |
sig = add.s(1, 2).set(countdown=10, expires=30)
print(sig.name) # 输出: 'my_app.add'
print(sig.args) # 输出: (1, 2)
print(sig.kwargs) # 输出: {}
print(sig.options) # 输出: {'countdown': 10, 'expires': 30}
print(sig.immutable) # 输出: False
print(sig.type) # 输出: None
print(sig.task) # 输出: <@task: my_app.add of my_app>
print(sig.id) # 输出: None (任务尚未执行)
3. 签名对象的执行
有了签名对象,我们可以通过 .apply_async()
方法来异步执行任务。
sig = add.s(1, 2).set(countdown=10)
result = sig.apply_async()
print(result.get()) # 阻塞直到任务完成并获取结果
apply_async()
方法返回一个 AsyncResult
对象,我们可以通过它来获取任务的执行状态和结果。
4. 偏函数应用
签名可以看作是任务函数的偏函数应用。这意味着我们可以预先绑定一部分参数,然后在稍后执行任务时再提供剩余的参数。
from functools import partial
# 使用 partial 创建偏函数
add_one = partial(add, 1) # 预先绑定 x=1
# 将偏函数转换为签名
add_one_sig = app.signature(add_one.func, args=add_one.args, kwargs=add_one.keywords)
# 或者直接使用partial函数创建
add_one_sig = app.signature(partial(add, 1))
# 执行任务,提供 y 的值
result = add_one_sig.apply_async(kwargs={'y': 2})
print(result.get()) # 输出: 3
在这个例子中,我们使用 functools.partial
创建了一个偏函数 add_one
,它预先绑定了 add
函数的 x
参数。然后,我们将 add_one
转换为一个签名,并在执行任务时提供了 y
参数。
5. 任务签名序列化与反序列化
任务签名可以被序列化成字符串,然后在稍后反序列化成签名对象。这使得我们可以将签名存储在数据库中,或者通过网络传输。
import pickle
sig = add.s(1, 2)
# 序列化
serialized_sig = pickle.dumps(sig)
# 反序列化
deserialized_sig = pickle.loads(serialized_sig)
# 执行任务
result = deserialized_sig.apply_async()
print(result.get()) # 输出: 3
在这个例子中,我们使用了 Python 的 pickle
模块来序列化和反序列化签名对象。当然,你也可以使用其他的序列化方式,例如 JSON。
工作流:编排复杂的任务流程
有了任务签名,我们就可以构建复杂的工作流了。Celery 提供了几种不同的工作流类型,包括链(Chains)、组(Groups)和 Chord。
1. 链(Chains)
链是一种线性工作流,其中每个任务的输出作为下一个任务的输入。我们可以使用 |
运算符或者 chain
函数来创建链。
from celery import chain
@app.task
def multiply(x, y):
return x * y
@app.task
def square(x):
return x * x
# 使用 | 运算符创建链
chain1 = add.s(1, 2) | multiply.s(3) | square.s()
# 使用 chain 函数创建链
chain2 = chain(add.s(1, 2), multiply.s(3), square.s())
# 执行链
result = chain1.apply_async()
print(result.get()) # 输出: 81 ((1+2)*3)^2 = 81
在这个例子中,我们创建了一个包含三个任务的链:add
、multiply
和 square
。add
任务的输出 (1+2=3) 作为 multiply
任务的输入,multiply
任务的输出 (3*3=9) 作为 square
任务的输入,最终 square
任务的输出 (9*9=81) 作为整个链的结果。
2. 组(Groups)
组是一种并行工作流,其中多个任务可以同时执行。我们可以使用 group
函数来创建组。
from celery import group
# 创建组
group1 = group(add.s(1, 2), multiply.s(3, 4), square.s(5))
# 执行组
result = group1.apply_async()
print(result.get()) # 输出: [3, 12, 25]
在这个例子中,我们创建了一个包含三个任务的组:add
、multiply
和 square
。这三个任务会同时执行,并且组的结果是一个包含所有任务结果的列表。
3. Chord
Chord 是一种特殊的任务类型,它将一个组和一个回调任务组合在一起。Chord 的工作流程如下:
- 一个组中的所有任务并行执行。
- 当组中的所有任务都完成后,回调任务会被执行。回调任务接收组中所有任务的结果作为输入。
我们可以使用 chord
函数来创建 Chord。
from celery import chord
@app.task
def total(results):
return sum(results)
# 创建 Chord
chord1 = chord([add.s(1, 2), multiply.s(3, 4), square.s(5)], total.s())
# 执行 Chord
result = chord1.apply_async()
print(result.get()) # 输出: 40 (3 + 12 + 25 = 40)
在这个例子中,我们创建了一个 Chord,它包含一个包含三个任务的组和一个回调任务 total
。total
任务接收组中所有任务的结果 (3, 12, 25) 作为输入,并计算它们的总和 (3+12+25=40)。
4. 组合工作流
我们可以将不同的工作流类型组合在一起,构建更复杂的任务流程。
# 组合链、组和 Chord
complex_workflow = (
add.s(1, 2) |
group(multiply.s(3, 4), square.s(5)) |
chord(total.s())
)
# 执行复杂工作流
result = complex_workflow.apply_async()
print(result.get())
5. 错误处理
在复杂的任务流程中,错误处理至关重要。Celery 提供了多种机制来处理任务执行过程中发生的错误,包括重试、忽略和报告。
-
重试(Retry): 我们可以使用
retry
和retry_policy
选项来配置任务的重试策略。@app.task(bind=True, retry_backoff=True, retry_kwargs={'max_retries': 3}) def flaky_task(self): try: # 模拟一个可能失败的任务 if random.random() < 0.5: raise Exception('Task failed!') return 'Task succeeded!' except Exception as exc: # 重试任务 raise self.retry(exc=exc)
在这个例子中,我们使用了
retry_backoff=True
来实现指数退避重试策略。这意味着每次重试之间的时间间隔会逐渐增加。retry_kwargs={'max_retries': 3}
限制了任务的最大重试次数为 3 次。 -
忽略(Ignore): 我们可以使用
ignore_result
选项来忽略任务的结果。这可以提高性能,减少资源消耗。@app.task(ignore_result=True) def log_message(message): print(message)
在这个例子中,
log_message
任务的结果会被忽略。 -
报告(Report): 我们可以使用 Celery 的信号机制来报告任务的执行状态。例如,我们可以使用
task_success
和task_failure
信号来记录任务的成功和失败事件。from celery.signals import task_success, task_failure @task_success.connect def task_success_handler(sender=None, **kwargs): print(f'Task {sender.name} succeeded!') @task_failure.connect def task_failure_handler(sender=None, exception=None, **kwargs): print(f'Task {sender.name} failed with exception: {exception}')
在这个例子中,我们定义了两个信号处理函数,
task_success_handler
和task_failure_handler
。当任务成功完成时,task_success_handler
会被调用;当任务失败时,task_failure_handler
会被调用。
6. 动态工作流
在某些情况下,我们可能需要在运行时动态地创建工作流。例如,我们可能需要根据用户的输入来决定要执行的任务。
def create_dynamic_workflow(task_names, args):
"""
动态创建工作流。
Args:
task_names: 一个包含任务名称的列表。
args: 一个包含任务参数的列表。
Returns:
一个 Celery 链对象。
"""
signatures = []
for i, task_name in enumerate(task_names):
signatures.append(app.signature(task_name, args=args[i]))
return chain(*signatures)
# 动态创建工作流
task_names = ['my_app.add', 'my_app.multiply', 'my_app.square']
args = [(1, 2), (3, 4), (5,)]
dynamic_workflow = create_dynamic_workflow(task_names, args)
# 执行动态工作流
result = dynamic_workflow.apply_async()
print(result.get())
在这个例子中,我们定义了一个 create_dynamic_workflow
函数,它可以根据任务名称和参数动态地创建工作流。
7. 实际案例:图片处理流水线
让我们通过一个实际的案例来演示如何使用 Celery 的任务签名和工作流来构建一个复杂的后台任务系统。假设我们需要构建一个图片处理流水线,它可以完成以下任务:
- 上传图片。
- 调整图片大小。
- 添加水印。
- 生成缩略图。
- 将处理后的图片存储到云存储。
我们可以使用 Celery 的链来构建这个流水线。
@app.task
def upload_image(image_path):
"""上传图片."""
# 模拟上传图片
print(f"Uploading image: {image_path}")
return "uploaded_image_url"
@app.task
def resize_image(image_url, width, height):
"""调整图片大小."""
# 模拟调整图片大小
print(f"Resizing image: {image_url} to {width}x{height}")
return "resized_image_url"
@app.task
def add_watermark(image_url, watermark_text):
"""添加水印."""
# 模拟添加水印
print(f"Adding watermark: {watermark_text} to {image_url}")
return "watermarked_image_url"
@app.task
def generate_thumbnail(image_url, size):
"""生成缩略图."""
# 模拟生成缩略图
print(f"Generating thumbnail: {image_url} with size {size}")
return "thumbnail_url"
@app.task
def store_image(image_url):
"""存储图片到云存储."""
# 模拟存储图片到云存储
print(f"Storing image: {image_url} to cloud storage")
return "cloud_storage_url"
# 创建图片处理流水线
image_processing_pipeline = (
upload_image.s("path/to/image.jpg") |
resize_image.s(width=800, height=600) |
add_watermark.s(watermark_text="My Watermark") |
generate_thumbnail.s(size=(100, 100)) |
store_image.s()
)
# 执行流水线
result = image_processing_pipeline.apply_async()
print(result.get())
在这个例子中,我们定义了五个任务:upload_image
、resize_image
、add_watermark
、generate_thumbnail
和 store_image
。然后,我们使用 Celery 的链将这些任务连接在一起,构建了一个图片处理流水线。
更健壮的系统
通过掌握 Celery 的任务签名和工作流,我们可以构建复杂、可靠的后台任务系统。 任务签名可以将任务调用与其执行解耦,从而实现更灵活的调度和编排。 工作流可以将多个任务组合在一起,构建复杂的任务流程。 结合错误处理、动态工作流等高级特性,我们可以构建一个健壮的系统。