Python websockets
库构建 WebSocket 服务
大家好,今天我们来深入探讨如何使用 Python 的 websockets
库构建 WebSocket 服务。WebSocket 是一种在单个 TCP 连接上提供全双工通信协议。这意味着一旦连接建立,客户端和服务器可以同时互相发送数据,而无需像传统的 HTTP 请求那样每次都建立新的连接。这使得 WebSocket 非常适合需要实时数据更新的应用,例如在线游戏、股票交易平台、聊天应用等。
1. websockets
库简介
websockets
是一个为 Python 3.7+ 设计的库,它专注于提供一个简单、高效、可靠的 WebSocket 实现。它基于 asyncio
库,因此是异步的,能够处理大量的并发连接,而不会阻塞主线程。它支持 WebSocket 协议的所有标准特性,包括:
- 文本和二进制消息: 可以发送和接收文本数据(UTF-8 编码)和二进制数据。
- 帧: 数据被分成帧进行传输,允许在单个连接上交错发送多个消息。
- 扩展: 允许添加额外的功能,例如压缩。
- 关闭握手: 提供优雅地关闭连接的机制。
2. 安装 websockets
库
首先,我们需要安装 websockets
库。可以使用 pip:
pip install websockets
3. 构建一个简单的 WebSocket 服务
让我们从一个最简单的 WebSocket 服务开始。这个服务将接收客户端发送的消息,并将消息原封不动地返回给客户端。
import asyncio
import websockets
async def echo(websocket, path):
try:
async for message in websocket:
print(f"Received: {message}")
await websocket.send(message)
print(f"Sent: {message}")
except websockets.exceptions.ConnectionClosed as e:
print(f"Connection closed: {e}")
async def main():
async with websockets.serve(echo, "localhost", 8765):
print("WebSocket server started at ws://localhost:8765")
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())
代码解释:
async def echo(websocket, path):
: 这是一个异步函数,它处理每个 WebSocket 连接。websocket
参数是websockets.WebSocketServerProtocol
的实例,代表与客户端的连接。path
参数是客户端请求的路径。async for message in websocket:
: 这是一个异步迭代器,它从 WebSocket 连接接收消息。每当客户端发送消息时,该迭代器就会产生一个新的消息。await websocket.send(message)
: 这是一个异步函数,它将消息发送回客户端。websockets.serve(echo, "localhost", 8765)
: 创建一个 WebSocket 服务器,它将监听localhost
的 8765 端口,并使用echo
函数处理每个连接。asyncio.Future()
: 创建一个永不完成的future,保持服务运行。
运行服务:
将代码保存为 server.py
,然后在终端中运行:
python server.py
现在,WebSocket 服务器已经在 ws://localhost:8765
上运行。
4. 构建一个简单的 WebSocket 客户端
为了测试我们的 WebSocket 服务,我们需要一个客户端。下面是一个简单的 Python 客户端:
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
name = input("What's your name? ")
await websocket.send(name)
print(f">>> {name}")
greeting = await websocket.recv()
print(f"<<< {greeting}")
if __name__ == "__main__":
asyncio.run(hello())
代码解释:
async with websockets.connect(uri) as websocket:
: 建立与 WebSocket 服务器的连接。uri
参数是服务器的地址。await websocket.send(name)
: 将消息发送到服务器。await websocket.recv()
: 从服务器接收消息。
运行客户端:
将代码保存为 client.py
,然后在终端中运行:
python client.py
客户端会提示你输入你的名字,然后将你的名字发送到服务器。服务器会将你的名字返回给客户端,客户端会将服务器返回的消息打印到终端。
5. 错误处理
在实际应用中,我们需要处理各种可能出现的错误。websockets
库提供了一些异常类,可以用来处理这些错误。
websockets.exceptions.ConnectionClosedError
: 当连接意外关闭时引发。websockets.exceptions.ConnectionClosedOK
: 当连接正常关闭时引发。websockets.exceptions.InvalidHandshake
: 当握手失败时引发。websockets.exceptions.WebSocketException
: 所有websockets
库引发的异常的基类。
在上面的 echo
函数中,我们已经使用了 try...except
块来捕获 websockets.exceptions.ConnectionClosed
异常。这是处理 WebSocket 连接关闭的常用方法。
6. 发送和接收不同类型的数据
WebSocket 可以发送和接收文本数据和二进制数据。
发送文本数据:
await websocket.send("Hello, world!")
发送二进制数据:
await websocket.send(b'x00x01x02x03')
接收文本数据:
message = await websocket.recv()
if isinstance(message, str):
print(f"Received text: {message}")
接收二进制数据:
message = await websocket.recv()
if isinstance(message, bytes):
print(f"Received bytes: {message}")
7. 更高级的用法
7.1. 处理多个客户端
websockets
库基于 asyncio
,因此可以轻松地处理多个客户端。 websockets.serve
会为每个新的客户端连接创建一个新的 echo
协程实例。
import asyncio
import websockets
async def handler(websocket):
while True:
try:
message = await websocket.recv()
print(f"Received message from {websocket.remote_address}: {message}")
await websocket.send(f"Server received: {message}")
except websockets.ConnectionClosed:
print(f"Connection with {websocket.remote_address} closed.")
break
async def main():
async with websockets.serve(handler, "localhost", 8765):
print("WebSocket server started at ws://localhost:8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
在这个例子中,handler
函数处理每个客户端连接。它使用一个无限循环来接收来自客户端的消息,并将消息打印到控制台。如果连接关闭,循环将中断。 websocket.remote_address
提供了客户端的地址信息。
7.2. 使用路径
可以使用不同的路径来处理不同的类型的请求。
import asyncio
import websockets
async def echo(websocket, path):
if path == "/echo":
try:
async for message in websocket:
print(f"Received from echo: {message}")
await websocket.send(message)
except websockets.ConnectionClosed:
print("Echo connection closed.")
else:
print(f"Unknown path: {path}")
async def main():
async with websockets.serve(echo, "localhost", 8765):
print("WebSocket server started at ws://localhost:8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
在这个例子中,只有当客户端请求 /echo
路径时,echo
函数才会处理连接。
客户端连接时需要指定路径:
import asyncio
import websockets
async def connect_to_echo():
uri = "ws://localhost:8765/echo"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello from echo client!")
response = await websocket.recv()
print(f"Received: {response}")
async def main():
await connect_to_echo()
if __name__ == "__main__":
asyncio.run(main())
7.3. 使用子协议
WebSocket 协议允许使用子协议来定义应用程序特定的协议。
import asyncio
import websockets
async def handler(websocket, path):
if websocket.subprotocol == "chat":
try:
async for message in websocket:
print(f"Received from chat: {message}")
await websocket.send(f"Chat server received: {message}")
except websockets.ConnectionClosed:
print("Chat connection closed.")
else:
print(f"Unknown subprotocol: {websocket.subprotocol}")
async def main():
async with websockets.serve(handler, "localhost", 8765, subprotocols=["chat"]):
print("WebSocket server started at ws://localhost:8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
在这个例子中,服务器只接受使用 chat
子协议的连接。客户端需要在连接时指定子协议:
import asyncio
import websockets
async def connect_to_chat():
uri = "ws://localhost:8765"
async with websockets.connect(uri, subprotocols=["chat"]) as websocket:
await websocket.send("Hello from chat client!")
response = await websocket.recv()
print(f"Received: {response}")
async def main():
await connect_to_chat()
if __name__ == "__main__":
asyncio.run(main())
7.4. 使用扩展
WebSocket 协议允许使用扩展来添加额外的功能,例如压缩。
虽然 websockets
库本身没有内置的扩展实现,但可以使用第三方库来实现扩展。 例如,可以使用 permessage-deflate
扩展来压缩 WebSocket 消息。
# 这个例子只是说明概念,permessage-deflate 通常需要客户端和服务器都支持并配置正确。
# 实际部署需要额外的配置和依赖安装. 这里只展示服务端部分。
import asyncio
import websockets
# 假设已经安装了 permessage-deflate 相关库,例如 aiohttp-compression
# pip install aiohttp-compression
# 请注意,websockets 库本身不直接支持配置 permessage-deflate 扩展,
# 通常需要在更高层次的框架中使用,如 aiohttp。 以下代码是为了演示概念,
# 可能无法直接运行。 更完整的示例需要结合 aiohttp 等框架。
async def handler(websocket, path):
try:
async for message in websocket:
print(f"Received: {message}")
await websocket.send(message)
except websockets.ConnectionClosed:
print("Connection closed.")
async def main():
# 在实际应用中,需要在创建 server 时配置 permessage-deflate 扩展。
# 这通常需要在 aiohttp 等框架中完成。
async with websockets.serve(handler, "localhost", 8765): # , extensions=[permessage_deflate.ServerExtension()])
print("WebSocket server started at ws://localhost:8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
重要提示: permessage-deflate
扩展的配置通常需要在更高级别的框架(例如 aiohttp
)中完成,而不是直接在 websockets.serve
中配置。 上面的代码只是为了说明概念。
8. 完整示例:简单的聊天应用
下面是一个更完整的示例,它实现了一个简单的聊天应用。
Server (chat_server.py):
import asyncio
import websockets
connected_clients = set()
async def chat_handler(websocket, path):
connected_clients.add(websocket)
try:
async for message in websocket:
for client in connected_clients:
if client != websocket:
try:
await client.send(f"User: {message}")
except websockets.ConnectionClosed:
print(f"Could not send message to client, removing.")
connected_clients.remove(client)
except websockets.ConnectionClosedError:
print("Client disconnected abruptly.")
except websockets.ConnectionClosedOK:
print("Client disconnected gracefully.")
finally:
connected_clients.remove(websocket)
async def main():
async with websockets.serve(chat_handler, "localhost", 8765):
print("Chat server started at ws://localhost:8765")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
Client (chat_client.py):
import asyncio
import websockets
async def chat_client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
print("Connected to chat server. Type your messages.")
try:
while True:
message = input()
await websocket.send(message)
incoming = await websocket.recv()
print(f"Received: {incoming}")
except websockets.ConnectionClosed:
print("Connection to server closed.")
async def main():
await chat_client()
if __name__ == "__main__":
asyncio.run(main())
Explanation:
- Server: The server maintains a set of connected clients (
connected_clients
). When a new client connects, it’s added to the set. When a client sends a message, the server iterates through the set and sends the message to all other connected clients. Error handling is included to remove clients that disconnect unexpectedly. - Client: The client connects to the server and enters a loop where it prompts the user for input, sends the input to the server, and receives messages from the server.
9. WebSocket 协议细节
Header Field | Description |
---|---|
Upgrade |
Specifies the "websocket" protocol. |
Connection |
Specifies "Upgrade", indicating that the client wants to upgrade the connection. |
Sec-WebSocket-Key |
A base64-encoded random value generated by the client. The server uses this to generate a response key to prevent caching proxies from interfering with the WebSocket handshake. |
Sec-WebSocket-Version |
Specifies the WebSocket protocol version being used (e.g., "13"). |
Sec-WebSocket-Protocol (optional) |
Specifies one or more subprotocols the client supports. The server selects one of these subprotocols and includes it in the Sec-WebSocket-Protocol response header if it supports any of them. |
Sec-WebSocket-Extensions (optional) |
Specifies one or more extensions the client supports. Similar to subprotocols, the server selects the extensions it supports and includes them in the Sec-WebSocket-Extensions response header. |
10. 常见问题和调试技巧
- 连接问题: 确保服务器正在运行,并且客户端使用的地址和端口是正确的。检查防火墙设置,确保端口未被阻止。
- 消息未收到: 检查客户端和服务器是否都正确地发送和接收消息。使用
print
语句来调试消息的发送和接收过程。 - 握手失败: 检查客户端和服务器是否都支持相同的 WebSocket 协议版本。确保
Sec-WebSocket-Key
和Sec-WebSocket-Accept
头正确生成。 - 异步问题:
websockets
库是异步的,因此需要正确地使用async
和await
关键字。确保所有的异步操作都正确地等待。
11. 选择合适的工具和框架
websockets
库本身是一个底层的库,提供了 WebSocket 协议的基本实现。 对于更复杂的应用,你可能需要使用更高级别的框架,例如:
- FastAPI: 一个现代、快速(高性能)的 Web 框架,用于构建 API。 它集成了 WebSocket 支持,可以方便地构建 WebSocket 服务。
- Django Channels: 为 Django 框架添加了 WebSocket 支持。
- Tornado: 一个 Python Web 框架和异步网络库,特别适合处理高并发连接。
- aiohttp: 一个异步 HTTP 客户端/服务器框架,可以与
websockets
库结合使用。
选择哪个框架取决于你的具体需求和偏好。
总结和下一步
今天我们学习了如何使用 Python 的 websockets
库构建 WebSocket 服务。 我们从一个简单的 echo
服务开始,然后学习了如何处理多个客户端、发送和接收不同类型的数据、以及处理错误。 最后,我们实现了一个简单的聊天应用。
学习了基础的 WebSocket 服务端和客户端构建,还需要进一步学习安全问题,例如身份验证,授权以及数据加密。 此外,大规模的 WebSocket 服务还需要考虑负载均衡和连接管理。