C++实现高性能异步网络编程:利用操作系统epoll/kqueue实现I/O多路复用

C++ 高性能异步网络编程:Epoll/Kqueue I/O 多路复用

各位朋友,大家好!今天我们来聊聊 C++ 中实现高性能异步网络编程的关键技术:利用操作系统提供的 Epoll (Linux) 或 Kqueue (BSD/macOS) 进行 I/O 多路复用。 I/O 多路复用是构建高并发网络服务的基础,它允许单个线程同时监视多个文件描述符(Sockets),并在其中任何一个准备好进行读写操作时通知应用程序。

1. 同步阻塞 I/O 的瓶颈

传统的同步阻塞 I/O 模型在处理大量并发连接时会遇到显著的瓶颈。每个连接通常需要一个独立的线程来处理。当连接数量增加时,线程创建和管理的开销变得巨大,同时频繁的上下文切换也会消耗大量的 CPU 资源。 想象一个 Web 服务器,每个请求都需要一个线程阻塞等待数据,在高并发场景下,系统资源很快就会耗尽。

2. I/O 多路复用:解决方案

I/O 多路复用通过允许单个线程同时监视多个文件描述符来解决这个问题。当某个文件描述符准备好进行 I/O 操作时,操作系统会通知应用程序,然后应用程序可以对该文件描述符执行相应的操作。 这样,单个线程就可以处理多个并发连接,显著提高了服务器的吞吐量和响应速度。 简单来说,一个线程可以同时监听多个 Socket,哪个 Socket 有数据可读,就处理哪个。

3. Epoll 和 Kqueue 简介

Epoll (Linux) 和 Kqueue (BSD/macOS) 是操作系统提供的两种高效的 I/O 多路复用机制。 它们都提供了类似的接口,允许应用程序注册感兴趣的文件描述符和事件,并在事件发生时收到通知。

  • Epoll (Linux): Epoll 使用基于事件驱动的方式,只有在文件描述符状态发生变化时才会通知应用程序。 它使用红黑树来管理文件描述符,因此添加、删除和查找操作的时间复杂度为 O(log N),其中 N 是被监视的文件描述符的数量。 Epoll 支持边缘触发 (ET) 和水平触发 (LT) 两种模式。

  • Kqueue (BSD/macOS): Kqueue 是一个更通用的事件通知机制,不仅可以用于 I/O,还可以用于文件系统事件、信号等。 Kqueue 使用内核事件队列来管理事件,并提供了丰富的事件过滤和处理选项。 Kqueue 通常被认为比 Epoll 更灵活和强大。

4. Epoll 的使用

下面我们通过一个简单的例子来说明如何在 C++ 中使用 Epoll 实现一个简单的 TCP 服务器。

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>

const int MAX_EVENTS = 10;
const int PORT = 8080;
const int BUFFER_SIZE = 1024;

int main() {
    int server_fd, new_socket, epoll_fd;
    struct sockaddr_in address;
    socklen_t addrlen = sizeof(address);
    struct epoll_event event, events[MAX_EVENTS];
    char buffer[BUFFER_SIZE];

    // 1. 创建 socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("Socket creation failed");
        return 1;
    }

    // 2. 设置 socket 选项,允许地址重用
    int opt = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("Setsockopt failed");
        return 1;
    }

    // 3. 绑定 socket 到指定地址和端口
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        return 1;
    }

    // 4. 监听连接
    if (listen(server_fd, 3) < 0) {
        perror("Listen failed");
        return 1;
    }

    // 5. 创建 epoll 实例
    if ((epoll_fd = epoll_create1(0)) == -1) {
        perror("Epoll creation failed");
        return 1;
    }

    // 6. 将 server_fd 添加到 epoll 监听列表
    event.events = EPOLLIN; // 监听读事件
    event.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
        perror("Epoll control failed");
        return 1;
    }

    std::cout << "Server listening on port " << PORT << std::endl;

    // 7. 事件循环
    while (true) {
        // 等待事件发生
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("Epoll wait failed");
            return 1;
        }

        // 处理发生的事件
        for (int i = 0; i < nfds; ++i) {
            if (events[i].data.fd == server_fd) {
                // 新连接到来
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, &addrlen)) < 0) {
                    perror("Accept failed");
                    continue; // 继续监听,不退出
                }

                // 设置非阻塞
                int flags = fcntl(new_socket, F_GETFL, 0);
                fcntl(new_socket, F_SETFL, flags | O_NONBLOCK);

                // 将新连接添加到 epoll 监听列表
                event.events = EPOLLIN | EPOLLET; // 边缘触发模式
                event.data.fd = new_socket;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {
                    perror("Epoll control failed");
                    close(new_socket); // 关闭socket
                    continue; // 继续监听,不退出
                }

                std::cout << "New connection accepted" << std::endl;
            } else {
                // 处理已连接的 socket 上的数据
                int client_fd = events[i].data.fd;
                memset(buffer, 0, BUFFER_SIZE);
                ssize_t bytes_received = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);

                if (bytes_received > 0) {
                    std::cout << "Received: " << buffer << std::endl;

                    // 回显数据
                    send(client_fd, buffer, bytes_received, 0);
                } else if (bytes_received == 0) {
                    // 连接关闭
                    std::cout << "Connection closed" << std::endl;
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);
                    close(client_fd);
                } else {
                    // 错误处理
                    if (errno != EAGAIN && errno != EWOULDBLOCK) {
                        perror("Recv failed");
                        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);
                        close(client_fd);
                    }
                }
            }
        }
    }

    close(server_fd);
    close(epoll_fd);

    return 0;
}

这个例子展示了如何使用 Epoll 创建一个简单的 TCP 服务器。 关键步骤包括:

  1. 创建 Socket: 使用 socket() 函数创建一个 socket。
  2. 绑定地址和端口: 使用 bind() 函数将 socket 绑定到指定的地址和端口。
  3. 监听连接: 使用 listen() 函数开始监听连接。
  4. 创建 Epoll 实例: 使用 epoll_create1() 函数创建一个 Epoll 实例。
  5. 将 Server Socket 添加到 Epoll 监听列表: 使用 epoll_ctl() 函数将 server socket 添加到 Epoll 监听列表,监听读事件 (EPOLLIN)。
  6. 事件循环:
    • 使用 epoll_wait() 函数等待事件发生。
    • 当有新连接到来时 (server socket 上的 EPOLLIN 事件),使用 accept() 函数接受连接,并将新 socket 添加到 Epoll 监听列表。 注意:要将新socket设置为非阻塞模式。
    • 当已连接的 socket 上有数据可读时 (client socket 上的 EPOLLIN 事件),使用 recv() 函数接收数据,并进行处理。
    • 当连接关闭时,从 Epoll 监听列表中删除该 socket,并关闭 socket。

5. Kqueue 的使用

Kqueue 的使用方式与 Epoll 类似,但有一些细微的差别。 下面是一个使用 Kqueue 实现的类似 TCP 服务器的例子:

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/event.h>
#include <sys/time.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>

const int MAX_EVENTS = 10;
const int PORT = 8080;
const int BUFFER_SIZE = 1024;

int main() {
    int server_fd, new_socket, kq;
    struct sockaddr_in address;
    socklen_t addrlen = sizeof(address);
    struct kevent events[MAX_EVENTS], change;
    char buffer[BUFFER_SIZE];

    // 1. 创建 socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("Socket creation failed");
        return 1;
    }

    // 2. 设置 socket 选项,允许地址重用
    int opt = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("Setsockopt failed");
        return 1;
    }

    // 3. 绑定 socket 到指定地址和端口
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        return 1;
    }

    // 4. 监听连接
    if (listen(server_fd, 3) < 0) {
        perror("Listen failed");
        return 1;
    }

    // 5. 创建 kqueue 实例
    if ((kq = kqueue()) == -1) {
        perror("Kqueue creation failed");
        return 1;
    }

    // 6. 将 server_fd 添加到 kqueue 监听列表
    EV_SET(&change, server_fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
    if (kevent(kq, &change, 1, NULL, 0, NULL) == -1) {
        perror("Kevent control failed");
        return 1;
    }

    std::cout << "Server listening on port " << PORT << std::endl;

    // 7. 事件循环
    while (true) {
        // 等待事件发生
        int nev = kevent(kq, NULL, 0, events, MAX_EVENTS, NULL);
        if (nev == -1) {
            perror("Kevent wait failed");
            return 1;
        }

        // 处理发生的事件
        for (int i = 0; i < nev; ++i) {
            if (events[i].ident == server_fd) {
                // 新连接到来
                if ((new_socket = accept(server_fd, (struct sockaddr *)&address, &addrlen)) < 0) {
                    perror("Accept failed");
                    continue;
                }

                // 设置非阻塞
                int flags = fcntl(new_socket, F_GETFL, 0);
                fcntl(new_socket, F_SETFL, flags | O_NONBLOCK);

                // 将新连接添加到 kqueue 监听列表
                EV_SET(&change, new_socket, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL); //EV_ONESHOT只触发一次
                if (kevent(kq, &change, 1, NULL, 0, NULL) == -1) {
                    perror("Kevent control failed");
                    close(new_socket);
                    continue;
                }

                std::cout << "New connection accepted" << std::endl;
            } else if (events[i].filter == EVFILT_READ) {
                // 处理已连接的 socket 上的数据
                int client_fd = events[i].ident;
                memset(buffer, 0, BUFFER_SIZE);
                ssize_t bytes_received = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);

                if (bytes_received > 0) {
                    std::cout << "Received: " << buffer << std::endl;

                    // 回显数据
                    send(client_fd, buffer, bytes_received, 0);

                    //重新注册读事件
                    EV_SET(&change, client_fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, NULL);
                    if (kevent(kq, &change, 1, NULL, 0, NULL) == -1) {
                        perror("Kevent control failed");
                        close(client_fd);
                        continue;
                    }

                } else if (bytes_received == 0) {
                    // 连接关闭
                    std::cout << "Connection closed" << std::endl;
                    EV_SET(&change, client_fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                    kevent(kq, &change, 1, NULL, 0, NULL);
                    close(client_fd);
                } else {
                    // 错误处理
                    if (errno != EAGAIN && errno != EWOULDBLOCK) {
                        perror("Recv failed");
                        EV_SET(&change, client_fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                         kevent(kq, &change, 1, NULL, 0, NULL);
                        close(client_fd);
                    }
                }
            }
        }
    }

    close(server_fd);
    close(kq);

    return 0;
}

Kqueue 的关键步骤包括:

  1. 创建 Socket, 绑定,监听:与 Epoll 例子相同。
  2. 创建 Kqueue 实例: 使用 kqueue() 函数创建一个 Kqueue 实例。
  3. 将 Server Socket 添加到 Kqueue 监听列表: 使用 EV_SET 宏初始化一个 kevent 结构体,然后使用 kevent() 函数将 server socket 添加到 Kqueue 监听列表,监听读事件 (EVFILT_READ)。
  4. 事件循环:
    • 使用 kevent() 函数等待事件发生。
    • 当有新连接到来时 (server socket 上的 EVFILT_READ 事件),使用 accept() 函数接受连接,并将新 socket 添加到 Kqueue 监听列表。同样需要设置非阻塞。
    • 当已连接的 socket 上有数据可读时 (client socket 上的 EVFILT_READ 事件),使用 recv() 函数接收数据,并进行处理。
    • 当连接关闭时,从 Kqueue 监听列表中删除该 socket,并关闭 socket。

6. Epoll 和 Kqueue 的比较

Epoll 和 Kqueue 都是高性能的 I/O 多路复用机制,但它们之间也存在一些差异。

特性 Epoll Kqueue
操作系统 Linux BSD/macOS
事件类型 主要用于 I/O 事件 支持 I/O、文件系统、信号等多种事件
触发模式 支持边缘触发 (ET) 和水平触发 (LT) 边缘触发 (需要手动重新注册)
数据结构 红黑树 内核事件队列
性能 在高并发场景下性能通常优于 Select 和 Poll 在某些场景下可能比 Epoll 更灵活和高效
API epoll_create, epoll_ctl, epoll_wait kqueue, kevent

7. 边缘触发 (ET) 和水平触发 (LT)

  • 水平触发 (LT): 只要文件描述符上的数据可读,epoll_waitkevent 就会持续通知应用程序。 应用程序可以随时读取数据,即使没有新的数据到达。 LT 模式更容易使用,但效率相对较低。
  • 边缘触发 (ET): 只有当文件描述符的状态发生变化时(例如,有新的数据到达),epoll_wait 才会通知应用程序。 应用程序必须立即读取所有可用数据,否则可能会错过后续的事件。 ET 模式效率更高,但编程复杂度也更高。 在上面的 Epoll 示例中,我们使用了 ET 模式,并设置了非阻塞 Socket。 如果不设置为非阻塞,在高并发场景下,可能会因为 Socket 缓冲区的数据没有完全读取完,导致 recv 函数阻塞,影响性能。

8. 最佳实践

  • 使用非阻塞 Socket: 在使用 Epoll 或 Kqueue 时,务必将 Socket 设置为非阻塞模式。
  • 合理选择触发模式: 根据应用程序的需求选择合适的触发模式。 如果对性能要求较高,可以选择 ET 模式,但需要注意处理所有可用数据。
  • 避免 busy-waiting: 不要在事件循环中执行耗时的操作,否则会导致其他事件无法及时处理。
  • 错误处理: 仔细处理各种错误情况,例如连接关闭、读取错误等。

9. 高性能网络库

许多现有的 C++ 高性能网络库都基于 Epoll 或 Kqueue 实现,例如:

  • Boost.Asio: 一个跨平台的 C++ 库,提供了异步 I/O、定时器、Sockets 等功能。
  • libevent: 一个轻量级的事件通知库,支持多种 I/O 多路复用机制。
  • libuv: Node.js 的底层 I/O 库,也支持 Epoll 和 Kqueue。
  • Muduo: 陈硕大佬的 C++ 网络库,基于 Reactor 模式和 Epoll 实现。

学习和使用这些库可以帮助您更高效地构建高性能的网络应用程序。

系统调用和事件循环,网络编程的重要基础

今天我们讨论了如何使用 Epoll 和 Kqueue 在 C++ 中实现高性能的异步网络编程。 I/O 多路复用是构建高并发网络服务的关键技术,理解和掌握它可以帮助您构建更高效、更可扩展的应用程序。掌握底层原理,才能更好的使用现成的网络库。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注