Python高级技术之:`asyncio`的网络编程:`asyncio.open_connection()`和`asyncio.start_server()`的用法。

各位观众老爷,晚上好!我是今晚的讲师,咱们今天聊聊Python asyncio 里的网络编程,特别是asyncio.open_connection()asyncio.start_server() 这两位重量级选手。

话说 asyncio 可是 Python 处理并发的利器,有了它,你就能写出高性能的网络应用,让你的程序像开了挂一样。 我们今天就来扒一扒这两个函数的用法,保证你听完之后,也能用 asyncio 写出漂亮的服务器和客户端。

第一幕:asyncio 的世界观

在深入到具体的函数之前,咱们先来简单回顾一下 asyncio 的核心概念。asyncio 基于事件循环 (event loop),它允许你并发地执行多个任务,而无需使用多线程或多进程。 这种并发是通过协程 (coroutines) 实现的。

  • 事件循环 (Event Loop): 整个 asyncio 程序的心脏,负责调度和执行协程。你可以把它想象成一个任务管理器,不断地轮询等待执行的任务,并按照一定的顺序执行。
  • 协程 (Coroutine): 一种特殊的函数,可以在执行过程中暂停,稍后恢复执行。你可以用 async 关键字定义一个协程。
  • await 关键字: 只能在协程中使用,用于暂停协程的执行,等待另一个协程完成。await 会将程序的控制权交还给事件循环,让它可以执行其他的协程。
  • Task: 事件循环调度执行的基本单位。 可以用 asyncio.create_task() 把一个协程包装成一个 Task。

简单来说,asyncio 的工作方式就是:你定义一堆协程,然后把它们交给事件循环,事件循环会负责并发地执行这些协程。当一个协程遇到 await 时,它就会暂停执行,把控制权交还给事件循环,让事件循环去执行其他的协程,直到 await 等待的操作完成,协程才会恢复执行。

第二幕:客户端的秘密武器:asyncio.open_connection()

asyncio.open_connection() 函数是 asyncio 客户端编程的核心。它可以建立一个 TCP 连接,并返回一对 StreamReaderStreamWriter 对象,用于读取和写入数据。

它的基本用法如下:

import asyncio

async def main():
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    print(f'Connected to the server')

    message = "Hello, server!".encode()
    writer.write(message)
    await writer.drain() # 刷新缓冲区

    data = await reader.read(100) # 读取最多100字节
    print(f'Received: {data.decode()}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(main())

这段代码做了什么呢?

  1. asyncio.open_connection('127.0.0.1', 8888): 连接到本地主机 (127.0.0.1) 的 8888 端口。 这个函数会返回两个对象:readerwriter
  2. reader (StreamReader): 用于从连接中读取数据。
  3. writer (StreamWriter): 用于向连接中写入数据。
  4. writer.write(message): 将消息 ( "Hello, server!" ) 编码成字节串,并写入到连接中。
  5. await writer.drain(): 刷新写入缓冲区。这个函数很重要,它会等待所有的数据都被发送出去。如果没有它,数据可能会被缓冲起来,导致服务器收不到完整的数据。
  6. await reader.read(100): 从连接中读取最多 100 个字节的数据。
  7. writer.close(): 关闭连接。
  8. await writer.wait_closed(): 等待连接真正关闭。

asyncio.open_connection() 的参数:

参数 类型 描述
host str 服务器的主机名或 IP 地址。
port int 服务器的端口号。
ssl SSLContext 如果需要使用 SSL/TLS 加密连接,可以传入一个 ssl.SSLContext 对象。
family socket.AddressFamily 地址族,例如 socket.AF_INET (IPv4) 或 socket.AF_INET6 (IPv6)。
proto int 协议,例如 socket.IPPROTO_TCP
flags int 用于控制套接字行为的标志。
server_hostname str 如果使用了 SSL/TLS,并且需要验证服务器的证书,需要设置这个参数。
local_addr tuple 本地地址,一个包含主机名和端口号的元组。
limit int StreamReader 内部缓冲区的最大大小,默认值是 65536 字节。
happy_eyeballs_delay float (Python 3.8+) 如果支持 Happy Eyeballs, 在尝试下一个地址之前等待的时间,以秒为单位。 Happy Eyeballs 是一种优化技术,可以更快地建立连接,特别是当客户端同时支持 IPv4 和 IPv6 时。
start_tls bool (Python 3.7+) 如果设置为 True,将在连接建立后立即启动 TLS 握手。

一个更复杂的例子:同时发送和接收数据

import asyncio

async def client():
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    try:
        while True:
            message = input("Enter message to send (or 'exit'): ")
            if message.lower() == 'exit':
                break

            message_bytes = message.encode()
            writer.write(message_bytes)
            await writer.drain()

            print(f"Sent: {message}")

            data = await reader.read(100)
            if not data:
                print("Server closed the connection.")
                break

            print(f"Received: {data.decode()}")

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        writer.close()
        await writer.wait_closed()
        print("Connection closed.")

asyncio.run(client())

这个例子允许你和服务器进行交互式的通信。 你可以输入消息,然后发送给服务器,并接收服务器的响应。

第三幕:服务器的基石:asyncio.start_server()

asyncio.start_server() 函数是构建 asyncio 服务器的核心。 它创建一个 TCP 服务器,监听指定的地址和端口,并为每个客户端连接创建一个新的协程来处理请求。

它的基本用法如下:

import asyncio

async def handle_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"Accepted connection from {addr}")

    while True:
        data = await reader.read(100)
        if not data:
            break

        message = data.decode()
        print(f"Received {message} from {addr}")

        response = f"Hello, {message}!".encode()
        writer.write(response)
        await writer.drain()

    print(f"Closed connection from {addr}")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:  # 优雅地关闭server
        await server.serve_forever()

asyncio.run(main())

这段代码创建了一个简单的 TCP 服务器,它:

  1. asyncio.start_server(handle_client, '127.0.0.1', 8888): 在本地主机 (127.0.0.1) 的 8888 端口上启动服务器。 handle_client 是一个回调函数,当有新的客户端连接时,事件循环会调用这个函数来处理连接。
  2. handle_client(reader, writer): 这个函数负责处理客户端的连接。 它接收两个参数:readerwriter,分别用于从连接中读取数据和向连接中写入数据。
  3. server.serve_forever(): 让服务器一直运行,等待新的客户端连接。
  4. writer.get_extra_info('peername'): 获取客户端的地址信息。

asyncio.start_server() 的参数:

参数 类型 描述
client_connected_cb callable 当有新的客户端连接时,事件循环会调用这个回调函数。 这个函数接收两个参数: StreamReaderStreamWriter
host str or sequence[str] 服务器监听的主机名或 IP 地址。 可以是一个字符串,也可以是一个字符串序列。 如果是一个字符串序列,服务器会在所有的地址上监听。 如果 host 是空字符串或 None,服务器会监听所有的 IPv4 和 IPv6 地址 (如果支持)。
port int 服务器监听的端口号。 如果 port 是 0,服务器会随机选择一个可用的端口。
family socket.AddressFamily 地址族,例如 socket.AF_INET (IPv4) 或 socket.AF_INET6 (IPv6)。
proto int 协议,例如 socket.IPPROTO_TCP
ssl SSLContext 如果需要使用 SSL/TLS 加密连接,可以传入一个 ssl.SSLContext 对象。
backlog int 监听队列的最大长度。 当服务器接收到新的连接请求时,会将请求放入监听队列中。 如果监听队列满了,服务器会拒绝新的连接请求。 默认值是 100。
reuse_address bool 是否允许重用地址。 如果设置为 True,即使有其他程序在使用相同的地址和端口,服务器也可以启动。 默认值取决于平台。
reuse_port bool (Python 3.7+) 是否允许重用端口。 如果设置为 True,多个服务器可以同时监听同一个端口。 这个选项通常用于负载均衡。 默认值取决于平台。
start_serving bool (Python 3.8+) 是否在创建服务器后立即开始服务。 如果设置为 False,你需要手动调用 server.start_serving() 方法来启动服务器。 默认值是 True
limit int StreamReader 内部缓冲区的最大大小,默认值是 65536 字节。
happy_eyeballs_delay float (Python 3.8+) 如果支持 Happy Eyeballs, 在尝试下一个地址之前等待的时间,以秒为单位。 Happy Eyeballs 是一种优化技术,可以更快地建立连接,特别是当客户端同时支持 IPv4 和 IPv6 时。

一个更复杂的例子:处理多个客户端连接

上面的例子只能处理一个客户端连接。 如果想让服务器同时处理多个客户端连接,asyncio 会自动帮你搞定。 你只需要确保 handle_client 函数是一个协程,并且不会阻塞事件循环。

import asyncio

async def handle_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"Accepted connection from {addr}")

    try:
        while True:
            data = await reader.read(1024) # 更大的缓冲区
            if not data:
                break

            message = data.decode()
            print(f"Received '{message}' from {addr}")

            response = f"Echo: {message}".encode()
            writer.write(response)
            await writer.drain()
            print(f"Sent 'Echo: {message}' to {addr}")

    except Exception as e:
        print(f"An error occurred while handling client {addr}: {e}")
    finally:
        print(f"Closing connection from {addr}")
        writer.close()
        await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

这个例子使用了一个 while 循环来持续地从客户端读取数据,并向客户端发送响应。 由于 reader.read() 是一个 awaitable 对象,所以 handle_client 函数不会阻塞事件循环,服务器可以同时处理多个客户端连接。 同时,我们用try...except...finally块包围了客户端处理逻辑,确保即使客户端出现错误,服务器也能正常关闭连接,避免资源泄露。

第四幕:实战演练:一个简单的聊天室

现在,让我们把 asyncio.open_connection()asyncio.start_server() 结合起来,创建一个简单的聊天室。

服务器端:

import asyncio

clients = []  # 保存所有客户端的 writer

async def handle_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"New connection from {addr}")

    clients.append(writer)

    try:
        while True:
            data = await reader.read(1024)
            if not data:
                break

            message = data.decode()
            print(f"Received message from {addr}: {message}")

            # 向所有客户端广播消息
            for client in clients:
                if client != writer: # 不发给自己
                    try:
                        client.write(f"{addr[0]}:{addr[1]}: {message}".encode())
                        await client.drain()
                    except Exception as e:
                        print(f"Error sending message to client: {e}")
                        clients.remove(client) # 移除失效的客户端

    except Exception as e:
        print(f"Error handling client {addr}: {e}")
    finally:
        print(f"Connection closed from {addr}")
        clients.remove(writer)
        writer.close()
        await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

客户端端:

import asyncio

async def client():
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    async def receive_messages():
        try:
            while True:
                data = await reader.read(1024)
                if not data:
                    print("Server disconnected.")
                    break
                print(data.decode())
        except Exception as e:
            print(f"Error receiving messages: {e}")
        finally:
            writer.close()
            await writer.wait_closed()

    asyncio.create_task(receive_messages()) # 创建一个Task来接收消息

    try:
        while True:
            message = input("> ")
            writer.write(message.encode())
            await writer.drain()

            if message.lower() == "exit":
                break
    except Exception as e:
        print(f"Error sending message: {e}")
    finally:
        writer.close()
        await writer.wait_closed()

asyncio.run(client())

这个聊天室服务器:

  1. 维护一个 clients 列表,保存所有连接的客户端的 writer 对象。
  2. 当有新的客户端连接时,将它的 writer 对象添加到 clients 列表中。
  3. 当收到客户端的消息时,将消息广播给所有其他的客户端。
  4. 当客户端断开连接时,从 clients 列表中移除它的 writer 对象。

客户端:

  1. 使用 asyncio.open_connection() 连接到服务器。
  2. 创建一个独立的 Task 来接收服务器发送的消息。
  3. 允许用户输入消息,并将消息发送给服务器。
  4. 当用户输入 "exit" 时,断开连接。

第五幕:总结与升华

今天,我们一起学习了 asyncioasyncio.open_connection()asyncio.start_server() 的用法。 你现在应该能够使用这两个函数来构建简单的 asyncio 客户端和服务器应用程序了。

记住,asyncio 是一个强大的工具,可以让你构建高性能的网络应用程序。 但是,它也需要你理解事件循环、协程和 await 关键字等概念。 多练习,多实践,你就能掌握 asyncio,写出高效、可扩展的网络应用。

希望这次讲座对你有所帮助! 下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注