C++实现对操作系统的`epoll`/`kqueue`封装:实现高性能异步I/O事件循环

C++ 实现高性能异步 I/O 事件循环:epoll/kqueue 封装

大家好,今天我们来聊聊如何在 C++ 中封装操作系统的 epollkqueue,实现一个高性能的异步 I/O 事件循环。这对于构建高并发、低延迟的网络应用至关重要。

异步 I/O 的重要性

传统的同步 I/O 模型下,一个线程在等待 I/O 操作完成时会被阻塞,无法执行其他任务。这在处理大量并发连接时会造成资源的极大浪费,导致性能瓶颈。异步 I/O 允许线程发起 I/O 操作后立即返回,无需等待 I/O 完成。当 I/O 操作完成后,操作系统会通知线程,线程再进行后续处理。

这种非阻塞、事件驱动的模式能够充分利用 CPU 资源,提高并发处理能力。像 Nginx、Redis 等高性能服务器都依赖于异步 I/O 模型。

epollkqueue 简介

epoll (Linux) 和 kqueue (FreeBSD, macOS) 是操作系统提供的两种高性能 I/O 事件通知机制,它们允许应用程序监视多个文件描述符 (file descriptor) 上的事件,并在事件发生时得到通知。

  • epoll (Linux): 基于事件通知,当文件描述符上的事件发生时,epoll 会将该文件描述符加入就绪列表。epoll 支持 LT (Level Triggered) 和 ET (Edge Triggered) 两种触发模式。
  • kqueue (FreeBSD, macOS): 基于事件过滤,可以监视多种事件类型,包括文件描述符、信号、定时器等。kqueue 提供了更加灵活的事件过滤和处理机制。

选择哪个取决于你的目标平台。本文会展示两种机制的封装,并提供一个通用的接口。

设计目标

我们的目标是创建一个易于使用、高性能的异步 I/O 事件循环库,该库应具备以下特点:

  • 跨平台: 支持 Linux (epoll) 和 FreeBSD/macOS (kqueue)。
  • 易于使用: 提供简洁的 API,方便用户注册和处理事件。
  • 高性能: 充分利用 epollkqueue 的优势,减少系统调用开销。
  • 可扩展性: 方便用户添加自定义事件类型和处理逻辑。
  • 线程安全: 保证在多线程环境下的正确性。

核心组件设计

为了实现上述目标,我们需要设计以下核心组件:

  1. EventLoop: 事件循环类,负责管理事件监听器,处理事件循环,以及调度事件处理器。
  2. EventHandler: 事件处理器基类,定义了事件处理接口。
  3. Channel: 封装文件描述符和事件类型,负责注册和取消事件监听。
  4. Poller: 抽象类,定义了 epollkqueue 的通用接口。
  5. EpollPoller: Pollerepoll 实现。
  6. KqueuePoller: Pollerkqueue 实现。

代码实现

接下来,我们逐步实现上述组件。

1. EventHandler 基类

#include <functional>

class EventHandler {
public:
    virtual ~EventHandler() = default;

    virtual void handleEvent() = 0;
};

// 也可以使用 std::function,更加灵活
// typedef std::function<void()> EventHandlerFunc;

这个基类定义了一个纯虚函数 handleEvent(),所有的事件处理器都必须继承自该类并实现该函数。

2. Channel

#include <sys/epoll.h> // For epoll defines
#include <sys/event.h> // For kqueue defines

class Channel {
public:
    using EventCallback = std::function<void()>;

    Channel(int fd) : fd_(fd) {}
    ~Channel() = default;

    int fd() const { return fd_; }

    void setReadCallback(EventCallback cb) { readCallback_ = cb; }
    void setWriteCallback(EventCallback cb) { writeCallback_ = cb; }
    void setErrorCallback(EventCallback cb) { errorCallback_ = cb; }

    void enableReading() { events_ |= EPOLLIN; } // or EVFILT_READ in kqueue
    void disableReading() { events_ &= ~EPOLLIN; } // or EVFILT_READ in kqueue
    void enableWriting() { events_ |= EPOLLOUT; } // or EVFILT_WRITE in kqueue
    void disableWriting() { events_ &= ~EPOLLOUT; } // or EVFILT_WRITE in kqueue
    void disableAll() { events_ = 0; }

    bool isNoneEvent() const { return events_ == 0; }
    bool isReading() const { return events_ & EPOLLIN; } // or EVFILT_READ in kqueue
    bool isWriting() const { return events_ & EPOLLOUT; } // or EVFILT_WRITE in kqueue

    int events() const { return events_; }
    void set_revents(int revents) { revents_ = revents; }
    int revents() const { return revents_; }

    void handleEvent() {
        if (revents_ & (EPOLLERR | EPOLLHUP)) { // or EV_EOF in kqueue
            if (errorCallback_) errorCallback_();
        }

        if (revents_ & EPOLLIN) { // or EVFILT_READ in kqueue
            if (readCallback_) readCallback_();
        }

        if (revents_ & EPOLLOUT) { // or EVFILT_WRITE in kqueue
            if (writeCallback_) writeCallback_();
        }
    }

private:
    int fd_;
    int events_ = 0; // The events that we're interested in
    int revents_ = 0; // The events that actually happened

    EventCallback readCallback_;
    EventCallback writeCallback_;
    EventCallback errorCallback_;
};

Channel 类封装了文件描述符和事件类型,负责注册和取消事件监听。它还保存了事件回调函数,并在事件发生时调用相应的回调函数。 events_ 成员变量存储了我们感兴趣的事件类型,例如 EPOLLIN (可读) 和 EPOLLOUT (可写)。 revents_ 成员变量存储了实际发生的事件类型。

3. Poller 抽象类

#include <vector>
#include <map>

class Channel;

class Poller {
public:
    using ChannelList = std::vector<Channel*>;

    virtual ~Poller() = default;

    virtual void poll(int timeoutMs, ChannelList* activeChannels) = 0;
    virtual void updateChannel(Channel* channel) = 0;
    virtual void removeChannel(Channel* channel) = 0;

    // Helper function to get the Poller implementation based on the OS
    static Poller* newDefaultPoller();

protected:
    Poller() = default;
};

Poller 抽象类定义了 poll(), updateChannel(), 和 removeChannel() 三个纯虚函数,分别用于:

  • poll(): 等待事件发生,并将发生的事件添加到 activeChannels 列表中。
  • updateChannel(): 更新文件描述符上的事件监听。
  • removeChannel(): 移除文件描述符上的事件监听。

newDefaultPoller() 是一个静态工厂方法,用于根据操作系统选择合适的 Poller 实现。

4. EpollPoller

#include <sys/epoll.h>
#include <unistd.h>
#include <cerrno>

#include <iostream> // For debugging

class EpollPoller : public Poller {
public:
    EpollPoller() : epollfd_(epoll_create1(EPOLL_CLOEXEC)), events_(kInitialEventListSize) {
        if (epollfd_ < 0) {
            perror("epoll_create1");
            abort();
        }
    }

    ~EpollPoller() override {
        close(epollfd_);
    }

    void poll(int timeoutMs, ChannelList* activeChannels) override {
        int numEvents = epoll_wait(epollfd_, events_.data(), static_cast<int>(events_.size()), timeoutMs);
        int savedErrno = errno;

        if (numEvents > 0) {
            for (int i = 0; i < numEvents; ++i) {
                Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
                channel->set_revents(events_[i].events);
                activeChannels->push_back(channel);
            }

            if (static_cast<size_t>(numEvents) == events_.size()) {
                events_.resize(events_.size() * 2); // Double the size if needed
            }
        } else if (numEvents == 0) {
            // Timeout
        } else {
            if (savedErrno != EINTR) {
                errno = savedErrno;
                perror("epoll_wait");
            }
        }
    }

    void updateChannel(Channel* channel) override {
        int fd = channel->fd();
        int events = channel->events();

        struct epoll_event ev;
        ev.events = events;
        ev.data.ptr = channel;

        if (channels_.find(fd) == channels_.end()) {
            // New channel
            if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd, &ev) < 0) {
                perror("epoll_ctl (add)");
            } else {
                channels_[fd] = channel;
            }
        } else {
            // Existing channel
            if (epoll_ctl(epollfd_, EPOLL_CTL_MOD, fd, &ev) < 0) {
                perror("epoll_ctl (mod)");
            }
        }
    }

    void removeChannel(Channel* channel) override {
        int fd = channel->fd();
        channels_.erase(fd);

        struct epoll_event ev;
        ev.events = 0; // Not interested in any events
        ev.data.ptr = channel;

        if (epoll_ctl(epollfd_, EPOLL_CTL_DEL, fd, &ev) < 0) {
             perror("epoll_ctl (del)");
        }
    }

private:
    static const int kInitialEventListSize = 16;

    int epollfd_;
    std::vector<epoll_event> events_;
    std::map<int, Channel*> channels_;
};

EpollPoller 类是 Pollerepoll 实现。它使用 epoll_create1(), epoll_wait(), 和 epoll_ctl() 等系统调用来实现事件监听和通知。 events_ 成员变量是一个 std::vector<epoll_event>,用于存储就绪的事件。为了避免频繁的内存分配,我们预先分配了一定数量的 epoll_event 结构体,并在需要时动态扩容。 channels_ 是一个 std::map<int, Channel*>,用于存储所有注册的 Channel 对象,方便在 poll() 函数中根据文件描述符找到对应的 Channel 对象。

5. KqueuePoller

#ifdef __APPLE__ || __FreeBSD__
#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
#include <cerrno>

#include <iostream> // For debugging

class KqueuePoller : public Poller {
public:
    KqueuePoller() : kq_(kqueue()), events_(kInitialEventListSize) {
        if (kq_ < 0) {
            perror("kqueue");
            abort();
        }
    }

    ~KqueuePoller() override {
        close(kq_);
    }

    void poll(int timeoutMs, ChannelList* activeChannels) override {
        struct timespec timeout;
        timeout.tv_sec = timeoutMs / 1000;
        timeout.tv_nsec = (timeoutMs % 1000) * 1000000;

        int numEvents = kevent(kq_, nullptr, 0, events_.data(), static_cast<int>(events_.size()), timeoutMs == -1 ? nullptr : &timeout);
        int savedErrno = errno;

        if (numEvents > 0) {
            for (int i = 0; i < numEvents; ++i) {
                Channel* channel = static_cast<Channel*>(events_[i].udata);
                int revents = 0;

                if (events_[i].flags & EV_ERROR) {
                    revents |= EPOLLERR; // Map kqueue error to epoll error
                }

                if (events_[i].filter == EVFILT_READ) {
                    revents |= EPOLLIN;
                }

                if (events_[i].filter == EVFILT_WRITE) {
                    revents |= EPOLLOUT;
                }

                channel->set_revents(revents);
                activeChannels->push_back(channel);
            }

            if (static_cast<size_t>(numEvents) == events_.size()) {
                events_.resize(events_.size() * 2); // Double the size if needed
            }
        } else if (numEvents == 0) {
            // Timeout
        } else {
            if (savedErrno != EINTR) {
                errno = savedErrno;
                perror("kevent");
            }
        }
    }

    void updateChannel(Channel* channel) override {
        int fd = channel->fd();

        struct kevent changes[2];
        int nchanges = 0;

        if (channel->isReading()) {
            EV_SET(&changes[nchanges++], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, channel);
        } else {
            EV_SET(&changes[nchanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, channel);
        }

        if (channel->isWriting()) {
            EV_SET(&changes[nchanges++], fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, channel);
        } else {
            EV_SET(&changes[nchanges++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, channel);
        }

        kevent(kq_, changes, nchanges, nullptr, 0, nullptr);
    }

    void removeChannel(Channel* channel) override {
        int fd = channel->fd();

        struct kevent changes[2];
        int nchanges = 0;

        EV_SET(&changes[nchanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, channel);
        EV_SET(&changes[nchanges++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, channel);

        kevent(kq_, changes, nchanges, nullptr, 0, nullptr);
    }

private:
    static const int kInitialEventListSize = 16;

    int kq_;
    std::vector<struct kevent> events_;
};

#endif

KqueuePoller 类是 Pollerkqueue 实现。它使用 kqueue(), kevent() 等系统调用来实现事件监听和通知。 为了跨平台,我们在KqueuePoller中将kqueue返回的事件转换成epoll定义的事件。

6. Poller::newDefaultPoller() 工厂方法

#include <cstdlib> // For getenv

Poller* Poller::newDefaultPoller() {
#ifdef __linux__
    return new EpollPoller();
#elif defined(__APPLE__) || defined(__FreeBSD__)
    return new KqueuePoller();
#else
    return nullptr; // Or throw an exception
#endif
}

这个静态工厂方法根据操作系统选择合适的 Poller 实现。

7. EventLoop

#include <memory>
#include <thread>
#include <chrono>
#include <iostream>

class EventLoop {
public:
    EventLoop() : poller_(Poller::newDefaultPoller()), quit_(false) {}
    ~EventLoop() { delete poller_; }

    void loop();

    void updateChannel(Channel* channel) {
        poller_->updateChannel(channel);
    }

    void removeChannel(Channel* channel) {
        poller_->removeChannel(channel);
    }

    void quit() {
        quit_ = true;
    }

private:
    Poller* poller_;
    bool quit_;
};

void EventLoop::loop() {
    while (!quit_) {
        Poller::ChannelList activeChannels;
        poller_->poll(10000, &activeChannels); // 10 seconds timeout

        for (Channel* channel : activeChannels) {
            channel->handleEvent();
        }
    }
}

EventLoop 类是事件循环的核心。它包含一个 Poller 对象,负责监听事件。loop() 函数是事件循环的主体,它不断地调用 poller_->poll() 等待事件发生,然后处理发生的事件。

使用示例

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <string.h>

// Include the classes defined above
// EventHandler, Channel, Poller, EpollPoller, KqueuePoller, EventLoop

class Acceptor : public EventHandler {
public:
    Acceptor(EventLoop* loop, int port) : loop_(loop), listenFd_(socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0)), channel_(listenFd_) {
        if (listenFd_ < 0) {
            perror("socket");
            abort();
        }

        struct sockaddr_in addr;
        memset(&addr, 0, sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
        addr.sin_port = htons(port);

        if (bind(listenFd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
            perror("bind");
            abort();
        }

        if (listen(listenFd_, 128) < 0) {
            perror("listen");
            abort();
        }

        channel_.setReadCallback(std::bind(&Acceptor::handleAccept, this));
        channel_.enableReading();
        loop_->updateChannel(&channel_);
    }

    ~Acceptor() override {
        close(listenFd_);
    }

private:
    void handleAccept() {
        struct sockaddr_in clientAddr;
        socklen_t clientLen = sizeof(clientAddr);
        int connFd = accept4(listenFd_, (struct sockaddr*)&clientAddr, &clientLen, SOCK_NONBLOCK | SOCK_CLOEXEC);

        if (connFd < 0) {
            perror("accept4");
            return;
        }

        std::cout << "New connection from " << inet_ntoa(clientAddr.sin_addr) << ":" << ntohs(clientAddr.sin_port) << std::endl;

        // Create a new connection handler (not implemented here)
        // You would typically create a new Channel and EventHandler for the new connection
    }

    EventLoop* loop_;
    int listenFd_;
    Channel channel_;
};

int main() {
    EventLoop loop;
    Acceptor acceptor(&loop, 8080); // Listen on port 8080
    loop.loop();

    return 0;
}

这个示例创建了一个 Acceptor 类,用于监听新的连接。当有新的连接到达时,Acceptor::handleAccept() 函数会被调用。 这个例子演示了如何使用 EventLoop, Channel, 和 Poller 来实现一个简单的服务器。

线程安全

在多线程环境下使用事件循环需要考虑线程安全问题。 以下是一些需要注意的点:

  • 事件循环的生命周期: 确保事件循环在所有线程退出之前都处于活动状态。
  • Channel 的更新: 多个线程可能同时尝试更新同一个 Channel,需要使用锁来保护 Channel 的状态。
  • 事件处理: 事件处理函数可能会被多个线程同时调用,需要保证事件处理函数的线程安全。

一种常见的做法是将事件循环绑定到特定的线程,并使用线程间通信机制 (例如消息队列) 来将任务提交到事件循环所在的线程执行。

性能优化

为了进一步提高事件循环的性能,可以考虑以下优化措施:

  • 减少系统调用: 尽量减少 epoll_ctl()kevent() 等系统调用的次数。 例如,可以批量更新事件监听。
  • 避免内存分配: 尽量避免在事件循环中进行内存分配。 可以使用对象池来重用对象。
  • 使用缓存: 可以使用缓存来存储经常访问的数据,减少 I/O 操作。
  • 选择合适的触发模式: epoll 支持 LT 和 ET 两种触发模式。 ET 模式可以减少事件通知的次数,但需要更加小心地处理事件。
  • CPU 亲和性: 将事件循环线程绑定到特定的 CPU 核心,可以提高缓存命中率。

总结:构建高性能 I/O 事件循环的关键要素

我们深入探讨了如何使用 C++ 封装 epollkqueue,构建一个高性能的异步 I/O 事件循环。 从 EventHandlerEventLoop,再到平台相关的 Poller 实现,每一步都至关重要。 掌握这些技术,你就能开发出能够处理高并发、低延迟的网络应用。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注