Python高级技术之:`Celery`的`Worker`和`Broker`:如何设计可伸缩的任务队列。

各位观众,代码界的弄潮儿们,大家好!今天咱们来聊聊Celery这位“任务界的扛把子”,看看它如何利用Worker和Broker,玩转可伸缩的任务队列。准备好了吗?上车!

开场白:为什么我们需要Celery?

想象一下,你正在开发一个在线购物网站。用户下单后,你需要做一系列的事情:发送确认邮件、更新库存、生成发货单、通知仓库等等。如果这些操作都放在同一个线程里执行,用户岂不是要等到花儿都谢了才能看到下单成功的页面?用户体验直线下降,老板脸色铁青,年终奖泡汤……

这时候,Celery就如同及时雨般出现了。它可以把这些耗时的任务放到后台异步执行,让用户瞬间就能看到下单成功的页面,皆大欢喜!

Celery的核心组件:Worker和Broker

Celery的核心在于两个家伙:Worker(工人)和 Broker(中间人)。

  • Worker(工人): 这家伙就是真正的干活的!它负责接收任务、执行任务,然后把结果送回。你可以理解成一个辛勤的码农,默默地在后台处理各种繁琐的任务。

  • Broker(中间人): 这家伙负责传递任务。它就像一个邮局,接收任务发布者的任务,然后把任务分发给合适的Worker。常用的Broker包括RabbitMQ和Redis。

它们的关系可以用下图来表示:

+-----------------+    +-----------------+    +-----------------+
|  Task Publisher  | -->|      Broker     | -->|     Worker      |
+-----------------+    +-----------------+    +-----------------+
| (任务发布者)   |    |  (RabbitMQ/Redis) |    |  (任务执行者)   |
+-----------------+    +-----------------+    +-----------------+

Broker的选择:RabbitMQ vs. Redis

选择哪个Broker取决于你的需求。简单来说:

特性 RabbitMQ Redis
消息持久性 默认支持,可以配置持久化到磁盘 内存型,数据可能会丢失(可配置持久化)
消息确认机制 支持消息确认,确保消息被正确处理 相对简单,可靠性稍逊
复杂路由 支持复杂的路由规则,例如基于内容的路由 不支持
适用场景 对可靠性要求高的场景 对性能要求高的场景

Celery的安装

安装Celery很简单,一条命令搞定:

pip install celery

如果你要用RabbitMQ作为Broker,还需要安装:

pip install amqp

如果用Redis,则安装:

pip install redis

Celery的基本使用

  1. 定义Celery应用

首先,创建一个celery.py文件,用来定义Celery应用:

from celery import Celery

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

这段代码做了什么?

  • Celery('my_task', ...): 创建一个Celery应用,命名为my_task
  • broker='redis://localhost:6379/0': 指定Broker的URL,这里用的是Redis。
  • backend='redis://localhost:6379/0': 指定结果存储的URL,也用的是Redis。
  • @app.task: 这是一个装饰器,用来把一个函数变成Celery任务。
  1. 启动Celery Worker

在命令行里,启动Celery Worker:

celery -A celery worker -l info
  • -A celery: 指定Celery应用的模块。
  • worker: 告诉Celery启动Worker。
  • -l info: 设置日志级别为info
  1. 调用Celery任务

在你的代码里,调用Celery任务:

from celery import Celery

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

result = add.delay(1, 2)  # 异步调用
print(result.get())       # 获取结果
  • add.delay(1, 2): 异步调用add任务,传入参数12delay方法会把任务放到Broker里,然后立即返回一个AsyncResult对象。
  • result.get(): 获取任务的结果。这个方法会阻塞,直到任务完成。

设计可伸缩的任务队列

Celery天生就具备可伸缩性。你可以启动多个Worker来并行处理任务,从而提高任务处理的速度。

  1. 启动多个Worker

在不同的机器上,或者同一个机器上的不同进程里,启动多个Celery Worker:

celery -A celery worker -l info
celery -A celery worker -l info -n [email protected]
celery -A celery worker -l info -n [email protected]
  • -n [email protected]: 指定Worker的名称,确保每个Worker的名称都是唯一的。
  1. 配置并发数

每个Worker可以配置并发数,也就是同时处理的任务数量。默认情况下,Celery会根据CPU核心数来自动设置并发数。你可以手动设置并发数:

celery -A celery worker -l info -c 8
  • -c 8: 设置并发数为8。
  1. 任务路由

Celery支持任务路由,可以把不同的任务路由到不同的Worker。这可以让你根据任务的类型,把任务分配给更合适的Worker。

celery.py里,配置任务路由:

from celery import Celery

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

app.conf.task_routes = {
    'celery.add': {'queue': 'add_queue'},
    'celery.multiply': {'queue': 'multiply_queue'},
}

@app.task(queue='add_queue')
def add(x, y):
    return x + y

@app.task(queue='multiply_queue')
def multiply(x, y):
    return x * y
  • app.conf.task_routes: 配置任务路由规则。
  • 'celery.add': {'queue': 'add_queue'}: 把add任务路由到add_queue队列。
  • @app.task(queue='add_queue'): 指定add任务的队列为add_queue

然后,启动Worker时,指定要监听的队列:

celery -A celery worker -l info -Q add_queue
celery -A celery worker -l info -Q multiply_queue
  • -Q add_queue: 告诉Worker监听add_queue队列。
  1. 任务优先级

Celery支持任务优先级,可以让你优先处理重要的任务。

celery.py里,配置任务优先级:

from celery import Celery

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task(priority=0)
def high_priority_task():
    return 'High priority'

@app.task(priority=5)
def normal_priority_task():
    return 'Normal priority'

@app.task(priority=9)
def low_priority_task():
    return 'Low priority'
  • @app.task(priority=0): 指定任务的优先级为0(最高优先级)。
  • @app.task(priority=9): 指定任务的优先级为9(最低优先级)。

启动Worker时,需要指定启用优先级支持:

celery -A celery worker -l info --qos prefetch=1
  • --qos prefetch=1: 启用优先级支持,并设置prefetch count为1。prefetch count表示Worker一次从Broker获取的任务数量。

Celery的进阶技巧

  1. 任务重试

有时候,任务可能会因为网络问题、服务器故障等原因而执行失败。Celery支持任务重试,可以自动重试失败的任务。

celery.py里,配置任务重试:

from celery import Celery

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def my_task(self):
    try:
        # 模拟一个可能会失败的操作
        raise Exception('Something went wrong!')
    except Exception as exc:
        # 重试任务
        self.retry(exc=exc)
  • bind=True: 绑定任务到self,这样可以在任务里访问Celery的API。
  • max_retries=3: 设置最大重试次数为3。
  • default_retry_delay=60: 设置重试间隔为60秒。
  • self.retry(exc=exc): 重试任务,并把异常信息传递给下一个重试。
  1. 任务定时执行

Celery可以定时执行任务,类似于Linux的cron。

celery.py里,配置任务定时执行:

from celery import Celery
from celery.schedules import crontab

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

app.conf.beat_schedule = {
    'add-every-minute': {
        'task': 'celery.add',
        'schedule': crontab(minute='*/1'),  # 每分钟执行一次
        'args': (1, 2)
    },
}

@app.task
def add(x, y):
    return x + y
  • app.conf.beat_schedule: 配置任务定时执行计划。
  • 'add-every-minute': 定时任务的名称。
  • 'task': 'celery.add': 要执行的任务。
  • 'schedule': crontab(minute='*/1'): 定时执行的规则,这里用的是crontab表达式,表示每分钟执行一次。
  • 'args': (1, 2): 传递给任务的参数。

然后,启动Celery Beat:

celery -A celery beat -l info

Celery Beat会按照定时执行计划,把任务放到Broker里,然后Worker会执行这些任务。

  1. 任务链

Celery支持任务链,可以把多个任务串联起来,让它们按照顺序执行。

from celery import Celery, chain

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.task
def multiply(x, y):
    return x * y

@app.task
def square(x):
    return x * x

# 定义任务链:先加,再乘,最后平方
my_chain = chain(add.s(1, 2), multiply.s(3), square.s())

# 执行任务链
result = my_chain()
print(result.get())  # 输出结果:81
  • chain(add.s(1, 2), multiply.s(3), square.s()): 定义任务链。.s()方法表示partial application,也就是预先设置任务的参数。
  • result = my_chain(): 执行任务链。

在这个例子里,任务链会先执行add(1, 2),得到结果3。然后执行multiply(3, 3),得到结果9。最后执行square(9),得到结果81

  1. 回调函数

Celery支持回调函数,可以在任务完成后执行一个回调函数。

from celery import Celery

app = Celery('my_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.task
def after_add(result):
    print(f'Add result: {result}')

# 异步调用任务,并指定回调函数
result = add.apply_async(args=(1, 2), link=after_add.s())
  • link=after_add.s(): 指定回调函数为after_add。当add任务完成后,会自动执行after_add任务,并把add任务的结果作为参数传递给after_add任务。

总结

Celery是一个强大的异步任务队列,可以让你轻松地处理各种耗时的任务。通过合理地配置Worker和Broker,你可以构建一个可伸缩、可靠的任务队列,从而提高你的应用程序的性能和用户体验。希望今天的讲座能让你对Celery有更深入的了解,并在实际项目中灵活运用。

祝大家编码愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注