Python的`Websockets`:如何使用`websockets`库构建`Websockets`服务。

Python websockets 库构建 WebSocket 服务

大家好,今天我们来深入探讨如何使用 Python 的 websockets 库构建 WebSocket 服务。WebSocket 是一种在单个 TCP 连接上提供全双工通信协议。这意味着一旦连接建立,客户端和服务器可以同时互相发送数据,而无需像传统的 HTTP 请求那样每次都建立新的连接。这使得 WebSocket 非常适合需要实时数据更新的应用,例如在线游戏、股票交易平台、聊天应用等。

1. websockets 库简介

websockets 是一个为 Python 3.7+ 设计的库,它专注于提供一个简单、高效、可靠的 WebSocket 实现。它基于 asyncio 库,因此是异步的,能够处理大量的并发连接,而不会阻塞主线程。它支持 WebSocket 协议的所有标准特性,包括:

  • 文本和二进制消息: 可以发送和接收文本数据(UTF-8 编码)和二进制数据。
  • 帧: 数据被分成帧进行传输,允许在单个连接上交错发送多个消息。
  • 扩展: 允许添加额外的功能,例如压缩。
  • 关闭握手: 提供优雅地关闭连接的机制。

2. 安装 websockets

首先,我们需要安装 websockets 库。可以使用 pip:

pip install websockets

3. 构建一个简单的 WebSocket 服务

让我们从一个最简单的 WebSocket 服务开始。这个服务将接收客户端发送的消息,并将消息原封不动地返回给客户端。

import asyncio
import websockets

async def echo(websocket, path):
    try:
        async for message in websocket:
            print(f"Received: {message}")
            await websocket.send(message)
            print(f"Sent: {message}")
    except websockets.exceptions.ConnectionClosed as e:
        print(f"Connection closed: {e}")

async def main():
    async with websockets.serve(echo, "localhost", 8765):
        print("WebSocket server started at ws://localhost:8765")
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

代码解释:

  • async def echo(websocket, path):: 这是一个异步函数,它处理每个 WebSocket 连接。websocket 参数是 websockets.WebSocketServerProtocol 的实例,代表与客户端的连接。path 参数是客户端请求的路径。
  • async for message in websocket:: 这是一个异步迭代器,它从 WebSocket 连接接收消息。每当客户端发送消息时,该迭代器就会产生一个新的消息。
  • await websocket.send(message): 这是一个异步函数,它将消息发送回客户端。
  • websockets.serve(echo, "localhost", 8765): 创建一个 WebSocket 服务器,它将监听 localhost 的 8765 端口,并使用 echo 函数处理每个连接。
  • asyncio.Future(): 创建一个永不完成的future,保持服务运行。

运行服务:

将代码保存为 server.py,然后在终端中运行:

python server.py

现在,WebSocket 服务器已经在 ws://localhost:8765 上运行。

4. 构建一个简单的 WebSocket 客户端

为了测试我们的 WebSocket 服务,我们需要一个客户端。下面是一个简单的 Python 客户端:

import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        name = input("What's your name? ")

        await websocket.send(name)
        print(f">>> {name}")

        greeting = await websocket.recv()
        print(f"<<< {greeting}")

if __name__ == "__main__":
    asyncio.run(hello())

代码解释:

  • async with websockets.connect(uri) as websocket:: 建立与 WebSocket 服务器的连接。uri 参数是服务器的地址。
  • await websocket.send(name): 将消息发送到服务器。
  • await websocket.recv(): 从服务器接收消息。

运行客户端:

将代码保存为 client.py,然后在终端中运行:

python client.py

客户端会提示你输入你的名字,然后将你的名字发送到服务器。服务器会将你的名字返回给客户端,客户端会将服务器返回的消息打印到终端。

5. 错误处理

在实际应用中,我们需要处理各种可能出现的错误。websockets 库提供了一些异常类,可以用来处理这些错误。

  • websockets.exceptions.ConnectionClosedError: 当连接意外关闭时引发。
  • websockets.exceptions.ConnectionClosedOK: 当连接正常关闭时引发。
  • websockets.exceptions.InvalidHandshake: 当握手失败时引发。
  • websockets.exceptions.WebSocketException: 所有 websockets 库引发的异常的基类。

在上面的 echo 函数中,我们已经使用了 try...except 块来捕获 websockets.exceptions.ConnectionClosed 异常。这是处理 WebSocket 连接关闭的常用方法。

6. 发送和接收不同类型的数据

WebSocket 可以发送和接收文本数据和二进制数据。

发送文本数据:

await websocket.send("Hello, world!")

发送二进制数据:

await websocket.send(b'x00x01x02x03')

接收文本数据:

message = await websocket.recv()
if isinstance(message, str):
    print(f"Received text: {message}")

接收二进制数据:

message = await websocket.recv()
if isinstance(message, bytes):
    print(f"Received bytes: {message}")

7. 更高级的用法

7.1. 处理多个客户端

websockets 库基于 asyncio,因此可以轻松地处理多个客户端。 websockets.serve 会为每个新的客户端连接创建一个新的 echo 协程实例。

import asyncio
import websockets

async def handler(websocket):
    while True:
        try:
            message = await websocket.recv()
            print(f"Received message from {websocket.remote_address}: {message}")
            await websocket.send(f"Server received: {message}")
        except websockets.ConnectionClosed:
            print(f"Connection with {websocket.remote_address} closed.")
            break

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("WebSocket server started at ws://localhost:8765")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

在这个例子中,handler 函数处理每个客户端连接。它使用一个无限循环来接收来自客户端的消息,并将消息打印到控制台。如果连接关闭,循环将中断。 websocket.remote_address 提供了客户端的地址信息。

7.2. 使用路径

可以使用不同的路径来处理不同的类型的请求。

import asyncio
import websockets

async def echo(websocket, path):
    if path == "/echo":
        try:
            async for message in websocket:
                print(f"Received from echo: {message}")
                await websocket.send(message)
        except websockets.ConnectionClosed:
            print("Echo connection closed.")
    else:
        print(f"Unknown path: {path}")

async def main():
    async with websockets.serve(echo, "localhost", 8765):
        print("WebSocket server started at ws://localhost:8765")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

在这个例子中,只有当客户端请求 /echo 路径时,echo 函数才会处理连接。

客户端连接时需要指定路径:

import asyncio
import websockets

async def connect_to_echo():
    uri = "ws://localhost:8765/echo"
    async with websockets.connect(uri) as websocket:
        await websocket.send("Hello from echo client!")
        response = await websocket.recv()
        print(f"Received: {response}")

async def main():
    await connect_to_echo()

if __name__ == "__main__":
    asyncio.run(main())

7.3. 使用子协议

WebSocket 协议允许使用子协议来定义应用程序特定的协议。

import asyncio
import websockets

async def handler(websocket, path):
    if websocket.subprotocol == "chat":
        try:
            async for message in websocket:
                print(f"Received from chat: {message}")
                await websocket.send(f"Chat server received: {message}")
        except websockets.ConnectionClosed:
            print("Chat connection closed.")
    else:
        print(f"Unknown subprotocol: {websocket.subprotocol}")

async def main():
    async with websockets.serve(handler, "localhost", 8765, subprotocols=["chat"]):
        print("WebSocket server started at ws://localhost:8765")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

在这个例子中,服务器只接受使用 chat 子协议的连接。客户端需要在连接时指定子协议:

import asyncio
import websockets

async def connect_to_chat():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri, subprotocols=["chat"]) as websocket:
        await websocket.send("Hello from chat client!")
        response = await websocket.recv()
        print(f"Received: {response}")

async def main():
    await connect_to_chat()

if __name__ == "__main__":
    asyncio.run(main())

7.4. 使用扩展

WebSocket 协议允许使用扩展来添加额外的功能,例如压缩。

虽然 websockets 库本身没有内置的扩展实现,但可以使用第三方库来实现扩展。 例如,可以使用 permessage-deflate 扩展来压缩 WebSocket 消息。

# 这个例子只是说明概念,permessage-deflate 通常需要客户端和服务器都支持并配置正确。
# 实际部署需要额外的配置和依赖安装.  这里只展示服务端部分。

import asyncio
import websockets
# 假设已经安装了 permessage-deflate 相关库,例如 aiohttp-compression
# pip install aiohttp-compression

#  请注意,websockets 库本身不直接支持配置 permessage-deflate 扩展,
#  通常需要在更高层次的框架中使用,如 aiohttp。  以下代码是为了演示概念,
#  可能无法直接运行。  更完整的示例需要结合 aiohttp 等框架。

async def handler(websocket, path):
    try:
        async for message in websocket:
            print(f"Received: {message}")
            await websocket.send(message)
    except websockets.ConnectionClosed:
        print("Connection closed.")

async def main():
    # 在实际应用中,需要在创建 server 时配置 permessage-deflate 扩展。
    # 这通常需要在 aiohttp 等框架中完成。
    async with websockets.serve(handler, "localhost", 8765): # , extensions=[permessage_deflate.ServerExtension()])
        print("WebSocket server started at ws://localhost:8765")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

重要提示: permessage-deflate 扩展的配置通常需要在更高级别的框架(例如 aiohttp)中完成,而不是直接在 websockets.serve 中配置。 上面的代码只是为了说明概念。

8. 完整示例:简单的聊天应用

下面是一个更完整的示例,它实现了一个简单的聊天应用。

Server (chat_server.py):

import asyncio
import websockets

connected_clients = set()

async def chat_handler(websocket, path):
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            for client in connected_clients:
                if client != websocket:
                    try:
                        await client.send(f"User: {message}")
                    except websockets.ConnectionClosed:
                        print(f"Could not send message to client, removing.")
                        connected_clients.remove(client)

    except websockets.ConnectionClosedError:
        print("Client disconnected abruptly.")
    except websockets.ConnectionClosedOK:
        print("Client disconnected gracefully.")
    finally:
        connected_clients.remove(websocket)

async def main():
    async with websockets.serve(chat_handler, "localhost", 8765):
        print("Chat server started at ws://localhost:8765")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

Client (chat_client.py):

import asyncio
import websockets

async def chat_client():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        print("Connected to chat server.  Type your messages.")
        try:
            while True:
                message = input()
                await websocket.send(message)
                incoming = await websocket.recv()
                print(f"Received: {incoming}")

        except websockets.ConnectionClosed:
            print("Connection to server closed.")

async def main():
    await chat_client()

if __name__ == "__main__":
    asyncio.run(main())

Explanation:

  • Server: The server maintains a set of connected clients (connected_clients). When a new client connects, it’s added to the set. When a client sends a message, the server iterates through the set and sends the message to all other connected clients. Error handling is included to remove clients that disconnect unexpectedly.
  • Client: The client connects to the server and enters a loop where it prompts the user for input, sends the input to the server, and receives messages from the server.

9. WebSocket 协议细节

Header Field Description
Upgrade Specifies the "websocket" protocol.
Connection Specifies "Upgrade", indicating that the client wants to upgrade the connection.
Sec-WebSocket-Key A base64-encoded random value generated by the client. The server uses this to generate a response key to prevent caching proxies from interfering with the WebSocket handshake.
Sec-WebSocket-Version Specifies the WebSocket protocol version being used (e.g., "13").
Sec-WebSocket-Protocol (optional) Specifies one or more subprotocols the client supports. The server selects one of these subprotocols and includes it in the Sec-WebSocket-Protocol response header if it supports any of them.
Sec-WebSocket-Extensions (optional) Specifies one or more extensions the client supports. Similar to subprotocols, the server selects the extensions it supports and includes them in the Sec-WebSocket-Extensions response header.

10. 常见问题和调试技巧

  • 连接问题: 确保服务器正在运行,并且客户端使用的地址和端口是正确的。检查防火墙设置,确保端口未被阻止。
  • 消息未收到: 检查客户端和服务器是否都正确地发送和接收消息。使用 print 语句来调试消息的发送和接收过程。
  • 握手失败: 检查客户端和服务器是否都支持相同的 WebSocket 协议版本。确保 Sec-WebSocket-KeySec-WebSocket-Accept 头正确生成。
  • 异步问题: websockets 库是异步的,因此需要正确地使用 asyncawait 关键字。确保所有的异步操作都正确地等待。

11. 选择合适的工具和框架

websockets 库本身是一个底层的库,提供了 WebSocket 协议的基本实现。 对于更复杂的应用,你可能需要使用更高级别的框架,例如:

  • FastAPI: 一个现代、快速(高性能)的 Web 框架,用于构建 API。 它集成了 WebSocket 支持,可以方便地构建 WebSocket 服务。
  • Django Channels: 为 Django 框架添加了 WebSocket 支持。
  • Tornado: 一个 Python Web 框架和异步网络库,特别适合处理高并发连接。
  • aiohttp: 一个异步 HTTP 客户端/服务器框架,可以与 websockets 库结合使用。

选择哪个框架取决于你的具体需求和偏好。

总结和下一步

今天我们学习了如何使用 Python 的 websockets 库构建 WebSocket 服务。 我们从一个简单的 echo 服务开始,然后学习了如何处理多个客户端、发送和接收不同类型的数据、以及处理错误。 最后,我们实现了一个简单的聊天应用。

学习了基础的 WebSocket 服务端和客户端构建,还需要进一步学习安全问题,例如身份验证,授权以及数据加密。 此外,大规模的 WebSocket 服务还需要考虑负载均衡和连接管理。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注