好的,没问题,我们直接进入主题。
各位听众,欢迎来到今天的C++进阶讲堂!今天我们要聊一个有点意思的话题:如何用C++打造一个可伸缩的事件循环,让它既能玩转多线程,又能耍得起多路复用。 想象一下,你的程序就像一个餐厅,事件循环就是这个餐厅的调度员,多线程是厨房里的大厨们,而多路复用就像服务员,负责高效地从顾客(外部事件)那里接收订单,并把做好的菜(处理结果)送到顾客手中。
一、什么是事件循环?
首先,咱们得搞明白,什么是事件循环。简单来说,事件循环就是一个不断循环执行以下步骤的程序结构:
- 等待事件: 监听各种事件源(比如socket、定时器、信号等)是否有事件发生。
- 处理事件: 如果有事件发生,就调用相应的事件处理函数来处理它。
- 回到第一步: 继续等待下一个事件。
这就像一个永动机,只要程序活着,它就会不停地转下去。
二、为什么要用多线程和多路复用?
好,现在问题来了,单个事件循环够用吗?答案是:在某些情况下,不够。
- 单线程的局限性: 如果某个事件处理函数执行时间过长(比如执行了耗时的IO操作),整个事件循环就会被阻塞,无法响应其他事件。这就像餐厅里只有一个厨师,如果他正在做一个复杂的菜,其他顾客就得等着。
- 多线程的优势: 为了解决这个问题,我们可以引入多线程。把耗时的事件处理函数放到独立的线程中执行,这样就不会阻塞主线程的事件循环。就像餐厅里有了多个厨师,可以同时做不同的菜。
- 多路复用的必要性: 但是,如果有很多个socket连接需要监听,为每个socket创建一个线程,线程数量会变得非常庞大,导致资源浪费。 这时候,多路复用就派上用场了。它可以让一个线程同时监听多个socket,当某个socket有数据可读时,才去处理它。就像一个服务员可以同时服务多个顾客,而不需要为每个顾客都配备一个服务员。
所以,一个可伸缩的事件循环,应该能够同时利用多线程和多路复用的优势,既能并发处理事件,又能高效地管理大量的连接。
三、设计思路
我们的目标是创建一个既能处理CPU密集型任务(通过多线程),又能处理IO密集型任务(通过多路复用)的事件循环。
1. 核心组件
- 事件队列 (Event Queue): 用于存放待处理的事件。
- 事件循环线程 (Event Loop Thread): 主线程,负责从事件队列中取出事件,并分发给相应的处理函数。
- 线程池 (Thread Pool): 用于执行耗时的CPU密集型任务。
- IO多路复用器 (IO Multiplexer): 用于监听多个socket连接,当有数据可读时,通知事件循环线程。
- 事件处理器 (Event Handler): 负责处理特定类型的事件。
2. 工作流程
- 注册事件: 将事件和对应的事件处理器注册到事件循环中。
- 事件发生: 当事件发生时(比如socket上有数据可读,定时器到期等),将事件添加到事件队列中。
- 事件循环线程:
- 从事件队列中取出事件。
- 如果事件是CPU密集型的,就提交给线程池执行。
- 如果事件是IO事件,就调用相应的事件处理器处理。
- 如果没有事件,就等待一段时间。
- IO多路复用器:
- 监听多个socket连接。
- 当某个socket上有数据可读时,将事件添加到事件队列中。
- 事件处理器: 负责处理特定类型的事件,比如读取socket数据,解析协议,等等。
四、代码实现
接下来,我们用C++代码来实现这个事件循环。 为了便于理解,我们先从一个简单的单线程事件循环开始,然后逐步添加多线程和多路复用功能。
1. 简单的单线程事件循环
#include <iostream>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
class EventLoop {
public:
using EventCallback = std::function<void()>;
void addEvent(EventCallback callback) {
{
std::lock_guard<std::mutex> lock(mutex_);
eventQueue_.push(callback);
}
cv_.notify_one();
}
void run() {
while (running_) {
EventCallback callback;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !eventQueue_.empty() || !running_; });
if (!running_ && eventQueue_.empty()) break;
callback = eventQueue_.front();
eventQueue_.pop();
}
callback(); // 执行事件处理函数
}
}
void stop() {
running_ = false;
cv_.notify_one();
}
private:
std::queue<EventCallback> eventQueue_;
std::mutex mutex_;
std::condition_variable cv_;
bool running_ = true;
};
int main() {
EventLoop loop;
std::thread loopThread([&loop] { loop.run(); });
// 添加一些事件
loop.addEvent([]() {
std::cout << "Event 1 executed" << std::endl;
});
loop.addEvent([]() {
std::cout << "Event 2 executed" << std::endl;
});
std::this_thread::sleep_for(std::chrono::seconds(2));
loop.stop();
loopThread.join();
return 0;
}
这个代码实现了一个最基本的事件循环。它有一个事件队列,一个互斥锁,一个条件变量,和一个run()
函数。 addEvent()
函数用于向事件队列中添加事件处理函数。 run()
函数在一个循环中不断地从事件队列中取出事件处理函数,并执行它。
2. 加入线程池
现在,我们来添加线程池,用于执行耗时的CPU密集型任务。
#include <iostream>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <vector>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : numThreads_(numThreads) {
threads_.reserve(numThreads_);
for (size_t i = 0; i < numThreads_; ++i) {
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex_);
cv_.wait(lock, [this] { return !taskQueue_.empty() || stop_; });
if (stop_ && taskQueue_.empty()) return;
task = taskQueue_.front();
taskQueue_.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex_);
stop_ = true;
}
cv_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex_);
if (stop_) throw std::runtime_error("enqueue on stopped ThreadPool");
taskQueue_.emplace([task]() { (*task)(); });
}
cv_.notify_one();
return res;
}
private:
size_t numThreads_;
std::vector<std::thread> threads_;
std::queue<std::function<void()>> taskQueue_;
std::mutex queueMutex_;
std::condition_variable cv_;
bool stop_ = false;
};
class EventLoop {
public:
using EventCallback = std::function<void()>;
EventLoop(size_t threadPoolSize) : threadPool_(threadPoolSize) {}
void addEvent(EventCallback callback, bool runInThread = false) {
if (runInThread) {
threadPool_.enqueue(callback);
} else {
{
std::lock_guard<std::mutex> lock(mutex_);
eventQueue_.push(callback);
}
cv_.notify_one();
}
}
void run() {
while (running_) {
EventCallback callback;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return !eventQueue_.empty() || !running_; });
if (!running_ && eventQueue_.empty()) break;
callback = eventQueue_.front();
eventQueue_.pop();
}
callback(); // 执行事件处理函数
}
}
void stop() {
running_ = false;
cv_.notify_one();
}
private:
std::queue<EventCallback> eventQueue_;
std::mutex mutex_;
std::condition_variable cv_;
bool running_ = true;
ThreadPool threadPool_;
};
int main() {
EventLoop loop(4); // 创建一个包含4个线程的线程池
std::thread loopThread([&loop] { loop.run(); });
// 添加一个CPU密集型任务
loop.addEvent([]() {
std::cout << "CPU intensive task started" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟耗时操作
std::cout << "CPU intensive task finished" << std::endl;
}, true); // 在线程池中执行
// 添加一个简单的事件
loop.addEvent([]() {
std::cout << "Simple event executed" << std::endl;
});
std::this_thread::sleep_for(std::chrono::seconds(2));
loop.stop();
loopThread.join();
return 0;
}
在这个代码中,我们添加了一个ThreadPool
类,用于管理线程池。 EventLoop
类现在接受一个线程池大小作为参数,并将CPU密集型任务提交给线程池执行。 addEvent()
函数现在接受一个runInThread
参数,用于指定是否在线程池中执行事件处理函数。
3. 加入IO多路复用 (epoll为例)
现在,我们来添加IO多路复用功能,用于监听多个socket连接。 这里我们以epoll为例,因为它是Linux系统上最常用的多路复用机制。
#include <iostream>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <vector>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MAX_EVENTS 10
// ThreadPool 代码 (与之前相同)
class ThreadPool {
public:
ThreadPool(size_t numThreads) : numThreads_(numThreads) {
threads_.reserve(numThreads_);
for (size_t i = 0; i < numThreads_; ++i) {
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex_);
cv_.wait(lock, [this] { return !taskQueue_.empty() || stop_; });
if (stop_ && taskQueue_.empty()) return;
task = taskQueue_.front();
taskQueue_.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex_);
stop_ = true;
}
cv_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex_);
if (stop_) throw std::runtime_error("enqueue on stopped ThreadPool");
taskQueue_.emplace([task]() { (*task)(); });
}
cv_.notify_one();
return res;
}
private:
size_t numThreads_;
std::vector<std::thread> threads_;
std::queue<std::function<void()>> taskQueue_;
std::mutex queueMutex_;
std::condition_variable cv_;
bool stop_ = false;
};
class EventLoop {
public:
using EventCallback = std::function<void()>;
EventLoop(size_t threadPoolSize) : threadPool_(threadPoolSize), epollFd_(epoll_create1(0)) {
if (epollFd_ == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
}
~EventLoop() {
close(epollFd_);
}
void addEvent(EventCallback callback, bool runInThread = false) {
if (runInThread) {
threadPool_.enqueue(callback);
} else {
{
std::lock_guard<std::mutex> lock(mutex_);
eventQueue_.push(callback);
}
cv_.notify_one();
}
}
void addSocket(int fd, EventCallback callback) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN; // 监听可读事件
if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event) == -1) {
perror("epoll_ctl: add");
close(fd);
return;
}
socketCallbacks_[fd] = callback;
}
void removeSocket(int fd) {
if (epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {
perror("epoll_ctl: del");
}
socketCallbacks_.erase(fd);
close(fd);
}
void run() {
epoll_event events[MAX_EVENTS];
while (running_) {
{
std::unique_lock<std::mutex> lock(mutex_);
if (eventQueue_.empty() && socketCallbacks_.empty()) {
cv_.wait_for(lock, std::chrono::milliseconds(10), [this] { return !eventQueue_.empty() || !socketCallbacks_.empty() || !running_; });
if (!running_ && eventQueue_.empty() && socketCallbacks_.empty()) break;
}
}
// Process pending events from queue
while (true) {
EventCallback callback;
{
std::lock_guard<std::mutex> lock(mutex_);
if (eventQueue_.empty()) break;
callback = eventQueue_.front();
eventQueue_.pop();
}
callback();
}
// Process epoll events
int numEvents = epoll_wait(epollFd_, events, MAX_EVENTS, 0); // Non-blocking wait
if (numEvents == -1) {
perror("epoll_wait");
continue; // Or handle error as appropriate
}
for (int i = 0; i < numEvents; ++i) {
int fd = events[i].data.fd;
auto it = socketCallbacks_.find(fd);
if (it != socketCallbacks_.end()) {
it->second(); // 执行socket事件处理函数
} else {
std::cerr << "Error: socket callback not found for fd " << fd << std::endl;
removeSocket(fd);
}
}
}
// Close all sockets before exiting
for (auto const& [fd, callback] : socketCallbacks_) {
close(fd);
}
socketCallbacks_.clear();
}
void stop() {
running_ = false;
cv_.notify_one();
}
private:
std::queue<EventCallback> eventQueue_;
std::mutex mutex_;
std::condition_variable cv_;
bool running_ = true;
ThreadPool threadPool_;
int epollFd_;
std::unordered_map<int, EventCallback> socketCallbacks_;
};
int main() {
EventLoop loop(4); // 创建一个包含4个线程的线程池
std::thread loopThread([&loop] { loop.run(); });
// 创建一个socket,用于监听连接
int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket == -1) {
perror("socket");
return 1;
}
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(8080);
if (bind(serverSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == -1) {
perror("bind");
close(serverSocket);
return 1;
}
if (listen(serverSocket, 10) == -1) {
perror("listen");
close(serverSocket);
return 1;
}
// 添加socket到事件循环中
loop.addSocket(serverSocket, [&]() {
sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
int clientSocket = accept(serverSocket, (sockaddr*)&clientAddr, &clientAddrLen);
if (clientSocket == -1) {
perror("accept");
return;
}
std::cout << "Accepted connection from " << inet_ntoa(clientAddr.sin_addr) << ":" << ntohs(clientAddr.sin_port) << std::endl;
// 为客户端socket也添加到事件循环,处理接收到的数据
loop.addSocket(clientSocket, [clientSocket, &loop]() {
char buffer[1024];
ssize_t bytesRead = recv(clientSocket, buffer, sizeof(buffer), 0);
if (bytesRead > 0) {
std::cout << "Received: " << std::string(buffer, bytesRead) << std::endl;
send(clientSocket, buffer, bytesRead, 0); // Echo back
} else {
std::cout << "Client disconnected" << std::endl;
loop.removeSocket(clientSocket);
}
});
});
std::this_thread::sleep_for(std::chrono::seconds(10));
loop.stop();
loopThread.join();
close(serverSocket); // Close the server socket
return 0;
}
这个代码中,我们添加了epollFd_
成员变量,用于保存epoll的文件描述符。 addSocket()
函数用于将socket添加到epoll中,并注册相应的事件处理函数。 run()
函数现在会调用epoll_wait()
函数来等待socket事件。 当有socket事件发生时,就调用相应的事件处理函数。
五、总结
通过以上步骤,我们实现了一个基本的可伸缩的事件循环,它既能利用多线程处理CPU密集型任务,又能利用多路复用高效地管理大量的IO连接。
核心思路总结
组件 | 功能 | 技术点 |
---|---|---|
事件队列 | 存放待处理的事件 | 线程安全队列(互斥锁+条件变量) |
线程池 | 执行耗时的CPU密集型任务 | std::thread , std::future , std::packaged_task |
IO多路复用器 | 监听多个socket连接,当有数据可读时,通知事件循环线程 | epoll (Linux), kqueue (BSD/macOS), select (跨平台) |
事件循环线程 | 主线程,负责从事件队列中取出事件,并分发给相应的处理函数 | std::thread , 循环,条件变量 |
事件处理器 | 负责处理特定类型的事件 | 函数对象, lambda表达式 |
当然,这只是一个最基本的实现,还有很多可以改进的地方,比如:
- 错误处理: 添加更完善的错误处理机制。
- 超时机制: 为socket连接添加超时机制,防止连接一直占用资源。
- 负载均衡: 根据线程池的负载情况,动态地调整线程数量。
- 优先级: 为事件添加优先级,让重要的事件优先处理。
- 跨平台: 可以通过条件编译,使用不同的多路复用机制,实现跨平台支持(例如,Windows下使用IOCP)。
希望今天的讲座能对你有所帮助!下次再见!