各位观众,代码界的弄潮儿们,大家好!今天咱们来聊聊Celery这位“任务界的扛把子”,看看它如何利用Worker和Broker,玩转可伸缩的任务队列。准备好了吗?上车!
开场白:为什么我们需要Celery?
想象一下,你正在开发一个在线购物网站。用户下单后,你需要做一系列的事情:发送确认邮件、更新库存、生成发货单、通知仓库等等。如果这些操作都放在同一个线程里执行,用户岂不是要等到花儿都谢了才能看到下单成功的页面?用户体验直线下降,老板脸色铁青,年终奖泡汤……
这时候,Celery就如同及时雨般出现了。它可以把这些耗时的任务放到后台异步执行,让用户瞬间就能看到下单成功的页面,皆大欢喜!
Celery的核心组件:Worker和Broker
Celery的核心在于两个家伙:Worker(工人)和 Broker(中间人)。
-
Worker(工人): 这家伙就是真正的干活的!它负责接收任务、执行任务,然后把结果送回。你可以理解成一个辛勤的码农,默默地在后台处理各种繁琐的任务。
-
Broker(中间人): 这家伙负责传递任务。它就像一个邮局,接收任务发布者的任务,然后把任务分发给合适的Worker。常用的Broker包括RabbitMQ和Redis。
它们的关系可以用下图来表示:
+-----------------+ +-----------------+ +-----------------+
| Task Publisher | -->| Broker | -->| Worker |
+-----------------+ +-----------------+ +-----------------+
| (任务发布者) | | (RabbitMQ/Redis) | | (任务执行者) |
+-----------------+ +-----------------+ +-----------------+
Broker的选择:RabbitMQ vs. Redis
选择哪个Broker取决于你的需求。简单来说:
特性 | RabbitMQ | Redis |
---|---|---|
消息持久性 | 默认支持,可以配置持久化到磁盘 | 内存型,数据可能会丢失(可配置持久化) |
消息确认机制 | 支持消息确认,确保消息被正确处理 | 相对简单,可靠性稍逊 |
复杂路由 | 支持复杂的路由规则,例如基于内容的路由 | 不支持 |
适用场景 | 对可靠性要求高的场景 | 对性能要求高的场景 |
Celery的安装
安装Celery很简单,一条命令搞定:
pip install celery
如果你要用RabbitMQ作为Broker,还需要安装:
pip install amqp
如果用Redis,则安装:
pip install redis
Celery的基本使用
- 定义Celery应用
首先,创建一个celery.py
文件,用来定义Celery应用:
from celery import Celery
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
这段代码做了什么?
Celery('my_task', ...)
: 创建一个Celery应用,命名为my_task
。broker='redis://localhost:6379/0'
: 指定Broker的URL,这里用的是Redis。backend='redis://localhost:6379/0'
: 指定结果存储的URL,也用的是Redis。@app.task
: 这是一个装饰器,用来把一个函数变成Celery任务。
- 启动Celery Worker
在命令行里,启动Celery Worker:
celery -A celery worker -l info
-A celery
: 指定Celery应用的模块。worker
: 告诉Celery启动Worker。-l info
: 设置日志级别为info
。
- 调用Celery任务
在你的代码里,调用Celery任务:
from celery import Celery
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
result = add.delay(1, 2) # 异步调用
print(result.get()) # 获取结果
add.delay(1, 2)
: 异步调用add
任务,传入参数1
和2
。delay
方法会把任务放到Broker里,然后立即返回一个AsyncResult
对象。result.get()
: 获取任务的结果。这个方法会阻塞,直到任务完成。
设计可伸缩的任务队列
Celery天生就具备可伸缩性。你可以启动多个Worker来并行处理任务,从而提高任务处理的速度。
- 启动多个Worker
在不同的机器上,或者同一个机器上的不同进程里,启动多个Celery Worker:
celery -A celery worker -l info
celery -A celery worker -l info -n [email protected]
celery -A celery worker -l info -n [email protected]
-n [email protected]
: 指定Worker的名称,确保每个Worker的名称都是唯一的。
- 配置并发数
每个Worker可以配置并发数,也就是同时处理的任务数量。默认情况下,Celery会根据CPU核心数来自动设置并发数。你可以手动设置并发数:
celery -A celery worker -l info -c 8
-c 8
: 设置并发数为8。
- 任务路由
Celery支持任务路由,可以把不同的任务路由到不同的Worker。这可以让你根据任务的类型,把任务分配给更合适的Worker。
在celery.py
里,配置任务路由:
from celery import Celery
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
app.conf.task_routes = {
'celery.add': {'queue': 'add_queue'},
'celery.multiply': {'queue': 'multiply_queue'},
}
@app.task(queue='add_queue')
def add(x, y):
return x + y
@app.task(queue='multiply_queue')
def multiply(x, y):
return x * y
app.conf.task_routes
: 配置任务路由规则。'celery.add': {'queue': 'add_queue'}
: 把add
任务路由到add_queue
队列。@app.task(queue='add_queue')
: 指定add
任务的队列为add_queue
。
然后,启动Worker时,指定要监听的队列:
celery -A celery worker -l info -Q add_queue
celery -A celery worker -l info -Q multiply_queue
-Q add_queue
: 告诉Worker监听add_queue
队列。
- 任务优先级
Celery支持任务优先级,可以让你优先处理重要的任务。
在celery.py
里,配置任务优先级:
from celery import Celery
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task(priority=0)
def high_priority_task():
return 'High priority'
@app.task(priority=5)
def normal_priority_task():
return 'Normal priority'
@app.task(priority=9)
def low_priority_task():
return 'Low priority'
@app.task(priority=0)
: 指定任务的优先级为0(最高优先级)。@app.task(priority=9)
: 指定任务的优先级为9(最低优先级)。
启动Worker时,需要指定启用优先级支持:
celery -A celery worker -l info --qos prefetch=1
--qos prefetch=1
: 启用优先级支持,并设置prefetch count为1。prefetch count表示Worker一次从Broker获取的任务数量。
Celery的进阶技巧
- 任务重试
有时候,任务可能会因为网络问题、服务器故障等原因而执行失败。Celery支持任务重试,可以自动重试失败的任务。
在celery.py
里,配置任务重试:
from celery import Celery
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def my_task(self):
try:
# 模拟一个可能会失败的操作
raise Exception('Something went wrong!')
except Exception as exc:
# 重试任务
self.retry(exc=exc)
bind=True
: 绑定任务到self
,这样可以在任务里访问Celery的API。max_retries=3
: 设置最大重试次数为3。default_retry_delay=60
: 设置重试间隔为60秒。self.retry(exc=exc)
: 重试任务,并把异常信息传递给下一个重试。
- 任务定时执行
Celery可以定时执行任务,类似于Linux的cron。
在celery.py
里,配置任务定时执行:
from celery import Celery
from celery.schedules import crontab
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
app.conf.beat_schedule = {
'add-every-minute': {
'task': 'celery.add',
'schedule': crontab(minute='*/1'), # 每分钟执行一次
'args': (1, 2)
},
}
@app.task
def add(x, y):
return x + y
app.conf.beat_schedule
: 配置任务定时执行计划。'add-every-minute'
: 定时任务的名称。'task': 'celery.add'
: 要执行的任务。'schedule': crontab(minute='*/1')
: 定时执行的规则,这里用的是crontab表达式,表示每分钟执行一次。'args': (1, 2)
: 传递给任务的参数。
然后,启动Celery Beat:
celery -A celery beat -l info
Celery Beat会按照定时执行计划,把任务放到Broker里,然后Worker会执行这些任务。
- 任务链
Celery支持任务链,可以把多个任务串联起来,让它们按照顺序执行。
from celery import Celery, chain
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@app.task
def multiply(x, y):
return x * y
@app.task
def square(x):
return x * x
# 定义任务链:先加,再乘,最后平方
my_chain = chain(add.s(1, 2), multiply.s(3), square.s())
# 执行任务链
result = my_chain()
print(result.get()) # 输出结果:81
chain(add.s(1, 2), multiply.s(3), square.s())
: 定义任务链。.s()
方法表示partial application,也就是预先设置任务的参数。result = my_chain()
: 执行任务链。
在这个例子里,任务链会先执行add(1, 2)
,得到结果3
。然后执行multiply(3, 3)
,得到结果9
。最后执行square(9)
,得到结果81
。
- 回调函数
Celery支持回调函数,可以在任务完成后执行一个回调函数。
from celery import Celery
app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@app.task
def after_add(result):
print(f'Add result: {result}')
# 异步调用任务,并指定回调函数
result = add.apply_async(args=(1, 2), link=after_add.s())
link=after_add.s()
: 指定回调函数为after_add
。当add
任务完成后,会自动执行after_add
任务,并把add
任务的结果作为参数传递给after_add
任务。
总结
Celery是一个强大的异步任务队列,可以让你轻松地处理各种耗时的任务。通过合理地配置Worker和Broker,你可以构建一个可伸缩、可靠的任务队列,从而提高你的应用程序的性能和用户体验。希望今天的讲座能让你对Celery有更深入的了解,并在实际项目中灵活运用。
祝大家编码愉快!