C++ Reactor/Proactor 模式:网络编程中的事件驱动架构

C++ Reactor/Proactor 模式:网络编程中的事件驱动架构

各位听众,大家好!今天咱们来聊聊网络编程里两个响当当的人物:Reactor 和 Proactor。这哥俩就像武林高手,一个擅长“借力打力”,一个喜欢“包办一切”。他们都是事件驱动架构的代表,能帮助我们构建高性能、可扩展的网络应用。

一、什么是事件驱动?别跟我说“鼠标点一下”!

首先,咱们得搞清楚什么是“事件驱动”。别一听“事件”,就想到鼠标点击、键盘敲击,虽然它们也是事件,但网络编程里的事件可不止这些。在网络世界里,事件通常指:

  • 连接请求 (Connection Request): 有客户端想来跟我服务器“套近乎”。
  • 数据到达 (Data Arrival): 客户端发来消息了,要处理一下。
  • 数据发送完成 (Data Sent): 我发给客户端的消息已经成功送达。
  • 错误发生 (Error Occurred): 哎呀,出错了,可能是网络断了,也可能是客户端发来了不合法的请求。

事件驱动的核心思想就是:程序不是按照预先设定的流程一步一步执行,而是等待事件发生,然后根据事件类型做出相应的处理。 这就像一个服务员,不是傻傻地站在那里等客人来,而是时刻关注着整个餐厅,客人招手了(事件发生),就赶紧过去服务。

二、Reactor:来了来了,我帮你调度!

Reactor 模式就像一个“调度员”,它负责监听各种事件,当事件发生时,它会找到对应的“事件处理器”来处理。 简单来说,Reactor 负责“发现”事件,然后“分发”事件给相应的处理者。

2.1 Reactor 的核心组件

  • Handle: 操作系统用来标识 I/O 对象的东西,比如 Socket。你可以把它想象成一个水管的编号,通过这个编号,我们就能找到对应的水管。
  • Event Demultiplexer: 这个家伙是 Reactor 的核心,它负责监听 Handle 上发生的事件。在 Linux 上,通常使用 selectpollepoll 等系统调用来实现。你可以把它想象成一个监视器,时刻盯着各个水管,看看有没有水流过来。
  • Event Handler: 这是一个抽象类或接口,定义了处理事件的方法。你需要实现这个接口,来处理具体的事件。你可以把它想象成一个水管工,负责修理水管、疏通水管等等。
  • Concrete Event Handler: 这是 Event Handler 的具体实现,负责处理特定类型的事件。你可以把它想象成一个专门修理热水管的水管工。
  • Reactor: 它是整个模式的中心,负责注册 Event Handler,并驱动 Event Demultiplexer 监听事件。你可以把它想象成一个调度中心,负责协调各个水管工的工作。

2.2 Reactor 的工作流程

  1. 注册事件处理器: 将需要监听的 Handle 和对应的 Event Handler 注册到 Reactor 中。相当于告诉调度中心,哪些水管需要特别关注,以及一旦水管出问题,应该找哪个水管工。
  2. 事件循环: Reactor 进入事件循环,不断调用 Event Demultiplexer 监听事件。相当于调度中心让监视器开始工作,时刻盯着各个水管。
  3. 事件发生: 当某个 Handle 上发生事件时,Event Demultiplexer 会通知 Reactor。相当于监视器发现某个水管漏水了,赶紧报告给调度中心。
  4. 分发事件: Reactor 根据事件类型,找到对应的 Event Handler,并调用其处理方法。相当于调度中心找到对应的水管工,让他去修理漏水的水管。
  5. 处理事件: Event Handler 处理事件。相当于水管工修理漏水的水管。

2.3 Reactor 模式的优点

  • 简单易懂: Reactor 模式的结构比较简单,容易理解和实现。
  • 可扩展性好: 可以很容易地添加新的事件处理器,来处理新的事件类型。
  • 资源利用率高: 通过事件驱动的方式,可以避免不必要的轮询,提高 CPU 的利用率。

2.4 Reactor 模式的缺点

  • 所有 I/O 操作都需要非阻塞: 为了避免阻塞 Reactor 的事件循环,所有的 I/O 操作都必须是非阻塞的。这意味着你需要使用非阻塞 I/O API,并处理 EAGAINEWOULDBLOCK 错误。
  • 事件分发开销: Reactor 需要负责事件的监听和分发,这会带来一定的开销。

2.5 Reactor 模式的代码示例

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <vector>
#include <algorithm>
#include <stdexcept>

// 抽象事件处理器
class EventHandler {
public:
    virtual ~EventHandler() {}
    virtual int handle_event(int fd) = 0; // 处理事件,返回0表示继续监听,-1表示关闭连接
};

// 具体事件处理器 - 处理客户端连接
class AcceptHandler : public EventHandler {
public:
    AcceptHandler(int listen_fd) : listen_fd_(listen_fd) {}

    int handle_event(int fd) override {
        sockaddr_in client_addr;
        socklen_t client_addr_len = sizeof(client_addr);
        int client_fd = accept(listen_fd_, (sockaddr*)&client_addr, &client_addr_len);

        if (client_fd < 0) {
            perror("accept");
            return -1; // 出错,关闭连接
        }

        // 设置为非阻塞
        int flags = fcntl(client_fd, F_GETFL, 0);
        if (flags == -1) {
            perror("fcntl F_GETFL");
            close(client_fd);
            return -1;
        }
        if (fcntl(client_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
            perror("fcntl F_SETFL O_NONBLOCK");
            close(client_fd);
            return -1;
        }

        std::cout << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << ":" << ntohs(client_addr.sin_port) << " fd: " << client_fd << std::endl;

        // 创建新的 ReadHandler 来处理该客户端的读取事件
        ReadHandler* read_handler = new ReadHandler(client_fd);
        Reactor::instance().register_handler(client_fd, read_handler); // 注册 read 事件
        return 0; // 继续监听
    }

private:
    int listen_fd_;
};

// 具体事件处理器 - 处理客户端读取
class ReadHandler : public EventHandler {
public:
    ReadHandler(int client_fd) : client_fd_(client_fd) {}

    int handle_event(int fd) override {
        char buffer[1024];
        ssize_t bytes_read = recv(client_fd_, buffer, sizeof(buffer), 0);

        if (bytes_read > 0) {
            buffer[bytes_read] = '';
            std::cout << "Received from client fd " << client_fd_ << ": " << buffer << std::endl;

            // Echo back to the client
            send(client_fd_, buffer, bytes_read, 0); // 简单回显
            return 0; // 继续监听
        } else if (bytes_read == 0) {
            std::cout << "Client fd " << client_fd_ << " disconnected." << std::endl;
            close(client_fd_);
            Reactor::instance().remove_handler(client_fd_);
            delete this; // 释放handler
            return -1; // 关闭连接
        } else {
            perror("recv");
            close(client_fd_);
             Reactor::instance().remove_handler(client_fd_);
            delete this; // 释放handler
            return -1; // 出错,关闭连接
        }
    }

private:
    int client_fd_;
};

// Reactor 类 - 单例模式
class Reactor {
public:
    static Reactor& instance() {
        static Reactor reactor;
        return reactor;
    }

    void register_handler(int fd, EventHandler* handler) {
        handlers_[fd] = handler;
        fds_.push_back(fd);
    }

    void remove_handler(int fd) {
        handlers_.erase(fd);
        fds_.erase(std::remove(fds_.begin(), fds_.end(), fd), fds_.end());
    }

    void run() {
        while (true) {
            fd_set read_fds;
            FD_ZERO(&read_fds);
            int max_fd = 0;

            for (int fd : fds_) {
                FD_SET(fd, &read_fds);
                max_fd = std::max(max_fd, fd);
            }

            if (select(max_fd + 1, &read_fds, nullptr, nullptr, nullptr) > 0) {
                for (int fd : fds_) {
                    if (FD_ISSET(fd, &read_fds)) {
                        if (handlers_.count(fd)) {
                            if(handlers_[fd]->handle_event(fd) == -1) { // 如果handler返回-1,则删除
                                //remove_handler(fd); // handle_event内部已经调用了remove_handler和delete this
                            }
                        }
                    }
                }
            } else {
                perror("select");
                break; // 退出循环
            }
        }
    }

private:
    Reactor() {} // 私有构造函数,防止外部实例化

    std::vector<int> fds_;
    std::unordered_map<int, EventHandler*> handlers_;
};

int main() {
    // 创建监听 Socket
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd < 0) {
        perror("socket");
        return 1;
    }

    // 设置端口复用
    int optval = 1;
    if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        perror("setsockopt SO_REUSEADDR");
        close(listen_fd);
        return 1;
    }

    // 绑定地址
    sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8080);

    if (bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("bind");
        close(listen_fd);
        return 1;
    }

    // 监听
    if (listen(listen_fd, 10) < 0) {
        perror("listen");
        close(listen_fd);
        return 1;
    }

    // 设置为非阻塞
    int flags = fcntl(listen_fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl F_GETFL");
        close(listen_fd);
        return 1;
    }
    if (fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl F_SETFL O_NONBLOCK");
        close(listen_fd);
        return 1;
    }

    std::cout << "Server listening on port 8080..." << std::endl;

    // 注册 AcceptHandler 处理连接请求
    AcceptHandler* accept_handler = new AcceptHandler(listen_fd);
    Reactor::instance().register_handler(listen_fd, accept_handler);

    // 运行 Reactor
    Reactor::instance().run();

    close(listen_fd);

    return 0;
}

代码解释:

  • EventHandler: 抽象基类,定义了 handle_event 方法,所有具体的事件处理器都需要继承它。
  • AcceptHandler: 负责处理客户端的连接请求。当有新的连接请求到达时,它会调用 accept 函数接受连接,并将新的 Socket 注册到 Reactor 中,同时创建一个 ReadHandler 来处理该客户端的数据读取事件。注意,acceptHandler 负责创建新的ReadHandler实例,并将其注册到reactor中。
  • ReadHandler: 负责处理客户端发送的数据。当客户端有数据到达时,它会调用 recv 函数读取数据,并进行处理 (这里只是简单地回显)。如果客户端断开连接,它会关闭 Socket,并从 Reactor 中移除。ReadHandler负责处理一个socket上的读事件,读取数据,并将其回显。
  • Reactor: 单例类,负责注册事件处理器,并运行事件循环。它使用 select 函数监听所有注册的 Socket 上的事件,当有事件发生时,它会调用对应的 handle_event 方法来处理事件。reactor 负责维护所有handler,并运行select循环,当事件发生时,调用对应的handler。
  • 单例模式: Reactor 使用单例模式,保证全局只有一个 Reactor 实例,方便访问和管理。

三、Proactor:啥都交给我,你躺平就好!

Proactor 模式就像一个“包工头”,它不仅负责监听事件,还负责发起 I/O 操作,并将 I/O 操作的结果通知给“完成处理器”。 简单来说,Proactor 负责“发起” I/O 操作,然后“通知”结果。

3.1 Proactor 的核心组件

  • Asynchronous Operation Processor: 操作系统提供的异步 I/O 接口,比如 Windows 上的 IOCP (I/O Completion Port) 或 Linux 上的 AIO (Asynchronous I/O)。你可以把它想象成一个自动化的流水线,你只需要把原材料放进去,它就会自动完成加工,并把成品送出来。
  • Completion Handler: 这是一个接口,定义了 I/O 操作完成后的回调方法。你需要实现这个接口,来处理 I/O 操作的结果。你可以把它想象成一个质检员,负责检查流水线送出来的成品是否合格。
  • Concrete Completion Handler: 这是 Completion Handler 的具体实现,负责处理特定类型的 I/O 操作结果。你可以把它想象成一个专门检查热水壶的质检员。
  • Proactor: 它是整个模式的中心,负责发起异步 I/O 操作,并将 Completion Handler 注册到 Asynchronous Operation Processor 中。你可以把它想象成一个生产经理,负责安排生产计划,并协调各个质检员的工作。

3.2 Proactor 的工作流程

  1. 发起异步 I/O 操作: Proactor 调用 Asynchronous Operation Processor 发起异步 I/O 操作,并将 Completion Handler 注册到 Asynchronous Operation Processor 中。相当于生产经理把原材料放到自动化流水线上,并告诉质检员,一旦流水线送出成品,就让他来检查。
  2. I/O 操作完成: 当 I/O 操作完成时,Asynchronous Operation Processor 会通知 Proactor。相当于自动化流水线完成了加工,并把成品送到了质检员面前。
  3. 调用 Completion Handler: Proactor 调用 Completion Handler 的回调方法,处理 I/O 操作的结果。相当于质检员检查流水线送出来的成品是否合格。

3.3 Proactor 模式的优点

  • 真正的异步: Proactor 模式是真正的异步 I/O,I/O 操作的执行完全由操作系统负责,应用程序不需要阻塞等待。
  • 更高的性能: 由于 I/O 操作是异步的,因此可以更好地利用 CPU 和 I/O 资源,提高程序的性能。

3.4 Proactor 模式的缺点

  • 实现复杂: Proactor 模式的实现比较复杂,需要使用异步 I/O API,并处理各种回调函数。
  • 平台依赖性强: 不同的操作系统提供的异步 I/O API 不同,因此 Proactor 模式的平台依赖性比较强。

3.5 Proactor 模式的代码示例 (Windows IOCP)

由于 Linux 上的 AIO 支持不够完善,这里我们使用 Windows IOCP 来演示 Proactor 模式。

#include <iostream>
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#pragma comment(lib, "ws2_32.lib")

// Completion Handler 结构体
struct CompletionData {
    OVERLAPPED overlapped;
    SOCKET socket;
    WSABUF wsa_buf;
    char buffer[1024];
    int bytes_received;
};

// 全局 IOCP 句柄
HANDLE iocp_handle;

// 完成端口回调函数
void CALLBACK WorkerThread(ULONG_PTR completion_key, DWORD bytes_transferred, LPOVERLAPPED overlapped) {
    CompletionData* completion_data = (CompletionData*)overlapped;
    SOCKET socket = completion_data->socket;

    if (bytes_transferred == 0) {
        // 客户端断开连接
        std::cout << "Client disconnected." << std::endl;
        closesocket(socket);
        delete completion_data;
        return;
    }

    completion_data->bytes_received = bytes_transferred;
    std::cout << "Received " << bytes_transferred << " bytes from client." << std::endl;
    std::cout << "Data: " << completion_data->buffer << std::endl;

    // Echo back to the client
    WSABUF wsa_buf;
    wsa_buf.buf = completion_data->buffer;
    wsa_buf.len = bytes_transferred;
    DWORD bytes_sent;
    DWORD flags = 0;

    if (WSASend(socket, &wsa_buf, 1, &bytes_sent, flags, &(completion_data->overlapped), NULL) == SOCKET_ERROR) {
        if (WSAGetLastError() != WSA_IO_PENDING) {
            std::cerr << "WSASend failed with error: " << WSAGetLastError() << std::endl;
            closesocket(socket);
            delete completion_data;
            return;
        }
    }

     // 再次投递接收请求
    DWORD flags_recv = 0;
    completion_data->wsa_buf.buf = completion_data->buffer;
    completion_data->wsa_buf.len = sizeof(completion_data->buffer);
    completion_data->bytes_received = 0;
    ZeroMemory(&(completion_data->overlapped), sizeof(OVERLAPPED));

    if (WSARecv(socket, &(completion_data->wsa_buf), 1, NULL, &flags_recv, &(completion_data->overlapped), NULL) == SOCKET_ERROR) {
        if (WSAGetLastError() != WSA_IO_PENDING) {
            std::cerr << "WSARecv failed with error: " << WSAGetLastError() << std::endl;
            closesocket(socket);
            delete completion_data;
            return;
        }
    }
}

int main() {
    // 初始化 Winsock
    WSADATA wsa_data;
    if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0) {
        std::cerr << "WSAStartup failed." << std::endl;
        return 1;
    }

    // 创建监听 Socket
    SOCKET listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (listen_socket == INVALID_SOCKET) {
        std::cerr << "socket failed with error: " << WSAGetLastError() << std::endl;
        WSACleanup();
        return 1;
    }

    // 绑定地址
    sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(8080);

    if (bind(listen_socket, (sockaddr*)&server_addr, sizeof(server_addr)) == SOCKET_ERROR) {
        std::cerr << "bind failed with error: " << WSAGetLastError() << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    // 监听
    if (listen(listen_socket, SOMAXCONN) == SOCKET_ERROR) {
        std::cerr << "listen failed with error: " << WSAGetLastError() << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    std::cout << "Server listening on port 8080..." << std::endl;

    // 创建 IOCP
    iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (iocp_handle == NULL) {
        std::cerr << "CreateIoCompletionPort failed with error: " << GetLastError() << std::endl;
        closesocket(listen_socket);
        WSACleanup();
        return 1;
    }

    // 创建工作线程
    SYSTEM_INFO system_info;
    GetSystemInfo(&system_info);
    int num_threads = system_info.dwNumberOfProcessors * 2; // 创建双倍于 CPU 核心数的工作线程
    HANDLE worker_threads[num_threads];

    for (int i = 0; i < num_threads; ++i) {
        worker_threads[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)[](LPVOID lpParameter) -> DWORD {
            while (true) {
                DWORD bytes_transferred;
                ULONG_PTR completion_key;
                LPOVERLAPPED overlapped;

                BOOL result = GetQueuedCompletionStatus(iocp_handle, &bytes_transferred, &completion_key, &overlapped, INFINITE);

                if (!result) {
                    std::cerr << "GetQueuedCompletionStatus failed with error: " << GetLastError() << std::endl;
                    break; // 退出线程
                }

                if (overlapped == NULL) {
                    break; // 线程退出信号
                }

                WorkerThread(completion_key, bytes_transferred, overlapped);
            }
            return 0;
        }, NULL, 0, NULL);

        if (worker_threads[i] == NULL) {
            std::cerr << "CreateThread failed with error: " << GetLastError() << std::endl;
            CloseHandle(iocp_handle);
            closesocket(listen_socket);
            WSACleanup();
            return 1;
        }
    }

    // 接受连接并投递接收请求
    while (true) {
        sockaddr_in client_addr;
        int client_addr_len = sizeof(client_addr);
        SOCKET client_socket = accept(listen_socket, (sockaddr*)&client_addr, &client_addr_len);

        if (client_socket == INVALID_SOCKET) {
            std::cerr << "accept failed with error: " << WSAGetLastError() << std::endl;
            continue; // 继续接受其他连接
        }

        std::cout << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << ":" << ntohs(client_addr.sin_port) << std::endl;

        // 将 Socket 关联到 IOCP
        if (CreateIoCompletionPort((HANDLE)client_socket, iocp_handle, (ULONG_PTR)client_socket, 0) == NULL) {
            std::cerr << "CreateIoCompletionPort failed with error: " << GetLastError() << std::endl;
            closesocket(client_socket);
            continue; // 继续接受其他连接
        }

        // 创建 CompletionData 结构体
        CompletionData* completion_data = new CompletionData;
        ZeroMemory(completion_data, sizeof(CompletionData));
        completion_data->socket = client_socket;
        completion_data->wsa_buf.buf = completion_data->buffer;
        completion_data->wsa_buf.len = sizeof(completion_data->buffer);

        // 投递异步接收请求
        DWORD flags = 0;
        if (WSARecv(client_socket, &(completion_data->wsa_buf), 1, NULL, &flags, &(completion_data->overlapped), NULL) == SOCKET_ERROR) {
            if (WSAGetLastError() != WSA_IO_PENDING) {
                std::cerr << "WSARecv failed with error: " << WSAGetLastError() << std::endl;
                closesocket(client_socket);
                delete completion_data;
                continue; // 继续接受其他连接
            }
        }
    }

    // 关闭监听 Socket 和 IOCP
    closesocket(listen_socket);
    CloseHandle(iocp_handle);
    WSACleanup();

    return 0;
}

代码解释:

  • CompletionData: 结构体,用于存储 I/O 操作的相关信息,包括 Socket、缓冲区、OVERLAPPED 结构体等。
  • iocp_handle: 全局 IOCP 句柄。
  • WorkerThread: 工作线程函数,负责从 IOCP 中获取完成的 I/O 操作,并调用 Completion Handler 处理结果。
  • main: 主函数,负责初始化 Winsock、创建监听 Socket、创建 IOCP、创建工作线程、接受连接并投递接收请求。

四、Reactor vs Proactor:到底选哪个?

Reactor 和 Proactor 都是事件驱动架构的优秀代表,但它们的应用场景有所不同。

特性 Reactor Proactor
I/O 操作 由应用程序发起 由操作系统发起
异步程度 伪异步 (需要非阻塞 I/O) 真异步
实现难度 相对简单 复杂
平台依赖性 较低 较高
性能 中等 较高
适用场景 I/O 操作比较简单,或者平台不支持真异步 I/O I/O 操作比较复杂,且平台支持真异步 I/O

总结:

  • Reactor: 如果你需要一个简单易懂、平台兼容性好的事件驱动框架,并且 I/O 操作比较简单,那么 Reactor 是一个不错的选择。
  • Proactor: 如果你需要更高的性能,并且平台支持真异步 I/O,那么 Proactor 可能是更好的选择。

五、总结

今天我们一起学习了 Reactor 和 Proactor 两种事件驱动架构模式,了解了它们的核心组件、工作流程、优缺点以及适用场景。希望通过今天的讲解,大家能够对 Reactor 和 Proactor 有更深入的理解,并在实际项目中灵活运用,构建出高性能、可扩展的网络应用。

记住,没有银弹,只有最适合你的解决方案。选择哪种模式,取决于你的具体需求和平台环境。

好了,今天的分享就到这里,谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注