FastAPI 异步编程:async/await 与事件循环的底层实现
各位来宾,大家好。今天我们来深入探讨 FastAPI 的异步编程模型,重点关注 async/await
关键字以及事件循环的底层运作机制。理解这些概念对于编写高性能、高并发的 FastAPI 应用至关重要。
1. 异步编程的必要性
在传统的同步编程模型中,当一个函数执行耗时操作(例如网络请求、数据库查询、文件 I/O)时,线程会被阻塞,直到操作完成。这意味着在此期间,线程无法执行其他任务,从而降低了程序的整体吞吐量。
异步编程通过允许函数在等待耗时操作完成时让出控制权,从而解决了这个问题。这样,线程可以继续执行其他任务,直到耗时操作完成并发出通知,然后异步函数恢复执行。这种机制允许单个线程同时处理多个并发任务,从而显著提高程序的性能。
2. async/await
关键字
Python 的 async
和 await
关键字是实现异步编程的核心工具。
async
: 用于声明一个函数为协程函数。协程函数可以暂停执行并在稍后恢复执行。与普通函数不同,调用协程函数会返回一个协程对象(coroutine object),而不是立即执行函数体。await
: 只能在async
函数中使用,用于等待一个可等待对象(awaitable object)完成。可等待对象通常是另一个协程、一个 Task 对象或一个实现了__await__
方法的对象。当await
遇到一个可等待对象时,它会将当前协程的执行权交给事件循环,直到可等待对象完成。
示例:使用 async/await
的简单网络请求
import asyncio
import aiohttp
async def fetch_url(url: str) -> str:
"""
使用 aiohttp 异步获取 URL 的内容。
"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
url = "https://www.example.com"
content = await fetch_url(url)
print(f"Content from {url}: {content[:100]}...")
if __name__ == "__main__":
asyncio.run(main())
在这个例子中,fetch_url
是一个协程函数,它使用 aiohttp
库异步地获取 URL 的内容。await response.text()
语句会将当前协程的执行权交给事件循环,直到 response.text()
完成。asyncio.run(main())
用于启动事件循环并运行 main
协程。
3. 事件循环 (Event Loop)
事件循环是异步编程的核心组件。它负责调度和执行协程,管理 I/O 事件,以及处理其他异步任务。
事件循环的工作流程如下:
- 注册任务: 协程和其他异步任务被注册到事件循环中。
- 监听事件: 事件循环监听文件描述符 (file descriptor) 或其他事件源,等待 I/O 事件发生。
- 处理事件: 当事件发生时,事件循环会唤醒相应的协程,并将其添加到就绪队列中。
- 执行协程: 事件循环从就绪队列中选择一个协程来执行,直到协程遇到
await
语句或完成。 - 重复循环: 事件循环重复步骤 2-4,直到没有更多的任务需要执行。
Python 的 asyncio
模块提供了事件循环的实现。可以使用 asyncio.get_event_loop()
获取当前事件循环,并使用 loop.run_until_complete()
或 asyncio.run()
来运行协程。
4. FastAPI 中的异步支持
FastAPI 框架本身是异步设计的。它使用 Starlette 作为 ASGI (Asynchronous Server Gateway Interface) 服务器,并充分利用 async/await
关键字和事件循环来实现高性能的 API。
在 FastAPI 中,可以将任何路由函数声明为 async
函数,从而使其能够异步地处理请求。
示例:异步路由函数
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_item(item_id: int):
# 模拟耗时操作
await asyncio.sleep(1)
return {"item_id": item_id}
在这个例子中,read_item
是一个异步路由函数。当客户端发送请求到 /items/{item_id}
时,FastAPI 会创建一个新的协程来处理该请求。await asyncio.sleep(1)
语句会让协程暂停执行 1 秒钟,在此期间,FastAPI 可以处理其他请求。
5. 深入理解 await
的工作原理
await
关键字并非简单地等待一个协程完成。它涉及以下几个步骤:
- 获取可等待对象:
await
表达式首先获取可等待对象(例如,一个协程、一个 Task 对象或一个实现了__await__
方法的对象)。 - 调用
__await__
方法: 如果可等待对象实现了__await__
方法,则调用该方法。__await__
方法必须返回一个迭代器。 - 与事件循环交互:
await
表达式将当前协程的状态保存下来,并将控制权交给事件循环。 - 事件循环调度: 事件循环负责调度可等待对象,并在其完成时唤醒当前协程。
- 恢复执行: 当可等待对象完成时,事件循环会将结果传递给当前协程,并恢复其执行。
示例:自定义可等待对象
import asyncio
class MyAwaitable:
def __await__(self):
yield # 暂停执行
return "MyAwaitable is done!"
async def main():
my_awaitable = MyAwaitable()
result = await my_awaitable
print(result)
if __name__ == "__main__":
asyncio.run(main())
在这个例子中,MyAwaitable
类实现了 __await__
方法,该方法返回一个生成器,生成器通过 yield
语句暂停执行。当 await my_awaitable
语句执行时,__await__
方法会被调用,生成器会暂停执行,并将控制权交给事件循环。当事件循环再次调度该协程时,生成器会恢复执行,并返回 "MyAwaitable is done!"。
6. Task 对象
Task
对象是 asyncio
模块中用于封装协程的一个重要概念。可以使用 asyncio.create_task()
函数创建一个 Task 对象。Task 对象可以被取消、查询状态,以及获取结果。
示例:使用 Task 对象并发执行多个协程
import asyncio
async def my_coroutine(i: int):
await asyncio.sleep(i)
print(f"Coroutine {i} finished")
return i
async def main():
tasks = [asyncio.create_task(my_coroutine(i)) for i in range(1, 4)]
results = await asyncio.gather(*tasks)
print(f"Results: {results}")
if __name__ == "__main__":
asyncio.run(main())
在这个例子中,asyncio.create_task()
函数创建了三个 Task 对象,每个 Task 对象都封装了一个 my_coroutine
协程。asyncio.gather(*tasks)
函数会并发地执行这些 Task 对象,并等待它们全部完成。await asyncio.gather(*tasks)
语句会暂停当前协程的执行,直到所有 Task 对象都完成。
7. 避免阻塞事件循环
在异步编程中,最重要的一点是避免阻塞事件循环。如果事件循环被阻塞,那么所有的协程都会受到影响,从而降低程序的整体性能。
以下是一些避免阻塞事件循环的常见方法:
- 使用异步库: 尽可能使用异步库来执行 I/O 操作。例如,使用
aiohttp
代替requests
,使用asyncpg
代替psycopg2
。 - 避免 CPU 密集型操作: 如果需要执行 CPU 密集型操作,可以使用
asyncio.to_thread()
或concurrent.futures
模块将其放到独立的线程或进程中执行。 - 设置超时: 为 I/O 操作设置超时时间,以避免无限期地等待。
示例:使用 asyncio.to_thread()
执行 CPU 密集型操作
import asyncio
import time
import concurrent.futures
def cpu_bound_operation(n: int) -> int:
"""
模拟 CPU 密集型操作。
"""
result = 0
for i in range(n):
result += i * i
return result
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_operation, 10000000)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
在这个例子中,cpu_bound_operation
函数模拟了一个 CPU 密集型操作。loop.run_in_executor()
函数将该函数放到一个进程池中执行,从而避免阻塞事件循环。
8. 总结表格
特性 | 描述 | 示例 |
---|---|---|
async |
用于声明一个协程函数,该函数可以暂停并在稍后恢复执行。 | async def my_coroutine(): ... |
await |
只能在 async 函数中使用,用于等待一个可等待对象完成。 |
result = await some_awaitable() |
事件循环 | 负责调度和执行协程,管理 I/O 事件,以及处理其他异步任务。 | loop = asyncio.get_event_loop(); loop.run_until_complete(my_coroutine()) 或 asyncio.run(my_coroutine()) |
Task 对象 | 用于封装协程,可以被取消、查询状态,以及获取结果。 | task = asyncio.create_task(my_coroutine()); await task |
异步库 | 使用异步库(例如 aiohttp , asyncpg )来执行 I/O 操作,避免阻塞事件循环。 |
async with aiohttp.ClientSession() as session: ... |
asyncio.to_thread() |
将 CPU 密集型操作放到独立的线程中执行,避免阻塞事件循环。 | await asyncio.to_thread(cpu_bound_operation, 10000000) |
9. 异步编程的实践建议
- 始终使用异步库: 对于任何 I/O 操作,都应尽可能使用异步库。
- 避免长时间运行的同步代码: 如果需要执行长时间运行的同步代码,可以使用线程或进程将其放到后台执行。
- 监控事件循环: 使用工具监控事件循环的性能,例如延迟和 CPU 使用率。
- 测试异步代码: 编写单元测试和集成测试来验证异步代码的正确性。
- 理解异常处理: 异步代码中的异常处理与同步代码略有不同。需要使用
try...except
块来捕获异常,并使用asyncio.gather
函数的return_exceptions
参数来处理多个协程中的异常。
10. 代码示例:综合应用
import asyncio
import aiohttp
import time
async def fetch_data(session: aiohttp.ClientSession, url: str) -> str:
"""
使用 aiohttp 异步获取 URL 的数据。
"""
try:
async with session.get(url) as response:
response.raise_for_status() # 抛出 HTTPError 异常
return await response.text()
except aiohttp.ClientError as e:
print(f"Error fetching {url}: {e}")
return None
async def process_data(data: str) -> int:
"""
模拟处理数据的 CPU 密集型操作。
"""
if data is None:
return 0
# 模拟 CPU 密集型操作
count = 0
for char in data:
if char.isalpha():
count += 1
await asyncio.sleep(0.1) # 模拟I/O操作
return count
async def main():
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.google.com",
"https://www.nonexistent-website.com", #故意制造一个错误
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(fetch_data(session, url)) for url in urls
]
results = await asyncio.gather(*tasks, return_exceptions=True) # 捕获异常
end_time = time.time()
total_count = 0
processing_tasks = []
for data in results:
processing_tasks.append(asyncio.create_task(process_data(data)))
processing_results = await asyncio.gather(*processing_tasks)
for count in processing_results:
total_count += count
print(f"Total alphabetic characters: {total_count}")
print(f"Total time: {end_time - start_time:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
这个示例演示了如何使用 async/await
关键字、事件循环、Task 对象和异步库来并发地获取和处理数据。它还演示了如何使用 asyncio.gather
函数的 return_exceptions
参数来处理异常,以及如何使用 asyncio.to_thread()
函数来避免阻塞事件循环。
实践是关键,理解概念很重要
今天我们深入探讨了 FastAPI 异步编程的底层实现,包括 async/await
关键字、事件循环和 Task 对象。掌握这些概念对于编写高性能的 FastAPI 应用至关重要。希望本次讲座能够帮助大家更好地理解和应用异步编程。