C++ 设计一个可伸缩的事件循环:多线程与多路复用结合

好的,没问题,我们直接进入主题。

各位听众,欢迎来到今天的C++进阶讲堂!今天我们要聊一个有点意思的话题:如何用C++打造一个可伸缩的事件循环,让它既能玩转多线程,又能耍得起多路复用。 想象一下,你的程序就像一个餐厅,事件循环就是这个餐厅的调度员,多线程是厨房里的大厨们,而多路复用就像服务员,负责高效地从顾客(外部事件)那里接收订单,并把做好的菜(处理结果)送到顾客手中。

一、什么是事件循环?

首先,咱们得搞明白,什么是事件循环。简单来说,事件循环就是一个不断循环执行以下步骤的程序结构:

  1. 等待事件: 监听各种事件源(比如socket、定时器、信号等)是否有事件发生。
  2. 处理事件: 如果有事件发生,就调用相应的事件处理函数来处理它。
  3. 回到第一步: 继续等待下一个事件。

这就像一个永动机,只要程序活着,它就会不停地转下去。

二、为什么要用多线程和多路复用?

好,现在问题来了,单个事件循环够用吗?答案是:在某些情况下,不够。

  • 单线程的局限性: 如果某个事件处理函数执行时间过长(比如执行了耗时的IO操作),整个事件循环就会被阻塞,无法响应其他事件。这就像餐厅里只有一个厨师,如果他正在做一个复杂的菜,其他顾客就得等着。
  • 多线程的优势: 为了解决这个问题,我们可以引入多线程。把耗时的事件处理函数放到独立的线程中执行,这样就不会阻塞主线程的事件循环。就像餐厅里有了多个厨师,可以同时做不同的菜。
  • 多路复用的必要性: 但是,如果有很多个socket连接需要监听,为每个socket创建一个线程,线程数量会变得非常庞大,导致资源浪费。 这时候,多路复用就派上用场了。它可以让一个线程同时监听多个socket,当某个socket有数据可读时,才去处理它。就像一个服务员可以同时服务多个顾客,而不需要为每个顾客都配备一个服务员。

所以,一个可伸缩的事件循环,应该能够同时利用多线程和多路复用的优势,既能并发处理事件,又能高效地管理大量的连接。

三、设计思路

我们的目标是创建一个既能处理CPU密集型任务(通过多线程),又能处理IO密集型任务(通过多路复用)的事件循环。

1. 核心组件

  • 事件队列 (Event Queue): 用于存放待处理的事件。
  • 事件循环线程 (Event Loop Thread): 主线程,负责从事件队列中取出事件,并分发给相应的处理函数。
  • 线程池 (Thread Pool): 用于执行耗时的CPU密集型任务。
  • IO多路复用器 (IO Multiplexer): 用于监听多个socket连接,当有数据可读时,通知事件循环线程。
  • 事件处理器 (Event Handler): 负责处理特定类型的事件。

2. 工作流程

  1. 注册事件: 将事件和对应的事件处理器注册到事件循环中。
  2. 事件发生: 当事件发生时(比如socket上有数据可读,定时器到期等),将事件添加到事件队列中。
  3. 事件循环线程:
    • 从事件队列中取出事件。
    • 如果事件是CPU密集型的,就提交给线程池执行。
    • 如果事件是IO事件,就调用相应的事件处理器处理。
    • 如果没有事件,就等待一段时间。
  4. IO多路复用器:
    • 监听多个socket连接。
    • 当某个socket上有数据可读时,将事件添加到事件队列中。
  5. 事件处理器: 负责处理特定类型的事件,比如读取socket数据,解析协议,等等。

四、代码实现

接下来,我们用C++代码来实现这个事件循环。 为了便于理解,我们先从一个简单的单线程事件循环开始,然后逐步添加多线程和多路复用功能。

1. 简单的单线程事件循环

#include <iostream>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>

class EventLoop {
public:
    using EventCallback = std::function<void()>;

    void addEvent(EventCallback callback) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            eventQueue_.push(callback);
        }
        cv_.notify_one();
    }

    void run() {
        while (running_) {
            EventCallback callback;
            {
                std::unique_lock<std::mutex> lock(mutex_);
                cv_.wait(lock, [this] { return !eventQueue_.empty() || !running_; });
                if (!running_ && eventQueue_.empty()) break;
                callback = eventQueue_.front();
                eventQueue_.pop();
            }
            callback(); // 执行事件处理函数
        }
    }

    void stop() {
        running_ = false;
        cv_.notify_one();
    }

private:
    std::queue<EventCallback> eventQueue_;
    std::mutex mutex_;
    std::condition_variable cv_;
    bool running_ = true;
};

int main() {
    EventLoop loop;
    std::thread loopThread([&loop] { loop.run(); });

    // 添加一些事件
    loop.addEvent([]() {
        std::cout << "Event 1 executed" << std::endl;
    });

    loop.addEvent([]() {
        std::cout << "Event 2 executed" << std::endl;
    });

    std::this_thread::sleep_for(std::chrono::seconds(2));
    loop.stop();
    loopThread.join();

    return 0;
}

这个代码实现了一个最基本的事件循环。它有一个事件队列,一个互斥锁,一个条件变量,和一个run()函数。 addEvent()函数用于向事件队列中添加事件处理函数。 run()函数在一个循环中不断地从事件队列中取出事件处理函数,并执行它。

2. 加入线程池

现在,我们来添加线程池,用于执行耗时的CPU密集型任务。

#include <iostream>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <vector>

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : numThreads_(numThreads) {
        threads_.reserve(numThreads_);
        for (size_t i = 0; i < numThreads_; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queueMutex_);
                        cv_.wait(lock, [this] { return !taskQueue_.empty() || stop_; });
                        if (stop_ && taskQueue_.empty()) return;
                        task = taskQueue_.front();
                        taskQueue_.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            stop_ = true;
        }
        cv_.notify_all();
        for (std::thread& thread : threads_) {
            thread.join();
        }
    }

    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            if (stop_) throw std::runtime_error("enqueue on stopped ThreadPool");
            taskQueue_.emplace([task]() { (*task)(); });
        }
        cv_.notify_one();
        return res;
    }

private:
    size_t numThreads_;
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> taskQueue_;
    std::mutex queueMutex_;
    std::condition_variable cv_;
    bool stop_ = false;
};

class EventLoop {
public:
    using EventCallback = std::function<void()>;

    EventLoop(size_t threadPoolSize) : threadPool_(threadPoolSize) {}

    void addEvent(EventCallback callback, bool runInThread = false) {
        if (runInThread) {
            threadPool_.enqueue(callback);
        } else {
            {
                std::lock_guard<std::mutex> lock(mutex_);
                eventQueue_.push(callback);
            }
            cv_.notify_one();
        }
    }

    void run() {
        while (running_) {
            EventCallback callback;
            {
                std::unique_lock<std::mutex> lock(mutex_);
                cv_.wait(lock, [this] { return !eventQueue_.empty() || !running_; });
                if (!running_ && eventQueue_.empty()) break;
                callback = eventQueue_.front();
                eventQueue_.pop();
            }
            callback(); // 执行事件处理函数
        }
    }

    void stop() {
        running_ = false;
        cv_.notify_one();
    }

private:
    std::queue<EventCallback> eventQueue_;
    std::mutex mutex_;
    std::condition_variable cv_;
    bool running_ = true;
    ThreadPool threadPool_;
};

int main() {
    EventLoop loop(4); // 创建一个包含4个线程的线程池
    std::thread loopThread([&loop] { loop.run(); });

    // 添加一个CPU密集型任务
    loop.addEvent([]() {
        std::cout << "CPU intensive task started" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟耗时操作
        std::cout << "CPU intensive task finished" << std::endl;
    }, true); // 在线程池中执行

    // 添加一个简单的事件
    loop.addEvent([]() {
        std::cout << "Simple event executed" << std::endl;
    });

    std::this_thread::sleep_for(std::chrono::seconds(2));
    loop.stop();
    loopThread.join();

    return 0;
}

在这个代码中,我们添加了一个ThreadPool类,用于管理线程池。 EventLoop类现在接受一个线程池大小作为参数,并将CPU密集型任务提交给线程池执行。 addEvent()函数现在接受一个runInThread参数,用于指定是否在线程池中执行事件处理函数。

3. 加入IO多路复用 (epoll为例)

现在,我们来添加IO多路复用功能,用于监听多个socket连接。 这里我们以epoll为例,因为它是Linux系统上最常用的多路复用机制。

#include <iostream>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <vector>

#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define MAX_EVENTS 10

// ThreadPool 代码 (与之前相同)
class ThreadPool {
public:
    ThreadPool(size_t numThreads) : numThreads_(numThreads) {
        threads_.reserve(numThreads_);
        for (size_t i = 0; i < numThreads_; ++i) {
            threads_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queueMutex_);
                        cv_.wait(lock, [this] { return !taskQueue_.empty() || stop_; });
                        if (stop_ && taskQueue_.empty()) return;
                        task = taskQueue_.front();
                        taskQueue_.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            stop_ = true;
        }
        cv_.notify_all();
        for (std::thread& thread : threads_) {
            thread.join();
        }
    }

    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            if (stop_) throw std::runtime_error("enqueue on stopped ThreadPool");
            taskQueue_.emplace([task]() { (*task)(); });
        }
        cv_.notify_one();
        return res;
    }

private:
    size_t numThreads_;
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> taskQueue_;
    std::mutex queueMutex_;
    std::condition_variable cv_;
    bool stop_ = false;
};

class EventLoop {
public:
    using EventCallback = std::function<void()>;

    EventLoop(size_t threadPoolSize) : threadPool_(threadPoolSize), epollFd_(epoll_create1(0)) {
        if (epollFd_ == -1) {
            perror("epoll_create1");
            exit(EXIT_FAILURE);
        }
    }

    ~EventLoop() {
        close(epollFd_);
    }

    void addEvent(EventCallback callback, bool runInThread = false) {
        if (runInThread) {
            threadPool_.enqueue(callback);
        } else {
            {
                std::lock_guard<std::mutex> lock(mutex_);
                eventQueue_.push(callback);
            }
            cv_.notify_one();
        }
    }

    void addSocket(int fd, EventCallback callback) {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN; // 监听可读事件
        if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event) == -1) {
            perror("epoll_ctl: add");
            close(fd);
            return;
        }
        socketCallbacks_[fd] = callback;
    }

    void removeSocket(int fd) {
        if (epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {
            perror("epoll_ctl: del");
        }
        socketCallbacks_.erase(fd);
        close(fd);
    }

    void run() {
        epoll_event events[MAX_EVENTS];

        while (running_) {
            {
                std::unique_lock<std::mutex> lock(mutex_);
                if (eventQueue_.empty() && socketCallbacks_.empty()) {
                     cv_.wait_for(lock, std::chrono::milliseconds(10), [this] { return !eventQueue_.empty() || !socketCallbacks_.empty() || !running_; });
                    if (!running_ && eventQueue_.empty() && socketCallbacks_.empty()) break;
                }

            }

            // Process pending events from queue
            while (true) {
                EventCallback callback;
                {
                    std::lock_guard<std::mutex> lock(mutex_);
                    if (eventQueue_.empty()) break;
                    callback = eventQueue_.front();
                    eventQueue_.pop();
                }
                callback();
            }

            // Process epoll events
            int numEvents = epoll_wait(epollFd_, events, MAX_EVENTS, 0); // Non-blocking wait
            if (numEvents == -1) {
                perror("epoll_wait");
                continue; // Or handle error as appropriate
            }

            for (int i = 0; i < numEvents; ++i) {
                int fd = events[i].data.fd;
                auto it = socketCallbacks_.find(fd);
                if (it != socketCallbacks_.end()) {
                    it->second(); // 执行socket事件处理函数
                } else {
                    std::cerr << "Error: socket callback not found for fd " << fd << std::endl;
                    removeSocket(fd);
                }
            }
        }

        // Close all sockets before exiting
        for (auto const& [fd, callback] : socketCallbacks_) {
            close(fd);
        }
        socketCallbacks_.clear();
    }

    void stop() {
        running_ = false;
        cv_.notify_one();
    }

private:
    std::queue<EventCallback> eventQueue_;
    std::mutex mutex_;
    std::condition_variable cv_;
    bool running_ = true;
    ThreadPool threadPool_;
    int epollFd_;
    std::unordered_map<int, EventCallback> socketCallbacks_;
};

int main() {
    EventLoop loop(4); // 创建一个包含4个线程的线程池
    std::thread loopThread([&loop] { loop.run(); });

    // 创建一个socket,用于监听连接
    int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
    if (serverSocket == -1) {
        perror("socket");
        return 1;
    }

    sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_addr.s_addr = INADDR_ANY;
    serverAddr.sin_port = htons(8080);

    if (bind(serverSocket, (sockaddr*)&serverAddr, sizeof(serverAddr)) == -1) {
        perror("bind");
        close(serverSocket);
        return 1;
    }

    if (listen(serverSocket, 10) == -1) {
        perror("listen");
        close(serverSocket);
        return 1;
    }

    // 添加socket到事件循环中
    loop.addSocket(serverSocket, [&]() {
        sockaddr_in clientAddr;
        socklen_t clientAddrLen = sizeof(clientAddr);
        int clientSocket = accept(serverSocket, (sockaddr*)&clientAddr, &clientAddrLen);
        if (clientSocket == -1) {
            perror("accept");
            return;
        }

        std::cout << "Accepted connection from " << inet_ntoa(clientAddr.sin_addr) << ":" << ntohs(clientAddr.sin_port) << std::endl;

        //  为客户端socket也添加到事件循环,处理接收到的数据
        loop.addSocket(clientSocket, [clientSocket, &loop]() {
            char buffer[1024];
            ssize_t bytesRead = recv(clientSocket, buffer, sizeof(buffer), 0);
            if (bytesRead > 0) {
                std::cout << "Received: " << std::string(buffer, bytesRead) << std::endl;
                send(clientSocket, buffer, bytesRead, 0); // Echo back
            } else {
                std::cout << "Client disconnected" << std::endl;
                loop.removeSocket(clientSocket);
            }
        });
    });

    std::this_thread::sleep_for(std::chrono::seconds(10));
    loop.stop();
    loopThread.join();
    close(serverSocket); // Close the server socket

    return 0;
}

这个代码中,我们添加了epollFd_成员变量,用于保存epoll的文件描述符。 addSocket()函数用于将socket添加到epoll中,并注册相应的事件处理函数。 run()函数现在会调用epoll_wait()函数来等待socket事件。 当有socket事件发生时,就调用相应的事件处理函数。

五、总结

通过以上步骤,我们实现了一个基本的可伸缩的事件循环,它既能利用多线程处理CPU密集型任务,又能利用多路复用高效地管理大量的IO连接。

核心思路总结

组件 功能 技术点
事件队列 存放待处理的事件 线程安全队列(互斥锁+条件变量)
线程池 执行耗时的CPU密集型任务 std::thread, std::future, std::packaged_task
IO多路复用器 监听多个socket连接,当有数据可读时,通知事件循环线程 epoll (Linux), kqueue (BSD/macOS), select (跨平台)
事件循环线程 主线程,负责从事件队列中取出事件,并分发给相应的处理函数 std::thread, 循环,条件变量
事件处理器 负责处理特定类型的事件 函数对象, lambda表达式

当然,这只是一个最基本的实现,还有很多可以改进的地方,比如:

  • 错误处理: 添加更完善的错误处理机制。
  • 超时机制: 为socket连接添加超时机制,防止连接一直占用资源。
  • 负载均衡: 根据线程池的负载情况,动态地调整线程数量。
  • 优先级: 为事件添加优先级,让重要的事件优先处理。
  • 跨平台: 可以通过条件编译,使用不同的多路复用机制,实现跨平台支持(例如,Windows下使用IOCP)。

希望今天的讲座能对你有所帮助!下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注