C++ Reactor/Proactor 模式:网络编程中的事件驱动架构
各位听众,大家好!今天咱们来聊聊网络编程里两个响当当的人物:Reactor 和 Proactor。这哥俩就像武林高手,一个擅长“借力打力”,一个喜欢“包办一切”。他们都是事件驱动架构的代表,能帮助我们构建高性能、可扩展的网络应用。
一、什么是事件驱动?别跟我说“鼠标点一下”!
首先,咱们得搞清楚什么是“事件驱动”。别一听“事件”,就想到鼠标点击、键盘敲击,虽然它们也是事件,但网络编程里的事件可不止这些。在网络世界里,事件通常指:
- 连接请求 (Connection Request): 有客户端想来跟我服务器“套近乎”。
- 数据到达 (Data Arrival): 客户端发来消息了,要处理一下。
- 数据发送完成 (Data Sent): 我发给客户端的消息已经成功送达。
- 错误发生 (Error Occurred): 哎呀,出错了,可能是网络断了,也可能是客户端发来了不合法的请求。
事件驱动的核心思想就是:程序不是按照预先设定的流程一步一步执行,而是等待事件发生,然后根据事件类型做出相应的处理。 这就像一个服务员,不是傻傻地站在那里等客人来,而是时刻关注着整个餐厅,客人招手了(事件发生),就赶紧过去服务。
二、Reactor:来了来了,我帮你调度!
Reactor 模式就像一个“调度员”,它负责监听各种事件,当事件发生时,它会找到对应的“事件处理器”来处理。 简单来说,Reactor 负责“发现”事件,然后“分发”事件给相应的处理者。
2.1 Reactor 的核心组件
- Handle: 操作系统用来标识 I/O 对象的东西,比如 Socket。你可以把它想象成一个水管的编号,通过这个编号,我们就能找到对应的水管。
- Event Demultiplexer: 这个家伙是 Reactor 的核心,它负责监听 Handle 上发生的事件。在 Linux 上,通常使用
select
、poll
或epoll
等系统调用来实现。你可以把它想象成一个监视器,时刻盯着各个水管,看看有没有水流过来。 - Event Handler: 这是一个抽象类或接口,定义了处理事件的方法。你需要实现这个接口,来处理具体的事件。你可以把它想象成一个水管工,负责修理水管、疏通水管等等。
- Concrete Event Handler: 这是 Event Handler 的具体实现,负责处理特定类型的事件。你可以把它想象成一个专门修理热水管的水管工。
- Reactor: 它是整个模式的中心,负责注册 Event Handler,并驱动 Event Demultiplexer 监听事件。你可以把它想象成一个调度中心,负责协调各个水管工的工作。
2.2 Reactor 的工作流程
- 注册事件处理器: 将需要监听的 Handle 和对应的 Event Handler 注册到 Reactor 中。相当于告诉调度中心,哪些水管需要特别关注,以及一旦水管出问题,应该找哪个水管工。
- 事件循环: Reactor 进入事件循环,不断调用 Event Demultiplexer 监听事件。相当于调度中心让监视器开始工作,时刻盯着各个水管。
- 事件发生: 当某个 Handle 上发生事件时,Event Demultiplexer 会通知 Reactor。相当于监视器发现某个水管漏水了,赶紧报告给调度中心。
- 分发事件: Reactor 根据事件类型,找到对应的 Event Handler,并调用其处理方法。相当于调度中心找到对应的水管工,让他去修理漏水的水管。
- 处理事件: Event Handler 处理事件。相当于水管工修理漏水的水管。
2.3 Reactor 模式的优点
- 简单易懂: Reactor 模式的结构比较简单,容易理解和实现。
- 可扩展性好: 可以很容易地添加新的事件处理器,来处理新的事件类型。
- 资源利用率高: 通过事件驱动的方式,可以避免不必要的轮询,提高 CPU 的利用率。
2.4 Reactor 模式的缺点
- 所有 I/O 操作都需要非阻塞: 为了避免阻塞 Reactor 的事件循环,所有的 I/O 操作都必须是非阻塞的。这意味着你需要使用非阻塞 I/O API,并处理
EAGAIN
或EWOULDBLOCK
错误。 - 事件分发开销: Reactor 需要负责事件的监听和分发,这会带来一定的开销。
2.5 Reactor 模式的代码示例
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <vector>
#include <algorithm>
#include <stdexcept>
// 抽象事件处理器
class EventHandler {
public:
virtual ~EventHandler() {}
virtual int handle_event(int fd) = 0; // 处理事件,返回0表示继续监听,-1表示关闭连接
};
// 具体事件处理器 - 处理客户端连接
class AcceptHandler : public EventHandler {
public:
AcceptHandler(int listen_fd) : listen_fd_(listen_fd) {}
int handle_event(int fd) override {
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(listen_fd_, (sockaddr*)&client_addr, &client_addr_len);
if (client_fd < 0) {
perror("accept");
return -1; // 出错,关闭连接
}
// 设置为非阻塞
int flags = fcntl(client_fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL");
close(client_fd);
return -1;
}
if (fcntl(client_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL O_NONBLOCK");
close(client_fd);
return -1;
}
std::cout << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << ":" << ntohs(client_addr.sin_port) << " fd: " << client_fd << std::endl;
// 创建新的 ReadHandler 来处理该客户端的读取事件
ReadHandler* read_handler = new ReadHandler(client_fd);
Reactor::instance().register_handler(client_fd, read_handler); // 注册 read 事件
return 0; // 继续监听
}
private:
int listen_fd_;
};
// 具体事件处理器 - 处理客户端读取
class ReadHandler : public EventHandler {
public:
ReadHandler(int client_fd) : client_fd_(client_fd) {}
int handle_event(int fd) override {
char buffer[1024];
ssize_t bytes_read = recv(client_fd_, buffer, sizeof(buffer), 0);
if (bytes_read > 0) {
buffer[bytes_read] = '';
std::cout << "Received from client fd " << client_fd_ << ": " << buffer << std::endl;
// Echo back to the client
send(client_fd_, buffer, bytes_read, 0); // 简单回显
return 0; // 继续监听
} else if (bytes_read == 0) {
std::cout << "Client fd " << client_fd_ << " disconnected." << std::endl;
close(client_fd_);
Reactor::instance().remove_handler(client_fd_);
delete this; // 释放handler
return -1; // 关闭连接
} else {
perror("recv");
close(client_fd_);
Reactor::instance().remove_handler(client_fd_);
delete this; // 释放handler
return -1; // 出错,关闭连接
}
}
private:
int client_fd_;
};
// Reactor 类 - 单例模式
class Reactor {
public:
static Reactor& instance() {
static Reactor reactor;
return reactor;
}
void register_handler(int fd, EventHandler* handler) {
handlers_[fd] = handler;
fds_.push_back(fd);
}
void remove_handler(int fd) {
handlers_.erase(fd);
fds_.erase(std::remove(fds_.begin(), fds_.end(), fd), fds_.end());
}
void run() {
while (true) {
fd_set read_fds;
FD_ZERO(&read_fds);
int max_fd = 0;
for (int fd : fds_) {
FD_SET(fd, &read_fds);
max_fd = std::max(max_fd, fd);
}
if (select(max_fd + 1, &read_fds, nullptr, nullptr, nullptr) > 0) {
for (int fd : fds_) {
if (FD_ISSET(fd, &read_fds)) {
if (handlers_.count(fd)) {
if(handlers_[fd]->handle_event(fd) == -1) { // 如果handler返回-1,则删除
//remove_handler(fd); // handle_event内部已经调用了remove_handler和delete this
}
}
}
}
} else {
perror("select");
break; // 退出循环
}
}
}
private:
Reactor() {} // 私有构造函数,防止外部实例化
std::vector<int> fds_;
std::unordered_map<int, EventHandler*> handlers_;
};
int main() {
// 创建监听 Socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) {
perror("socket");
return 1;
}
// 设置端口复用
int optval = 1;
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
perror("setsockopt SO_REUSEADDR");
close(listen_fd);
return 1;
}
// 绑定地址
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8080);
if (bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind");
close(listen_fd);
return 1;
}
// 监听
if (listen(listen_fd, 10) < 0) {
perror("listen");
close(listen_fd);
return 1;
}
// 设置为非阻塞
int flags = fcntl(listen_fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL");
close(listen_fd);
return 1;
}
if (fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL O_NONBLOCK");
close(listen_fd);
return 1;
}
std::cout << "Server listening on port 8080..." << std::endl;
// 注册 AcceptHandler 处理连接请求
AcceptHandler* accept_handler = new AcceptHandler(listen_fd);
Reactor::instance().register_handler(listen_fd, accept_handler);
// 运行 Reactor
Reactor::instance().run();
close(listen_fd);
return 0;
}
代码解释:
- EventHandler: 抽象基类,定义了
handle_event
方法,所有具体的事件处理器都需要继承它。 - AcceptHandler: 负责处理客户端的连接请求。当有新的连接请求到达时,它会调用
accept
函数接受连接,并将新的 Socket 注册到 Reactor 中,同时创建一个ReadHandler
来处理该客户端的数据读取事件。注意,acceptHandler 负责创建新的ReadHandler实例,并将其注册到reactor中。 - ReadHandler: 负责处理客户端发送的数据。当客户端有数据到达时,它会调用
recv
函数读取数据,并进行处理 (这里只是简单地回显)。如果客户端断开连接,它会关闭 Socket,并从 Reactor 中移除。ReadHandler负责处理一个socket上的读事件,读取数据,并将其回显。 - Reactor: 单例类,负责注册事件处理器,并运行事件循环。它使用
select
函数监听所有注册的 Socket 上的事件,当有事件发生时,它会调用对应的handle_event
方法来处理事件。reactor 负责维护所有handler,并运行select循环,当事件发生时,调用对应的handler。 - 单例模式: Reactor 使用单例模式,保证全局只有一个 Reactor 实例,方便访问和管理。
三、Proactor:啥都交给我,你躺平就好!
Proactor 模式就像一个“包工头”,它不仅负责监听事件,还负责发起 I/O 操作,并将 I/O 操作的结果通知给“完成处理器”。 简单来说,Proactor 负责“发起” I/O 操作,然后“通知”结果。
3.1 Proactor 的核心组件
- Asynchronous Operation Processor: 操作系统提供的异步 I/O 接口,比如 Windows 上的 IOCP (I/O Completion Port) 或 Linux 上的 AIO (Asynchronous I/O)。你可以把它想象成一个自动化的流水线,你只需要把原材料放进去,它就会自动完成加工,并把成品送出来。
- Completion Handler: 这是一个接口,定义了 I/O 操作完成后的回调方法。你需要实现这个接口,来处理 I/O 操作的结果。你可以把它想象成一个质检员,负责检查流水线送出来的成品是否合格。
- Concrete Completion Handler: 这是 Completion Handler 的具体实现,负责处理特定类型的 I/O 操作结果。你可以把它想象成一个专门检查热水壶的质检员。
- Proactor: 它是整个模式的中心,负责发起异步 I/O 操作,并将 Completion Handler 注册到 Asynchronous Operation Processor 中。你可以把它想象成一个生产经理,负责安排生产计划,并协调各个质检员的工作。
3.2 Proactor 的工作流程
- 发起异步 I/O 操作: Proactor 调用 Asynchronous Operation Processor 发起异步 I/O 操作,并将 Completion Handler 注册到 Asynchronous Operation Processor 中。相当于生产经理把原材料放到自动化流水线上,并告诉质检员,一旦流水线送出成品,就让他来检查。
- I/O 操作完成: 当 I/O 操作完成时,Asynchronous Operation Processor 会通知 Proactor。相当于自动化流水线完成了加工,并把成品送到了质检员面前。
- 调用 Completion Handler: Proactor 调用 Completion Handler 的回调方法,处理 I/O 操作的结果。相当于质检员检查流水线送出来的成品是否合格。
3.3 Proactor 模式的优点
- 真正的异步: Proactor 模式是真正的异步 I/O,I/O 操作的执行完全由操作系统负责,应用程序不需要阻塞等待。
- 更高的性能: 由于 I/O 操作是异步的,因此可以更好地利用 CPU 和 I/O 资源,提高程序的性能。
3.4 Proactor 模式的缺点
- 实现复杂: Proactor 模式的实现比较复杂,需要使用异步 I/O API,并处理各种回调函数。
- 平台依赖性强: 不同的操作系统提供的异步 I/O API 不同,因此 Proactor 模式的平台依赖性比较强。
3.5 Proactor 模式的代码示例 (Windows IOCP)
由于 Linux 上的 AIO 支持不够完善,这里我们使用 Windows IOCP 来演示 Proactor 模式。
#include <iostream>
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#pragma comment(lib, "ws2_32.lib")
// Completion Handler 结构体
struct CompletionData {
OVERLAPPED overlapped;
SOCKET socket;
WSABUF wsa_buf;
char buffer[1024];
int bytes_received;
};
// 全局 IOCP 句柄
HANDLE iocp_handle;
// 完成端口回调函数
void CALLBACK WorkerThread(ULONG_PTR completion_key, DWORD bytes_transferred, LPOVERLAPPED overlapped) {
CompletionData* completion_data = (CompletionData*)overlapped;
SOCKET socket = completion_data->socket;
if (bytes_transferred == 0) {
// 客户端断开连接
std::cout << "Client disconnected." << std::endl;
closesocket(socket);
delete completion_data;
return;
}
completion_data->bytes_received = bytes_transferred;
std::cout << "Received " << bytes_transferred << " bytes from client." << std::endl;
std::cout << "Data: " << completion_data->buffer << std::endl;
// Echo back to the client
WSABUF wsa_buf;
wsa_buf.buf = completion_data->buffer;
wsa_buf.len = bytes_transferred;
DWORD bytes_sent;
DWORD flags = 0;
if (WSASend(socket, &wsa_buf, 1, &bytes_sent, flags, &(completion_data->overlapped), NULL) == SOCKET_ERROR) {
if (WSAGetLastError() != WSA_IO_PENDING) {
std::cerr << "WSASend failed with error: " << WSAGetLastError() << std::endl;
closesocket(socket);
delete completion_data;
return;
}
}
// 再次投递接收请求
DWORD flags_recv = 0;
completion_data->wsa_buf.buf = completion_data->buffer;
completion_data->wsa_buf.len = sizeof(completion_data->buffer);
completion_data->bytes_received = 0;
ZeroMemory(&(completion_data->overlapped), sizeof(OVERLAPPED));
if (WSARecv(socket, &(completion_data->wsa_buf), 1, NULL, &flags_recv, &(completion_data->overlapped), NULL) == SOCKET_ERROR) {
if (WSAGetLastError() != WSA_IO_PENDING) {
std::cerr << "WSARecv failed with error: " << WSAGetLastError() << std::endl;
closesocket(socket);
delete completion_data;
return;
}
}
}
int main() {
// 初始化 Winsock
WSADATA wsa_data;
if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0) {
std::cerr << "WSAStartup failed." << std::endl;
return 1;
}
// 创建监听 Socket
SOCKET listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (listen_socket == INVALID_SOCKET) {
std::cerr << "socket failed with error: " << WSAGetLastError() << std::endl;
WSACleanup();
return 1;
}
// 绑定地址
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8080);
if (bind(listen_socket, (sockaddr*)&server_addr, sizeof(server_addr)) == SOCKET_ERROR) {
std::cerr << "bind failed with error: " << WSAGetLastError() << std::endl;
closesocket(listen_socket);
WSACleanup();
return 1;
}
// 监听
if (listen(listen_socket, SOMAXCONN) == SOCKET_ERROR) {
std::cerr << "listen failed with error: " << WSAGetLastError() << std::endl;
closesocket(listen_socket);
WSACleanup();
return 1;
}
std::cout << "Server listening on port 8080..." << std::endl;
// 创建 IOCP
iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (iocp_handle == NULL) {
std::cerr << "CreateIoCompletionPort failed with error: " << GetLastError() << std::endl;
closesocket(listen_socket);
WSACleanup();
return 1;
}
// 创建工作线程
SYSTEM_INFO system_info;
GetSystemInfo(&system_info);
int num_threads = system_info.dwNumberOfProcessors * 2; // 创建双倍于 CPU 核心数的工作线程
HANDLE worker_threads[num_threads];
for (int i = 0; i < num_threads; ++i) {
worker_threads[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)[](LPVOID lpParameter) -> DWORD {
while (true) {
DWORD bytes_transferred;
ULONG_PTR completion_key;
LPOVERLAPPED overlapped;
BOOL result = GetQueuedCompletionStatus(iocp_handle, &bytes_transferred, &completion_key, &overlapped, INFINITE);
if (!result) {
std::cerr << "GetQueuedCompletionStatus failed with error: " << GetLastError() << std::endl;
break; // 退出线程
}
if (overlapped == NULL) {
break; // 线程退出信号
}
WorkerThread(completion_key, bytes_transferred, overlapped);
}
return 0;
}, NULL, 0, NULL);
if (worker_threads[i] == NULL) {
std::cerr << "CreateThread failed with error: " << GetLastError() << std::endl;
CloseHandle(iocp_handle);
closesocket(listen_socket);
WSACleanup();
return 1;
}
}
// 接受连接并投递接收请求
while (true) {
sockaddr_in client_addr;
int client_addr_len = sizeof(client_addr);
SOCKET client_socket = accept(listen_socket, (sockaddr*)&client_addr, &client_addr_len);
if (client_socket == INVALID_SOCKET) {
std::cerr << "accept failed with error: " << WSAGetLastError() << std::endl;
continue; // 继续接受其他连接
}
std::cout << "Accepted connection from " << inet_ntoa(client_addr.sin_addr) << ":" << ntohs(client_addr.sin_port) << std::endl;
// 将 Socket 关联到 IOCP
if (CreateIoCompletionPort((HANDLE)client_socket, iocp_handle, (ULONG_PTR)client_socket, 0) == NULL) {
std::cerr << "CreateIoCompletionPort failed with error: " << GetLastError() << std::endl;
closesocket(client_socket);
continue; // 继续接受其他连接
}
// 创建 CompletionData 结构体
CompletionData* completion_data = new CompletionData;
ZeroMemory(completion_data, sizeof(CompletionData));
completion_data->socket = client_socket;
completion_data->wsa_buf.buf = completion_data->buffer;
completion_data->wsa_buf.len = sizeof(completion_data->buffer);
// 投递异步接收请求
DWORD flags = 0;
if (WSARecv(client_socket, &(completion_data->wsa_buf), 1, NULL, &flags, &(completion_data->overlapped), NULL) == SOCKET_ERROR) {
if (WSAGetLastError() != WSA_IO_PENDING) {
std::cerr << "WSARecv failed with error: " << WSAGetLastError() << std::endl;
closesocket(client_socket);
delete completion_data;
continue; // 继续接受其他连接
}
}
}
// 关闭监听 Socket 和 IOCP
closesocket(listen_socket);
CloseHandle(iocp_handle);
WSACleanup();
return 0;
}
代码解释:
- CompletionData: 结构体,用于存储 I/O 操作的相关信息,包括 Socket、缓冲区、OVERLAPPED 结构体等。
- iocp_handle: 全局 IOCP 句柄。
- WorkerThread: 工作线程函数,负责从 IOCP 中获取完成的 I/O 操作,并调用 Completion Handler 处理结果。
- main: 主函数,负责初始化 Winsock、创建监听 Socket、创建 IOCP、创建工作线程、接受连接并投递接收请求。
四、Reactor vs Proactor:到底选哪个?
Reactor 和 Proactor 都是事件驱动架构的优秀代表,但它们的应用场景有所不同。
特性 | Reactor | Proactor |
---|---|---|
I/O 操作 | 由应用程序发起 | 由操作系统发起 |
异步程度 | 伪异步 (需要非阻塞 I/O) | 真异步 |
实现难度 | 相对简单 | 复杂 |
平台依赖性 | 较低 | 较高 |
性能 | 中等 | 较高 |
适用场景 | I/O 操作比较简单,或者平台不支持真异步 I/O | I/O 操作比较复杂,且平台支持真异步 I/O |
总结:
- Reactor: 如果你需要一个简单易懂、平台兼容性好的事件驱动框架,并且 I/O 操作比较简单,那么 Reactor 是一个不错的选择。
- Proactor: 如果你需要更高的性能,并且平台支持真异步 I/O,那么 Proactor 可能是更好的选择。
五、总结
今天我们一起学习了 Reactor 和 Proactor 两种事件驱动架构模式,了解了它们的核心组件、工作流程、优缺点以及适用场景。希望通过今天的讲解,大家能够对 Reactor 和 Proactor 有更深入的理解,并在实际项目中灵活运用,构建出高性能、可扩展的网络应用。
记住,没有银弹,只有最适合你的解决方案。选择哪种模式,取决于你的具体需求和平台环境。
好了,今天的分享就到这里,谢谢大家!