利用 ‘Fiber’ 模拟:在 C++ 中通过手动切换汇编栈实现万级轻量级线程的并发调度

尊敬的各位技术同行,大家好。

今天,我们将深入探讨一个在高性能、高并发领域中极具价值的话题:如何利用“Fiber”(协程)机制,在C++中通过手动切换汇编栈的方式,实现万级轻量级线程的并发调度。这不仅仅是一项技术挑战,更是一种对系统底层机制的深刻理解和运用。我们将从概念出发,逐步深入到C++代码实现,并最终触及底层汇编的奥秘。

I. 引言:为什么我们需要轻量级并发?

在现代软件系统中,并发处理已是常态。无论是Web服务器处理海量用户请求,还是科学计算进行大规模数据并行计算,亦或是游戏引擎进行复杂场景渲染,都离不开并发。传统的并发模型主要基于操作系统线程(OS Thread)。

操作系统线程的挑战:

  1. 资源开销大: 每个OS线程通常需要至少几MB的栈空间,加上内核数据结构、线程控制块(TCB)等,内存占用不容小觑。当我们需要创建成千上万个线程时,内存开销会迅速成为瓶颈。
  2. 创建与销毁开销: OS线程的创建和销毁涉及内核调用,耗时相对较长。
  3. 上下文切换开销: OS线程的调度由操作系统内核完成。每次上下文切换,CPU需要保存当前线程的所有寄存器状态、切换内存管理单元(MMU)上下文(如果涉及进程切换)、刷新TLB等,这些操作都会带来显著的性能损耗。内核态切换的开销远高于用户态切换。
  4. 调度复杂性: OS线程的调度是抢占式的,由内核根据调度算法决定。这使得程序的行为更难预测,也增加了同步机制(互斥锁、信号量等)的复杂性。

为了应对这些挑战,人们开始探索更轻量级的并发模型,其中“协程”(Coroutine),或者在特定语境下被称为“Fiber”,应运而生。Go语言的Goroutine、Python的async/await、C#的async/await等都是协程思想的体现。

Fiber的优势:

  • 用户态调度: Fiber的调度完全在用户空间进行,无需陷入内核。这意味着上下文切换的开销极低,通常只涉及少量寄存器的保存和恢复。
  • 内存占用小: 每个Fiber可以只分配几十KB甚至几KB的栈空间,远小于OS线程。
  • 协作式调度: Fiber通常采用协作式调度模型,即一个Fiber主动让出CPU控制权(yield)给另一个Fiber,或者调度器选择下一个Fiber执行。这使得程序逻辑更清晰,避免了抢占式调度带来的竞态条件和死锁问题(但需要程序员自行管理阻塞)。
  • 高密度: 由于其极低的开销,一个进程可以轻松管理数万乃至数十万个Fiber,实现极高的并发密度。

本文的目标,就是带领大家一步步地,利用C++和少量的汇编代码,手动实现一个具备基本功能的Fiber库,从而深刻理解其底层原理。

II. 理解 Fiber 的核心机制:栈与上下文切换

Fiber 的核心思想是:每个 Fiber 拥有独立的执行上下文和栈空间。当一个 Fiber 暂停执行时,它会将其当前的 CPU 寄存器状态(包括栈指针、指令指针等)保存起来;当它需要恢复执行时,这些寄存器状态会被重新加载到 CPU 中,使得 Fiber 能够从上次暂停的地方继续执行。

从本质上讲,Fiber 的上下文切换,就是一次“栈的切换”。操作系统线程的切换是由内核完成的,而 Fiber 的切换则完全在用户态代码中完成。

为什么需要汇编?

C++ 作为一种高级语言,通常不允许我们直接访问和操作 CPU 寄存器,也无法直接修改栈指针(RSP/ESP)和指令指针(RIP/EIP)。然而,Fiber 的上下文切换正是通过保存和恢复这些关键寄存器来实现的。因此,我们需要借助汇编语言来完成这部分底层操作。

我们将主要关注 x86-64 架构,因为它在现代服务器和个人电脑上广泛使用。在 x86-64 架构下,我们需要保存的寄存器包括:

  • 通用寄存器 (General Purpose Registers):
    • RBX, RBP, R12, R13, R14, R15:这些是 ABI (Application Binary Interface) 规定为“被调用者保存”(Callee-saved)的寄存器。这意味着如果函数使用了它们,必须在函数返回前恢复它们的原值。因此,在 Fiber 切换时,它们必须被保存。
    • RSP (Stack Pointer): 栈指针,指向当前栈顶。这是切换 Fiber 栈的关键。
    • RIP (Instruction Pointer): 指令指针,指向下一条要执行的指令地址。这是恢复执行位置的关键。
  • 标志寄存器 (Flags Register):
    • RFLAGS: 包含了各种 CPU 状态标志,如零标志、进位标志等。保存它可以确保上下文的完整性。

我们不需要保存像 RAX, RCX, RDX, RSI, RDI, R8R11 这些“调用者保存”(Caller-saved)寄存器,因为根据 ABI,函数调用者负责在调用函数之前保存它们,如果它需要这些值在函数返回后仍然有效。在 Fiber 切换的语境下,switch_context 函数本身就是被调用的,所以它不需要关心这些寄存器。

III. 栈管理:为 Fiber 分配内存

每个 Fiber 都需要独立的栈空间来存储局部变量、函数参数和返回地址。栈的分配和管理是 Fiber 实现的基础。

在 x86-64 架构下,栈通常是向下增长的(从高地址向低地址增长)。这意味着当我们为 Fiber 分配一块内存作为其栈时,栈顶指针(RSP)将指向这块内存的最高地址,然后随着函数调用和局部变量的创建,RSP 会逐渐减小。

栈的分配方式:

我们可以使用标准库函数 malloc 或操作系统特定的内存分配函数来分配 Fiber 的栈空间。

  • malloc (C/C++ 标准库): 简单易用,但无法提供页对齐或保护页等高级功能。
  • VirtualAlloc (Windows): 允许分配特定属性的内存,例如指定起始地址、保护页(Guard Page)等。
  • mmap (Linux/Unix): 功能类似于 VirtualAlloc,可以用来分配匿名内存,并设置内存保护。

为了简化,我们的示例将主要使用 malloc,但会提到更健壮的方案。

#include <cstddef> // For size_t
#include <cstdlib> // For malloc, free
#include <stdexcept> // For std::runtime_error
#include <iostream>

// 默认栈大小,例如 64KB
// 实际应用中,根据 Fiber 的复杂度和调用深度调整
const size_t DEFAULT_STACK_SIZE = 64 * 1024; // 64KB

class StackAllocator {
public:
    // 分配栈内存,返回栈顶(高地址)
    void* allocate(size_t size) {
        // x86-64 栈是向下增长的,所以我们返回分配区域的最高地址
        // 确保栈空间是页面对齐的,虽然 malloc 通常已经对齐,但 mmap/VirtualAlloc 更能保证
        void* mem = malloc(size);
        if (!mem) {
            throw std::runtime_error("Failed to allocate stack memory.");
        }
        // 返回栈的“顶部”(最高地址),因为栈是向下增长的
        return static_cast<char*>(mem) + size;
    }

    // 释放栈内存
    void deallocate(void* stack_top, size_t size) {
        // 释放时需要传入原始分配的起始地址
        void* mem = static_cast<char*>(stack_top) - size;
        free(mem);
    }
};

栈溢出保护(Guard Page):

在生产环境中,简单的 malloc 并不足以保护 Fiber 免受栈溢出。当 Fiber 的栈使用超出了分配的空间,它会侵入相邻的内存区域,导致数据损坏甚至程序崩溃。为了防止这种情况,可以采用“栈溢出保护页”(Guard Page)技术:

  • 在分配 Fiber 栈时,额外分配一个页面(通常是4KB)作为保护页。
  • 将保护页放在 Fiber 栈的“底部”(低地址端)。
  • 设置保护页为不可读写(PROT_NONEPAGE_NOACCESS)。
  • 当 Fiber 栈溢出并尝试访问保护页时,会触发操作系统级别的访问冲突(Segmentation Fault 或 Access Violation),从而及时发现并处理栈溢出错误。

这种技术通常需要使用 mmap (Linux) 或 VirtualAlloc (Windows) 来实现。在本文的示例中,为了聚焦核心机制,我们将暂不深入实现保护页,但请务必记住其重要性。

IV. 核心:上下文切换的汇编艺术

上下文切换是 Fiber 机制的“心脏”。我们将创建一个结构体来存储 Fiber 的上下文,并编写一个汇编函数来完成实际的保存和恢复操作。

1. FiberContext 结构体

这个结构体将保存所有需要切换的 CPU 寄存器。

// fiber_context.h
#pragma once

#include <cstdint> // For uint64_t

// 保存 Fiber 上下文的结构体
// 针对 x86-64 架构
// 我们需要保存被调用者保存的寄存器 (RBX, RBP, R12-R15)
// 以及栈指针 RSP 和指令指针 RIP
// RFLAGS 也可以保存以确保更完整的上下文,但有些实现会省略
struct FiberContext {
    uint64_t rbx;
    uint64_t rbp;
    uint64_t r12;
    uint64_t r13;
    uint64_t r14;
    uint64_t r15;
    uint64_t rsp; // Stack Pointer
    uint64_t rip; // Instruction Pointer

    // 构造函数,初始化为0
    FiberContext() : rbx(0), rbp(0), r12(0), r13(0), r14(0), r15(0), rsp(0), rip(0) {}
};

// 汇编函数声明
// extern "C" 确保 C++ 编译器使用 C 链接约定
// 第一个参数是保存当前上下文的指针
// 第二个参数是加载目标上下文的指针
extern "C" void _switch_context(FiberContext* current_ctx, const FiberContext* target_ctx);

// 另一个辅助函数,用于新 Fiber 的首次进入。
// 它会设置栈并调用一个函数
extern "C" void _setup_fiber_entry(FiberContext* ctx, void* stack_top, void (*entry_func)());

2. 汇编代码 (switch_context.S)

我们将使用 AT&T 语法(Linux/GCC 常用)编写汇编代码。文件名为 switch_context.S

_switch_context 函数:

这个函数的核心逻辑是:

  1. 保存当前 Fiber 的被调用者保存寄存器和 RSPcurrent_ctx
  2. target_ctx 加载被调用者保存寄存器和 RSP
  3. 通过 ret 指令跳转到 target_ctx 中保存的 RIP
// switch_context.S
// x86-64 Linux (System V ABI)

.global _switch_context
.global _setup_fiber_entry

// void _switch_context(FiberContext* current_ctx, const FiberContext* target_ctx);
// rdi: current_ctx
// rsi: target_ctx
_switch_context:
    // 保存当前上下文到 current_ctx
    // current_ctx->rbx = rbx;
    movq %rbx, (%rdi)
    // current_ctx->rbp = rbp;
    movq %rbp, 8(%rdi)
    // current_ctx->r12 = r12;
    movq %r12, 16(%rdi)
    // current_ctx->r13 = r13;
    movq %r13, 24(%rdi)
    // current_ctx->r14 = r14;
    movq %r14, 32(%rdi)
    // current_ctx->r15 = r15;
    movq %r15, 40(%rdi)
    // current_ctx->rsp = rsp;
    movq %rsp, 48(%rdi)

    // 将当前指令指针(_switch_context的返回地址)保存到 current_ctx->rip
    // ret 指令会从栈顶弹出 RIP。这里我们通过压栈和弹出到寄存器来获取它
    // 另一种方法是使用 call 指令,然后立即 pop,但这里需要确保返回地址在栈上
    // 假设 _switch_context 是被 call 调用的,那么返回地址已经在栈顶
    // 我们需要把这个返回地址保存到 current_ctx->rip
    // 这里我们不能直接 pop %rip, 而是要保存当前栈顶的返回地址
    movq (%rsp), %rax // 获取 _switch_context 的返回地址 (即 C++ resume/yield 调用的下一条指令)
    movq %rax, 56(%rdi) // 保存到 current_ctx->rip

    // 恢复目标上下文
    // rbx = target_ctx->rbx;
    movq (%rsi), %rbx
    // rbp = target_ctx->rbp;
    movq 8(%rsi), %rbp
    // r12 = target_ctx->r12;
    movq 16(%rsi), %r12
    // r13 = target_ctx->r13;
    movq 24(%rsi), %r13
    // r14 = target_ctx->r14;
    movq 32(%rsi), %r14
    // r15 = target_ctx->r15;
    movq 40(%rsi), %r15
    // rsp = target_ctx->rsp;
    movq 48(%rsi), %rsp

    // 跳转到目标 Fiber 的指令指针
    // rip = target_ctx->rip;
    // 我们不能直接 movq %rip。而是将目标 RIP 压栈,然后 ret
    movq 56(%rsi), %rax // 获取目标 RIP
    pushq %rax          // 压入目标 RIP 到新栈
    ret                 // 弹出 RIP 并跳转

// -----------------------------------------------------------------------------
// void _setup_fiber_entry(FiberContext* ctx, void* stack_top, void (*entry_func)());
// rdi: ctx
// rsi: stack_top
// rdx: entry_func
//
// 这个函数用于初始化一个新的 FiberContext,使其在第一次被 _switch_context 切换时,
// 能够正确地从 entry_func 开始执行。
// 主要思想是:将 entry_func 的地址伪装成一个返回地址,
// 并设置 rsp 指向新栈的底部,使得 _switch_context 恢复后执行 ret 时,
// 能跳到 entry_func。
//
// x86-64 ABI 要求栈在调用函数前必须 16 字节对齐。
// 因此,在设置栈顶时,需要确保其满足对齐要求。
// 首次进入时,ret 会从栈顶弹出 RIP。所以我们需要将 entry_func 压入栈顶。
// 还需要考虑一个 dummy 返回地址,因为 entry_func 可能会返回。
_setup_fiber_entry:
    // 设置 RSP 为新的栈顶。
    // stack_top (rsi) 是分配内存的最高地址。
    // 我们需要减去 sizeof(uint64_t) * N 来确保栈对齐和足够的空间。
    // 至少需要为 entry_func 的返回地址预留空间。
    // System V ABI 要求栈在 `call` 指令前 16 字节对齐。
    // `ret` 指令之后,`rsp` 应该是 16 字节对齐的。
    //
    // 假设 stack_top 是 16 字节对齐的(通常 malloc 分配的内存起始地址是 8 或 16 字节对齐)
    // 我们要放置两个 8 字节的值在栈顶:
    // 1. entry_func 的地址 (作为 RIP)
    // 2. 一个虚拟的返回地址,以防 entry_func 返回 (例如,指向一个清理函数或者直接退出)
    //
    // 实际的栈布局 (从高地址到低地址):
    // stack_top (rsi)
    // ...
    // [dummy_return_address] <- rsp - 8
    // [entry_func_address]   <- rsp - 16 (这将是第一次 ret 后的 RIP)

    // 首先,确保 rsp 是 16 字节对齐的。
    // 如果 stack_top 已经是 16 字节对齐,那么 (stack_top - 16) 也是 16 字节对齐。
    movq %rsi, %rax       // rax = stack_top
    subq $16, %rax        // rax = stack_top - 16 (为两个 qword 预留空间)
    andq $-16, %rax       // 确保 rax 是 16 字节对齐的

    // 将 entry_func 地址写入 rsp - 8 的位置 (这将是第一次 ret 后的 RIP)
    movq %rdx, 8(%rax)    // *(rax + 8) = entry_func

    // 写入一个虚拟的返回地址到 rsp (如果 entry_func 返回,会跳到这里)
    // 我们可以让它指向一个终止 Fiber 的函数,或者只是一个无限循环。
    // 这里我们简单地让它指向一个会终止当前 Fiber 的 C++ 函数。
    // 假设我们有一个叫做 _fiber_exit_stub 的 C++ 函数,它的地址需要被放置在这里。
    // 为了简化,我们暂时用 0 表示,实际中需要一个合适的清理/退出函数。
    // 注意:这里我们不能直接引用 C++ 符号,需要在 C++ 代码中提供一个 stub 函数的地址。
    // 暂时用一个不会被执行到的地址代替,或者假设 entry_func 不会返回。
    // 更好的做法是,Fiber 封装类提供一个静态成员函数作为退出点。
    xorq %rcx, %rcx       // 或者直接 movq $0x0, (%rax)
    movq %rcx, (%rax)     // *(rax) = 0 (或者 _fiber_exit_stub 的地址)

    // 保存设置好的 RSP 和 RIP 到 ctx
    movq %rax, 48(%rdi)   // ctx->rsp = rax (新的栈顶)
    movq 8(%rax), %rax    // 获取 entry_func 地址
    movq %rax, 56(%rdi)   // ctx->rip = entry_func (第一次切换后,ret 会跳到这里)

    // 清零其他寄存器,或者保持不变。对于新 Fiber,通常清零是安全的。
    // 或者直接让 C++ 构造函数初始化为0
    xorq %rbx, %rbx
    movq %rbx, (%rdi)     // ctx->rbx = 0
    xorq %rbp, %rbp
    movq %rbp, 8(%rdi)    // ctx->rbp = 0
    xorq %r12, %r12
    movq %r12, 16(%rdi)   // ctx->r12 = 0
    xorq %r13, %r13
    movq %r13, 24(%rdi)   // ctx->r13 = 0
    xorq %r14, %r14
    movq %r14, 32(%rdi)   // ctx->r14 = 0
    xorq %r15, %r15
    movq %r15, 40(%rdi)   // ctx->r15 = 0

    ret                   // 返回到 C++ 调用者

编译汇编代码:

在 Linux 上,你可以使用 gccas 来编译汇编文件:

gcc -c switch_context.S -o switch_context.o

然后链接到你的 C++ 程序中。

Windows x64 注意事项:

  • Calling Convention: Windows x64 使用 __fastcall 约定,前四个参数通过 RCX, RDX, R8, R9 传递。我们的 _switch_context_setup_fiber_entry 函数需要相应调整。
  • 寄存器保存: 需要保存的被调用者保存寄存器包括 RBX, RBP, RDI, RSI, R12, R13, R14, R15
  • 栈帧对齐: Windows x64 约定要求在函数调用前,RSP 必须是 16 字节对齐的。
  • Inline Assembly: MSVC 不支持 AT&T 语法的内联汇编。你需要使用 Intel 语法,或者将汇编代码放在单独的 .asm 文件中,并用 ml64.exe 编译。

由于篇幅限制和跨平台复杂性,我们主要聚焦于 x86-64 Linux 的实现。

V. C++ 封装:构建 Fiber 库

现在,我们有了底层的栈分配和上下文切换机制,可以开始构建 C++ 层的 Fiber 类和调度器。

1. Fiber 类设计

Fiber 类代表一个独立的执行单元。

FiberState 枚举:

一个 Fiber 可以有多种状态:

状态 描述
INITIAL 刚创建,尚未运行
READY 已准备好运行,等待调度
RUNNING 正在 CPU 上执行
SUSPENDED 暂停执行,等待被 resume
TERMINATED 执行完毕
// fiber.h
#pragma once

#include <functional> // For std::function
#include <memory>     // For std::unique_ptr
#include "fiber_context.h"
#include "stack_allocator.h" // 假设 StackAllocator 在这里

class Fiber; // 前置声明

// Fiber 的状态
enum class FiberState {
    INITIAL,
    READY,
    RUNNING,
    SUSPENDED,
    TERMINATED
};

class Fiber {
public:
    // 构造函数:创建一个新的 Fiber
    // entry_func: Fiber 的入口函数
    // stack_size: 分配的栈大小
    Fiber(std::function<void()> entry_func, size_t stack_size = DEFAULT_STACK_SIZE);

    // 析构函数:释放栈内存
    ~Fiber();

    // 禁用拷贝构造和赋值操作
    Fiber(const Fiber&) = delete;
    Fiber& operator=(const Fiber&) = delete;

    // 移动构造和赋值
    Fiber(Fiber&& other) noexcept;
    Fiber& operator=(Fiber&& other) noexcept;

    // 让出 CPU 控制权给调度器
    void yield();

    // 恢复 Fiber 的执行
    void resume();

    // 获取 Fiber 的状态
    FiberState get_state() const { return state_; }

    // 设置 Fiber 的状态
    void set_state(FiberState state) { state_ = state; }

    // 获取 FiberContext 指针
    FiberContext* get_context() { return &context_; }

    // 获取 Fiber 的 ID (可选)
    uint64_t get_id() const { return id_; }

private:
    uint64_t id_;               // Fiber 的唯一标识符
    static uint64_t next_id_;   // 下一个 Fiber 的 ID

    FiberContext context_;      // Fiber 的 CPU 上下文
    std::function<void()> entry_func_; // Fiber 的入口函数

    void* stack_base_;          // 栈内存的起始地址 (低地址)
    void* stack_top_;           // 栈内存的顶部地址 (高地址)
    size_t stack_size_;         // 栈的大小

    FiberState state_;          // Fiber 当前状态

    // 静态成员,用于保存当前正在运行的 Fiber
    // 这将由 Scheduler 管理,但 Fiber 也需要知道谁是当前运行的
    static thread_local Fiber* current_fiber_;

    // 静态成员,用于保存调度器的 FiberContext,以便 Fiber yield 时可以切换回调度器
    // 理论上应该由 Scheduler 管理,但为了简化 Fiber 内部的 yield 实现,
    // 可以让 Fiber 知道调度器的上下文。
    static thread_local FiberContext* scheduler_context_;

    // 实际执行 Fiber 逻辑的静态成员函数
    // 在新 Fiber 首次启动时,_setup_fiber_entry 会跳转到这里
    static void _fiber_entry_stub();

    // 静态成员,用于保存当前 Fiber 实例的指针,供 _fiber_entry_stub 使用
    static thread_local Fiber* _initial_fiber_ptr_;

    // 用于 Fiber 正常结束时的清理和返回调度器的 stub
    static void _fiber_exit_stub();

    friend class Scheduler; // 允许调度器访问私有成员
};

2. Fiber 类实现

// fiber.cpp
#include "fiber.h"
#include <iostream>
#include <stdexcept>
#include <cassert>

// 静态成员初始化
uint64_t Fiber::next_id_ = 0;
thread_local Fiber* Fiber::current_fiber_ = nullptr;
thread_local FiberContext* Fiber::scheduler_context_ = nullptr;
thread_local Fiber* Fiber::_initial_fiber_ptr_ = nullptr;

StackAllocator g_stack_allocator; // 全局栈分配器

// Fiber 构造函数
Fiber::Fiber(std::function<void()> entry_func, size_t stack_size)
    : id_(next_id_++),
      entry_func_(std::move(entry_func)),
      stack_size_(stack_size),
      state_(FiberState::INITIAL)
{
    // 分配栈内存
    stack_top_ = g_stack_allocator.allocate(stack_size_);
    // stack_base_ 指向分配内存的低地址
    stack_base_ = static_cast<char*>(stack_top_) - stack_size_;

    // 初始化 FiberContext
    // _setup_fiber_entry 函数会设置好 rsp 和 rip,使其在第一次 resume 时能执行 entry_func_
    // 为了让 _fiber_entry_stub 能找到正确的 Fiber 实例,我们先将其保存到 thread_local 变量
    _initial_fiber_ptr_ = this;
    _setup_fiber_entry(&context_, stack_top_, _fiber_entry_stub);
    _initial_fiber_ptr_ = nullptr; // 用完后清空
}

// Fiber 析构函数
Fiber::~Fiber() {
    if (stack_top_ != nullptr) {
        g_stack_allocator.deallocate(stack_top_, stack_size_);
        stack_top_ = nullptr;
        stack_base_ = nullptr;
    }
}

// 移动构造函数
Fiber::Fiber(Fiber&& other) noexcept
    : id_(other.id_),
      context_(other.context_),
      entry_func_(std::move(other.entry_func_)),
      stack_base_(other.stack_base_),
      stack_top_(other.stack_top_),
      stack_size_(other.stack_size_),
      state_(other.state_)
{
    other.id_ = 0; // Prevent double free/double id for simplicity
    other.stack_base_ = nullptr;
    other.stack_top_ = nullptr;
    other.stack_size_ = 0;
    other.state_ = FiberState::TERMINATED; // Mark old fiber as terminated or invalid
}

// 移动赋值运算符
Fiber& Fiber::operator=(Fiber&& other) noexcept {
    if (this != &other) {
        // 释放当前 Fiber 的资源
        if (stack_top_ != nullptr) {
            g_stack_allocator.deallocate(stack_top_, stack_size_);
        }

        // 复制 other 的资源
        id_ = other.id_;
        context_ = other.context_;
        entry_func_ = std::move(other.entry_func_);
        stack_base_ = other.stack_base_;
        stack_top_ = other.stack_top_;
        stack_size_ = other.stack_size_;
        state_ = other.state_;

        // 清空 other 的资源
        other.id_ = 0;
        other.stack_base_ = nullptr;
        other.stack_top_ = nullptr;
        other.stack_size_ = 0;
        other.state_ = FiberState::TERMINATED;
    }
    return *this;
}

// 静态入口点 stub
void Fiber::_fiber_entry_stub() {
    // 这个函数在新 Fiber 第一次被 resume 时执行
    // 此时 _initial_fiber_ptr_ 应该指向当前 Fiber 实例
    assert(_initial_fiber_ptr_ != nullptr);
    Fiber* self = _initial_fiber_ptr_;

    // 清空 _initial_fiber_ptr_,防止后续 Fiber 错误使用
    _initial_fiber_ptr_ = nullptr;

    // 设置当前 Fiber 为正在运行
    current_fiber_ = self;
    self->set_state(FiberState::RUNNING);

    // 调用 Fiber 的实际入口函数
    self->entry_func_();

    // 如果 entry_func_ 返回,说明 Fiber 正常结束
    // 切换回调度器,并标记当前 Fiber 为 TERMINATED
    self->set_state(FiberState::TERMINATED);
    _switch_context(&self->context_, scheduler_context_); // 切换回调度器
    // 注意:_switch_context 不会返回,因为已经切换到另一个 Fiber
}

// 让出 CPU 控制权给调度器
void Fiber::yield() {
    assert(current_fiber_ == this && scheduler_context_ != nullptr);
    set_state(FiberState::SUSPENDED); // 标记为暂停
    _switch_context(&context_, scheduler_context_); // 切换到调度器上下文
    // 当 Fiber 被 Scheduler 再次 resume 时,会从这里继续执行
    set_state(FiberState::RUNNING); // 恢复后标记为运行
}

// 恢复 Fiber 的执行
void Fiber::resume() {
    assert(get_state() == FiberState::READY || get_state() == FiberState::SUSPENDED);
    // 保存当前 Fiber (通常是调度器) 的上下文
    FiberContext* prev_ctx = current_fiber_ ? current_fiber_->get_context() : scheduler_context_;

    // 设置当前正在运行的 Fiber
    Fiber* prev_fiber = current_fiber_;
    current_fiber_ = this;

    // 切换到目标 Fiber
    set_state(FiberState::RUNNING);
    _switch_context(prev_ctx, &context_);

    // 当目标 Fiber yield 或 terminate 时,会切换回这里
    current_fiber_ = prev_fiber; // 恢复之前的 Fiber 为当前 Fiber
}

3. Scheduler 类设计

Scheduler 负责管理所有 Fiber 的生命周期和调度。

// scheduler.h
#pragma once

#include <vector>
#include <queue>
#include <memory>
#include <stdexcept>
#include "fiber.h"

class Scheduler {
public:
    Scheduler();
    ~Scheduler();

    // 创建一个新的 Fiber
    std::shared_ptr<Fiber> create_fiber(std::function<void()> entry_func, size_t stack_size = DEFAULT_STACK_SIZE);

    // 开始调度循环
    void run();

    // 停止调度循环
    void stop();

    // 获取当前正在运行的 Fiber (如果没有则返回 nullptr)
    static Fiber* get_current_fiber() { return Fiber::current_fiber_; }

private:
    // 调度器自身的上下文
    FiberContext scheduler_ctx_;

    // 管理所有 Fiber 的容器
    std::vector<std::shared_ptr<Fiber>> all_fibers_;

    // 就绪队列:存放 READY 状态的 Fiber
    std::queue<std::shared_ptr<Fiber>> ready_queue_;

    bool running_; // 调度器是否正在运行

    // 在调度器初始化时,将主线程的上下文保存到 scheduler_ctx_
    // 并在 Fiber 切换回主线程时,恢复这个上下文
    void init_main_fiber_context();

    // 查找并调度下一个 Fiber
    void schedule_next_fiber();
};

4. Scheduler 类实现

// scheduler.cpp
#include "scheduler.h"
#include <algorithm> // For std::remove_if
#include <chrono>    // For std::this_thread::sleep_for
#include <thread>    // For std::this_thread

// 全局静态调度器实例,以便 Fiber::yield 能够找到它
// 或者通过参数传递,这里简化处理
thread_local Scheduler* g_current_scheduler = nullptr;

Scheduler::Scheduler() : running_(false) {
    // 初始化调度器上下文。
    // 第一次调用 _switch_context 时,current_ctx 会保存主线程的上下文。
    // 我们需要确保 Fiber::scheduler_context_ 指向这个上下文。
    Fiber::scheduler_context_ = &scheduler_ctx_;
    g_current_scheduler = this; // 设置当前调度器

    // 主线程自身也是一个“Fiber”,它的上下文由 scheduler_ctx_ 表示。
    // 这里不需要特别初始化 scheduler_ctx_ 的 rsp/rip,
    // 因为 _switch_context 会在第一次被调用时保存当前(主线程)的上下文。
}

Scheduler::~Scheduler() {
    stop(); // 确保停止调度
    all_fibers_.clear(); // 释放所有 Fiber
    if (g_current_scheduler == this) {
        g_current_scheduler = nullptr;
    }
}

// 创建一个新的 Fiber
std::shared_ptr<Fiber> Scheduler::create_fiber(std::function<void()> entry_func, size_t stack_size) {
    auto fiber = std::make_shared<Fiber>(std::move(entry_func), stack_size);
    all_fibers_.push_back(fiber);
    ready_queue_.push(fiber); // 将新 Fiber 加入就绪队列
    fiber->set_state(FiberState::READY);
    return fiber;
}

// 调度循环
void Scheduler::run() {
    running_ = true;
    std::cout << "Scheduler started." << std::endl;

    // 这是一个简单的轮询调度器
    while (running_) {
        // 移除已终止的 Fiber
        all_fibers_.erase(
            std::remove_if(all_fibers_.begin(), all_fibers_.end(),
                           [](const std::shared_ptr<Fiber>& f) {
                               return f->get_state() == FiberState::TERMINATED;
                           }),
            all_fibers_.end()
        );

        if (ready_queue_.empty() && all_fibers_.empty()) {
            // 如果没有就绪 Fiber 且所有 Fiber 都已终止,则停止调度
            running_ = false;
            break;
        }

        if (ready_queue_.empty()) {
            // 如果没有就绪 Fiber,但还有未终止的 Fiber (可能处于 SUSPENDED 状态)
            // 此时可以考虑等待事件,或者简单地让出时间片 (例如 sleep 一小段时间)
            std::cout << "Scheduler: No ready fibers, sleeping briefly..." << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
            // 也可以检查是否有 SUSPENDED 的 Fiber 可以重新变为 READY
            // (例如,如果 Fiber 正在等待某个外部事件完成,需要在这里检查并将其移回 ready_queue_)
            // For now, we only process ready_queue_, so if all are suspended, it will eventually stop.
            continue;
        }

        // 从就绪队列中取出一个 Fiber
        std::shared_ptr<Fiber> next_fiber = ready_queue_.front();
        ready_queue_.pop();

        // 检查 Fiber 状态,确保是 READY 或 SUSPENDED
        if (next_fiber->get_state() == FiberState::READY || next_fiber->get_state() == FiberState::SUSPENDED) {
            // 切换到这个 Fiber
            next_fiber->resume();

            // 当 Fiber 切换回调度器时,我们在这里继续执行
            // 检查 Fiber 的状态,决定下一步怎么做
            if (next_fiber->get_state() == FiberState::SUSPENDED) {
                // 如果 Fiber 是主动 yield 的,将其放回就绪队列
                ready_queue_.push(next_fiber);
            } else if (next_fiber->get_state() == FiberState::TERMINATED) {
                // 如果 Fiber 已经终止,则不需要做任何事,它会在下次循环中被清理
                std::cout << "Fiber " << next_fiber->get_id() << " terminated." << std::endl;
            }
            // 如果 Fiber 状态变为 RUNNING (不应该发生,因为 resume 之后会立即切换),
            // 或者其他未知状态,可以加入错误处理。
        } else {
            // Fiber 状态不正确 (例如,已经 TERMINATED 但还在队列里)
            std::cerr << "Scheduler: Skipping invalid fiber " << next_fiber->get_id() << " with state " << (int)next_fiber->get_state() << std::endl;
        }
    }

    std::cout << "Scheduler stopped." << std::endl;
}

void Scheduler::stop() {
    running_ = false;
}

VI. 示例与测试

现在我们已经构建了 Fiber 库,是时候来测试它了。我们将创建一个简单的场景,模拟万级 Fiber 的并发调度。

// main.cpp
#include <iostream>
#include <vector>
#include <chrono>
#include <numeric> // For std::iota
#include "fiber.h"
#include "scheduler.h"

// 示例 Fiber 入口函数
void my_fiber_task(int id) {
    std::cout << "Fiber " << id << ": Starting." << std::endl;
    int counter = 0;
    for (int i = 0; i < 5; ++i) {
        counter++;
        std::cout << "Fiber " << id << ": Running, counter = " << counter << std::endl;
        // 让出 CPU,模拟I/O等待或长时间计算
        Scheduler::get_current_fiber()->yield();
    }
    std::cout << "Fiber " << id << ": Finished." << std::endl;
}

// 另一个 Fiber 任务,稍微复杂一点
void another_fiber_task(int id, int iterations) {
    std::cout << "Fiber " << id << " (Complex): Starting with " << iterations << " iterations." << std::endl;
    long long sum = 0;
    for (int i = 0; i < iterations; ++i) {
        sum += i;
        if (i % 1000 == 0) { // 每隔一段时间 yield
             // std::cout << "Fiber " << id << " (Complex): Iteration " << i << ", sum = " << sum << std::endl;
            Scheduler::get_current_fiber()->yield();
        }
    }
    std::cout << "Fiber " << id << " (Complex): Finished. Final sum = " << sum << std::endl;
}

int main() {
    std::cout << "Starting Fiber concurrency example." << std::endl;

    Scheduler scheduler;

    const int num_fibers_simple = 5;
    const int num_fibers_complex = 10000; // 尝试万级 Fiber

    std::cout << "Creating " << num_fibers_simple << " simple fibers..." << std::endl;
    for (int i = 0; i < num_fibers_simple; ++i) {
        // 使用 lambda 捕获 i
        scheduler.create_fiber([i]() { my_fiber_task(i); });
    }

    std::cout << "Creating " << num_fibers_complex << " complex fibers..." << std::endl;
    auto start_create = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < num_fibers_complex; ++i) {
        // 传递不同的迭代次数
        scheduler.create_fiber([i, iterations = i * 10]() { another_fiber_task(i + num_fibers_simple, iterations); }, DEFAULT_STACK_SIZE / 2); // 适当减小栈大小
    }
    auto end_create = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> create_duration = end_create - start_create;
    std::cout << "Created " << num_fibers_complex << " complex fibers in " << create_duration.count() << " ms." << std::endl;

    std::cout << "Starting scheduler run..." << std::endl;
    auto start_run = std::chrono::high_resolution_clock::now();
    scheduler.run(); // 启动调度器
    auto end_run = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> run_duration = end_run - start_run;
    std::cout << "Scheduler finished running all fibers in " << run_duration.count() << " ms." << std::endl;

    std::cout << "Fiber concurrency example finished." << std::endl;

    return 0;
}

编译与运行:

假设你的文件结构如下:

.
├── fiber_context.h
├── fiber.h
├── fiber.cpp
├── scheduler.h
├── scheduler.cpp
├── stack_allocator.h
├── switch_context.S
└── main.cpp

使用 g++ 编译:

g++ -std=c++17 -Wall -g main.cpp fiber.cpp scheduler.cpp switch_context.S -o fiber_example
./fiber_example

你将看到 Fiber 按照调度器的规则交替执行。通过调整 num_fibers_complex 的值,你可以观察到创建和运行大量 Fiber 的效率。万级 Fiber 的创建和调度过程会非常迅速,远超同等数量的 OS 线程。

VII. 进阶话题与注意事项

1. 栈溢出检测

如前所述,简单的 malloc 不提供栈溢出保护。对于生产系统,务必使用 mmap (Linux) 或 VirtualAlloc (Windows) 分配栈,并在栈的低地址端设置一个不可访问的“保护页”(guard page)。当 Fiber 栈使用超出其界限时,会触发硬件异常,可以被捕获并妥善处理。

2. 内存对齐

x86-64 ABI 对栈帧有严格的 16 字节对齐要求。这意味着在任何 call 指令执行前,RSP 必须是 16 字节对齐的。我们的 _setup_fiber_entry 函数已经考虑了这一点,通过 andq $-16, %rax 确保了栈指针的对齐。不遵循此规则可能导致程序崩溃或不可预测的行为,尤其是在使用 SSE/AVX 指令集时。

3. 信号处理与异常处理

  • 信号处理: 由于 Fiber 是在用户态调度的,操作系统并不知道 Fiber 的存在。当进程接收到信号时,信号处理函数会在当前运行的 Fiber 上下文(即 current_fiber_)中执行。如果信号处理函数需要切换 Fiber,可能会导致复杂的问题。通常需要特殊的处理,例如将信号处理从 Fiber 切换到主线程或一个专门的 OS 线程。
  • C++ 异常: C++ 异常处理机制依赖于栈展开。当 Fiber 内部抛出异常时,如果该异常在当前 Fiber 内部被捕获,则没有问题。但如果异常跨越了 Fiber 边界(即在一个 Fiber 中抛出,希望在另一个 Fiber 中捕获),则这是不允许的,因为栈是独立的。

4. 阻塞系统调用

我们实现的 Fiber 是协作式的。这意味着如果一个 Fiber 执行了阻塞式系统调用(如 read() 从网络套接字读取数据,或者 sleep()),那么整个进程都会被阻塞,直到该系统调用完成。这会丧失并发性。

要解决这个问题,通常有两种方法:

  • 异步 I/O: 使用非阻塞 I/O (如 epoll 在 Linux, IOCP 在 Windows)。当 Fiber 遇到一个可能阻塞的 I/O 操作时,它会发起非阻塞调用,然后 yield,让调度器去运行其他 Fiber。当 I/O 操作完成时,系统会通知调度器,调度器再将等待该 I/O 的 Fiber 重新标记为 READY
  • 线程池 + 阻塞调用: 启动一个 OS 线程池,将阻塞式系统调用提交给线程池中的 OS 线程执行。当 OS 线程完成任务后,通过回调或事件机制通知 Fiber 调度器,唤醒相应的 Fiber。这种方法结合了 OS 线程和 Fiber 的优点。

5. 调试

调试 Fiber 比调试 OS 线程更困难。因为调试器通常只认识 OS 线程,而 Fiber 的栈切换在用户态进行,对调试器是透明的。你可能需要:

  • 使用自定义的调试工具或日志。
  • 在关键切换点设置断点。
  • 在调试时,Fiber 的栈信息可能不会被调试器正确解析,需要手动查看 RSPRIP

6. 平台兼容性

汇编代码是高度平台相关的。我们的 switch_context.S 是为 x86-64 Linux System V ABI 编写的。要移植到 Windows 或其他架构(如 ARM),你需要重写汇编部分,并根据目标平台的 ABI 调整 C++ 接口。

7. 调度策略

我们实现了一个简单的轮询调度器。在实际应用中,可以根据需求实现更复杂的调度策略,例如:

  • 优先级调度: 允许 Fiber 具有不同的优先级。
  • 时间片轮转: 强制 Fiber 在执行一段时间后 yield,防止单个 Fiber 霸占 CPU。这需要修改 yield 机制,或者通过外部定时器触发。
  • 事件驱动调度: Fiber 等待特定事件(如 I/O 完成、定时器到期)发生时才被唤醒。

VIII. 总结

通过手动管理汇编栈和 CPU 寄存器,我们成功地在 C++ 中构建了一个轻量级的 Fiber 调度系统。这种用户态的并发模型,以其极低的资源开销和高效的上下文切换,为实现万级并发提供了强大的基础。理解其底层机制,不仅能帮助我们构建高性能系统,更能加深对计算机系统工作原理的理解。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注