各位观众老爷,晚上好!我是今晚的讲师,咱们今天聊聊Python asyncio
里的网络编程,特别是asyncio.open_connection()
和 asyncio.start_server()
这两位重量级选手。
话说 asyncio
可是 Python 处理并发的利器,有了它,你就能写出高性能的网络应用,让你的程序像开了挂一样。 我们今天就来扒一扒这两个函数的用法,保证你听完之后,也能用 asyncio
写出漂亮的服务器和客户端。
第一幕:asyncio
的世界观
在深入到具体的函数之前,咱们先来简单回顾一下 asyncio
的核心概念。asyncio
基于事件循环 (event loop),它允许你并发地执行多个任务,而无需使用多线程或多进程。 这种并发是通过协程 (coroutines) 实现的。
- 事件循环 (Event Loop): 整个
asyncio
程序的心脏,负责调度和执行协程。你可以把它想象成一个任务管理器,不断地轮询等待执行的任务,并按照一定的顺序执行。 - 协程 (Coroutine): 一种特殊的函数,可以在执行过程中暂停,稍后恢复执行。你可以用
async
关键字定义一个协程。 await
关键字: 只能在协程中使用,用于暂停协程的执行,等待另一个协程完成。await
会将程序的控制权交还给事件循环,让它可以执行其他的协程。- Task: 事件循环调度执行的基本单位。 可以用
asyncio.create_task()
把一个协程包装成一个 Task。
简单来说,asyncio
的工作方式就是:你定义一堆协程,然后把它们交给事件循环,事件循环会负责并发地执行这些协程。当一个协程遇到 await
时,它就会暂停执行,把控制权交还给事件循环,让事件循环去执行其他的协程,直到 await
等待的操作完成,协程才会恢复执行。
第二幕:客户端的秘密武器:asyncio.open_connection()
asyncio.open_connection()
函数是 asyncio
客户端编程的核心。它可以建立一个 TCP 连接,并返回一对 StreamReader
和 StreamWriter
对象,用于读取和写入数据。
它的基本用法如下:
import asyncio
async def main():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
print(f'Connected to the server')
message = "Hello, server!".encode()
writer.write(message)
await writer.drain() # 刷新缓冲区
data = await reader.read(100) # 读取最多100字节
print(f'Received: {data.decode()}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(main())
这段代码做了什么呢?
asyncio.open_connection('127.0.0.1', 8888)
: 连接到本地主机 (127.0.0.1) 的 8888 端口。 这个函数会返回两个对象:reader
和writer
。reader
(StreamReader): 用于从连接中读取数据。writer
(StreamWriter): 用于向连接中写入数据。writer.write(message)
: 将消息 ( "Hello, server!" ) 编码成字节串,并写入到连接中。await writer.drain()
: 刷新写入缓冲区。这个函数很重要,它会等待所有的数据都被发送出去。如果没有它,数据可能会被缓冲起来,导致服务器收不到完整的数据。await reader.read(100)
: 从连接中读取最多 100 个字节的数据。writer.close()
: 关闭连接。await writer.wait_closed()
: 等待连接真正关闭。
asyncio.open_connection()
的参数:
参数 | 类型 | 描述 |
---|---|---|
host |
str |
服务器的主机名或 IP 地址。 |
port |
int |
服务器的端口号。 |
ssl |
SSLContext |
如果需要使用 SSL/TLS 加密连接,可以传入一个 ssl.SSLContext 对象。 |
family |
socket.AddressFamily |
地址族,例如 socket.AF_INET (IPv4) 或 socket.AF_INET6 (IPv6)。 |
proto |
int |
协议,例如 socket.IPPROTO_TCP 。 |
flags |
int |
用于控制套接字行为的标志。 |
server_hostname |
str |
如果使用了 SSL/TLS,并且需要验证服务器的证书,需要设置这个参数。 |
local_addr |
tuple |
本地地址,一个包含主机名和端口号的元组。 |
limit |
int |
StreamReader 内部缓冲区的最大大小,默认值是 65536 字节。 |
happy_eyeballs_delay |
float |
(Python 3.8+) 如果支持 Happy Eyeballs, 在尝试下一个地址之前等待的时间,以秒为单位。 Happy Eyeballs 是一种优化技术,可以更快地建立连接,特别是当客户端同时支持 IPv4 和 IPv6 时。 |
start_tls |
bool |
(Python 3.7+) 如果设置为 True ,将在连接建立后立即启动 TLS 握手。 |
一个更复杂的例子:同时发送和接收数据
import asyncio
async def client():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
try:
while True:
message = input("Enter message to send (or 'exit'): ")
if message.lower() == 'exit':
break
message_bytes = message.encode()
writer.write(message_bytes)
await writer.drain()
print(f"Sent: {message}")
data = await reader.read(100)
if not data:
print("Server closed the connection.")
break
print(f"Received: {data.decode()}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
writer.close()
await writer.wait_closed()
print("Connection closed.")
asyncio.run(client())
这个例子允许你和服务器进行交互式的通信。 你可以输入消息,然后发送给服务器,并接收服务器的响应。
第三幕:服务器的基石:asyncio.start_server()
asyncio.start_server()
函数是构建 asyncio
服务器的核心。 它创建一个 TCP 服务器,监听指定的地址和端口,并为每个客户端连接创建一个新的协程来处理请求。
它的基本用法如下:
import asyncio
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"Accepted connection from {addr}")
while True:
data = await reader.read(100)
if not data:
break
message = data.decode()
print(f"Received {message} from {addr}")
response = f"Hello, {message}!".encode()
writer.write(response)
await writer.drain()
print(f"Closed connection from {addr}")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server: # 优雅地关闭server
await server.serve_forever()
asyncio.run(main())
这段代码创建了一个简单的 TCP 服务器,它:
asyncio.start_server(handle_client, '127.0.0.1', 8888)
: 在本地主机 (127.0.0.1) 的 8888 端口上启动服务器。handle_client
是一个回调函数,当有新的客户端连接时,事件循环会调用这个函数来处理连接。handle_client(reader, writer)
: 这个函数负责处理客户端的连接。 它接收两个参数:reader
和writer
,分别用于从连接中读取数据和向连接中写入数据。server.serve_forever()
: 让服务器一直运行,等待新的客户端连接。writer.get_extra_info('peername')
: 获取客户端的地址信息。
asyncio.start_server()
的参数:
参数 | 类型 | 描述 |
---|---|---|
client_connected_cb |
callable |
当有新的客户端连接时,事件循环会调用这个回调函数。 这个函数接收两个参数: StreamReader 和 StreamWriter 。 |
host |
str or sequence[str] |
服务器监听的主机名或 IP 地址。 可以是一个字符串,也可以是一个字符串序列。 如果是一个字符串序列,服务器会在所有的地址上监听。 如果 host 是空字符串或 None ,服务器会监听所有的 IPv4 和 IPv6 地址 (如果支持)。 |
port |
int |
服务器监听的端口号。 如果 port 是 0 ,服务器会随机选择一个可用的端口。 |
family |
socket.AddressFamily |
地址族,例如 socket.AF_INET (IPv4) 或 socket.AF_INET6 (IPv6)。 |
proto |
int |
协议,例如 socket.IPPROTO_TCP 。 |
ssl |
SSLContext |
如果需要使用 SSL/TLS 加密连接,可以传入一个 ssl.SSLContext 对象。 |
backlog |
int |
监听队列的最大长度。 当服务器接收到新的连接请求时,会将请求放入监听队列中。 如果监听队列满了,服务器会拒绝新的连接请求。 默认值是 100。 |
reuse_address |
bool |
是否允许重用地址。 如果设置为 True ,即使有其他程序在使用相同的地址和端口,服务器也可以启动。 默认值取决于平台。 |
reuse_port |
bool |
(Python 3.7+) 是否允许重用端口。 如果设置为 True ,多个服务器可以同时监听同一个端口。 这个选项通常用于负载均衡。 默认值取决于平台。 |
start_serving |
bool |
(Python 3.8+) 是否在创建服务器后立即开始服务。 如果设置为 False ,你需要手动调用 server.start_serving() 方法来启动服务器。 默认值是 True 。 |
limit |
int |
StreamReader 内部缓冲区的最大大小,默认值是 65536 字节。 |
happy_eyeballs_delay |
float |
(Python 3.8+) 如果支持 Happy Eyeballs, 在尝试下一个地址之前等待的时间,以秒为单位。 Happy Eyeballs 是一种优化技术,可以更快地建立连接,特别是当客户端同时支持 IPv4 和 IPv6 时。 |
一个更复杂的例子:处理多个客户端连接
上面的例子只能处理一个客户端连接。 如果想让服务器同时处理多个客户端连接,asyncio
会自动帮你搞定。 你只需要确保 handle_client
函数是一个协程,并且不会阻塞事件循环。
import asyncio
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"Accepted connection from {addr}")
try:
while True:
data = await reader.read(1024) # 更大的缓冲区
if not data:
break
message = data.decode()
print(f"Received '{message}' from {addr}")
response = f"Echo: {message}".encode()
writer.write(response)
await writer.drain()
print(f"Sent 'Echo: {message}' to {addr}")
except Exception as e:
print(f"An error occurred while handling client {addr}: {e}")
finally:
print(f"Closing connection from {addr}")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
这个例子使用了一个 while
循环来持续地从客户端读取数据,并向客户端发送响应。 由于 reader.read()
是一个 awaitable
对象,所以 handle_client
函数不会阻塞事件循环,服务器可以同时处理多个客户端连接。 同时,我们用try...except...finally
块包围了客户端处理逻辑,确保即使客户端出现错误,服务器也能正常关闭连接,避免资源泄露。
第四幕:实战演练:一个简单的聊天室
现在,让我们把 asyncio.open_connection()
和 asyncio.start_server()
结合起来,创建一个简单的聊天室。
服务器端:
import asyncio
clients = [] # 保存所有客户端的 writer
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"New connection from {addr}")
clients.append(writer)
try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode()
print(f"Received message from {addr}: {message}")
# 向所有客户端广播消息
for client in clients:
if client != writer: # 不发给自己
try:
client.write(f"{addr[0]}:{addr[1]}: {message}".encode())
await client.drain()
except Exception as e:
print(f"Error sending message to client: {e}")
clients.remove(client) # 移除失效的客户端
except Exception as e:
print(f"Error handling client {addr}: {e}")
finally:
print(f"Connection closed from {addr}")
clients.remove(writer)
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
客户端端:
import asyncio
async def client():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
async def receive_messages():
try:
while True:
data = await reader.read(1024)
if not data:
print("Server disconnected.")
break
print(data.decode())
except Exception as e:
print(f"Error receiving messages: {e}")
finally:
writer.close()
await writer.wait_closed()
asyncio.create_task(receive_messages()) # 创建一个Task来接收消息
try:
while True:
message = input("> ")
writer.write(message.encode())
await writer.drain()
if message.lower() == "exit":
break
except Exception as e:
print(f"Error sending message: {e}")
finally:
writer.close()
await writer.wait_closed()
asyncio.run(client())
这个聊天室服务器:
- 维护一个
clients
列表,保存所有连接的客户端的writer
对象。 - 当有新的客户端连接时,将它的
writer
对象添加到clients
列表中。 - 当收到客户端的消息时,将消息广播给所有其他的客户端。
- 当客户端断开连接时,从
clients
列表中移除它的writer
对象。
客户端:
- 使用
asyncio.open_connection()
连接到服务器。 - 创建一个独立的
Task
来接收服务器发送的消息。 - 允许用户输入消息,并将消息发送给服务器。
- 当用户输入 "exit" 时,断开连接。
第五幕:总结与升华
今天,我们一起学习了 asyncio
中 asyncio.open_connection()
和 asyncio.start_server()
的用法。 你现在应该能够使用这两个函数来构建简单的 asyncio
客户端和服务器应用程序了。
记住,asyncio
是一个强大的工具,可以让你构建高性能的网络应用程序。 但是,它也需要你理解事件循环、协程和 await
关键字等概念。 多练习,多实践,你就能掌握 asyncio
,写出高效、可扩展的网络应用。
希望这次讲座对你有所帮助! 下次再见!