ASGI Middleware 的协议级拦截:实现请求解析、认证与限速的底层逻辑
大家好,今天我们来深入探讨 ASGI (Asynchronous Server Gateway Interface) 中间件的强大功能,特别是在协议层面上进行拦截,以实现请求解析、认证和限速等核心功能。 相较于传统的 WSGI,ASGI 提供了异步处理能力,使其更适合处理现代 Web 应用中的高并发场景。而 ASGI 中间件则是在 ASGI 应用处理请求前后插入自定义逻辑的关键机制。
ASGI 协议回顾
首先,我们需要对 ASGI 协议有一个清晰的认识。ASGI 本质上是一个接口规范,定义了 Web 服务器(或协议服务器)和 Web 应用之间的通信方式。 它使用一个可调用的对象(通常是一个异步函数)作为应用,并通过一个字典(scope)传递请求的上下文信息,并使用异步函数 send 和 receive 来进行数据的收发。
scope 包含了请求的各种信息,例如协议类型(http、websocket)、路径、查询参数、headers 等。receive 异步函数用于接收来自客户端的数据,例如 HTTP 请求体或 WebSocket 消息。send 异步函数用于将数据发送回客户端,例如 HTTP 响应头和响应体或 WebSocket 消息。
一个最简单的 ASGI 应用可能如下所示:
async def app(scope, receive, send):
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})
这个应用只处理 HTTP 请求,并返回 "Hello, world!"。
ASGI 中间件的概念
ASGI 中间件本质上是一个包装 ASGI 应用的可调用对象。它接收一个 ASGI 应用作为参数,并返回一个新的 ASGI 应用。这个新的 ASGI 应用可以在调用原始 ASGI 应用之前和之后执行自定义的逻辑。
一个简单的 ASGI 中间件的结构如下:
class MyMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# 在调用原始应用之前执行的逻辑
print("Before request")
await self.app(scope, receive, send)
# 在调用原始应用之后执行的逻辑
print("After request")
这个中间件只是简单地在请求前后打印一些信息。更复杂的中间件可以读取和修改 scope,receive 和 send 的行为,从而实现各种功能。
协议级拦截
协议级拦截是指在 ASGI 中间件中,针对特定的协议类型(例如 HTTP 或 WebSocket)进行处理。这意味着中间件可以根据协议类型的不同,执行不同的逻辑。
例如,一个中间件可能只处理 HTTP 请求,而忽略 WebSocket 连接。或者,一个中间件可能根据 HTTP 请求的 Content-Type 来进行不同的解析。
请求解析
请求解析是指将 receive 函数接收到的原始数据转换为更易于处理的格式。对于 HTTP 请求,这通常包括解析 HTTP 请求头和请求体。
一个用于解析 HTTP 请求体的中间件可能如下所示:
import json
class BodyParserMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
body = b''
more_body = True
while more_body:
message = await receive()
body += message.get('body', b'')
more_body = message.get('more', False)
# 将请求体添加到 scope 中
scope['body'] = body
# 尝试解析 JSON
try:
scope['json'] = json.loads(body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
scope['json'] = None
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
这个中间件首先检查协议类型是否为 HTTP。如果是,它会循环接收所有数据块,并将它们组合成一个完整的请求体。然后,它将请求体添加到 scope 中,并尝试将其解析为 JSON。如果解析成功,JSON 数据也会被添加到 scope 中。
这样,下游的应用就可以直接从 scope 中获取请求体和 JSON 数据,而无需自己进行解析。
认证
认证是指验证用户的身份。ASGI 中间件可以在协议层面上进行认证,例如通过检查 HTTP 请求头中的 Authorization 字段。
一个用于进行简单 Token 认证的中间件可能如下所示:
class AuthenticationMiddleware:
def __init__(self, app, token):
self.app = app
self.token = token
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
headers = dict(scope['headers'])
auth_header = headers.get(b'authorization')
if auth_header:
try:
scheme, token = auth_header.decode('utf-8').split(' ')
if scheme.lower() == 'bearer' and token == self.token:
# 认证成功,将用户信息添加到 scope 中
scope['user'] = {'username': 'authenticated_user'}
await self.app(scope, receive, send)
else:
# 认证失败,返回 401 Unauthorized
await self.send_unauthorized(send)
except ValueError:
await self.send_unauthorized(send)
else:
# 没有提供认证信息,返回 401 Unauthorized
await self.send_unauthorized(send)
else:
await self.app(scope, receive, send)
async def send_unauthorized(self, send):
await send({
'type': 'http.response.start',
'status': 401,
'headers': [
[b'content-type', b'text/plain'],
[b'www-authenticate', b'Bearer'],
]
})
await send({
'type': 'http.response.body',
'body': b'Unauthorized',
})
这个中间件首先检查协议类型是否为 HTTP。如果是,它会检查 HTTP 请求头中是否包含 Authorization 字段。如果包含,它会尝试提取 Token,并与预定义的 Token 进行比较。如果 Token 匹配,则认证成功,并将用户信息添加到 scope 中。如果 Token 不匹配或未提供认证信息,则返回 401 Unauthorized 响应。
注意:这只是一个简单的示例,实际的认证系统可能更复杂,例如使用 JWT 或 OAuth。
限速
限速是指限制客户端在一定时间内可以发送的请求数量。ASGI 中间件可以在协议层面上实现限速,例如通过使用 Redis 存储客户端的请求计数。
一个简单的基于 Redis 的限速中间件可能如下所示:
import asyncio
import redis
class RateLimitMiddleware:
def __init__(self, app, redis_host='localhost', redis_port=6379, limit=10, period=60):
self.app = app
self.redis_host = redis_host
self.redis_port = redis_port
self.limit = limit # 允许的请求数量
self.period = period # 时间窗口(秒)
self.redis = redis.Redis(host=self.redis_host, port=self.redis_port)
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
client_ip = scope['client'][0] # 获取客户端 IP 地址
key = f"rate_limit:{client_ip}"
# 检查是否超出限制
count = self.redis.get(key)
if count is not None and int(count) >= self.limit:
await self.send_too_many_requests(send)
return
# 增加请求计数
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, self.period) # 设置过期时间
pipe.execute()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
async def send_too_many_requests(self, send):
await send({
'type': 'http.response.start',
'status': 429,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Too Many Requests',
})
这个中间件首先检查协议类型是否为 HTTP。如果是,它会获取客户端的 IP 地址,并使用它作为 Redis 键。然后,它会检查 Redis 中是否已经存在该键,以及该键的值是否大于或等于限制值。如果超出限制,则返回 429 Too Many Requests 响应。如果没有超出限制,则增加 Redis 键的值,并设置过期时间。
这个中间件使用 Redis 来存储客户端的请求计数,并使用 IP 地址作为键。这样可以有效地限制每个客户端的请求数量。
中间件的组合
ASGI 中间件可以组合在一起,形成一个中间件链。这意味着一个中间件的输出可以作为另一个中间件的输入。
例如,可以将 BodyParserMiddleware、AuthenticationMiddleware 和 RateLimitMiddleware 组合在一起,形成一个完整的请求处理流程。
from asgiref.wsgi import WsgiToAsgi
from django.core.wsgi import get_wsgi_application
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
django_app = get_wsgi_application()
application = WsgiToAsgi(django_app)
application = RateLimitMiddleware(application)
application = AuthenticationMiddleware(application, token='your_secret_token')
application = BodyParserMiddleware(application)
在这个例子中,请求首先经过 BodyParserMiddleware 进行解析,然后经过 AuthenticationMiddleware 进行认证,最后经过 RateLimitMiddleware 进行限速。
错误处理
在 ASGI 中间件中,错误处理非常重要。如果中间件在处理请求时发生错误,应该捕获该错误,并返回一个合适的响应。
例如,如果 BodyParserMiddleware 在解析 JSON 数据时发生错误,应该返回一个 400 Bad Request 响应。
class BodyParserMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
body = b''
more_body = True
while more_body:
message = await receive()
body += message.get('body', b'')
more_body = message.get('more', False)
# 将请求体添加到 scope 中
scope['body'] = body
# 尝试解析 JSON
try:
scope['json'] = json.loads(body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
# 解析失败,返回 400 Bad Request
await self.send_bad_request(send, str(e))
return
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
async def send_bad_request(self, send, message):
await send({
'type': 'http.response.start',
'status': 400,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': message.encode('utf-8'),
})
在这个例子中,如果 json.loads() 抛出异常,则会捕获该异常,并调用 send_bad_request() 函数返回一个 400 Bad Request 响应。
总结
| 功能 | 描述 | 示例代码 |
|---|---|---|
| 请求解析 | 将原始请求数据转换为更易于处理的格式,例如解析 HTTP 请求体。 | python class BodyParserMiddleware: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope['type'] == 'http': body = b'' more_body = True while more_body: message = await receive() body += message.get('body', b'') more_body = message.get('more', False) # 将请求体添加到 scope 中 scope['body'] = body # 尝试解析 JSON try: scope['json'] = json.loads(body.decode('utf-8')) except (json.JSONDecodeError, UnicodeDecodeError): scope['json'] = None await self.app(scope, receive, send) else: await self.app(scope, receive, send) |
| 认证 | 验证用户的身份,例如通过检查 HTTP 请求头中的 Authorization 字段。 | python class AuthenticationMiddleware: def __init__(self, app, token): self.app = app self.token = token async def __call__(self, scope, receive, send): if scope['type'] == 'http': headers = dict(scope['headers']) auth_header = headers.get(b'authorization') if auth_header: try: scheme, token = auth_header.decode('utf-8').split(' ') if scheme.lower() == 'bearer' and token == self.token: # 认证成功,将用户信息添加到 scope 中 scope['user'] = {'username': 'authenticated_user'} await self.app(scope, receive, send) else: # 认证失败,返回 401 Unauthorized await self.send_unauthorized(send) except ValueError: await self.send_unauthorized(send) else: # 没有提供认证信息,返回 401 Unauthorized await self.send_unauthorized(send) else: await self.app(scope, receive, send) async def send_unauthorized(self, send): await send({ 'type': 'http.response.start', 'status': 401, 'headers': [ [b'content-type', b'text/plain'], [b'www-authenticate', b'Bearer'], ] }) await send({ 'type': 'http.response.body', 'body': b'Unauthorized', }) |
| 限速 | 限制客户端在一定时间内可以发送的请求数量。 | python import asyncio import redis class RateLimitMiddleware: def __init__(self, app, redis_host='localhost', redis_port=6379, limit=10, period=60): self.app = app self.redis_host = redis_host self.redis_port = redis_port self.limit = limit # 允许的请求数量 self.period = period # 时间窗口(秒) self.redis = redis.Redis(host=self.redis_host, port=self.redis_port) async def __call__(self, scope, receive, send): if scope['type'] == 'http': client_ip = scope['client'][0] # 获取客户端 IP 地址 key = f"rate_limit:{client_ip}" # 检查是否超出限制 count = self.redis.get(key) if count is not None and int(count) >= self.limit: await self.send_too_many_requests(send) return # 增加请求计数 pipe = self.redis.pipeline() pipe.incr(key) pipe.expire(key, self.period) # 设置过期时间 pipe.execute() await self.app(scope, receive, send) else: await self.app(scope, receive, send) async def send_too_many_requests(self, send): await send({ 'type': 'http.response.start', 'status': 429, 'headers': [ [b'content-type', b'text/plain'], ] }) await send({ 'type': 'http.response.body', 'body': b'Too Many Requests', }) |
通过使用 ASGI 中间件,我们可以方便地在协议层面上进行请求解析、认证和限速等操作。这使得我们可以构建更安全、更可靠、更易于维护的 Web 应用。
ASGI中间件应用的要点
ASGI 中间件是构建现代异步 Web 应用的重要工具,它允许我们在请求处理流程中插入自定义逻辑,实现各种功能。理解 ASGI 协议和中间件的工作原理,以及如何在协议层面上进行拦截,对于构建高性能、安全和可扩展的 Web 应用至关重要。掌握这些技术,能够更好地应对现代 Web 开发中的挑战。
更多IT精英技术系列讲座,到智猿学院