C++ 实现高性能异步 I/O 事件循环:epoll/kqueue 封装
大家好,今天我们来聊聊如何在 C++ 中封装操作系统的 epoll 和 kqueue,实现一个高性能的异步 I/O 事件循环。这对于构建高并发、低延迟的网络应用至关重要。
异步 I/O 的重要性
传统的同步 I/O 模型下,一个线程在等待 I/O 操作完成时会被阻塞,无法执行其他任务。这在处理大量并发连接时会造成资源的极大浪费,导致性能瓶颈。异步 I/O 允许线程发起 I/O 操作后立即返回,无需等待 I/O 完成。当 I/O 操作完成后,操作系统会通知线程,线程再进行后续处理。
这种非阻塞、事件驱动的模式能够充分利用 CPU 资源,提高并发处理能力。像 Nginx、Redis 等高性能服务器都依赖于异步 I/O 模型。
epoll 和 kqueue 简介
epoll (Linux) 和 kqueue (FreeBSD, macOS) 是操作系统提供的两种高性能 I/O 事件通知机制,它们允许应用程序监视多个文件描述符 (file descriptor) 上的事件,并在事件发生时得到通知。
epoll(Linux): 基于事件通知,当文件描述符上的事件发生时,epoll会将该文件描述符加入就绪列表。epoll支持 LT (Level Triggered) 和 ET (Edge Triggered) 两种触发模式。kqueue(FreeBSD, macOS): 基于事件过滤,可以监视多种事件类型,包括文件描述符、信号、定时器等。kqueue提供了更加灵活的事件过滤和处理机制。
选择哪个取决于你的目标平台。本文会展示两种机制的封装,并提供一个通用的接口。
设计目标
我们的目标是创建一个易于使用、高性能的异步 I/O 事件循环库,该库应具备以下特点:
- 跨平台: 支持 Linux (
epoll) 和 FreeBSD/macOS (kqueue)。 - 易于使用: 提供简洁的 API,方便用户注册和处理事件。
- 高性能: 充分利用
epoll和kqueue的优势,减少系统调用开销。 - 可扩展性: 方便用户添加自定义事件类型和处理逻辑。
- 线程安全: 保证在多线程环境下的正确性。
核心组件设计
为了实现上述目标,我们需要设计以下核心组件:
EventLoop: 事件循环类,负责管理事件监听器,处理事件循环,以及调度事件处理器。EventHandler: 事件处理器基类,定义了事件处理接口。Channel: 封装文件描述符和事件类型,负责注册和取消事件监听。Poller: 抽象类,定义了epoll和kqueue的通用接口。EpollPoller:Poller的epoll实现。KqueuePoller:Poller的kqueue实现。
代码实现
接下来,我们逐步实现上述组件。
1. EventHandler 基类
#include <functional>
class EventHandler {
public:
virtual ~EventHandler() = default;
virtual void handleEvent() = 0;
};
// 也可以使用 std::function,更加灵活
// typedef std::function<void()> EventHandlerFunc;
这个基类定义了一个纯虚函数 handleEvent(),所有的事件处理器都必须继承自该类并实现该函数。
2. Channel 类
#include <sys/epoll.h> // For epoll defines
#include <sys/event.h> // For kqueue defines
class Channel {
public:
using EventCallback = std::function<void()>;
Channel(int fd) : fd_(fd) {}
~Channel() = default;
int fd() const { return fd_; }
void setReadCallback(EventCallback cb) { readCallback_ = cb; }
void setWriteCallback(EventCallback cb) { writeCallback_ = cb; }
void setErrorCallback(EventCallback cb) { errorCallback_ = cb; }
void enableReading() { events_ |= EPOLLIN; } // or EVFILT_READ in kqueue
void disableReading() { events_ &= ~EPOLLIN; } // or EVFILT_READ in kqueue
void enableWriting() { events_ |= EPOLLOUT; } // or EVFILT_WRITE in kqueue
void disableWriting() { events_ &= ~EPOLLOUT; } // or EVFILT_WRITE in kqueue
void disableAll() { events_ = 0; }
bool isNoneEvent() const { return events_ == 0; }
bool isReading() const { return events_ & EPOLLIN; } // or EVFILT_READ in kqueue
bool isWriting() const { return events_ & EPOLLOUT; } // or EVFILT_WRITE in kqueue
int events() const { return events_; }
void set_revents(int revents) { revents_ = revents; }
int revents() const { return revents_; }
void handleEvent() {
if (revents_ & (EPOLLERR | EPOLLHUP)) { // or EV_EOF in kqueue
if (errorCallback_) errorCallback_();
}
if (revents_ & EPOLLIN) { // or EVFILT_READ in kqueue
if (readCallback_) readCallback_();
}
if (revents_ & EPOLLOUT) { // or EVFILT_WRITE in kqueue
if (writeCallback_) writeCallback_();
}
}
private:
int fd_;
int events_ = 0; // The events that we're interested in
int revents_ = 0; // The events that actually happened
EventCallback readCallback_;
EventCallback writeCallback_;
EventCallback errorCallback_;
};
Channel 类封装了文件描述符和事件类型,负责注册和取消事件监听。它还保存了事件回调函数,并在事件发生时调用相应的回调函数。 events_ 成员变量存储了我们感兴趣的事件类型,例如 EPOLLIN (可读) 和 EPOLLOUT (可写)。 revents_ 成员变量存储了实际发生的事件类型。
3. Poller 抽象类
#include <vector>
#include <map>
class Channel;
class Poller {
public:
using ChannelList = std::vector<Channel*>;
virtual ~Poller() = default;
virtual void poll(int timeoutMs, ChannelList* activeChannels) = 0;
virtual void updateChannel(Channel* channel) = 0;
virtual void removeChannel(Channel* channel) = 0;
// Helper function to get the Poller implementation based on the OS
static Poller* newDefaultPoller();
protected:
Poller() = default;
};
Poller 抽象类定义了 poll(), updateChannel(), 和 removeChannel() 三个纯虚函数,分别用于:
poll(): 等待事件发生,并将发生的事件添加到activeChannels列表中。updateChannel(): 更新文件描述符上的事件监听。removeChannel(): 移除文件描述符上的事件监听。
newDefaultPoller() 是一个静态工厂方法,用于根据操作系统选择合适的 Poller 实现。
4. EpollPoller 类
#include <sys/epoll.h>
#include <unistd.h>
#include <cerrno>
#include <iostream> // For debugging
class EpollPoller : public Poller {
public:
EpollPoller() : epollfd_(epoll_create1(EPOLL_CLOEXEC)), events_(kInitialEventListSize) {
if (epollfd_ < 0) {
perror("epoll_create1");
abort();
}
}
~EpollPoller() override {
close(epollfd_);
}
void poll(int timeoutMs, ChannelList* activeChannels) override {
int numEvents = epoll_wait(epollfd_, events_.data(), static_cast<int>(events_.size()), timeoutMs);
int savedErrno = errno;
if (numEvents > 0) {
for (int i = 0; i < numEvents; ++i) {
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
if (static_cast<size_t>(numEvents) == events_.size()) {
events_.resize(events_.size() * 2); // Double the size if needed
}
} else if (numEvents == 0) {
// Timeout
} else {
if (savedErrno != EINTR) {
errno = savedErrno;
perror("epoll_wait");
}
}
}
void updateChannel(Channel* channel) override {
int fd = channel->fd();
int events = channel->events();
struct epoll_event ev;
ev.events = events;
ev.data.ptr = channel;
if (channels_.find(fd) == channels_.end()) {
// New channel
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd, &ev) < 0) {
perror("epoll_ctl (add)");
} else {
channels_[fd] = channel;
}
} else {
// Existing channel
if (epoll_ctl(epollfd_, EPOLL_CTL_MOD, fd, &ev) < 0) {
perror("epoll_ctl (mod)");
}
}
}
void removeChannel(Channel* channel) override {
int fd = channel->fd();
channels_.erase(fd);
struct epoll_event ev;
ev.events = 0; // Not interested in any events
ev.data.ptr = channel;
if (epoll_ctl(epollfd_, EPOLL_CTL_DEL, fd, &ev) < 0) {
perror("epoll_ctl (del)");
}
}
private:
static const int kInitialEventListSize = 16;
int epollfd_;
std::vector<epoll_event> events_;
std::map<int, Channel*> channels_;
};
EpollPoller 类是 Poller 的 epoll 实现。它使用 epoll_create1(), epoll_wait(), 和 epoll_ctl() 等系统调用来实现事件监听和通知。 events_ 成员变量是一个 std::vector<epoll_event>,用于存储就绪的事件。为了避免频繁的内存分配,我们预先分配了一定数量的 epoll_event 结构体,并在需要时动态扩容。 channels_ 是一个 std::map<int, Channel*>,用于存储所有注册的 Channel 对象,方便在 poll() 函数中根据文件描述符找到对应的 Channel 对象。
5. KqueuePoller 类
#ifdef __APPLE__ || __FreeBSD__
#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
#include <cerrno>
#include <iostream> // For debugging
class KqueuePoller : public Poller {
public:
KqueuePoller() : kq_(kqueue()), events_(kInitialEventListSize) {
if (kq_ < 0) {
perror("kqueue");
abort();
}
}
~KqueuePoller() override {
close(kq_);
}
void poll(int timeoutMs, ChannelList* activeChannels) override {
struct timespec timeout;
timeout.tv_sec = timeoutMs / 1000;
timeout.tv_nsec = (timeoutMs % 1000) * 1000000;
int numEvents = kevent(kq_, nullptr, 0, events_.data(), static_cast<int>(events_.size()), timeoutMs == -1 ? nullptr : &timeout);
int savedErrno = errno;
if (numEvents > 0) {
for (int i = 0; i < numEvents; ++i) {
Channel* channel = static_cast<Channel*>(events_[i].udata);
int revents = 0;
if (events_[i].flags & EV_ERROR) {
revents |= EPOLLERR; // Map kqueue error to epoll error
}
if (events_[i].filter == EVFILT_READ) {
revents |= EPOLLIN;
}
if (events_[i].filter == EVFILT_WRITE) {
revents |= EPOLLOUT;
}
channel->set_revents(revents);
activeChannels->push_back(channel);
}
if (static_cast<size_t>(numEvents) == events_.size()) {
events_.resize(events_.size() * 2); // Double the size if needed
}
} else if (numEvents == 0) {
// Timeout
} else {
if (savedErrno != EINTR) {
errno = savedErrno;
perror("kevent");
}
}
}
void updateChannel(Channel* channel) override {
int fd = channel->fd();
struct kevent changes[2];
int nchanges = 0;
if (channel->isReading()) {
EV_SET(&changes[nchanges++], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, channel);
} else {
EV_SET(&changes[nchanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, channel);
}
if (channel->isWriting()) {
EV_SET(&changes[nchanges++], fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, channel);
} else {
EV_SET(&changes[nchanges++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, channel);
}
kevent(kq_, changes, nchanges, nullptr, 0, nullptr);
}
void removeChannel(Channel* channel) override {
int fd = channel->fd();
struct kevent changes[2];
int nchanges = 0;
EV_SET(&changes[nchanges++], fd, EVFILT_READ, EV_DELETE, 0, 0, channel);
EV_SET(&changes[nchanges++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, channel);
kevent(kq_, changes, nchanges, nullptr, 0, nullptr);
}
private:
static const int kInitialEventListSize = 16;
int kq_;
std::vector<struct kevent> events_;
};
#endif
KqueuePoller 类是 Poller 的 kqueue 实现。它使用 kqueue(), kevent() 等系统调用来实现事件监听和通知。 为了跨平台,我们在KqueuePoller中将kqueue返回的事件转换成epoll定义的事件。
6. Poller::newDefaultPoller() 工厂方法
#include <cstdlib> // For getenv
Poller* Poller::newDefaultPoller() {
#ifdef __linux__
return new EpollPoller();
#elif defined(__APPLE__) || defined(__FreeBSD__)
return new KqueuePoller();
#else
return nullptr; // Or throw an exception
#endif
}
这个静态工厂方法根据操作系统选择合适的 Poller 实现。
7. EventLoop 类
#include <memory>
#include <thread>
#include <chrono>
#include <iostream>
class EventLoop {
public:
EventLoop() : poller_(Poller::newDefaultPoller()), quit_(false) {}
~EventLoop() { delete poller_; }
void loop();
void updateChannel(Channel* channel) {
poller_->updateChannel(channel);
}
void removeChannel(Channel* channel) {
poller_->removeChannel(channel);
}
void quit() {
quit_ = true;
}
private:
Poller* poller_;
bool quit_;
};
void EventLoop::loop() {
while (!quit_) {
Poller::ChannelList activeChannels;
poller_->poll(10000, &activeChannels); // 10 seconds timeout
for (Channel* channel : activeChannels) {
channel->handleEvent();
}
}
}
EventLoop 类是事件循环的核心。它包含一个 Poller 对象,负责监听事件。loop() 函数是事件循环的主体,它不断地调用 poller_->poll() 等待事件发生,然后处理发生的事件。
使用示例
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <string.h>
// Include the classes defined above
// EventHandler, Channel, Poller, EpollPoller, KqueuePoller, EventLoop
class Acceptor : public EventHandler {
public:
Acceptor(EventLoop* loop, int port) : loop_(loop), listenFd_(socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0)), channel_(listenFd_) {
if (listenFd_ < 0) {
perror("socket");
abort();
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
if (bind(listenFd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("bind");
abort();
}
if (listen(listenFd_, 128) < 0) {
perror("listen");
abort();
}
channel_.setReadCallback(std::bind(&Acceptor::handleAccept, this));
channel_.enableReading();
loop_->updateChannel(&channel_);
}
~Acceptor() override {
close(listenFd_);
}
private:
void handleAccept() {
struct sockaddr_in clientAddr;
socklen_t clientLen = sizeof(clientAddr);
int connFd = accept4(listenFd_, (struct sockaddr*)&clientAddr, &clientLen, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (connFd < 0) {
perror("accept4");
return;
}
std::cout << "New connection from " << inet_ntoa(clientAddr.sin_addr) << ":" << ntohs(clientAddr.sin_port) << std::endl;
// Create a new connection handler (not implemented here)
// You would typically create a new Channel and EventHandler for the new connection
}
EventLoop* loop_;
int listenFd_;
Channel channel_;
};
int main() {
EventLoop loop;
Acceptor acceptor(&loop, 8080); // Listen on port 8080
loop.loop();
return 0;
}
这个示例创建了一个 Acceptor 类,用于监听新的连接。当有新的连接到达时,Acceptor::handleAccept() 函数会被调用。 这个例子演示了如何使用 EventLoop, Channel, 和 Poller 来实现一个简单的服务器。
线程安全
在多线程环境下使用事件循环需要考虑线程安全问题。 以下是一些需要注意的点:
- 事件循环的生命周期: 确保事件循环在所有线程退出之前都处于活动状态。
Channel的更新: 多个线程可能同时尝试更新同一个Channel,需要使用锁来保护Channel的状态。- 事件处理: 事件处理函数可能会被多个线程同时调用,需要保证事件处理函数的线程安全。
一种常见的做法是将事件循环绑定到特定的线程,并使用线程间通信机制 (例如消息队列) 来将任务提交到事件循环所在的线程执行。
性能优化
为了进一步提高事件循环的性能,可以考虑以下优化措施:
- 减少系统调用: 尽量减少
epoll_ctl()和kevent()等系统调用的次数。 例如,可以批量更新事件监听。 - 避免内存分配: 尽量避免在事件循环中进行内存分配。 可以使用对象池来重用对象。
- 使用缓存: 可以使用缓存来存储经常访问的数据,减少 I/O 操作。
- 选择合适的触发模式:
epoll支持 LT 和 ET 两种触发模式。 ET 模式可以减少事件通知的次数,但需要更加小心地处理事件。 - CPU 亲和性: 将事件循环线程绑定到特定的 CPU 核心,可以提高缓存命中率。
总结:构建高性能 I/O 事件循环的关键要素
我们深入探讨了如何使用 C++ 封装 epoll 和 kqueue,构建一个高性能的异步 I/O 事件循环。 从 EventHandler 到 EventLoop,再到平台相关的 Poller 实现,每一步都至关重要。 掌握这些技术,你就能开发出能够处理高并发、低延迟的网络应用。
更多IT精英技术系列讲座,到智猿学院