实战:利用 C++20 协程构建一个异步磁盘 IO 引擎(结合 Linux io_uring)

欢迎来到本次技术讲座,我们将深入探讨如何利用 C++20 协程的强大能力,结合 Linux 内核提供的 io_uring 异步 I/O 接口,构建一个高性能的异步磁盘 I/O 引擎。在现代高并发、低延迟的服务中,I/O 操作往往是性能瓶颈。传统的同步 I/O 会阻塞线程,导致系统吞吐量下降;而基于回调或线程池的异步 I/O 虽然能解决阻塞问题,但代码复杂性高、可读性差。C++20 协程与 io_uring 的结合,为我们提供了一个优雅且高效的解决方案。


引言:异步 I/O 的必要性与挑战

在服务器端应用、数据库系统、高性能文件存储等领域,磁盘 I/O 是核心操作。传统的同步 I/O 模型,如 read()write() 系统调用,会阻塞调用线程,直到数据传输完成。这意味着在一个单线程程序中,进行一次磁盘 I/O 操作时,CPU 将处于空闲状态,无法处理其他任务,极大地浪费了计算资源。即使在多线程环境中,过多的阻塞 I/O 也会导致线程上下文切换开销增大,甚至引发线程池枯竭。

为了解决同步 I/O 的阻塞问题,异步 I/O 应运而生。它允许应用程序在提交 I/O 请求后立即返回,并在 I/O 完成时通过某种机制(如回调函数、事件通知)得到通知。然而,异步 I/O 编程也面临挑战:

  1. 回调地狱 (Callback Hell): 当多个异步操作按顺序或并行执行时,深层嵌套的回调函数会导致代码难以阅读、理解和维护。
  2. 错误处理复杂性: 异步操作的错误通常需要通过回调机制传递,导致错误处理逻辑分散且复杂。
  3. 状态管理: 异步操作的中间状态需要显式地存储和传递,增加了心智负担。
  4. 跨平台兼容性: 不同的操作系统提供不同的异步 I/O API (如 Windows 的 IOCP, Linux 的 aio, epoll)。

C++20 协程为异步编程带来了革命性的语法糖,使得异步代码能够以接近同步的线性方式书写,极大地提升了可读性和可维护性。而 Linux io_uring 则是近年来内核提供的一个高性能异步 I/O 接口,它克服了传统 aio 的诸多局限,提供了真正的异步、零拷贝、批处理能力。将两者结合,我们便能构建出既高效又易于编程的异步 I/O 引擎。


第一部分:异步 I/O 的基石:Linux io_uring 深度解析

在 Linux 系统上,长期以来异步 I/O 方案一直不尽如人意。aio(7) 接口复杂且性能不佳,通常在用户空间模拟异步,或者依赖于内核线程池。epoll(7) 虽然是高效的事件通知机制,但它只能通知文件描述符“可读”或“可写”,实际的 read()/write() 操作仍然是阻塞的。io_uring 的出现彻底改变了这一局面。

io_uring 提供了一种全新的、高效的异步 I/O 机制,它基于共享内存队列(ring buffer)在用户空间和内核空间之间传递 I/O 请求和完成事件,从而极大地减少了系统调用开销和数据拷贝。

1. io_uring 的核心概念

io_uring 主要由两个循环缓冲区(ring buffer)组成:

  • 提交队列 (Submission Queue, SQ): 用户空间通过 SQ 向内核提交 I/O 请求。
  • 完成队列 (Completion Queue, CQ): 内核通过 CQ 向用户空间报告 I/O 请求的完成情况。

每个队列都包含一个头部和尾部指针,以及一个条目数组。

  • SQE (Submission Queue Entry): 提交队列中的每个条目代表一个 I/O 请求,包含了操作类型(如读、写、打开等)、文件描述符、缓冲区地址、长度、偏移量以及一个 user_data 字段。user_data 是一个 64 位的值,由用户程序自由定义,它会在 I/O 完成时原封不动地返回给用户,这是将完成事件与特定协程关联的关键。
  • CQE (Completion Queue Entry): 完成队列中的每个条目代表一个已完成的 I/O 请求,包含了原始请求的 user_data、返回状态码(如 errno)和实际完成的字节数。

工作流程概览:

  1. 初始化: 用户程序调用 io_uring_setup() 创建 io_uring 实例,并映射共享内存区域。
  2. 提交请求: 用户程序获取一个空的 SQE,填充 I/O 请求的详细信息,将其放入 SQ。
  3. 通知内核: 用户程序调用 io_uring_enter() 通知内核有新的请求需要处理。
  4. 内核处理: 内核从 SQ 中取出 SQE,执行相应的 I/O 操作。
  5. 完成通知: I/O 操作完成后,内核将一个 CQE 放入 CQ。
  6. 用户处理: 用户程序从 CQ 中取出 CQE,根据 user_data 识别出对应的请求,并处理结果。

io_uring 的主要优势:

  • 单系统调用: 可以通过一次 io_uring_enter() 系统调用提交多个请求并等待多个完成事件,显著减少系统调用开销。
  • 无锁设计: 队列操作通常采用无锁原子操作,进一步提升效率。
  • 零拷贝 (可选): 支持注册固定缓冲区和文件,允许内核直接访问用户缓冲区,避免数据在用户空间和内核空间之间的拷贝。
  • 批处理 (Batching): 允许一次性提交大量 I/O 请求。
  • 多种操作类型: 不仅支持文件读写,还支持网络 I/O (send/recv), 文件元数据操作 (stat), 异步文件打开/关闭等。
  • 轮询模式 (IORING_SETUP_SQPOLL): 允许内核线程专门处理提交队列,减少用户空间到内核空间的切换开销,适用于极低延迟场景。

2. io_uring 的基本使用流程 (C 语言 API)

尽管我们最终会用 C++20 协程封装,但理解 io_uring 的原生 C API 是基础。

首先,需要包含头文件并链接库:

#include <liburing.h> // liburing 库提供更友好的封装
#include <fcntl.h>    // For open()
#include <unistd.h>   // For close()
#include <sys/stat.h> // For O_DIRECT
#include <string.h>   // For strerror
#include <iostream>

// 实际使用时,需要检测内核版本和liburing版本
// #define HAS_IO_URING_REGISTER_BUFFERS
// #define HAS_IO_URING_OP_CLOSE

初始化 io_uring 实例:

struct io_uring ring;
int ret = io_uring_setup(32, &ring); // 32 是队列深度
if (ret < 0) {
    std::cerr << "io_uring_setup failed: " << strerror(-ret) << std::endl;
    return 1;
}
// 映射共享内存
io_uring_queue_init(32, &ring, 0); // 0 for default flags

提交一个读请求:

int fd = open("test_file.txt", O_RDONLY | O_CREAT, 0644);
if (fd < 0) {
    std::cerr << "open failed: " << strerror(errno) << std::endl;
    return 1;
}

char buffer[512];
// 1. 获取一个 SQE
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (!sqe) {
    std::cerr << "io_uring_get_sqe failed" << std::endl;
    return 1;
}

// 2. 填充 SQE
io_uring_prep_read(sqe, fd, buffer, sizeof(buffer), 0); // fd, buffer, len, offset
sqe->user_data = 1234; // 自定义数据,用于在完成时识别请求

// 3. 提交 SQE 到内核
io_uring_submit(&ring);

等待并处理完成事件:

struct io_uring_cqe *cqe;
// 等待一个完成事件
int num_completions = io_uring_wait_cqe(&ring, &cqe);
if (num_completions < 0) {
    std::cerr << "io_uring_wait_cqe failed: " << strerror(-num_completions) << std::endl;
    return 1;
}

// 处理 CQE
if (cqe->user_data == 1234) {
    if (cqe->res < 0) {
        std::cerr << "Read failed: " << strerror(-cqe->res) << std::endl;
    } else {
        std::cout << "Read " << cqe->res << " bytes. Data: " << std::string(buffer, cqe->res) << std::endl;
    }
}
// 4. 标记 CQE 已处理
io_uring_cqe_seen(&ring, cqe);

清理:

close(fd);
io_uring_queue_exit(&ring);

这是一个非常简化的 io_uring 流程。实际应用中,会涉及更多的错误处理、批处理以及更复杂的 I/O 类型。


第二部分:C++20 协程:异步编程的语法糖与执行模型

C++20 协程 (Coroutines) 是一种可暂停和可恢复的函数。它允许函数在执行过程中暂停,将控制权交还给调用者,并在稍后从暂停点恢复执行,而不需要显式地保存和恢复栈帧。这使得异步代码能够以同步的、顺序的风格编写,极大地提升了可读性和可维护性,消除了“回调地狱”。

1. 协程的核心组件

C++20 协程主要由以下关键字和类型构成:

  • co_await: 暂停点操作符。它用于等待一个 awaitable 对象。当 co_await 表达式求值时,如果 awaitable 未准备好结果,协程会暂停,将控制权交还给调用者。当 awaitable 准备好结果时,协程从暂停点恢复。
  • co_return: 结束协程。它用于返回一个值或表示协程完成。
  • co_yield: (主要用于生成器协程) 暂停协程并返回一个值,稍后可恢复以生成下一个值。我们构建 I/O 引擎不会直接使用 co_yield
  • std::coroutine_handle: 一个轻量级的、非拥有型的类型,用于表示一个协程的句柄。通过它可以恢复或销毁协程。
  • promise_type: 协程的核心。每个协程都有一个关联的 promise_type 对象,它负责管理协程的生命周期、结果、异常处理以及与外部世界的交互。它定义了 initial_suspend (协程启动时是否立即暂停)、final_suspend (协程完成时是否暂停)、get_return_object (协程返回类型)、return_value (处理 co_return 值) 等方法。

2. awaitableawaiter 接口

co_await 表达式需要操作一个 awaitable 对象。awaitable 对象本身或者通过 operator co_await() 转换后,必须提供 awaiter 接口:

| 方法签名 | 描述 —

第三部分:融合:构建 io_uring 驱动的 C++20 协程调度器

现在,我们有了 io_uring 作为高性能异步 I/O 的底层基石,以及 C++20 协程作为用户态异步流程控制的利器。接下来,我们将把它们融合起来,构建一个能够将 io_uring 的完成事件映射到协程恢复的调度器。

1. 设计目标

  1. 协程友好: 提供 awaitable 对象,使得用户可以通过 co_await 关键字直接调用异步 I/O 操作。
  2. 事件驱动: io_uring 完成事件驱动协程的恢复。
  3. 单线程事件循环: 一个主线程负责 io_uring 的提交和完成事件的轮询,以及协程的调度和恢复。
  4. 清晰的生命周期管理: 确保文件描述符、缓冲区和协程句柄的正确管理。

2. 核心组件设计

我们将设计以下几个核心组件:

  • IoUringContext: 封装 io_uring 实例的创建、销毁、SQE/CQE 的获取和提交。它将是与内核直接交互的唯一组件。
  • Task<T>: 一个通用的协程返回类型,类似于 std::future<T>,但专门为协程设计。它将持有协程的 promise_type 和结果。
  • IoUringOperation (基类 awaitable): 抽象所有 io_uring 操作的基类 awaitable。它将在 await_suspend 中提交 SQE,并将当前协程句柄存储起来,以便 io_uring 完成后恢复。
  • IoUringRead / IoUringWrite 等 (派生 awaitable): 具体的 I/O 操作,继承自 IoUringOperation
  • Scheduler (事件循环): 这是整个引擎的“心脏”。它包含 IoUringContext,维护一个映射(从 user_datastd::coroutine_handle<>),并在主循环中等待 io_uring 完成事件,然后根据 user_data 恢复相应的协程。

3. 详细代码实现

我们将逐步构建这些组件。

3.1. IoUringContext:io_uring 实例的封装

这个类负责 io_uring 的初始化、资源管理和与内核的低级交互。

#pragma once

#include <liburing.h>
#include <vector>
#include <stdexcept>
#include <string>
#include <system_error> // For std::system_error
#include <fcntl.h>
#include <unistd.h>
#include <iostream>

// C++20 coroutines require specific flags and headers.
// Using GCC 10+ or Clang 10+
// Compile with: g++ -std=c++20 -fcoroutines -luring -o my_app main.cpp

class IoUringContext {
public:
    // 构造函数:初始化 io_uring
    explicit IoUringContext(unsigned int entries) : ring_{} {
        int ret = io_uring_queue_init(entries, &ring_, 0);
        if (ret < 0) {
            throw std::system_error(std::abs(ret), std::system_category(), "io_uring_queue_init failed");
        }
    }

    // 析构函数:清理 io_uring 资源
    ~IoUringContext() {
        io_uring_queue_exit(&ring_);
    }

    // 禁止拷贝和移动,io_uring_ring 结构不适合拷贝
    IoUringContext(const IoUringContext&) = delete;
    IoUringContext& operator=(const IoUringContext&) = delete;
    IoUringContext(IoUringContext&&) = delete;
    IoUringContext& operator=(IoUringContext&&) = delete;

    // 获取一个可用的 SQE
    io_uring_sqe* get_sqe() {
        return io_uring_get_sqe(&ring_);
    }

    // 提交所有已准备好的 SQE 到内核
    // min_complete:至少等待多少个完成事件再返回
    // flags:提交标志,如 IORING_ENTER_GETEVENTS
    int submit(unsigned int min_complete = 0, unsigned int flags = 0) {
        return io_uring_enter(&ring_, io_uring_sq_ready(&ring_), min_complete, flags, NULL);
    }

    // 等待并获取一个或多个完成事件
    // 注意:liburing 的 io_uring_wait_cqe 已经封装了 io_uring_enter
    // 对于批处理,通常使用 io_uring_peek_cqe 和 io_uring_for_each_cqe
    int wait_for_cqe(io_uring_cqe** cqe_ptr) {
        return io_uring_wait_cqe(&ring_, cqe_ptr);
    }

    // 标记 CQE 已处理
    void cqe_seen(io_uring_cqe* cqe) {
        io_uring_cqe_seen(&ring_, cqe);
    }

    // 获取所有可用 CQE 的数量
    unsigned int peek_cqe_count() const {
        return io_uring_cq_ready(&ring_);
    }

    // 批量获取 CQE
    // max_cqes_to_get: 最多获取多少个 CQE
    // cqes: 用于存储获取到的 CQE 指针的vector
    int get_cqes(std::vector<io_uring_cqe*>& cqes, unsigned int max_cqes_to_get) {
        cqes.clear();
        cqes.reserve(max_cqes_to_get);
        unsigned int head;
        unsigned int count = 0;
        io_uring_for_each_cqe(&ring_, head, cqes.emplace_back()) {
            count++;
            if (count >= max_cqes_to_get) {
                break;
            }
        }
        return count;
    }

private:
    io_uring ring_;
};

3.2. Task<T>:协程的返回类型

这是我们异步函数(协程)的返回类型。它将持有协程的 promise_type,并通过它来管理协程的生命周期和结果。

#pragma once

#include <coroutine>
#include <exception>
#include <optional>
#include <iostream>

template<typename T>
struct Task {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    struct promise_type {
        T value_; // 存储协程的返回值
        std::exception_ptr exception_; // 存储协程中的异常
        handle_type continuation_ = nullptr; // 用于存储恢复当前协程的协程句柄

        Task get_return_object() {
            return Task(handle_type::from_promise(*this));
        }

        // 协程首次启动时,暂停。这样,调用者可以获得 Task 对象,
        // 但协程本体还未执行,允许调用者决定何时启动协程。
        std::suspend_always initial_suspend() { return {}; }

        // 协程结束时,暂停。这样,调用者可以等待协程的结果,
        // 并在协程完全销毁之前获取结果。
        std::suspend_always final_suspend() noexcept { return {}; }

        void unhandled_exception() {
            exception_ = std::current_exception(); // 捕获未处理的异常
        }

        void return_value(T value) {
            value_ = value; // 存储返回值
        }

        // 用于无返回值协程 (Task<void>)
        void return_void() {}
    };

    handle_type coro_handle_;

    // 构造函数
    explicit Task(handle_type h) : coro_handle_(h) {}

    // 析构函数:确保协程句柄被销毁
    ~Task() {
        if (coro_handle_) {
            coro_handle_.destroy();
        }
    }

    // 禁止拷贝,允许移动 (协程句柄是资源)
    Task(const Task&) = delete;
    Task& operator=(const Task&) = delete;
    Task(Task&& other) noexcept : coro_handle_(other.coro_handle_) {
        other.coro_handle_ = nullptr;
    }
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (coro_handle_) {
                coro_handle_.destroy();
            }
            coro_handle_ = other.coro_handle_;
            other.coro_handle_ = nullptr;
        }
        return *this;
    }

    // 协程是否已完成
    bool done() const { return coro_handle_.done(); }

    // 获取协程结果。如果协程尚未完成,调用此方法是未定义行为。
    T get_result() {
        if (coro_handle_.promise().exception_) {
            std::rethrow_exception(coro_handle_.promise().exception_);
        }
        return coro_handle_.promise().value_;
    }

    // Awaiter 接口,使得 Task 对象可以被 co_await
    struct Awaiter {
        handle_type coro_handle_;

        // 协程是否已经准备好结果?如果已完成,则无需暂停。
        bool await_ready() const {
            return coro_handle_.done();
        }

        // 协程暂停时调用。将当前协程的句柄存储到 Task 的 continuation_ 成员中。
        // 这使得在 Task 内部的 awaiter 完成时,可以恢复这个被暂停的协程。
        void await_suspend(std::coroutine_handle<> awaiting_coro) {
            coro_handle_.promise().continuation_ = awaiting_coro;
        }

        // 协程恢复时调用。返回协程的结果。
        T await_resume() {
            return coro_handle_.promise().get_return_object().get_result();
        }
    };

    Awaiter operator co_await() const {
        return Awaiter{coro_handle_};
    }

    // 立即恢复协程 (用于调度器启动协程)
    void resume() {
        coro_handle_.resume();
    }
};

// 特化 Task<void>
template<>
struct Task<void> {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    struct promise_type {
        std::exception_ptr exception_;
        handle_type continuation_ = nullptr;

        Task get_return_object() {
            return Task(handle_type::from_promise(*this));
        }

        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() {
            exception_ = std::current_exception();
        }
        void return_void() {}
    };

    handle_type coro_handle_;

    explicit Task(handle_type h) : coro_handle_(h) {}
    ~Task() {
        if (coro_handle_) {
            coro_handle_.destroy();
        }
    }

    Task(const Task&) = delete;
    Task& operator=(const Task&) = delete;
    Task(Task&& other) noexcept : coro_handle_(other.coro_handle_) {
        other.coro_handle_ = nullptr;
    }
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (coro_handle_) {
                coro_handle_.destroy();
            }
            coro_handle_ = other.coro_handle_;
            other.coro_handle_ = nullptr;
        }
        return *this;
    }

    bool done() const { return coro_handle_.done(); }

    void get_result() {
        if (coro_handle_.promise().exception_) {
            std::rethrow_exception(coro_handle_.promise().exception_);
        }
    }

    struct Awaiter {
        handle_type coro_handle_;

        bool await_ready() const {
            return coro_handle_.done();
        }

        void await_suspend(std::coroutine_handle<> awaiting_coro) {
            coro_handle_.promise().continuation_ = awaiting_coro;
        }

        void await_resume() {
            coro_handle_.promise().get_return_object().get_result();
        }
    };

    Awaiter operator co_await() const {
        return Awaiter{coro_handle_};
    }

    void resume() {
        coro_handle_.resume();
    }
};

3.3. IoUringOperationio_uring 操作的 awaitable 基类

这是将 io_uring 操作转换为 awaitable 对象的关键。它封装了 io_uring 提交和完成的逻辑。

#pragma once

#include "IoUringContext.h"
#include <coroutine>
#include <map>
#include <mutex>
#include <atomic>

// 协程调度器,用于管理协程的恢复
class Scheduler;

// Base awaiter for io_uring operations
struct IoUringOperation {
    Scheduler& scheduler_;
    std::coroutine_handle<> awaiting_coroutine_; // 暂停的协程句柄
    uint64_t user_data_; // SQE 的 user_data,用于识别完成事件
    int result_; // 操作结果 (例如,读取的字节数或错误码)

    IoUringOperation(Scheduler& scheduler) : scheduler_(scheduler), user_data_(0), result_(0) {}

    // Awaiter 接口方法
    bool await_ready() const { return false; } // 总是返回 false,我们希望暂停并等待 io_uring 完成

    void await_suspend(std::coroutine_handle<> awaiting_coroutine); // 在 Scheduler.cpp 中实现

    int await_resume() const {
        if (result_ < 0) {
            // 将 io_uring 错误码转换为 errno
            throw std::system_error(std::abs(result_), std::system_category(), "io_uring operation failed");
        }
        return result_;
    }

protected:
    // 抽象方法:填充 SQE
    virtual void prepare_sqe(io_uring_sqe* sqe) = 0;
};

3.4. Scheduler:事件循环与协程调度器

这是整个引擎的核心,负责驱动 io_uring 并调度协程。它将 IoUringContext 包装起来,并提供高级的 async_readasync_write 等接口。

#pragma once

#include "IoUringContext.h"
#include "Task.h"
#include <map>
#include <mutex>
#include <atomic>
#include <vector>
#include <thread>
#include <functional>

class Scheduler {
public:
    explicit Scheduler(unsigned int entries = 256) : io_uring_context_(entries), next_user_data_(1) {}

    // 运行事件循环
    void run() {
        is_running_ = true;
        std::cout << "Scheduler started." << std::endl;
        while (is_running_ || !pending_coroutines_.empty()) {
            // 1. 提交所有待处理的 SQE
            if (io_uring_context_.submit() < 0) {
                 std::cerr << "Error submitting I/O: " << strerror(errno) << std::endl;
                 is_running_ = false;
                 break;
            }

            // 2. 等待并处理完成事件
            std::vector<io_uring_cqe*> cqes;
            // 尝试获取所有可用的 CQE,如果队列为空,则阻塞等待一个
            int num_cqes = io_uring_context_.get_cqes(cqes, io_uring_context_.peek_cqe_count());
            if (num_cqes == 0 && !is_running_) { // 如果没有完成事件且调度器停止,则退出
                break;
            } else if (num_cqes == 0 && is_running_) { // 如果没有完成事件但调度器还在运行,阻塞等待一个
                io_uring_cqe* cqe;
                int ret = io_uring_context_.wait_for_cqe(&cqe);
                if (ret < 0) {
                    std::cerr << "Error waiting for CQE: " << strerror(std::abs(ret)) << std::endl;
                    is_running_ = false;
                    break;
                }
                cqes.push_back(cqe);
                num_cqes = 1;
            }

            for (io_uring_cqe* cqe : cqes) {
                uint64_t user_data = cqe->user_data;
                int res = cqe->res;

                std::coroutine_handle<> handle_to_resume;
                {
                    std::lock_guard<std::mutex> lock(mtx_);
                    auto it = pending_coroutines_.find(user_data);
                    if (it != pending_coroutines_.end()) {
                        handle_to_resume = it->second.first; // 获取协程句柄
                        it->second.second->result_ = res; // 设置操作结果
                        pending_coroutines_.erase(it); // 从映射中移除
                    } else {
                        std::cerr << "Warning: CQE with unknown user_data " << user_data << " received." << std::endl;
                    }
                }

                if (handle_to_resume) {
                    handle_to_resume.resume(); // 恢复对应的协程
                }

                io_uring_context_.cqe_seen(cqe); // 标记 CQE 已处理
            }
        }
        std::cout << "Scheduler stopped." << std::endl;
    }

    // 停止事件循环
    void stop() {
        is_running_ = false;
    }

    // 获取下一个可用的 user_data
    uint64_t get_next_user_data() {
        return next_user_data_.fetch_add(1, std::memory_order_relaxed);
    }

    // 注册一个待恢复的协程
    void register_coroutine(uint64_t user_data, std::coroutine_handle<> handle, IoUringOperation* op) {
        std::lock_guard<std::mutex> lock(mtx_);
        pending_coroutines_[user_data] = {handle, op};
    }

    // 辅助函数:提交一个 SQE
    io_uring_sqe* get_sqe() {
        return io_uring_context_.get_sqe();
    }

private:
    IoUringContext io_uring_context_;
    std::atomic<uint64_t> next_user_data_;
    std::map<uint64_t, std::pair<std::coroutine_handle<>, IoUringOperation*>> pending_coroutines_;
    std::mutex mtx_; // 保护 pending_coroutines_
    std::atomic<bool> is_running_{false};
};

// IoUringOperation 的 await_suspend 实现,需要在 Scheduler 定义之后
inline void IoUringOperation::await_suspend(std::coroutine_handle<> awaiting_coroutine) {
    this->awaiting_coroutine_ = awaiting_coroutine;
    this->user_data_ = scheduler_.get_next_user_data();

    io_uring_sqe* sqe = scheduler_.get_sqe();
    if (!sqe) {
        // 如果无法获取 SQE,则无法提交请求。
        // 此时应该立即恢复协程并抛出异常,或者在 await_resume 中处理。
        // 为了简化,这里直接在 await_resume 中处理 result_ < 0 的情况
        this->result_ = -ENOMEM; // 模拟一个内存不足的错误
        awaiting_coroutine.resume(); // 立即恢复,以便 await_resume 抛出异常
        return;
    }
    prepare_sqe(sqe);
    sqe->user_data = this->user_data_;

    scheduler_.register_coroutine(this->user_data_, this->awaiting_coroutine_, this);
}

3.5. 具体的 awaitable I/O 操作

现在我们可以基于 IoUringOperation 创建具体的异步 I/O 操作,如 async_read, async_write, async_open, async_close

#pragma once

#include "IoUringOperation.h"
#include <string_view>

// Async file open operation
struct IoUringOpen : public IoUringOperation {
    const char* path_;
    int flags_;
    mode_t mode_;

    IoUringOpen(Scheduler& scheduler, const char* path, int flags, mode_t mode)
        : IoUringOperation(scheduler), path_(path), flags_(flags), mode_(mode) {}

protected:
    void prepare_sqe(io_uring_sqe* sqe) override {
        io_uring_prep_openat(sqe, AT_FDCWD, path_, flags_, mode_);
    }
};

// Async file read operation
struct IoUringRead : public IoUringOperation {
    int fd_;
    void* buffer_;
    size_t length_;
    off_t offset_;

    IoUringRead(Scheduler& scheduler, int fd, void* buffer, size_t length, off_t offset)
        : IoUringOperation(scheduler), fd_(fd), buffer_(buffer), length_(length), offset_(offset) {}

protected:
    void prepare_sqe(io_uring_sqe* sqe) override {
        io_uring_prep_read(sqe, fd_, buffer_, length_, offset_);
    }
};

// Async file write operation
struct IoUringWrite : public IoUringOperation {
    int fd_;
    const void* buffer_;
    size_t length_;
    off_t offset_;

    IoUringWrite(Scheduler& scheduler, int fd, const void* buffer, size_t length, off_t offset)
        : IoUringOperation(scheduler), fd_(fd), buffer_(buffer), length_(length), offset_(offset) {}

protected:
    void prepare_sqe(io_uring_sqe* sqe) override {
        io_uring_prep_write(sqe, fd_, buffer_, length_, offset_);
    }
};

// Async file close operation
struct IoUringClose : public IoUringOperation {
    int fd_;

    IoUringClose(Scheduler& scheduler, int fd)
        : IoUringOperation(scheduler), fd_(fd) {}

protected:
    void prepare_sqe(io_uring_sqe* sqe) override {
        io_uring_prep_close(sqe, fd_);
    }
};

// 辅助函数,让 Scheduler 提供更简洁的 awaitable 接口
namespace async_io {

inline IoUringOpen open(Scheduler& scheduler, const char* path, int flags, mode_t mode) {
    return IoUringOpen(scheduler, path, flags, mode);
}

inline IoUringRead read(Scheduler& scheduler, int fd, void* buffer, size_t length, off_t offset) {
    return IoUringRead(scheduler, fd, buffer, length, offset);
}

inline IoUringWrite write(Scheduler& scheduler, int fd, const void* buffer, size_t length, off_t offset) {
    return IoUringWrite(scheduler, fd, buffer, length, offset);
}

inline IoUringClose close(Scheduler& scheduler, int fd) {
    return IoUringClose(scheduler, fd);
}

} // namespace async_io

第四部分:实战演练:一个异步文件读写示例

现在我们已经构建了所有基础组件。让我们通过一个实际的例子来演示如何使用这个异步 I/O 引擎。我们将实现一个协程,它异步打开一个文件,读取其内容,然后将内容异步写入另一个文件。

main.cpp:

#include "Scheduler.h"
#include "Task.h"
#include "IoUringOperations.h"
#include <iostream>
#include <vector>
#include <string>
#include <thread>
#include <chrono>

// 定义一个异步任务,用于读取文件并写入另一个文件
Task<void> async_file_copy(Scheduler& scheduler, const char* input_path, const char* output_path) {
    int in_fd = -1;
    int out_fd = -1;
    std::vector<char> buffer(4096); // 4KB buffer

    try {
        std::cout << "Coroutine: Opening input file " << input_path << std::endl;
        in_fd = co_await async_io::open(scheduler, input_path, O_RDONLY | O_CREAT, 0644);
        if (in_fd < 0) {
            throw std::runtime_error("Failed to open input file");
        }
        std::cout << "Coroutine: Input file " << input_path << " opened with fd: " << in_fd << std::endl;

        std::cout << "Coroutine: Opening output file " << output_path << std::endl;
        out_fd = co_await async_io::open(scheduler, output_path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
        if (out_fd < 0) {
            throw std::runtime_error("Failed to open output file");
        }
        std::cout << "Coroutine: Output file " << output_path << " opened with fd: " << out_fd << std::endl;

        off_t current_offset = 0;
        ssize_t bytes_read;
        do {
            std::cout << "Coroutine: Reading from input file at offset " << current_offset << std::endl;
            bytes_read = co_await async_io::read(scheduler, in_fd, buffer.data(), buffer.size(), current_offset);

            if (bytes_read > 0) {
                std::cout << "Coroutine: Read " << bytes_read << " bytes. Writing to output file..." << std::endl;
                ssize_t bytes_written = co_await async_io::write(scheduler, out_fd, buffer.data(), bytes_read, current_offset);
                if (bytes_written != bytes_read) {
                    throw std::runtime_error("Mismatch in bytes written");
                }
                current_offset += bytes_read;
            }
        } while (bytes_read > 0);

        std::cout << "Coroutine: File copy completed. Total bytes copied: " << current_offset << std::endl;

    } catch (const std::system_error& e) {
        std::cerr << "Coroutine Error (system_error): " << e.what() << " (errno: " << e.code().value() << ")" << std::endl;
    } catch (const std::runtime_error& e) {
        std::cerr << "Coroutine Error (runtime_error): " << e.what() << std::endl;
    } catch (const std::exception& e) {
        std::cerr << "Coroutine Error (std::exception): " << e.what() << std::endl;
    }

    // 确保文件描述符被关闭
    if (in_fd != -1) {
        std::cout << "Coroutine: Closing input file (fd: " << in_fd << ")" << std::endl;
        co_await async_io::close(scheduler, in_fd);
    }
    if (out_fd != -1) {
        std::cout << "Coroutine: Closing output file (fd: " << out_fd << ")" << std::endl;
        co_await async_io::close(scheduler, out_fd);
    }

    co_return;
}

int main() {
    // 1. 创建测试文件
    const char* input_file = "input.txt";
    const char* output_file = "output.txt";
    {
        std::ofstream ofs(input_file);
        if (!ofs.is_open()) {
            std::cerr << "Failed to create input.txt" << std::endl;
            return 1;
        }
        for (int i = 0; i < 1000; ++i) {
            ofs << "This is line " << i << " of test data for async I/O engine.n";
        }
        ofs.close();
        std::cout << "Created input.txt" << std::endl;
    }

    // 2. 初始化调度器
    Scheduler scheduler(128); // io_uring 队列深度

    // 3. 启动异步文件拷贝任务
    Task<void> copy_task = async_file_copy(scheduler, input_file, output_file);

    // 4. 在单独的线程中运行调度器
    // 也可以在主线程运行,但为了演示非阻塞性,我们放在单独线程
    std::thread scheduler_thread([&]() {
        scheduler.run();
    });

    // 5. 立即恢复协程,开始执行
    // initial_suspend 使得协程在 get_return_object 之后立即暂停,
    // 需要手动 resume 才能开始执行。
    copy_task.resume();

    // 6. 等待任务完成
    // 主线程可以做其他事情,这里我们简单地等待 copy_task 完成
    while (!copy_task.done()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟做其他事情
        std::cout << "Main thread doing other work..." << std::endl;
    }

    // 7. 任务完成后,停止调度器并等待调度器线程结束
    scheduler.stop();
    if (scheduler_thread.joinable()) {
        scheduler_thread.join();
    }

    std::cout << "All tasks completed. Verifying output.txt..." << std::endl;
    // 可以添加文件内容校验逻辑
    std::ifstream ifs_in(input_file);
    std::ifstream ifs_out(output_file);
    std::string line_in, line_out;
    int mismatches = 0;
    int line_count = 0;
    while (std::getline(ifs_in, line_in) && std::getline(ifs_out, line_out)) {
        line_count++;
        if (line_in != line_out) {
            std::cerr << "Mismatch at line " << line_count << ":nInput: " << line_in << "nOutput: " << line_out << std::endl;
            mismatches++;
        }
    }
    if (mismatches == 0 && ifs_in.eof() && ifs_out.eof()) {
        std::cout << "Verification successful: input.txt and output.txt are identical." << std::endl;
    } else {
        std::cerr << "Verification failed: " << mismatches << " mismatches found or file sizes differ." << std::endl;
    }

    // 清理测试文件
    // remove(input_file);
    // remove(output_file);

    return 0;
}

编译与运行:

# 确保安装了 liburing 库及其开发文件 (例如在 Ubuntu/Debian 上: sudo apt install liburing-dev)
g++ -std=c++20 -fcoroutines -luring -pthread -o async_io_engine main.cpp IoUringContext.cpp Scheduler.cpp
./async_io_engine

输出示例 (部分):

Created input.txt
Scheduler started.
Coroutine: Opening input file input.txt
Main thread doing other work...
Coroutine: Input file input.txt opened with fd: 3
Coroutine: Opening output file output.txt
Main thread doing other work...
Coroutine: Output file output.txt opened with fd: 4
Coroutine: Reading from input file at offset 0
Main thread doing other work...
Coroutine: Read 4096 bytes. Writing to output file...
Main thread doing other work...
Coroutine: Reading from input file at offset 4096
Main thread doing other work...
... (重复读写过程) ...
Coroutine: File copy completed. Total bytes copied: 40960
Coroutine: Closing input file (fd: 3)
Main thread doing other work...
Coroutine: Closing output file (fd: 4)
Main thread doing other work...
Main thread doing other work...
Scheduler stopped.
All tasks completed. Verifying output.txt...
Verification successful: input.txt and output.txt are identical.

从输出中可以看到,主线程在协程进行 I/O 操作时并未阻塞,而是继续执行 Main thread doing other work...,这正是异步 I/O 的核心优势。协程在 co_await 遇到异步 I/O 操作时暂停,将控制权交还给调度器。调度器继续处理 io_uring 事件,当 I/O 完成时,调度器会恢复相应的协程,使其从暂停点继续执行。


第五部分:性能考量与高级话题

我们已经构建了一个功能性的异步 I/O 引擎。然而,对于生产级的应用,还有许多性能优化和高级特性值得探讨。

1. 零拷贝与固定缓冲区

io_uring 支持通过注册固定缓冲区来避免用户空间和内核空间之间的数据拷贝,从而实现真正的零拷贝 I/O。

  • 注册缓冲区:
    // 在 IoUringContext 初始化后,注册一个或多个缓冲区
    std::vector<iovec> iovecs;
    // ... 填充 iovecs ...
    io_uring_register_buffers(&ring_, iovecs.data(), iovecs.size());
  • 使用固定缓冲区:io_uring_prep_read_fixedio_uring_prep_write_fixed 中,指定注册的缓冲区索引。这要求应用程序预先分配并注册 I/O 缓冲区。
  • 注册文件描述符: 类似地,io_uring_register_files 可以注册文件描述符,避免在每个 I/O 请求中传递 fd。
  • IORING_OP_SPLICE / IORING_OP_SEND_FILE: 这些操作可以直接在两个文件描述符之间传输数据(如文件到文件,文件到 socket),完全在内核中完成,避免用户空间拷贝。

2. 批处理与提交策略

  • 批量提交: io_uring 的核心优势之一是批处理。通过在 io_uring_enter 中一次性提交多个 SQE,可以显著减少系统调用次数。我们的 Scheduler::run() 中已经通过 io_uring_submit() 来提交所有 io_uring_sq_ready(&ring_) 的 SQE。
  • SQPOLL 模式: 使用 IORING_SETUP_SQPOLL 标志初始化 io_uring,内核会创建一个线程来轮询 SQ,进一步减少用户空间到内核空间的切换。这对于高 QPS、低延迟的应用非常有用,但会消耗一个内核线程。
  • CQ 轮询: 通过 IORING_SETUP_CQ_POLL 可以在用户空间轮询 CQ,而无需阻塞等待。这通常与 SQPOLL 结合使用。

3. 错误处理与取消操作

  • 完善的错误码: io_uring 在 CQE 的 res 字段中返回操作结果。负值表示错误(通常是负的 errno),正值表示成功传输的字节数。我们的 await_resume 已经包含了基础的错误转换。
  • 取消操作: IORING_OP_ASYNC_CANCEL 可以用于取消一个正在进行的 I/O 请求。这在超时或用户取消等场景下非常有用。协程中实现取消需要更复杂的机制,通常涉及到 std::stop_token 或类似的协调机制。

4. 资源管理与生命周期

  • 文件描述符: 异步文件打开 (IORING_OP_OPENAT) 和关闭 (IORING_OP_CLOSE) 使得文件描述符的生命周期可以完全由 io_uring 管理。
  • 缓冲区管理: 对于读写操作的缓冲区,需要确保在 I/O 完成之前其生命周期有效。这通常通过智能指针 (std::unique_ptr<char[]>) 或 std::vector 来管理。在我们的例子中,buffer 是一个局部变量,但在 co_await 期间会被协程状态捕获,因此是安全的。
  • 协程句柄: Task 类的析构函数负责销毁其持有的 std::coroutine_handle,防止内存泄漏。

5. 多线程与调度策略

当前的调度器是单线程的,所有协程都在一个线程中执行。对于 CPU 密集型任务,或者需要并行处理多个 I/O 队列的场景,可以考虑:

  • 多调度器实例: 为每个 CPU 核心或 I/O 密集型模块创建独立的 Scheduler 实例,每个实例运行在自己的线程中。
  • 工作窃取 (Work-stealing): 实现一个更复杂的调度器,允许空闲线程从繁忙线程的队列中“窃取”待执行的协程。
  • 将计算任务 offload: 对于协程中包含的 CPU 密集型计算,可以使用 co_await 一个线程池任务,将计算从 I/O 调度线程中分流出去。

6. 异步文件操作的封装

除了基本的读写,io_uring 还支持许多其他文件操作,例如:

  • IORING_OP_STATX: 异步获取文件元数据。
  • IORING_OP_FSYNC: 异步刷新文件数据到磁盘。
  • IORING_OP_UNLINKAT: 异步删除文件。
  • IORING_OP_RENAMEAT: 异步重命名文件。

将这些操作也封装成 awaitable 对象,可以构建一个功能更完整的异步文件系统库。

7. C++23 和未来的发展

C++23 引入了 std::generator,它也是基于协程的,主要用于迭代器模式。此外,标准库中关于协程的支持还在不断演进,未来可能会有标准的 TaskExecutor 类型,进一步简化异步编程。


C++20 协程与 Linux io_uring 的结合,为高性能异步 I/O 提供了一个优雅且强大的编程模型。协程的顺序式语法极大地提升了异步代码的可读性,而 io_uring 则在底层提供了无与伦比的性能和灵活性。通过本文的实战演练,我们构建了一个基础的异步 I/O 引擎,展示了如何将这两者无缝融合。这仅仅是冰山一角,基于此模型,您可以进一步扩展,构建出满足各种高性能需求、功能丰富的异步服务。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注