Python的`IO多路复用`:如何使用`select`、`poll`和`epoll`实现高并发的网络服务。

Python IO多路复用:构建高性能网络服务

大家好,今天我们来深入探讨Python中实现高并发网络服务的关键技术:IO多路复用。我们将重点讲解selectpollepoll这三种机制,通过理论结合实际代码示例,帮助大家理解它们的工作原理和适用场景。

1. IO多路复用的概念

在传统的阻塞IO模型中,一个线程只能处理一个连接。当连接上没有数据可读或无法写入时,线程会被阻塞,导致服务器并发能力受限。为了解决这个问题,IO多路复用应运而生。

IO多路复用允许一个线程同时监听多个文件描述符(file descriptor, fd)。当其中任何一个fd准备好进行IO操作(读或写)时,selectpollepoll会通知应用程序,从而避免了线程阻塞在等待IO上。

换句话说,IO多路复用允许一个线程同时处理多个socket连接,显著提高了服务器的并发能力。

2. select模块

select是Python中最早提供的IO多路复用机制,它通过select.select()函数来实现。

2.1 select.select()函数

select.select()函数接受三个列表作为输入:rlist(读列表)、wlist(写列表)和xlist(异常列表)。函数会阻塞,直到其中一个或多个fd准备好进行相应的IO操作。

import select
import socket

def select_example():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('localhost', 12345))
    server_socket.listen(5)
    server_socket.setblocking(False) # 设置为非阻塞

    inputs = [server_socket] # 监听server_socket的连接请求
    outputs = []

    print("Server listening on port 12345...")

    try:
        while inputs:
            readable, writable, exceptional = select.select(inputs, outputs, inputs)

            for s in readable:
                if s is server_socket:
                    # 有新的连接
                    connection, client_address = s.accept()
                    connection.setblocking(False)
                    inputs.append(connection)
                    print(f"New connection from {client_address}")
                else:
                    # 有数据可读
                    try:
                        data = s.recv(1024)
                        if data:
                            print(f"Received {data.decode()} from {s.getpeername()}")
                            # 将socket添加到writable列表,准备回显数据
                            if s not in outputs:
                                outputs.append(s)
                        else:
                            # 连接关闭
                            print(f"Closing connection from {s.getpeername()}")
                            if s in outputs:
                                outputs.remove(s)
                            inputs.remove(s)
                            s.close()
                    except ConnectionResetError:
                        print(f"Connection reset by peer: {s.getpeername()}")
                        if s in outputs:
                            outputs.remove(s)
                        inputs.remove(s)
                        s.close()

            for s in writable:
                # 可以写入数据
                try:
                    message = "Echo: " + s.recv(1024).decode() #读取并回显
                    s.sendall(message.encode()) #回显数据
                    outputs.remove(s) #从可写列表中移除,等待下次可读
                except BlockingIOError:
                    pass # 避免非阻塞socket下立即重试导致CPU占用过高
                except ConnectionResetError:
                    print(f"Connection reset by peer during write: {s.getpeername()}")
                    if s in outputs:
                        outputs.remove(s)
                    inputs.remove(s)
                    s.close()

            for s in exceptional:
                print(f"Exception condition on {s.getpeername()}")
                inputs.remove(s)
                if s in outputs:
                    outputs.remove(s)
                s.close()

    except KeyboardInterrupt:
        print("nShutting down server...")
    finally:
        for s in inputs:
            s.close()

if __name__ == "__main__":
    select_example()

代码解释:

  1. 初始化: 创建一个socket,绑定到地址和端口,并设置为非阻塞模式。
  2. 监听列表: inputs列表用于存储需要监听读事件的socket,初始时只包含server socket。
  3. 主循环: select.select()函数监听inputsoutputsinputs(异常)列表。
  4. 处理可读事件:
    • 如果可读的socket是server socket,则表示有新的连接请求,调用accept()接受连接,并将新的连接socket添加到inputs列表。
    • 如果可读的socket是已连接的socket,则尝试接收数据。如果接收到数据,则打印消息并将该socket添加到outputs列表,准备回显数据。如果接收到空数据,则表示连接已关闭,从inputsoutputs列表中移除该socket,并关闭连接。
  5. 处理可写事件:
    • 如果可写的socket在outputs列表中,则表示可以发送数据。尝试发送数据,如果发送成功,则从outputs列表中移除该socket,等待下次可读。如果发生BlockingIOError,则忽略,因为socket是非阻塞的。
  6. 处理异常事件:
    • 如果socket发生异常,则从inputsoutputs列表中移除该socket,并关闭连接。
  7. 关闭连接: 在程序退出时,关闭所有socket。

2.2 select的缺点

  • 最大连接数限制: select能监听的文件描述符的数量受系统限制,通常为1024。
  • 线性扫描: select需要遍历整个fd集合来确定哪个fd准备好,时间复杂度为O(n)。
  • 复制开销: 每次调用select都需要将fd集合从用户空间复制到内核空间,开销较大。
  • 缺乏边缘触发: 只能工作在水平触发模式。

2.3 水平触发和边缘触发

  • 水平触发 (Level Triggered, LT): 只要文件描述符可以非阻塞地执行 I/O 操作(例如,有数据可读,缓冲区可写),就会一直通知应用程序。
  • 边缘触发 (Edge Triggered, ET): 只有当文件描述符的状态发生变化时(例如,从不可读变为可读,从不可写变为可写),才会通知应用程序一次。如果应用程序没有立即处理该事件,后续就不会再收到通知,除非状态再次发生变化。

3. poll模块

poll是对select的改进,它使用poll对象来管理文件描述符,克服了select的一些限制。

3.1 poll.poll()函数

import select
import socket

def poll_example():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('localhost', 12345))
    server_socket.listen(5)
    server_socket.setblocking(False)

    poller = select.poll()
    poller.register(server_socket, select.POLLIN) # 监听可读事件

    connections = {server_socket.fileno(): server_socket} # 使用fd作为key

    print("Server listening on port 12345...")

    try:
        while True:
            events = poller.poll() # 阻塞,直到有事件发生

            for fd, event in events:
                s = connections[fd]

                if s is server_socket:
                    # 新的连接
                    connection, client_address = s.accept()
                    connection.setblocking(False)
                    poller.register(connection, select.POLLIN) # 监听新连接的可读事件
                    connections[connection.fileno()] = connection
                    print(f"New connection from {client_address}")
                elif event & select.POLLIN:
                    # 有数据可读
                    try:
                        data = s.recv(1024)
                        if data:
                            print(f"Received {data.decode()} from {s.getpeername()}")
                            poller.modify(s, select.POLLIN | select.POLLOUT) # 监听可写事件
                        else:
                            # 连接关闭
                            print(f"Closing connection from {s.getpeername()}")
                            poller.unregister(s)
                            s.close()
                            del connections[fd]
                    except ConnectionResetError:
                        print(f"Connection reset by peer: {s.getpeername()}")
                        poller.unregister(s)
                        s.close()
                        del connections[fd]
                elif event & select.POLLOUT:
                    # 可以写入数据
                    try:
                        message = "Echo: " + s.recv(1024).decode() # 读取并回显
                        s.sendall(message.encode())  # 回显数据
                        poller.modify(s, select.POLLIN)  # 监听可读事件,等待下次可读
                    except BlockingIOError:
                        pass # 避免非阻塞socket下立即重试导致CPU占用过高
                    except ConnectionResetError:
                         print(f"Connection reset by peer during write: {s.getpeername()}")
                         poller.unregister(s)
                         s.close()
                         del connections[fd]
                elif event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
                    print(f"Exception condition on {s.getpeername()}")
                    poller.unregister(s)
                    s.close()
                    del connections[fd]

    except KeyboardInterrupt:
        print("nShutting down server...")
    finally:
        for fd, s in connections.items():
            s.close()

if __name__ == "__main__":
    poll_example()

代码解释:

  1. 初始化: 创建一个socket,绑定到地址和端口,并设置为非阻塞模式。
  2. 创建poll对象: 创建一个select.poll()对象,用于管理文件描述符。
  3. 注册socket: 使用poller.register()函数注册server socket,并监听select.POLLIN事件(可读事件)。
  4. 主循环: poller.poll()函数监听注册的socket,返回一个事件列表。
  5. 处理事件:
    • 如果事件来自server socket,则表示有新的连接请求,调用accept()接受连接,并将新的连接socket注册到poller对象,监听select.POLLIN事件。
    • 如果事件是select.POLLIN,则表示socket可读,尝试接收数据。如果接收到数据,则打印消息,并使用poller.modify()函数修改socket监听的事件,添加select.POLLOUT事件(可写事件)。如果接收到空数据,则表示连接已关闭,使用poller.unregister()函数取消注册该socket,并关闭连接。
    • 如果事件是select.POLLOUT,则表示socket可写,尝试发送数据。如果发送成功,使用poller.modify()函数修改socket监听的事件,移除select.POLLOUT事件,等待下次可读。
    • 如果事件是select.POLLHUPselect.POLLERRselect.POLLNVAL,则表示socket发生异常,使用poller.unregister()函数取消注册该socket,并关闭连接。
  6. 关闭连接: 在程序退出时,关闭所有socket。

3.2 poll的优点

  • 突破最大连接数限制: poll使用链表存储fd,不再受系统最大文件描述符数量的限制。
  • 更高的效率: poll只需要复制发生事件的fd到用户空间,避免了复制整个fd集合。

3.3 poll的缺点

  • 线性扫描: poll仍然需要遍历整个fd集合来确定哪个fd准备好,时间复杂度为O(n)。
  • 缺乏边缘触发: 只能工作在水平触发模式。

4. epoll模块

epoll是Linux特有的IO多路复用机制,是目前最高效的IO多路复用技术。它通过内核中的红黑树和就绪链表来管理文件描述符,实现了O(1)的时间复杂度。

4.1 epoll.epoll()函数

import select
import socket

def epoll_example():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('localhost', 12345))
    server_socket.listen(5)
    server_socket.setblocking(False)

    epoll = select.epoll()
    epoll.register(server_socket.fileno(), select.EPOLLIN) # 监听可读事件

    connections = {} # 使用fd作为key

    print("Server listening on port 12345...")

    try:
        while True:
            events = epoll.poll() # 阻塞,直到有事件发生

            for fd, event in events:
                if fd == server_socket.fileno():
                    # 新的连接
                    connection, client_address = server_socket.accept()
                    connection.setblocking(False)
                    epoll.register(connection.fileno(), select.EPOLLIN) # 监听新连接的可读事件
                    connections[connection.fileno()] = connection
                    print(f"New connection from {client_address}")
                elif event & select.EPOLLIN:
                    # 有数据可读
                    try:
                        data = connections[fd].recv(1024)
                        if data:
                            print(f"Received {data.decode()} from {connections[fd].getpeername()}")
                            epoll.modify(fd, select.EPOLLIN | select.EPOLLOUT) # 监听可写事件
                        else:
                            # 连接关闭
                            print(f"Closing connection from {connections[fd].getpeername()}")
                            epoll.unregister(fd)
                            connections[fd].close()
                            del connections[fd]
                    except ConnectionResetError:
                        print(f"Connection reset by peer: {connections[fd].getpeername()}")
                        epoll.unregister(fd)
                        connections[fd].close()
                        del connections[fd]
                elif event & select.EPOLLOUT:
                    # 可以写入数据
                    try:
                        message = "Echo: " + connections[fd].recv(1024).decode() # 读取并回显
                        connections[fd].sendall(message.encode())  # 回显数据
                        epoll.modify(fd, select.EPOLLIN)  # 监听可读事件,等待下次可读
                    except BlockingIOError:
                        pass # 避免非阻塞socket下立即重试导致CPU占用过高
                    except ConnectionResetError:
                        print(f"Connection reset by peer during write: {connections[fd].getpeername()}")
                        epoll.unregister(fd)
                        connections[fd].close()
                        del connections[fd]

                elif event & (select.EPOLLHUP | select.EPOLLERR):
                    print(f"Exception condition on {connections[fd].getpeername()}")
                    epoll.unregister(fd)
                    connections[fd].close()
                    del connections[fd]

    except KeyboardInterrupt:
        print("nShutting down server...")
    finally:
        for fd, s in connections.items():
            s.close()

if __name__ == "__main__":
    epoll_example()

代码解释:

  1. 初始化: 创建一个socket,绑定到地址和端口,并设置为非阻塞模式。
  2. 创建epoll对象: 创建一个select.epoll()对象,用于管理文件描述符。
  3. 注册socket: 使用epoll.register()函数注册server socket,并监听select.EPOLLIN事件(可读事件)。
  4. 主循环: epoll.poll()函数监听注册的socket,返回一个事件列表。
  5. 处理事件:
    • 如果事件来自server socket,则表示有新的连接请求,调用accept()接受连接,并将新的连接socket注册到epoll对象,监听select.EPOLLIN事件。
    • 如果事件是select.EPOLLIN,则表示socket可读,尝试接收数据。如果接收到数据,则打印消息,并使用epoll.modify()函数修改socket监听的事件,添加select.EPOLLOUT事件(可写事件)。如果接收到空数据,则表示连接已关闭,使用epoll.unregister()函数取消注册该socket,并关闭连接。
    • 如果事件是select.EPOLLOUT,则表示socket可写,尝试发送数据。如果发送成功,使用epoll.modify()函数修改socket监听的事件,移除select.EPOLLOUT事件,等待下次可读。
    • 如果事件是select.EPOLLHUPselect.EPOLLERR,则表示socket发生异常,使用epoll.unregister()函数取消注册该socket,并关闭连接。
  6. 关闭连接: 在程序退出时,关闭所有socket。

4.2 epoll的优点

  • 高效率: epoll使用红黑树和就绪链表,只需要O(1)的时间复杂度即可确定哪个fd准备好。
  • 支持边缘触发: epoll支持边缘触发模式,可以减少不必要的事件通知,提高效率。
  • 无最大连接数限制: epoll可以监听大量的文件描述符。

4.3 epoll的模式:LT和ET

  • 水平触发 (Level Triggered, LT): 只要文件描述符可以非阻塞地执行 I/O 操作(例如,有数据可读,缓冲区可写),就会一直通知应用程序。如果没有一次性把数据读完,下次还会通知。
  • 边缘触发 (Edge Triggered, ET): 只有当文件描述符的状态发生变化时(例如,从不可读变为可读,从不可写变为可写),才会通知应用程序一次。如果应用程序没有立即处理该事件,后续就不会再收到通知,除非状态再次发生变化。 使用ET模式需要确保一次性读取所有数据,否则可能导致数据丢失。通常与非阻塞socket一起使用,配合循环读取,直到EAGAINEWOULDBLOCK错误发生。

代码示例(边缘触发模式):

import select
import socket

def epoll_et_example():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('localhost', 12345))
    server_socket.listen(5)
    server_socket.setblocking(False)

    epoll = select.epoll()
    epoll.register(server_socket.fileno(), select.EPOLLIN | select.EPOLLET) # 监听可读事件,使用边缘触发

    connections = {} # 使用fd作为key

    print("Server listening on port 12345...")

    try:
        while True:
            events = epoll.poll() # 阻塞,直到有事件发生

            for fd, event in events:
                if fd == server_socket.fileno():
                    # 新的连接
                    connection, client_address = server_socket.accept()
                    connection.setblocking(False)
                    epoll.register(connection.fileno(), select.EPOLLIN | select.EPOLLET) # 监听新连接的可读事件,使用边缘触发
                    connections[connection.fileno()] = connection
                    print(f"New connection from {client_address}")
                elif event & select.EPOLLIN:
                    # 有数据可读
                    try:
                        while True: # 循环读取,直到没有数据可读
                            data = connections[fd].recv(1024)
                            if data:
                                print(f"Received {data.decode()} from {connections[fd].getpeername()}")
                                # 处理数据
                            else:
                                # 连接关闭或者没有更多数据
                                if data == b"":
                                    print(f"Closing connection from {connections[fd].getpeername()}")
                                    epoll.unregister(fd)
                                    connections[fd].close()
                                    del connections[fd]
                                break  # 退出循环读取
                    except (BlockingIOError, ConnectionResetError) as e:
                        if isinstance(e, BlockingIOError):
                            # 没有更多数据可读
                            pass
                        elif isinstance(e, ConnectionResetError):
                             print(f"Connection reset by peer: {connections[fd].getpeername()}")
                             epoll.unregister(fd)
                             connections[fd].close()
                             del connections[fd]

    except KeyboardInterrupt:
        print("nShutting down server...")
    finally:
        for fd, s in connections.items():
            s.close()

if __name__ == "__main__":
    epoll_et_example()

代码解释:

  1. 注册socket: 使用epoll.register()函数注册server socket,并监听select.EPOLLIN | select.EPOLLET事件,即监听可读事件,并使用边缘触发模式。
  2. 循环读取: 在处理可读事件时,使用while True循环读取数据,直到recv()函数返回空数据或者抛出BlockingIOError异常。
  3. 错误处理: 捕获BlockingIOError异常,表示没有更多数据可读,退出循环。

5. selectpollepoll的比较

特性 select poll epoll
最大连接数限制
时间复杂度 O(n) O(n) O(1)
数据复制 复制整个fd集合 复制事件fd 只复制事件fd
触发模式 水平触发 水平触发 水平/边缘触发
平台兼容性 良好 良好 Linux特有

6. 如何选择合适的IO多路复用机制

  • 并发量较低: 如果并发量较低,selectpoll都可以满足需求。
  • 并发量较高: 如果并发量较高,建议使用epoll,可以获得更好的性能。
  • 平台兼容性: 如果需要跨平台,selectpoll是更好的选择。
  • Linux平台: 如果只在Linux平台上运行,epoll是最佳选择。

7. 总结

IO多路复用是构建高性能网络服务的关键技术。selectpollepoll各有优缺点,需要根据实际情况选择合适的机制。epoll在性能方面具有显著优势,尤其是在高并发场景下。掌握这些技术,可以帮助我们构建出更加高效、稳定的网络应用。

8. 快速回顾

我们讨论了IO多路复用的概念,介绍了selectpollepoll的使用方法,并比较了它们的优缺点和适用场景。通过代码示例,我们了解了如何在Python中实现这些IO多路复用机制。理解并正确应用IO多路复用技术,能显著提高网络服务的并发能力和性能。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注