Python高级技术之:`FastAPI`的后台任务(`Background Tasks`):如何处理非阻塞的异步任务。

各位观众老爷们,大家好!今天咱们聊点高级货,聊聊 FastAPI 的后台任务,让你的 API 飞起来!

开场白:API 响应慢?你该考虑后台任务了!

咱们写 API 的时候,最怕啥?响应慢!用户点了一下按钮,半天没反应,分分钟想卸载你的 App。很多时候,响应慢不是因为服务器不行,而是因为你在 API 里面做了太多事情,比如发送邮件、处理图片、跑复杂的计算等等。这些任务往往不需要立刻完成,但却阻塞了 API 的响应,导致用户体验极差。

这时候,后台任务就派上用场了!它可以让你把这些耗时的任务扔到后台去执行,而 API 则立刻返回响应,让用户感觉飞一样快。

什么是后台任务?

简单来说,后台任务就是一些不需要立即完成,可以在后台异步执行的任务。你可以想象一下,你点了个外卖,商家接单后,就开始准备饭菜,然后配送。你下单这个动作相当于 API 请求,商家接单返回确认相当于 API 响应,而准备饭菜和配送的过程,就是后台任务。

FastAPI 如何处理后台任务?BackgroundTasks 闪亮登场!

FastAPI 提供了 BackgroundTasks 类来方便我们处理后台任务。它允许你定义一些函数,这些函数将在 API 请求返回后异步执行。

BackgroundTasks 的基本用法:

咱们先来看个简单的例子,假设我们要创建一个 API,当用户注册成功后,发送一封欢迎邮件。

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import time

app = FastAPI()

class User(BaseModel):
    email: str
    username: str

def send_email(email: str, username: str):
    """
    模拟发送邮件的函数,实际项目中需要使用专业的邮件发送库。
    这里为了演示,简单地 sleep 一会儿。
    """
    print(f"Sending welcome email to {email} for user {username}...")
    time.sleep(2)  # 模拟邮件发送的耗时
    print(f"Welcome email sent to {email}!")

@app.post("/register")
async def register_user(user: User, background_tasks: BackgroundTasks):
    """
    用户注册 API,注册成功后发送欢迎邮件。
    """
    background_tasks.add_task(send_email, user.email, user.username)
    return {"message": "User registered successfully!"}

在这个例子中:

  1. 我们定义了一个 send_email 函数,用于模拟发送邮件。注意,这个函数是同步的,也就是说,它会阻塞当前线程。
  2. register_user API 中,我们通过依赖注入的方式获取了 BackgroundTasks 对象。
  3. 使用 background_tasks.add_task() 方法将 send_email 函数添加到后台任务队列中。
  4. API 立即返回 {"message": "User registered successfully!"},而邮件发送则在后台异步执行。

当你运行这个 API 并发送一个注册请求时,你会发现 API 立即返回了响应,而邮件发送的日志则在稍后才打印出来。这就是后台任务的威力!

BackgroundTasks 的高级用法:

BackgroundTasks 还有一些更高级的用法,可以让你更灵活地控制后台任务。

  • 传递多个参数: add_task 方法可以接受任意数量的参数,这些参数将被传递给后台任务函数。

    background_tasks.add_task(send_email, user.email, user.username, "Welcome to our platform!")
  • 使用异步函数: 后台任务函数也可以是异步函数。这样可以进一步提高并发性能。

    async def send_email_async(email: str, username: str):
        """
        异步发送邮件的函数。
        """
        print(f"Sending welcome email to {email} for user {username} (async)...")
        await asyncio.sleep(2)  # 模拟邮件发送的耗时
        print(f"Welcome email sent to {email}! (async)")
    
    @app.post("/register")
    async def register_user(user: User, background_tasks: BackgroundTasks):
        background_tasks.add_task(send_email_async, user.email, user.username)
        return {"message": "User registered successfully!"}

    需要注意的是,如果你的后台任务函数是异步的,那么你需要在函数内部使用 await 关键字来等待异步操作完成。

  • 处理后台任务的异常: 默认情况下,如果后台任务发生异常,FastAPI 会忽略这些异常。如果你想处理这些异常,可以使用 try...except 语句。

    def send_email(email: str, username: str):
        try:
            print(f"Sending welcome email to {email} for user {username}...")
            time.sleep(2)
            # 模拟邮件发送失败
            raise Exception("Failed to send email")
            print(f"Welcome email sent to {email}!")
        except Exception as e:
            print(f"Error sending email to {email}: {e}")
    
    @app.post("/register")
    async def register_user(user: User, background_tasks: BackgroundTasks):
        background_tasks.add_task(send_email, user.email, user.username)
        return {"message": "User registered successfully!"}

    在这个例子中,如果 send_email 函数发生异常,异常将被捕获并打印到控制台。

更强大的后台任务管理:Celery 和 Redis

虽然 BackgroundTasks 很方便,但它也有一些局限性。例如,它只能在当前进程中执行后台任务,如果你的服务器重启,所有未完成的后台任务都会丢失。

为了解决这些问题,你可以使用更强大的后台任务管理工具,例如 Celery 和 Redis。

  • Celery: 是一个分布式任务队列,它可以让你把后台任务分发到多个工作节点上执行。Celery 支持多种消息队列,例如 RabbitMQ 和 Redis。
  • Redis: 是一个高性能的键值存储数据库,它可以用来存储任务队列和任务状态。

使用 Celery 和 Redis 可以让你更好地管理后台任务,提高系统的可靠性和可扩展性。

Celery + Redis 的配置和使用方法:

这部分内容稍微复杂一些,但绝对值得你花时间学习。

  1. 安装 Celery 和 Redis:

    pip install celery redis
  2. 安装 Redis 服务器:

    你需要安装 Redis 服务器,并确保它正在运行。具体的安装方法取决于你的操作系统。

    • Ubuntu/Debian: sudo apt-get update && sudo apt-get install redis-server
    • macOS (using Homebrew): brew install redis
  3. 创建 Celery 任务:

    创建一个 celery.py 文件,用于配置 Celery 实例。

    # celery.py
    from celery import Celery
    
    celery = Celery(
        'tasks',
        broker='redis://localhost:6379/0',  # Redis 作为消息代理
        backend='redis://localhost:6379/0'   # Redis 作为结果存储
    )
    
    @celery.task
    def send_email_celery(email: str, username: str):
        """
        使用 Celery 异步发送邮件的函数。
        """
        print(f"Sending welcome email to {email} for user {username} (Celery)...")
        import time
        time.sleep(2)
        print(f"Welcome email sent to {email}! (Celery)")

    在这个文件中:

    • 我们创建了一个 Celery 实例,并指定了 Redis 作为消息代理和结果存储。
    • 使用 @celery.task 装饰器将 send_email_celery 函数注册为 Celery 任务。
  4. 在 FastAPI 中使用 Celery 任务:

    修改 FastAPI 代码,使用 Celery 任务来发送邮件。

    # main.py
    from fastapi import FastAPI
    from pydantic import BaseModel
    from .celery import send_email_celery  # 注意这里的相对导入
    
    app = FastAPI()
    
    class User(BaseModel):
        email: str
        username: str
    
    @app.post("/register")
    async def register_user(user: User):
        """
        用户注册 API,使用 Celery 发送欢迎邮件。
        """
        send_email_celery.delay(user.email, user.username)  # 使用 .delay() 方法异步执行 Celery 任务
        return {"message": "User registered successfully!"}

    在这个例子中,我们使用 send_email_celery.delay() 方法来异步执行 Celery 任务。delay() 方法会将任务添加到 Celery 队列中,Celery 工作节点会从队列中取出任务并执行。

  5. 启动 Celery 工作节点:

    打开一个新的终端,运行以下命令来启动 Celery 工作节点:

    celery -A main.celery worker -l info

    确保将 main.celery 替换为你的 Celery 实例所在的模块。

  6. 测试 API:

    运行 FastAPI 应用,并发送一个注册请求。你会发现 API 立即返回了响应,而邮件发送的日志则在 Celery 工作节点的终端中打印出来。

BackgroundTasks vs Celery:如何选择?

特性 BackgroundTasks Celery + Redis
复杂度 简单 复杂
可靠性
可扩展性
适用场景 简单的后台任务 复杂的、需要高可靠性和可扩展性的后台任务
是否需要额外服务 是 (Redis)

总的来说,如果你的后台任务比较简单,而且对可靠性和可扩展性要求不高,那么 BackgroundTasks 是一个不错的选择。但如果你的后台任务比较复杂,或者需要处理大量的任务,那么 Celery + Redis 才是更明智的选择。

最佳实践:

  • 尽量避免在后台任务中执行阻塞操作。 如果必须执行阻塞操作,可以使用线程池来避免阻塞整个进程。
  • 监控后台任务的执行情况。 使用 Celery Flower 或其他监控工具来监控后台任务的执行状态,及时发现和解决问题。
  • 合理设置任务的优先级。 对于重要的任务,可以设置较高的优先级,确保它们能够及时执行。
  • 处理任务失败的情况。 对于可能失败的任务,应该进行重试或降级处理,避免影响系统的整体稳定性。
  • 注意序列化问题。 传递给后台任务的参数需要能够被序列化,否则会导致任务执行失败。

总结:

今天我们学习了 FastAPI 的后台任务处理技术,包括 BackgroundTasks 和 Celery + Redis。希望这些知识能够帮助你构建更快速、更可靠的 API。

记住,选择合适的后台任务处理方案取决于你的具体需求。BackgroundTasks 简单易用,适合处理简单的后台任务;而 Celery + Redis 则更加强大,适合处理复杂的、需要高可靠性和可扩展性的后台任务。

好了,今天的讲座就到这里。希望大家学有所获,咱们下期再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注