各位观众老爷,大家好!我是今天的主讲人,咱们今天聊聊Python世界里的一大利器:Celery,一个能让你轻松打造可伸缩、高可用任务队列系统的神器。
开场白:Celery是个什么玩意?
想象一下,你开了一家饭馆,客人络绎不绝。如果每来一个客人,你都要亲自洗菜、切菜、炒菜、端菜,那不得累死?这时候就需要服务员、洗碗工、厨师等各司其职,才能高效运转。Celery就相当于这个饭馆里的服务员、洗碗工和厨师,它负责把耗时的任务(比如发送邮件、处理图片、分析数据)从你的主程序里剥离出来,交给后台的工人(worker)去异步执行,让你的主程序可以专心服务客人(响应用户请求),保证饭馆(你的应用)流畅运行。
Celery的核心概念:
- 任务 (Task): 这就是你要Celery执行的具体工作,比如“发送欢迎邮件”、“生成PDF报表”。
- 工人 (Worker): Celery的执行者,负责接收任务并执行。可以启动多个Worker来提高并发处理能力。
- 消息队列 (Broker): 任务的“中转站”,负责接收来自主程序的任务,并将任务分发给Worker。常用的Broker有RabbitMQ和Redis。
- 结果存储 (Backend): 可选组件,用于存储任务的执行结果。常用的Backend有Redis、数据库等。
Celery的基本架构:
[主程序] –> [消息队列(Broker)] –> [Celery Worker] –> [结果存储(Backend)]
第一步:安装Celery及相关依赖
pip install celery redis # 如果使用Redis作为Broker和Backend
# 或者
pip install celery rabbitmq # 如果使用RabbitMQ作为Broker
第二步:创建一个Celery应用
# tasks.py
from celery import Celery
# 初始化Celery应用
app = Celery('my_tasks',
broker='redis://localhost:6379/0', # 使用Redis作为Broker
backend='redis://localhost:6379/0') # 使用Redis作为Backend
# 定义一个任务
@app.task
def add(x, y):
"""
一个简单的加法任务
"""
return x + y
@app.task
def send_email(recipient, subject, body):
"""
一个模拟发送邮件的任务,实际应用中替换为真正的邮件发送逻辑
"""
import time
time.sleep(5) # 模拟邮件发送的耗时
print(f"邮件已发送给 {recipient},主题:{subject},内容:{body}")
return True
if __name__ == '__main__':
# 仅用于演示,实际应用中不应该在这里调用任务
result = add.delay(4, 6) # 将任务放入队列,异步执行
print(f"add(4,6) 的结果: {result.get()}") # 获取任务结果,会阻塞直到任务完成
email_result = send_email.delay("[email protected]", "欢迎", "欢迎使用我们的服务!")
print(f"邮件发送任务已启动,任务ID:{email_result.id}") # 可以通过任务ID查询任务状态和结果
代码解释:
Celery('my_tasks', ...)
:创建Celery应用,my_tasks
是应用的名字。broker='redis://localhost:6379/0'
:指定消息队列的地址,这里使用Redis。backend='redis://localhost:6379/0'
:指定结果存储的地址,这里也使用Redis。@app.task
:装饰器,将一个函数标记为Celery任务。add.delay(4, 6)
:调用任务,.delay()
方法会将任务放入消息队列,由Worker异步执行。result.get()
:获取任务的执行结果,会阻塞直到任务完成。email_result.id
:获取任务的唯一ID,可以用来跟踪任务的状态。
第三步:启动Celery Worker
打开终端,进入 tasks.py
所在的目录,运行以下命令:
celery -A tasks worker -l info
命令解释:
celery
:Celery命令行工具。-A tasks
:指定Celery应用的位置,这里是tasks.py
文件。worker
:启动Worker进程。-l info
:设置日志级别为info
,可以看到更详细的日志信息。
看到类似如下输出,说明Worker启动成功:
[2023-10-27 10:00:00,000: INFO/MainProcess] celery@your-hostname ready.
第四步:从主程序调用任务
# main.py
from tasks import add, send_email
if __name__ == '__main__':
# 调用add任务
result = add.delay(10, 20)
print(f"add(10, 20) 任务已放入队列,任务ID:{result.id}")
# 调用send_email任务
email_result = send_email.delay("[email protected]", "你好", "这是一封测试邮件。")
print(f"邮件发送任务已放入队列,任务ID:{email_result.id}")
# 可以通过任务ID查询任务状态,或者在其他地方获取任务结果
# 例如:
# from celery.result import AsyncResult
# task_result = AsyncResult(result.id, app=add.app)
# print(f"任务状态:{task_result.status}")
# if task_result.ready():
# print(f"任务结果:{task_result.get()}")
代码解释:
from tasks import add, send_email
:导入定义的任务。add.delay(10, 20)
和send_email.delay(...)
:调用任务,.delay()
方法会将任务放入消息队列。result.id
和email_result.id
:获取任务的唯一ID。- 可以通过
celery.result.AsyncResult
来查询任务的状态和结果。
可伸缩性设计:
Celery天生就具备良好的可伸缩性,主要体现在以下几个方面:
- 水平扩展Worker: 启动更多的Worker进程,可以提高并发处理能力。只需要在不同的机器上运行
celery -A tasks worker -l info
命令即可。 - 多队列支持: Celery支持多个队列,可以将不同类型的任务分配到不同的队列,由不同的Worker处理。例如,可以将高优先级的任务放到一个队列,低优先级的任务放到另一个队列。
- 并发控制: 可以通过
CELERYD_CONCURRENCY
参数来控制每个Worker进程的并发数,避免Worker进程占用过多资源。
高可用性设计:
保证Celery系统的高可用性,需要考虑以下几个方面:
- 消息队列的高可用: RabbitMQ和Redis都支持集群模式,可以保证消息队列的高可用。
- Worker进程的监控和重启: 可以使用Supervisor或者Systemd等工具来监控Worker进程,如果Worker进程挂掉,可以自动重启。
- 任务的重试机制: Celery支持任务的重试机制,如果任务执行失败,可以自动重试。可以使用
@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5})
装饰器来实现。
任务路由和队列配置:
# tasks.py
from celery import Celery
app = Celery('my_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0')
# 配置任务路由
app.conf.task_routes = {
'tasks.add': {'queue': 'priority_high'},
'tasks.send_email': {'queue': 'email_queue'},
}
# 配置队列
app.conf.task_queues = (
{'name': 'priority_high', 'routing_key': 'priority_high'},
{'name': 'email_queue', 'routing_key': 'email_queue'},
)
@app.task
def add(x, y):
"""
一个简单的加法任务,放入priority_high队列
"""
return x + y
@app.task
def send_email(recipient, subject, body):
"""
一个模拟发送邮件的任务,放入email_queue队列
"""
import time
time.sleep(5)
print(f"邮件已发送给 {recipient},主题:{subject},内容:{body}")
return True
启动Worker时,需要指定监听的队列:
celery -A tasks worker -l info -Q priority_high,email_queue
代码解释:
app.conf.task_routes
:配置任务的路由规则,将tasks.add
任务路由到priority_high
队列,将tasks.send_email
任务路由到email_queue
队列。app.conf.task_queues
:配置队列的信息,包括队列的名字和路由键。celery -A tasks worker -l info -Q priority_high,email_queue
:启动Worker时,使用-Q
参数指定监听的队列。
Celery的进阶技巧:
- 任务链 (Chains): 将多个任务串联起来,形成一个任务链,前一个任务的输出作为后一个任务的输入。
- 任务组 (Groups): 将多个任务组合在一起,并行执行。
- 任务节拍 (Beats): 定时执行任务,类似于Linux的Cron。
- 自定义任务状态: 可以自定义任务的状态,例如
PENDING
(等待)、RUNNING
(运行中)、SUCCESS
(成功)、FAILURE
(失败)等。 - 任务签名 (Signatures): 用于创建任务的预定义配置,可以方便地重用任务配置。
表格总结:常用配置参数
| 参数 | 描述 | 默认值