尊敬的各位技术同行,大家好。
今天,我们将深入探讨一个在高性能、高并发领域中极具价值的话题:如何利用“Fiber”(协程)机制,在C++中通过手动切换汇编栈的方式,实现万级轻量级线程的并发调度。这不仅仅是一项技术挑战,更是一种对系统底层机制的深刻理解和运用。我们将从概念出发,逐步深入到C++代码实现,并最终触及底层汇编的奥秘。
I. 引言:为什么我们需要轻量级并发?
在现代软件系统中,并发处理已是常态。无论是Web服务器处理海量用户请求,还是科学计算进行大规模数据并行计算,亦或是游戏引擎进行复杂场景渲染,都离不开并发。传统的并发模型主要基于操作系统线程(OS Thread)。
操作系统线程的挑战:
- 资源开销大: 每个OS线程通常需要至少几MB的栈空间,加上内核数据结构、线程控制块(TCB)等,内存占用不容小觑。当我们需要创建成千上万个线程时,内存开销会迅速成为瓶颈。
- 创建与销毁开销: OS线程的创建和销毁涉及内核调用,耗时相对较长。
- 上下文切换开销: OS线程的调度由操作系统内核完成。每次上下文切换,CPU需要保存当前线程的所有寄存器状态、切换内存管理单元(MMU)上下文(如果涉及进程切换)、刷新TLB等,这些操作都会带来显著的性能损耗。内核态切换的开销远高于用户态切换。
- 调度复杂性: OS线程的调度是抢占式的,由内核根据调度算法决定。这使得程序的行为更难预测,也增加了同步机制(互斥锁、信号量等)的复杂性。
为了应对这些挑战,人们开始探索更轻量级的并发模型,其中“协程”(Coroutine),或者在特定语境下被称为“Fiber”,应运而生。Go语言的Goroutine、Python的async/await、C#的async/await等都是协程思想的体现。
Fiber的优势:
- 用户态调度: Fiber的调度完全在用户空间进行,无需陷入内核。这意味着上下文切换的开销极低,通常只涉及少量寄存器的保存和恢复。
- 内存占用小: 每个Fiber可以只分配几十KB甚至几KB的栈空间,远小于OS线程。
- 协作式调度: Fiber通常采用协作式调度模型,即一个Fiber主动让出CPU控制权(yield)给另一个Fiber,或者调度器选择下一个Fiber执行。这使得程序逻辑更清晰,避免了抢占式调度带来的竞态条件和死锁问题(但需要程序员自行管理阻塞)。
- 高密度: 由于其极低的开销,一个进程可以轻松管理数万乃至数十万个Fiber,实现极高的并发密度。
本文的目标,就是带领大家一步步地,利用C++和少量的汇编代码,手动实现一个具备基本功能的Fiber库,从而深刻理解其底层原理。
II. 理解 Fiber 的核心机制:栈与上下文切换
Fiber 的核心思想是:每个 Fiber 拥有独立的执行上下文和栈空间。当一个 Fiber 暂停执行时,它会将其当前的 CPU 寄存器状态(包括栈指针、指令指针等)保存起来;当它需要恢复执行时,这些寄存器状态会被重新加载到 CPU 中,使得 Fiber 能够从上次暂停的地方继续执行。
从本质上讲,Fiber 的上下文切换,就是一次“栈的切换”。操作系统线程的切换是由内核完成的,而 Fiber 的切换则完全在用户态代码中完成。
为什么需要汇编?
C++ 作为一种高级语言,通常不允许我们直接访问和操作 CPU 寄存器,也无法直接修改栈指针(RSP/ESP)和指令指针(RIP/EIP)。然而,Fiber 的上下文切换正是通过保存和恢复这些关键寄存器来实现的。因此,我们需要借助汇编语言来完成这部分底层操作。
我们将主要关注 x86-64 架构,因为它在现代服务器和个人电脑上广泛使用。在 x86-64 架构下,我们需要保存的寄存器包括:
- 通用寄存器 (General Purpose Registers):
RBX,RBP,R12,R13,R14,R15:这些是 ABI (Application Binary Interface) 规定为“被调用者保存”(Callee-saved)的寄存器。这意味着如果函数使用了它们,必须在函数返回前恢复它们的原值。因此,在 Fiber 切换时,它们必须被保存。RSP(Stack Pointer): 栈指针,指向当前栈顶。这是切换 Fiber 栈的关键。RIP(Instruction Pointer): 指令指针,指向下一条要执行的指令地址。这是恢复执行位置的关键。
- 标志寄存器 (Flags Register):
RFLAGS: 包含了各种 CPU 状态标志,如零标志、进位标志等。保存它可以确保上下文的完整性。
我们不需要保存像 RAX, RCX, RDX, RSI, RDI, R8–R11 这些“调用者保存”(Caller-saved)寄存器,因为根据 ABI,函数调用者负责在调用函数之前保存它们,如果它需要这些值在函数返回后仍然有效。在 Fiber 切换的语境下,switch_context 函数本身就是被调用的,所以它不需要关心这些寄存器。
III. 栈管理:为 Fiber 分配内存
每个 Fiber 都需要独立的栈空间来存储局部变量、函数参数和返回地址。栈的分配和管理是 Fiber 实现的基础。
在 x86-64 架构下,栈通常是向下增长的(从高地址向低地址增长)。这意味着当我们为 Fiber 分配一块内存作为其栈时,栈顶指针(RSP)将指向这块内存的最高地址,然后随着函数调用和局部变量的创建,RSP 会逐渐减小。
栈的分配方式:
我们可以使用标准库函数 malloc 或操作系统特定的内存分配函数来分配 Fiber 的栈空间。
malloc(C/C++ 标准库): 简单易用,但无法提供页对齐或保护页等高级功能。VirtualAlloc(Windows): 允许分配特定属性的内存,例如指定起始地址、保护页(Guard Page)等。mmap(Linux/Unix): 功能类似于VirtualAlloc,可以用来分配匿名内存,并设置内存保护。
为了简化,我们的示例将主要使用 malloc,但会提到更健壮的方案。
#include <cstddef> // For size_t
#include <cstdlib> // For malloc, free
#include <stdexcept> // For std::runtime_error
#include <iostream>
// 默认栈大小,例如 64KB
// 实际应用中,根据 Fiber 的复杂度和调用深度调整
const size_t DEFAULT_STACK_SIZE = 64 * 1024; // 64KB
class StackAllocator {
public:
// 分配栈内存,返回栈顶(高地址)
void* allocate(size_t size) {
// x86-64 栈是向下增长的,所以我们返回分配区域的最高地址
// 确保栈空间是页面对齐的,虽然 malloc 通常已经对齐,但 mmap/VirtualAlloc 更能保证
void* mem = malloc(size);
if (!mem) {
throw std::runtime_error("Failed to allocate stack memory.");
}
// 返回栈的“顶部”(最高地址),因为栈是向下增长的
return static_cast<char*>(mem) + size;
}
// 释放栈内存
void deallocate(void* stack_top, size_t size) {
// 释放时需要传入原始分配的起始地址
void* mem = static_cast<char*>(stack_top) - size;
free(mem);
}
};
栈溢出保护(Guard Page):
在生产环境中,简单的 malloc 并不足以保护 Fiber 免受栈溢出。当 Fiber 的栈使用超出了分配的空间,它会侵入相邻的内存区域,导致数据损坏甚至程序崩溃。为了防止这种情况,可以采用“栈溢出保护页”(Guard Page)技术:
- 在分配 Fiber 栈时,额外分配一个页面(通常是4KB)作为保护页。
- 将保护页放在 Fiber 栈的“底部”(低地址端)。
- 设置保护页为不可读写(
PROT_NONE或PAGE_NOACCESS)。 - 当 Fiber 栈溢出并尝试访问保护页时,会触发操作系统级别的访问冲突(Segmentation Fault 或 Access Violation),从而及时发现并处理栈溢出错误。
这种技术通常需要使用 mmap (Linux) 或 VirtualAlloc (Windows) 来实现。在本文的示例中,为了聚焦核心机制,我们将暂不深入实现保护页,但请务必记住其重要性。
IV. 核心:上下文切换的汇编艺术
上下文切换是 Fiber 机制的“心脏”。我们将创建一个结构体来存储 Fiber 的上下文,并编写一个汇编函数来完成实际的保存和恢复操作。
1. FiberContext 结构体
这个结构体将保存所有需要切换的 CPU 寄存器。
// fiber_context.h
#pragma once
#include <cstdint> // For uint64_t
// 保存 Fiber 上下文的结构体
// 针对 x86-64 架构
// 我们需要保存被调用者保存的寄存器 (RBX, RBP, R12-R15)
// 以及栈指针 RSP 和指令指针 RIP
// RFLAGS 也可以保存以确保更完整的上下文,但有些实现会省略
struct FiberContext {
uint64_t rbx;
uint64_t rbp;
uint64_t r12;
uint64_t r13;
uint64_t r14;
uint64_t r15;
uint64_t rsp; // Stack Pointer
uint64_t rip; // Instruction Pointer
// 构造函数,初始化为0
FiberContext() : rbx(0), rbp(0), r12(0), r13(0), r14(0), r15(0), rsp(0), rip(0) {}
};
// 汇编函数声明
// extern "C" 确保 C++ 编译器使用 C 链接约定
// 第一个参数是保存当前上下文的指针
// 第二个参数是加载目标上下文的指针
extern "C" void _switch_context(FiberContext* current_ctx, const FiberContext* target_ctx);
// 另一个辅助函数,用于新 Fiber 的首次进入。
// 它会设置栈并调用一个函数
extern "C" void _setup_fiber_entry(FiberContext* ctx, void* stack_top, void (*entry_func)());
2. 汇编代码 (switch_context.S)
我们将使用 AT&T 语法(Linux/GCC 常用)编写汇编代码。文件名为 switch_context.S。
_switch_context 函数:
这个函数的核心逻辑是:
- 保存当前 Fiber 的被调用者保存寄存器和
RSP到current_ctx。 - 从
target_ctx加载被调用者保存寄存器和RSP。 - 通过
ret指令跳转到target_ctx中保存的RIP。
// switch_context.S
// x86-64 Linux (System V ABI)
.global _switch_context
.global _setup_fiber_entry
// void _switch_context(FiberContext* current_ctx, const FiberContext* target_ctx);
// rdi: current_ctx
// rsi: target_ctx
_switch_context:
// 保存当前上下文到 current_ctx
// current_ctx->rbx = rbx;
movq %rbx, (%rdi)
// current_ctx->rbp = rbp;
movq %rbp, 8(%rdi)
// current_ctx->r12 = r12;
movq %r12, 16(%rdi)
// current_ctx->r13 = r13;
movq %r13, 24(%rdi)
// current_ctx->r14 = r14;
movq %r14, 32(%rdi)
// current_ctx->r15 = r15;
movq %r15, 40(%rdi)
// current_ctx->rsp = rsp;
movq %rsp, 48(%rdi)
// 将当前指令指针(_switch_context的返回地址)保存到 current_ctx->rip
// ret 指令会从栈顶弹出 RIP。这里我们通过压栈和弹出到寄存器来获取它
// 另一种方法是使用 call 指令,然后立即 pop,但这里需要确保返回地址在栈上
// 假设 _switch_context 是被 call 调用的,那么返回地址已经在栈顶
// 我们需要把这个返回地址保存到 current_ctx->rip
// 这里我们不能直接 pop %rip, 而是要保存当前栈顶的返回地址
movq (%rsp), %rax // 获取 _switch_context 的返回地址 (即 C++ resume/yield 调用的下一条指令)
movq %rax, 56(%rdi) // 保存到 current_ctx->rip
// 恢复目标上下文
// rbx = target_ctx->rbx;
movq (%rsi), %rbx
// rbp = target_ctx->rbp;
movq 8(%rsi), %rbp
// r12 = target_ctx->r12;
movq 16(%rsi), %r12
// r13 = target_ctx->r13;
movq 24(%rsi), %r13
// r14 = target_ctx->r14;
movq 32(%rsi), %r14
// r15 = target_ctx->r15;
movq 40(%rsi), %r15
// rsp = target_ctx->rsp;
movq 48(%rsi), %rsp
// 跳转到目标 Fiber 的指令指针
// rip = target_ctx->rip;
// 我们不能直接 movq %rip。而是将目标 RIP 压栈,然后 ret
movq 56(%rsi), %rax // 获取目标 RIP
pushq %rax // 压入目标 RIP 到新栈
ret // 弹出 RIP 并跳转
// -----------------------------------------------------------------------------
// void _setup_fiber_entry(FiberContext* ctx, void* stack_top, void (*entry_func)());
// rdi: ctx
// rsi: stack_top
// rdx: entry_func
//
// 这个函数用于初始化一个新的 FiberContext,使其在第一次被 _switch_context 切换时,
// 能够正确地从 entry_func 开始执行。
// 主要思想是:将 entry_func 的地址伪装成一个返回地址,
// 并设置 rsp 指向新栈的底部,使得 _switch_context 恢复后执行 ret 时,
// 能跳到 entry_func。
//
// x86-64 ABI 要求栈在调用函数前必须 16 字节对齐。
// 因此,在设置栈顶时,需要确保其满足对齐要求。
// 首次进入时,ret 会从栈顶弹出 RIP。所以我们需要将 entry_func 压入栈顶。
// 还需要考虑一个 dummy 返回地址,因为 entry_func 可能会返回。
_setup_fiber_entry:
// 设置 RSP 为新的栈顶。
// stack_top (rsi) 是分配内存的最高地址。
// 我们需要减去 sizeof(uint64_t) * N 来确保栈对齐和足够的空间。
// 至少需要为 entry_func 的返回地址预留空间。
// System V ABI 要求栈在 `call` 指令前 16 字节对齐。
// `ret` 指令之后,`rsp` 应该是 16 字节对齐的。
//
// 假设 stack_top 是 16 字节对齐的(通常 malloc 分配的内存起始地址是 8 或 16 字节对齐)
// 我们要放置两个 8 字节的值在栈顶:
// 1. entry_func 的地址 (作为 RIP)
// 2. 一个虚拟的返回地址,以防 entry_func 返回 (例如,指向一个清理函数或者直接退出)
//
// 实际的栈布局 (从高地址到低地址):
// stack_top (rsi)
// ...
// [dummy_return_address] <- rsp - 8
// [entry_func_address] <- rsp - 16 (这将是第一次 ret 后的 RIP)
// 首先,确保 rsp 是 16 字节对齐的。
// 如果 stack_top 已经是 16 字节对齐,那么 (stack_top - 16) 也是 16 字节对齐。
movq %rsi, %rax // rax = stack_top
subq $16, %rax // rax = stack_top - 16 (为两个 qword 预留空间)
andq $-16, %rax // 确保 rax 是 16 字节对齐的
// 将 entry_func 地址写入 rsp - 8 的位置 (这将是第一次 ret 后的 RIP)
movq %rdx, 8(%rax) // *(rax + 8) = entry_func
// 写入一个虚拟的返回地址到 rsp (如果 entry_func 返回,会跳到这里)
// 我们可以让它指向一个终止 Fiber 的函数,或者只是一个无限循环。
// 这里我们简单地让它指向一个会终止当前 Fiber 的 C++ 函数。
// 假设我们有一个叫做 _fiber_exit_stub 的 C++ 函数,它的地址需要被放置在这里。
// 为了简化,我们暂时用 0 表示,实际中需要一个合适的清理/退出函数。
// 注意:这里我们不能直接引用 C++ 符号,需要在 C++ 代码中提供一个 stub 函数的地址。
// 暂时用一个不会被执行到的地址代替,或者假设 entry_func 不会返回。
// 更好的做法是,Fiber 封装类提供一个静态成员函数作为退出点。
xorq %rcx, %rcx // 或者直接 movq $0x0, (%rax)
movq %rcx, (%rax) // *(rax) = 0 (或者 _fiber_exit_stub 的地址)
// 保存设置好的 RSP 和 RIP 到 ctx
movq %rax, 48(%rdi) // ctx->rsp = rax (新的栈顶)
movq 8(%rax), %rax // 获取 entry_func 地址
movq %rax, 56(%rdi) // ctx->rip = entry_func (第一次切换后,ret 会跳到这里)
// 清零其他寄存器,或者保持不变。对于新 Fiber,通常清零是安全的。
// 或者直接让 C++ 构造函数初始化为0
xorq %rbx, %rbx
movq %rbx, (%rdi) // ctx->rbx = 0
xorq %rbp, %rbp
movq %rbp, 8(%rdi) // ctx->rbp = 0
xorq %r12, %r12
movq %r12, 16(%rdi) // ctx->r12 = 0
xorq %r13, %r13
movq %r13, 24(%rdi) // ctx->r13 = 0
xorq %r14, %r14
movq %r14, 32(%rdi) // ctx->r14 = 0
xorq %r15, %r15
movq %r15, 40(%rdi) // ctx->r15 = 0
ret // 返回到 C++ 调用者
编译汇编代码:
在 Linux 上,你可以使用 gcc 或 as 来编译汇编文件:
gcc -c switch_context.S -o switch_context.o
然后链接到你的 C++ 程序中。
Windows x64 注意事项:
- Calling Convention: Windows x64 使用
__fastcall约定,前四个参数通过RCX,RDX,R8,R9传递。我们的_switch_context和_setup_fiber_entry函数需要相应调整。 - 寄存器保存: 需要保存的被调用者保存寄存器包括
RBX,RBP,RDI,RSI,R12,R13,R14,R15。 - 栈帧对齐: Windows x64 约定要求在函数调用前,
RSP必须是 16 字节对齐的。 - Inline Assembly: MSVC 不支持 AT&T 语法的内联汇编。你需要使用 Intel 语法,或者将汇编代码放在单独的
.asm文件中,并用ml64.exe编译。
由于篇幅限制和跨平台复杂性,我们主要聚焦于 x86-64 Linux 的实现。
V. C++ 封装:构建 Fiber 库
现在,我们有了底层的栈分配和上下文切换机制,可以开始构建 C++ 层的 Fiber 类和调度器。
1. Fiber 类设计
Fiber 类代表一个独立的执行单元。
FiberState 枚举:
一个 Fiber 可以有多种状态:
| 状态 | 描述 |
|---|---|
INITIAL |
刚创建,尚未运行 |
READY |
已准备好运行,等待调度 |
RUNNING |
正在 CPU 上执行 |
SUSPENDED |
暂停执行,等待被 resume |
TERMINATED |
执行完毕 |
// fiber.h
#pragma once
#include <functional> // For std::function
#include <memory> // For std::unique_ptr
#include "fiber_context.h"
#include "stack_allocator.h" // 假设 StackAllocator 在这里
class Fiber; // 前置声明
// Fiber 的状态
enum class FiberState {
INITIAL,
READY,
RUNNING,
SUSPENDED,
TERMINATED
};
class Fiber {
public:
// 构造函数:创建一个新的 Fiber
// entry_func: Fiber 的入口函数
// stack_size: 分配的栈大小
Fiber(std::function<void()> entry_func, size_t stack_size = DEFAULT_STACK_SIZE);
// 析构函数:释放栈内存
~Fiber();
// 禁用拷贝构造和赋值操作
Fiber(const Fiber&) = delete;
Fiber& operator=(const Fiber&) = delete;
// 移动构造和赋值
Fiber(Fiber&& other) noexcept;
Fiber& operator=(Fiber&& other) noexcept;
// 让出 CPU 控制权给调度器
void yield();
// 恢复 Fiber 的执行
void resume();
// 获取 Fiber 的状态
FiberState get_state() const { return state_; }
// 设置 Fiber 的状态
void set_state(FiberState state) { state_ = state; }
// 获取 FiberContext 指针
FiberContext* get_context() { return &context_; }
// 获取 Fiber 的 ID (可选)
uint64_t get_id() const { return id_; }
private:
uint64_t id_; // Fiber 的唯一标识符
static uint64_t next_id_; // 下一个 Fiber 的 ID
FiberContext context_; // Fiber 的 CPU 上下文
std::function<void()> entry_func_; // Fiber 的入口函数
void* stack_base_; // 栈内存的起始地址 (低地址)
void* stack_top_; // 栈内存的顶部地址 (高地址)
size_t stack_size_; // 栈的大小
FiberState state_; // Fiber 当前状态
// 静态成员,用于保存当前正在运行的 Fiber
// 这将由 Scheduler 管理,但 Fiber 也需要知道谁是当前运行的
static thread_local Fiber* current_fiber_;
// 静态成员,用于保存调度器的 FiberContext,以便 Fiber yield 时可以切换回调度器
// 理论上应该由 Scheduler 管理,但为了简化 Fiber 内部的 yield 实现,
// 可以让 Fiber 知道调度器的上下文。
static thread_local FiberContext* scheduler_context_;
// 实际执行 Fiber 逻辑的静态成员函数
// 在新 Fiber 首次启动时,_setup_fiber_entry 会跳转到这里
static void _fiber_entry_stub();
// 静态成员,用于保存当前 Fiber 实例的指针,供 _fiber_entry_stub 使用
static thread_local Fiber* _initial_fiber_ptr_;
// 用于 Fiber 正常结束时的清理和返回调度器的 stub
static void _fiber_exit_stub();
friend class Scheduler; // 允许调度器访问私有成员
};
2. Fiber 类实现
// fiber.cpp
#include "fiber.h"
#include <iostream>
#include <stdexcept>
#include <cassert>
// 静态成员初始化
uint64_t Fiber::next_id_ = 0;
thread_local Fiber* Fiber::current_fiber_ = nullptr;
thread_local FiberContext* Fiber::scheduler_context_ = nullptr;
thread_local Fiber* Fiber::_initial_fiber_ptr_ = nullptr;
StackAllocator g_stack_allocator; // 全局栈分配器
// Fiber 构造函数
Fiber::Fiber(std::function<void()> entry_func, size_t stack_size)
: id_(next_id_++),
entry_func_(std::move(entry_func)),
stack_size_(stack_size),
state_(FiberState::INITIAL)
{
// 分配栈内存
stack_top_ = g_stack_allocator.allocate(stack_size_);
// stack_base_ 指向分配内存的低地址
stack_base_ = static_cast<char*>(stack_top_) - stack_size_;
// 初始化 FiberContext
// _setup_fiber_entry 函数会设置好 rsp 和 rip,使其在第一次 resume 时能执行 entry_func_
// 为了让 _fiber_entry_stub 能找到正确的 Fiber 实例,我们先将其保存到 thread_local 变量
_initial_fiber_ptr_ = this;
_setup_fiber_entry(&context_, stack_top_, _fiber_entry_stub);
_initial_fiber_ptr_ = nullptr; // 用完后清空
}
// Fiber 析构函数
Fiber::~Fiber() {
if (stack_top_ != nullptr) {
g_stack_allocator.deallocate(stack_top_, stack_size_);
stack_top_ = nullptr;
stack_base_ = nullptr;
}
}
// 移动构造函数
Fiber::Fiber(Fiber&& other) noexcept
: id_(other.id_),
context_(other.context_),
entry_func_(std::move(other.entry_func_)),
stack_base_(other.stack_base_),
stack_top_(other.stack_top_),
stack_size_(other.stack_size_),
state_(other.state_)
{
other.id_ = 0; // Prevent double free/double id for simplicity
other.stack_base_ = nullptr;
other.stack_top_ = nullptr;
other.stack_size_ = 0;
other.state_ = FiberState::TERMINATED; // Mark old fiber as terminated or invalid
}
// 移动赋值运算符
Fiber& Fiber::operator=(Fiber&& other) noexcept {
if (this != &other) {
// 释放当前 Fiber 的资源
if (stack_top_ != nullptr) {
g_stack_allocator.deallocate(stack_top_, stack_size_);
}
// 复制 other 的资源
id_ = other.id_;
context_ = other.context_;
entry_func_ = std::move(other.entry_func_);
stack_base_ = other.stack_base_;
stack_top_ = other.stack_top_;
stack_size_ = other.stack_size_;
state_ = other.state_;
// 清空 other 的资源
other.id_ = 0;
other.stack_base_ = nullptr;
other.stack_top_ = nullptr;
other.stack_size_ = 0;
other.state_ = FiberState::TERMINATED;
}
return *this;
}
// 静态入口点 stub
void Fiber::_fiber_entry_stub() {
// 这个函数在新 Fiber 第一次被 resume 时执行
// 此时 _initial_fiber_ptr_ 应该指向当前 Fiber 实例
assert(_initial_fiber_ptr_ != nullptr);
Fiber* self = _initial_fiber_ptr_;
// 清空 _initial_fiber_ptr_,防止后续 Fiber 错误使用
_initial_fiber_ptr_ = nullptr;
// 设置当前 Fiber 为正在运行
current_fiber_ = self;
self->set_state(FiberState::RUNNING);
// 调用 Fiber 的实际入口函数
self->entry_func_();
// 如果 entry_func_ 返回,说明 Fiber 正常结束
// 切换回调度器,并标记当前 Fiber 为 TERMINATED
self->set_state(FiberState::TERMINATED);
_switch_context(&self->context_, scheduler_context_); // 切换回调度器
// 注意:_switch_context 不会返回,因为已经切换到另一个 Fiber
}
// 让出 CPU 控制权给调度器
void Fiber::yield() {
assert(current_fiber_ == this && scheduler_context_ != nullptr);
set_state(FiberState::SUSPENDED); // 标记为暂停
_switch_context(&context_, scheduler_context_); // 切换到调度器上下文
// 当 Fiber 被 Scheduler 再次 resume 时,会从这里继续执行
set_state(FiberState::RUNNING); // 恢复后标记为运行
}
// 恢复 Fiber 的执行
void Fiber::resume() {
assert(get_state() == FiberState::READY || get_state() == FiberState::SUSPENDED);
// 保存当前 Fiber (通常是调度器) 的上下文
FiberContext* prev_ctx = current_fiber_ ? current_fiber_->get_context() : scheduler_context_;
// 设置当前正在运行的 Fiber
Fiber* prev_fiber = current_fiber_;
current_fiber_ = this;
// 切换到目标 Fiber
set_state(FiberState::RUNNING);
_switch_context(prev_ctx, &context_);
// 当目标 Fiber yield 或 terminate 时,会切换回这里
current_fiber_ = prev_fiber; // 恢复之前的 Fiber 为当前 Fiber
}
3. Scheduler 类设计
Scheduler 负责管理所有 Fiber 的生命周期和调度。
// scheduler.h
#pragma once
#include <vector>
#include <queue>
#include <memory>
#include <stdexcept>
#include "fiber.h"
class Scheduler {
public:
Scheduler();
~Scheduler();
// 创建一个新的 Fiber
std::shared_ptr<Fiber> create_fiber(std::function<void()> entry_func, size_t stack_size = DEFAULT_STACK_SIZE);
// 开始调度循环
void run();
// 停止调度循环
void stop();
// 获取当前正在运行的 Fiber (如果没有则返回 nullptr)
static Fiber* get_current_fiber() { return Fiber::current_fiber_; }
private:
// 调度器自身的上下文
FiberContext scheduler_ctx_;
// 管理所有 Fiber 的容器
std::vector<std::shared_ptr<Fiber>> all_fibers_;
// 就绪队列:存放 READY 状态的 Fiber
std::queue<std::shared_ptr<Fiber>> ready_queue_;
bool running_; // 调度器是否正在运行
// 在调度器初始化时,将主线程的上下文保存到 scheduler_ctx_
// 并在 Fiber 切换回主线程时,恢复这个上下文
void init_main_fiber_context();
// 查找并调度下一个 Fiber
void schedule_next_fiber();
};
4. Scheduler 类实现
// scheduler.cpp
#include "scheduler.h"
#include <algorithm> // For std::remove_if
#include <chrono> // For std::this_thread::sleep_for
#include <thread> // For std::this_thread
// 全局静态调度器实例,以便 Fiber::yield 能够找到它
// 或者通过参数传递,这里简化处理
thread_local Scheduler* g_current_scheduler = nullptr;
Scheduler::Scheduler() : running_(false) {
// 初始化调度器上下文。
// 第一次调用 _switch_context 时,current_ctx 会保存主线程的上下文。
// 我们需要确保 Fiber::scheduler_context_ 指向这个上下文。
Fiber::scheduler_context_ = &scheduler_ctx_;
g_current_scheduler = this; // 设置当前调度器
// 主线程自身也是一个“Fiber”,它的上下文由 scheduler_ctx_ 表示。
// 这里不需要特别初始化 scheduler_ctx_ 的 rsp/rip,
// 因为 _switch_context 会在第一次被调用时保存当前(主线程)的上下文。
}
Scheduler::~Scheduler() {
stop(); // 确保停止调度
all_fibers_.clear(); // 释放所有 Fiber
if (g_current_scheduler == this) {
g_current_scheduler = nullptr;
}
}
// 创建一个新的 Fiber
std::shared_ptr<Fiber> Scheduler::create_fiber(std::function<void()> entry_func, size_t stack_size) {
auto fiber = std::make_shared<Fiber>(std::move(entry_func), stack_size);
all_fibers_.push_back(fiber);
ready_queue_.push(fiber); // 将新 Fiber 加入就绪队列
fiber->set_state(FiberState::READY);
return fiber;
}
// 调度循环
void Scheduler::run() {
running_ = true;
std::cout << "Scheduler started." << std::endl;
// 这是一个简单的轮询调度器
while (running_) {
// 移除已终止的 Fiber
all_fibers_.erase(
std::remove_if(all_fibers_.begin(), all_fibers_.end(),
[](const std::shared_ptr<Fiber>& f) {
return f->get_state() == FiberState::TERMINATED;
}),
all_fibers_.end()
);
if (ready_queue_.empty() && all_fibers_.empty()) {
// 如果没有就绪 Fiber 且所有 Fiber 都已终止,则停止调度
running_ = false;
break;
}
if (ready_queue_.empty()) {
// 如果没有就绪 Fiber,但还有未终止的 Fiber (可能处于 SUSPENDED 状态)
// 此时可以考虑等待事件,或者简单地让出时间片 (例如 sleep 一小段时间)
std::cout << "Scheduler: No ready fibers, sleeping briefly..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// 也可以检查是否有 SUSPENDED 的 Fiber 可以重新变为 READY
// (例如,如果 Fiber 正在等待某个外部事件完成,需要在这里检查并将其移回 ready_queue_)
// For now, we only process ready_queue_, so if all are suspended, it will eventually stop.
continue;
}
// 从就绪队列中取出一个 Fiber
std::shared_ptr<Fiber> next_fiber = ready_queue_.front();
ready_queue_.pop();
// 检查 Fiber 状态,确保是 READY 或 SUSPENDED
if (next_fiber->get_state() == FiberState::READY || next_fiber->get_state() == FiberState::SUSPENDED) {
// 切换到这个 Fiber
next_fiber->resume();
// 当 Fiber 切换回调度器时,我们在这里继续执行
// 检查 Fiber 的状态,决定下一步怎么做
if (next_fiber->get_state() == FiberState::SUSPENDED) {
// 如果 Fiber 是主动 yield 的,将其放回就绪队列
ready_queue_.push(next_fiber);
} else if (next_fiber->get_state() == FiberState::TERMINATED) {
// 如果 Fiber 已经终止,则不需要做任何事,它会在下次循环中被清理
std::cout << "Fiber " << next_fiber->get_id() << " terminated." << std::endl;
}
// 如果 Fiber 状态变为 RUNNING (不应该发生,因为 resume 之后会立即切换),
// 或者其他未知状态,可以加入错误处理。
} else {
// Fiber 状态不正确 (例如,已经 TERMINATED 但还在队列里)
std::cerr << "Scheduler: Skipping invalid fiber " << next_fiber->get_id() << " with state " << (int)next_fiber->get_state() << std::endl;
}
}
std::cout << "Scheduler stopped." << std::endl;
}
void Scheduler::stop() {
running_ = false;
}
VI. 示例与测试
现在我们已经构建了 Fiber 库,是时候来测试它了。我们将创建一个简单的场景,模拟万级 Fiber 的并发调度。
// main.cpp
#include <iostream>
#include <vector>
#include <chrono>
#include <numeric> // For std::iota
#include "fiber.h"
#include "scheduler.h"
// 示例 Fiber 入口函数
void my_fiber_task(int id) {
std::cout << "Fiber " << id << ": Starting." << std::endl;
int counter = 0;
for (int i = 0; i < 5; ++i) {
counter++;
std::cout << "Fiber " << id << ": Running, counter = " << counter << std::endl;
// 让出 CPU,模拟I/O等待或长时间计算
Scheduler::get_current_fiber()->yield();
}
std::cout << "Fiber " << id << ": Finished." << std::endl;
}
// 另一个 Fiber 任务,稍微复杂一点
void another_fiber_task(int id, int iterations) {
std::cout << "Fiber " << id << " (Complex): Starting with " << iterations << " iterations." << std::endl;
long long sum = 0;
for (int i = 0; i < iterations; ++i) {
sum += i;
if (i % 1000 == 0) { // 每隔一段时间 yield
// std::cout << "Fiber " << id << " (Complex): Iteration " << i << ", sum = " << sum << std::endl;
Scheduler::get_current_fiber()->yield();
}
}
std::cout << "Fiber " << id << " (Complex): Finished. Final sum = " << sum << std::endl;
}
int main() {
std::cout << "Starting Fiber concurrency example." << std::endl;
Scheduler scheduler;
const int num_fibers_simple = 5;
const int num_fibers_complex = 10000; // 尝试万级 Fiber
std::cout << "Creating " << num_fibers_simple << " simple fibers..." << std::endl;
for (int i = 0; i < num_fibers_simple; ++i) {
// 使用 lambda 捕获 i
scheduler.create_fiber([i]() { my_fiber_task(i); });
}
std::cout << "Creating " << num_fibers_complex << " complex fibers..." << std::endl;
auto start_create = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_fibers_complex; ++i) {
// 传递不同的迭代次数
scheduler.create_fiber([i, iterations = i * 10]() { another_fiber_task(i + num_fibers_simple, iterations); }, DEFAULT_STACK_SIZE / 2); // 适当减小栈大小
}
auto end_create = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> create_duration = end_create - start_create;
std::cout << "Created " << num_fibers_complex << " complex fibers in " << create_duration.count() << " ms." << std::endl;
std::cout << "Starting scheduler run..." << std::endl;
auto start_run = std::chrono::high_resolution_clock::now();
scheduler.run(); // 启动调度器
auto end_run = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> run_duration = end_run - start_run;
std::cout << "Scheduler finished running all fibers in " << run_duration.count() << " ms." << std::endl;
std::cout << "Fiber concurrency example finished." << std::endl;
return 0;
}
编译与运行:
假设你的文件结构如下:
.
├── fiber_context.h
├── fiber.h
├── fiber.cpp
├── scheduler.h
├── scheduler.cpp
├── stack_allocator.h
├── switch_context.S
└── main.cpp
使用 g++ 编译:
g++ -std=c++17 -Wall -g main.cpp fiber.cpp scheduler.cpp switch_context.S -o fiber_example
./fiber_example
你将看到 Fiber 按照调度器的规则交替执行。通过调整 num_fibers_complex 的值,你可以观察到创建和运行大量 Fiber 的效率。万级 Fiber 的创建和调度过程会非常迅速,远超同等数量的 OS 线程。
VII. 进阶话题与注意事项
1. 栈溢出检测
如前所述,简单的 malloc 不提供栈溢出保护。对于生产系统,务必使用 mmap (Linux) 或 VirtualAlloc (Windows) 分配栈,并在栈的低地址端设置一个不可访问的“保护页”(guard page)。当 Fiber 栈使用超出其界限时,会触发硬件异常,可以被捕获并妥善处理。
2. 内存对齐
x86-64 ABI 对栈帧有严格的 16 字节对齐要求。这意味着在任何 call 指令执行前,RSP 必须是 16 字节对齐的。我们的 _setup_fiber_entry 函数已经考虑了这一点,通过 andq $-16, %rax 确保了栈指针的对齐。不遵循此规则可能导致程序崩溃或不可预测的行为,尤其是在使用 SSE/AVX 指令集时。
3. 信号处理与异常处理
- 信号处理: 由于 Fiber 是在用户态调度的,操作系统并不知道 Fiber 的存在。当进程接收到信号时,信号处理函数会在当前运行的 Fiber 上下文(即
current_fiber_)中执行。如果信号处理函数需要切换 Fiber,可能会导致复杂的问题。通常需要特殊的处理,例如将信号处理从 Fiber 切换到主线程或一个专门的 OS 线程。 - C++ 异常: C++ 异常处理机制依赖于栈展开。当 Fiber 内部抛出异常时,如果该异常在当前 Fiber 内部被捕获,则没有问题。但如果异常跨越了 Fiber 边界(即在一个 Fiber 中抛出,希望在另一个 Fiber 中捕获),则这是不允许的,因为栈是独立的。
4. 阻塞系统调用
我们实现的 Fiber 是协作式的。这意味着如果一个 Fiber 执行了阻塞式系统调用(如 read() 从网络套接字读取数据,或者 sleep()),那么整个进程都会被阻塞,直到该系统调用完成。这会丧失并发性。
要解决这个问题,通常有两种方法:
- 异步 I/O: 使用非阻塞 I/O (如
epoll在 Linux,IOCP在 Windows)。当 Fiber 遇到一个可能阻塞的 I/O 操作时,它会发起非阻塞调用,然后yield,让调度器去运行其他 Fiber。当 I/O 操作完成时,系统会通知调度器,调度器再将等待该 I/O 的 Fiber 重新标记为READY。 - 线程池 + 阻塞调用: 启动一个 OS 线程池,将阻塞式系统调用提交给线程池中的 OS 线程执行。当 OS 线程完成任务后,通过回调或事件机制通知 Fiber 调度器,唤醒相应的 Fiber。这种方法结合了 OS 线程和 Fiber 的优点。
5. 调试
调试 Fiber 比调试 OS 线程更困难。因为调试器通常只认识 OS 线程,而 Fiber 的栈切换在用户态进行,对调试器是透明的。你可能需要:
- 使用自定义的调试工具或日志。
- 在关键切换点设置断点。
- 在调试时,Fiber 的栈信息可能不会被调试器正确解析,需要手动查看
RSP和RIP。
6. 平台兼容性
汇编代码是高度平台相关的。我们的 switch_context.S 是为 x86-64 Linux System V ABI 编写的。要移植到 Windows 或其他架构(如 ARM),你需要重写汇编部分,并根据目标平台的 ABI 调整 C++ 接口。
7. 调度策略
我们实现了一个简单的轮询调度器。在实际应用中,可以根据需求实现更复杂的调度策略,例如:
- 优先级调度: 允许 Fiber 具有不同的优先级。
- 时间片轮转: 强制 Fiber 在执行一段时间后
yield,防止单个 Fiber 霸占 CPU。这需要修改yield机制,或者通过外部定时器触发。 - 事件驱动调度: Fiber 等待特定事件(如 I/O 完成、定时器到期)发生时才被唤醒。
VIII. 总结
通过手动管理汇编栈和 CPU 寄存器,我们成功地在 C++ 中构建了一个轻量级的 Fiber 调度系统。这种用户态的并发模型,以其极低的资源开销和高效的上下文切换,为实现万级并发提供了强大的基础。理解其底层机制,不仅能帮助我们构建高性能系统,更能加深对计算机系统工作原理的理解。