在 C++ 中实现一个用户态调度器,通常指的是实现“协程”(Coroutines)或“纤程”(Fibers)。这种机制允许在用户空间进行协作式多任务处理,而无需操作系统的直接干预。其核心挑战在于如何保存和恢复执行上下文,尤其是栈指针(Stack Pointer)和指令指针(Instruction Pointer),因为这些是线程或纤程状态的关键组成部分。本讲座将深入探讨如何利用汇编指令接管栈指针,从而实现一个功能完善的用户态调度器。
引言:为何需要用户态调度器?
在深入技术细节之前,我们首先理解为什么我们可能需要一个用户态调度器。
传统的操作系统进程和线程提供了强大的并发能力。然而,它们也伴随着一定的开销:
- 内核态切换开销: 线程调度由操作系统内核完成,每次上下文切换都涉及从用户态到内核态的转换,这包括保存和恢复大量的CPU寄存器、TLB刷新、缓存失效等,这些操作的成本相对较高。
- 内存开销: 每个线程通常需要独立的内核栈和较大的用户态栈(通常数MB),导致大量并发线程的内存占用巨大。
- 编程模型复杂性: 操作系统线程是抢占式的,需要复杂的同步机制(互斥锁、信号量等)来避免竞态条件,这增加了程序设计的复杂性和出错的可能性。
用户态调度器(Fibers/Coroutines)旨在解决这些问题:
- 极致轻量级: Fibers完全运行在用户空间,没有内核开销。它们通常只需要一个独立的栈空间(可以很小,几KB到几十KB),以及保存少量寄存器的上下文结构。
- 协作式调度: Fibers是协作式的,意味着一个Fiber会主动放弃CPU控制权(
yield),而不是被操作系统抢占。这极大地简化了同步问题,因为在yield之前,Fiber可以保证其内部状态的一致性。 - 高效上下文切换: 上下文切换仅涉及保存和恢复少数寄存器,且完全在用户态完成,速度远超内核线程切换。
- 灵活的调度策略: 开发者可以完全控制Fiber的调度逻辑,实现各种定制化的调度策略,例如基于事件、优先级或亲和性等。
应用场景:
- 高性能网络服务器(Nginx、Node.js的事件循环模型)。
- 游戏引擎中的游戏逻辑和AI。
- 异步I/O框架。
- 状态机实现。
- 各种高并发、低延迟的服务。
下表总结了进程、线程和纤程的主要区别:
| 特性 | 进程 (Process) | 线程 (Thread) | 纤程 (Fiber/Coroutine) |
|---|---|---|---|
| 调度方式 | 操作系统抢占式 | 操作系统抢占式 | 用户态协作式 |
| 内存开销 | 独立地址空间,大 | 共享地址空间,中等 | 共享地址空间,小 |
| 上下文切换 | 非常高(内核态) | 高(内核态) | 非常低(用户态) |
| 同步机制 | 进程间通信(IPC) | 互斥锁、信号量等 | 通常不需要(协作式) |
| 创建/销毁 | 慢 | 较慢 | 很快 |
| 隔离性 | 高 | 中 | 低 |
| 错误处理 | 进程崩溃不影响其他进程 | 线程崩溃可能影响进程 | 纤程崩溃影响整个进程 |
执行上下文的本质
要实现用户态调度器,我们首先需要理解一个“正在运行的任务”在CPU层面是如何被定义的。一个任务的执行状态,或者说其“上下文”,主要由以下几个部分组成:
-
CPU寄存器: 这是CPU内部用于存储数据和控制指令执行的少量高速存储单元。其中最关键的包括:
- 指令指针 (Instruction Pointer, RIP/EIP): 指向CPU将要执行的下一条指令的内存地址。
- 栈指针 (Stack Pointer, RSP/ESP): 指向当前线程栈的顶部。栈用于存储函数调用参数、局部变量、函数返回地址等。
- 基指针 (Base Pointer, RBP/EBP): 通常用于标记当前函数栈帧的底部。
- 通用寄存器 (General Purpose Registers, GPRs): 如 RAX, RBX, RCX, RDX, RSI, RDI, R8-R15 等,用于临时存储数据。
- 标志寄存器 (Flags Register, RFLAGS/EFLAGS): 存储CPU操作的状态,如零标志、进位标志等。
- 浮点/向量寄存器 (XMM/YMM/ZMM): 用于浮点运算和SIMD指令。
-
栈 (Stack): 每个执行流(无论是进程、线程还是纤程)都需要一个独立的栈。栈是一个后进先出(LIFO)的数据结构,它在x86/x64架构上通常从高地址向低地址增长。当调用一个函数时,返回地址、参数和局部变量会被压入栈中;当函数返回时,这些数据会被弹出。
接管栈指针的意义:
在一个多任务环境中,当一个任务暂停执行并让出CPU时,它的所有CPU寄存器(包括RIP和RSP)以及其当前栈上的数据都必须被保存下来。当该任务恢复执行时,这些寄存器和栈数据必须被精确地恢复到之前的状态。
由于RIP和RSP直接控制着程序的执行流程和内存访问,它们的保存和恢复操作必须原子地、精确地完成。C++等高级语言不直接提供对这些底层寄存器的操作接口。因此,为了实现上下文切换,我们必须求助于汇编语言。
核心机制:上下文切换
上下文切换是用户态调度器的核心。它涉及将当前运行的Fiber的执行状态保存到一个数据结构中,然后从另一个Fiber的数据结构中加载其执行状态,从而将CPU控制权转移给新的Fiber。
为了实现高效的上下文切换,我们需要一个 FiberContext 结构体来保存Fiber的寄存器状态,以及一个汇编函数来执行实际的保存和恢复操作。
1. FiberContext 结构体
这个结构体将保存切换时需要的所有寄存器。对于x64架构,通常我们需要保存以下寄存器:
- RSP (Stack Pointer): 栈顶地址,最重要的。
- RIP (Instruction Pointer): 下一条指令地址。当汇编函数通过
ret指令返回时,它会从栈顶弹出 RIP。因此,我们可以在栈上模拟ret行为来设置初始 RIP。 - RBP (Base Pointer): 基指针,用于栈帧管理。
- 通用寄存器 (GPRs): 根据不同的调用约定(Calling Convention),某些通用寄存器需要由被调用者保存(Callee-saved registers),而另一些由调用者保存(Caller-saved registers)。在上下文切换中,我们通常只关心 Callee-saved 寄存器,因为 Caller-saved 寄存器在调用
_switch_context之前可能已经被保存,或者我们假设它们在切换前后不会被破坏。 - 浮点/SIMD寄存器 (XMM/YMM/ZMM): 如果Fiber会进行浮点运算或使用SIMD指令,这些寄存器也需要保存。这会增加上下文切换的开销,但对于正确性是必需的。为了简化,我们暂时只关注GPRs,但会在高级话题中提及FP寄存器。
我们定义一个 FiberContext 结构体,它将存储这些寄存器的值。为了跨平台兼容性,我们将分别处理 Windows x64 和 Linux x64 的调用约定。
// fiber.h
#pragma once
#include <cstdint>
#include <functional>
#include <vector>
#include <memory>
#include <stdexcept>
#include <iostream>
// 定义Fiber状态
enum class FiberState {
READY,
RUNNING,
SUSPENDED,
TERMINATED
};
// Fiber上下文结构体
// 注意:这里只包含必要的通用寄存器,不包括浮点/SIMD寄存器以简化示例
// 实际生产环境中,需要根据ABI(Application Binary Interface)完整保存
struct FiberContext {
uint64_t rsp; // 栈指针
uint64_t rbp; // 基指针
// Callee-saved registers (x64)
// 根据ABI,这些寄存器在函数调用中必须由被调用者保存和恢复
// Windows x64 ABI: RBX, RBP, RDI, RSI, R12, R13, R14, R15
// System V AMD64 ABI (Linux/macOS): RBX, RBP, R12, R13, R14, R15
// 注意:RDI和RSI在System V ABI中是caller-saved,但在Windows ABI中是callee-saved。
// 为了简化和通用性,我们这里暂时都保存。
// 更严谨的做法是根据具体的ABI来决定。
uint64_t rbx;
uint64_t rsi;
uint64_t rdi;
uint64_t r12;
uint64_t r13;
uint64_t r14;
uint64_t r15;
// TODO: Floating point registers (XMM0-XMM15) would also need to be saved for a complete context.
// For simplicity, we omit them here.
};
// 汇编函数声明
// extern "C" 确保C++编译器不会对函数名进行重整 (name mangling)
// 从而使得汇编代码能够正确链接
extern "C" void _switch_context(FiberContext* old_context, FiberContext* new_context);
// Fiber类声明
class Fiber;
// Fiber的入口点包装函数
// 这是一个C++函数,由汇编代码跳转到,然后它会调用用户提供的Fiber函数
void _fiber_entry_point(Fiber* fiber);
class Fiber {
public:
using FiberFunc = std::function<void()>;
Fiber(FiberFunc func, size_t stack_size = 64 * 1024); // 默认64KB栈
~Fiber();
// 禁用拷贝构造和赋值,因为Fiber拥有栈资源
Fiber(const Fiber&) = delete;
Fiber& operator=(const Fiber&) = delete;
// 允许移动构造和赋值
Fiber(Fiber&& other) noexcept;
Fiber& operator=(Fiber&& other) noexcept;
FiberState get_state() const { return state_; }
void set_state(FiberState state) { state_ = state; }
private:
friend class Scheduler; // 允许Scheduler访问私有成员
friend void _fiber_entry_point(Fiber* fiber); // 允许入口函数访问私有成员
FiberContext context_;
FiberFunc func_;
std::unique_ptr<uint8_t[]> stack_buffer_; // 栈内存
size_t stack_size_;
FiberState state_;
// 初始化Fiber的栈,使其准备好被_switch_context恢复
void initialize_stack();
};
class Scheduler {
public:
Scheduler();
~Scheduler();
// 创建一个新的Fiber并将其加入调度器
Fiber* create_fiber(Fiber::FiberFunc func, size_t stack_size = 64 * 1024);
// 暂停当前Fiber的执行,并切换到下一个就绪的Fiber
void yield();
// 恢复指定Fiber的执行
void resume(Fiber* fiber);
// 启动调度器,运行所有Fiber直到完成
void run();
// 获取当前正在运行的Fiber
Fiber* get_current_fiber() const { return current_fiber_; }
private:
std::vector<std::unique_ptr<Fiber>> fibers_;
Fiber* current_fiber_;
FiberContext main_thread_context_; // 主线程的上下文
bool running_; // 调度器是否正在运行
};
2. 汇编指令:_switch_context
这是最核心的部分。_switch_context 函数负责保存当前正在运行的Fiber的上下文,然后加载并恢复目标Fiber的上下文。
理解x64调用约定 (Calling Convention):
在x64架构上,不同的操作系统有不同的调用约定。我们需要根据目标平台来编写汇编代码。
a) System V AMD64 ABI (Linux/macOS)
- 参数传递: 前6个整数或指针参数依次通过
RDI,RSI,RDX,RCX,R8,R9寄存器传递。 - 返回地址: 由
call指令自动压入栈中。 - 返回结果: 整数或指针通过
RAX传递。 - Callee-saved 寄存器:
RBX,RBP,R12,R13,R14,R15。这些寄存器在函数内部被修改时,必须由被调用者保存(压栈)和恢复(出栈)。 - Caller-saved 寄存器:
RAX,RCX,RDX,RSI,RDI,R8,R9,R10,R11。这些寄存器在函数调用前可以由调用者保存,或假设它们在函数返回后可能被修改。 - 栈对齐: 栈指针
RSP必须在call指令执行前是16字节对齐的(RSP % 16 == 8)。在进入函数后,RSP会因为call压入返回地址而变成RSP % 16 == 0。
b) Microsoft x64 Calling Convention (Windows)
- 参数传递: 前4个整数或指针参数依次通过
RCX,RDX,R8,R9寄存器传递。 - 返回地址: 由
call指令自动压入栈中。 - 返回结果: 整数或指针通过
RAX传递。 - Callee-saved 寄存器:
RBX,RBP,RDI,RSI,RSP,R12,R13,R14,R15。 - Caller-saved 寄存器:
RAX,RCX,RDX,R8,R9,R10,R11。 - 栈对齐: 栈指针
RSP必须在call指令执行前是16字节对齐的。 - Shadow Space: 调用者必须为被调用者在栈上预留32字节的“影空间”,即使被调用者不使用这些空间。
_switch_context 汇编实现 (Linux/macOS – System V AMD64 ABI)
我们将使用AT&T语法编写汇编,并通过GCC的内联汇编或单独的.S文件来编译。这里为了清晰,我们假设一个独立的汇编文件 switch_context_asm_linux.s。
// switch_context_asm_linux.s
// extern "C" void _switch_context(FiberContext* old_context, FiberContext* new_context);
// old_context 传入 rdi
// new_context 传入 rsi
.global _switch_context
.type _switch_context, @function
_switch_context:
// --- 保存当前上下文 (old_context) ---
// 保存 Callee-saved registers 到栈上
pushq %rbx
pushq %rbp
pushq %r12
pushq %r13
pushq %r14
pushq %r15
// 注意:RDI和RSI在System V ABI中是caller-saved,
// 但在我们的FiberContext中包含了它们,为了简单和通用性,也在这里保存。
// 如果严格按照ABI,这里可以不保存,但需要确保FiberContext结构体定义匹配。
pushq %rdi // 保存old_context参数 (rdi)
pushq %rsi // 保存new_context参数 (rsi)
// 将当前栈指针 (RSP) 存入 old_context->rsp
movq %rsp, (%rdi) // old_context->rsp = RSP
// 将当前基指针 (RBP) 存入 old_context->rbp
movq %rbp, 8(%rdi) // old_context->rbp = RBP
// 保存其他 Callee-saved 寄存器到 old_context
movq %rbx, 16(%rdi) // old_context->rbx = RBX
movq %rsi, 24(%rdi) // old_context->rsi = RSI (这里是参数rsi的值,不是实际的rsi寄存器值)
movq %rdi, 32(%rdi) // old_context->rdi = RDI (这里是参数rdi的值,不是实际的rdi寄存器值)
movq %r12, 40(%rdi) // old_context->r12 = R12
movq %r13, 48(%rdi) // old_context->r13 = R13
movq %r14, 56(%rdi) // old_context->r14 = R14
movq %r15, 64(%rdi) // old_context->r15 = R15
// --- 恢复新上下文 (new_context) ---
// 从 new_context->rsp 加载新的栈指针
movq (%rsi), %rsp // RSP = new_context->rsp
// 从 new_context->rbp 加载新的基指针
movq 8(%rsi), %rbp // RBP = new_context->rbp
// 恢复其他 Callee-saved 寄存器
movq 16(%rsi), %rbx // RBX = new_context->rbx
movq 24(%rsi), %rsi_tmp // 临时保存,防止覆盖new_context参数
movq 32(%rsi), %rdi_tmp // 临时保存,防止覆盖new_context参数
movq 40(%rsi), %r12 // R12 = new_context->r12
movq 48(%rsi), %r13 // R13 = new_context->r13
movq 56(%rsi), %r14 // R14 = new_context->r14
movq 64(%rsi), %r15 // R15 = new_context->r15
// 将临时保存的RSI和RDI值恢复到对应寄存器
movq %rsi_tmp, %rsi
movq %rdi_tmp, %rdi
// 从栈上恢复 Callee-saved registers (注意顺序与保存时相反)
popq %rsi // 恢复new_context参数 (rsi)
popq %rdi // 恢复old_context参数 (rdi)
popq %r15
popq %r14
popq %r13
popq %r12
popq %rbp
popq %rbx
// 返回。这里的返回地址实际上是新Fiber被切换走时保存的返回地址,
// 它将跳转到新Fiber之前暂停的地方。
// 对于一个新创建的Fiber,其栈上会预先放置_fiber_entry_point的地址作为返回地址。
retq
_switch_context 汇编实现 (Windows x64 – Microsoft x64 Calling Convention)
假设独立的汇编文件 switch_context_asm_windows.asm。使用MASM语法。
; switch_context_asm_windows.asm
; extern "C" void _switch_context(FiberContext* old_context, FiberContext* new_context);
; old_context 传入 rcx
; new_context 传入 rdx
.CODE
_switch_context PROC
; --- 保存当前上下文 (old_context) ---
; RCX = old_context*, RDX = new_context*
; 保存 Callee-saved registers 到栈上
push rbx
push rbp
push rdi
push rsi
push r12
push r13
push r14
push r15
; 将当前栈指针 (RSP) 存入 old_context->rsp
mov [rcx].rsp, rsp
; 将当前基指针 (RBP) 存入 old_context->rbp
mov [rcx].rbp, rbp
; 保存其他 Callee-saved 寄存器到 old_context
mov [rcx].rbx, rbx
mov [rcx].rsi, rsi
mov [rcx].rdi, rdi
mov [rcx].r12, r12
mov [rcx].r13, r13
mov [rcx].r14, r14
mov [rcx].r15, r15
; --- 恢复新上下文 (new_context) ---
; 从 new_context->rsp 加载新的栈指针
mov rsp, [rdx].rsp
; 从 new_context->rbp 加载新的基指针
mov rbp, [rdx].rbp
; 恢复其他 Callee-saved 寄存器
mov rbx, [rdx].rbx
mov rsi, [rdx].rsi
mov rdi, [rdx].rdi
mov r12, [rdx].r12
mov r13, [rdx].r13
mov r14, [rdx].r14
mov r15, [rdx].r15
; 从栈上恢复 Callee-saved registers (注意顺序与保存时相反)
pop r15
pop r14
pop r13
pop r12
pop rsi
pop rdi
pop rbp
pop rbx
; 返回。这里的返回地址是新Fiber被切换走时保存的返回地址,
; 它将跳转到新Fiber之前暂停的地方。
; 对于一个新创建的Fiber,其栈上会预先放置_fiber_entry_point的地址作为返回地址。
ret
_switch_context ENDP
END
汇编代码解释:
push/pop寄存器: 这些指令用于将 Callee-saved 寄存器(在函数调用约定中规定必须由被调用者保存的寄存器)保存到当前栈上,并在函数返回前恢复。这是为了确保_switch_context函数本身不会破坏调用者的这些寄存器状态。movq %rsp, (%rdi)(Linux) /mov [rcx].rsp, rsp(Windows): 将当前栈指针RSP的值保存到old_context结构体的rsp字段中。old_context参数在 Linux 中通过RDI传递,在 Windows 中通过RCX传递。movq (%rsi), %rsp(Linux) /mov rsp, [rdx].rsp(Windows): 从new_context结构体的rsp字段中加载新的栈指针值到RSP寄存器。new_context参数在 Linux 中通过RSI传递,在 Windows 中通过RDX传递。这一步是接管栈指针的关键!movq %rbp, 8(%rdi)等: 保存和恢复其他 Callee-saved 寄存器。retq(Linux) /ret(Windows): 这条指令会从当前栈的顶部弹出RIP(指令指针)的值,然后跳转到该地址。对于一个被暂停的Fiber,其栈顶会是它上次_switch_context调用的返回地址。对于一个新创建的Fiber,其栈顶会是我们预先设置的_fiber_entry_point函数的地址。
栈管理与Fiber初始化
除了上下文切换,Fiber还需要独立的栈空间。
1. 栈分配
Fiber的栈可以动态分配。在x64系统上,栈通常向下增长(从高地址向低地址)。
- Linux/macOS: 使用
mmap系统调用。#include <sys/mmap.h> // ... void* stack_base = mmap(nullptr, stack_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYM, -1, 0); if (stack_base == MAP_FAILED) { // 错误处理 } // 记得在析构时使用 munmap(stack_base, stack_size) 释放 - Windows: 使用
VirtualAllocAPI。#include <windows.h> // ... void* stack_base = VirtualAlloc(nullptr, stack_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); if (stack_base == nullptr) { // 错误处理 } // 记得在析构时使用 VirtualFree(stack_base, 0, MEM_RELEASE) 释放
2. 栈的初始化
一个新创建的Fiber,在第一次被 _switch_context 恢复时,其栈必须被精心设置,以便 ret 指令能正确地跳转到Fiber的起始函数。
初始化的步骤如下:
- 确定栈顶: 由于栈向下增长,分配的内存区域的最高地址将是其逻辑上的“底部”,而我们要设置的
RSP将是其“顶部”(即最低有效地址)。uint8_t* stack_top = stack_base + stack_size;
- 栈对齐: x64架构要求
RSP始终是16字节对齐的(在call指令之前)。因此,我们需要将stack_top向下调整到16字节对齐的地址。stack_top = reinterpret_cast<uint8_t*>((reinterpret_cast<uintptr_t>(stack_top) & ~0xF));
- 放置返回地址 (
RIP): 在Fiber的栈顶放置_fiber_entry_point函数的地址。当_switch_context中的ret指令执行时,它会弹出这个地址并跳转到_fiber_entry_point。*(--stack_top) = reinterpret_cast<uintptr_t>(&_fiber_entry_point);
- 放置Fiber指针作为参数:
_fiber_entry_point函数需要知道它属于哪个Fiber对象。我们将Fiber*指针作为参数传递。在x64 System V ABI中,第一个参数通过RDI传递;在x64 Windows ABI中,第一个参数通过RCX传递。虽然我们的汇编_switch_context不直接设置参数寄存器,但_fiber_entry_point是一个普通的C++函数,当它被ret调用时,它会期望其参数在栈上或寄存器中。最简单的方法是将Fiber*放在_fiber_entry_point期望的栈位置或寄存器位置。- 为了简单起见,我们可以在栈上预留空间并放置
Fiber*指针,这样_fiber_entry_point就可以通过栈帧访问它。或者,我们可以在_switch_context首次切换到新Fiber时,直接将Fiber*载入RDI/RCX。然而,_switch_context的设计是通用的,它只处理上下文结构体。 - 一个更常见且简单的方法是,将
Fiber*作为_fiber_entry_point的“伪参数”压入栈中,_fiber_entry_point会在运行时获取它。或者,我们将Fiber*作为FiberContext结构体的一部分,并在_fiber_entry_point中通过全局/静态变量或current_fiber指针获取。 - 更优雅的方案: 将
Fiber*压入栈作为_fiber_entry_point的唯一参数,并让_fiber_entry_point从栈上取出。*(--stack_top) = reinterpret_cast<uintptr_t>(this); // this 指向当前Fiber对象- 注意:这里需要确保
_fiber_entry_point函数的签名与栈上参数的布局匹配。如果_fiber_entry_point接收一个Fiber*参数,且在第一次ret到它时,这个Fiber*位于栈顶。 - 然而,x64的调用约定通常是参数通过寄存器传递。所以,我们不能简单地压入栈就指望它被
_fiber_entry_point识别为参数。 - 最稳妥的方案:
_fiber_entry_point不带参数,它通过Scheduler::get_current_fiber()获取当前正在执行的Fiber。这需要Scheduler在切换前设置current_fiber_。
- 为了简单起见,我们可以在栈上预留空间并放置
- 设置初始
RSP和RBP: 将最终的stack_top设置为FiberContext中的rsp。将RBP设置为与RSP相同的值(或稍高,以创建空的栈帧)。
// 在 Fiber::initialize_stack() 中
void Fiber::initialize_stack() {
// 栈从高地址向低地址增长
// stack_buffer_指向分配内存的起始地址(最低地址)
// 栈的顶部应该在内存的最高地址附近
uint8_t* stack_ptr = stack_buffer_.get() + stack_size_;
// 确保栈指针16字节对齐 (x64 ABI要求)
// RSP在call指令前必须是16字节对齐。call指令会push返回地址(8字节),
// 使得RSP变成16字节对齐-8。
// 所以我们初始设置的RSP应该指向一个地址,使得在ret之前,
// 它的上一个地址(也就是返回地址)的RSP是16字节对齐的。
// 通常做法是,将RSP对齐到16的倍数,然后减去8,以模拟call后的状态。
// 但是,最简单的做法是直接对齐到16,然后将_fiber_entry_point地址放在其上,
// _switch_context会将其作为返回地址pop出来。
// 所以,我们确保栈顶指针是16字节对齐的。
stack_ptr = reinterpret_cast<uint8_t*>((reinterpret_cast<uintptr_t>(stack_ptr) & ~0xF));
// 为返回地址预留空间,并设置初始RIP为_fiber_entry_point
// 当_switch_context中的retq指令执行时,它会从这个位置弹出地址并跳转
// 注意:这里我们将_fiber_entry_point的地址直接压入栈中,
// 当_switch_context被调用时,它会保存当前的RSP,然后加载新的RSP,
// 最后执行retq。这个retq会弹出这里压入的地址,从而跳转到_fiber_entry_point。
//
// 为确保栈对齐,我们可能需要额外填充。
// 如果_fiber_entry_point是唯一一个被压入栈的元素,
// 并且我们希望它的参数在寄存器中,那么初始的RSP应该指向这个地址。
//
// 实际操作时,为了确保ret指令后的RSP是16字节对齐的,
// 我们需要确保在压入_fiber_entry_point地址后,RSP是16字节对齐的。
// 由于我们压入了一个8字节的地址,如果stack_ptr已经是16字节对齐,
// 那么压入后会变成16字节对齐-8。
// 因此,我们应该在压入前,将stack_ptr调整到 (16的倍数 + 8) 的位置。
//
// 示例:
// 原始 stack_ptr = 0x...000 (16字节对齐)
// 调整:stack_ptr = 0x...008
// 压入地址:*stack_ptr = _fiber_entry_point_addr; stack_ptr -= 8; // RSP现在是0x...000 (16字节对齐)
//
// 但更简单的方法是,直接对齐到16字节,然后压入。
// 汇编的ret指令会弹出这个值并跳转。
// 栈帧的建立由_fiber_entry_point本身负责。
//
// 我们将_fiber_entry_point的地址压入栈,作为初始的返回地址
// 这样当_switch_context第一次切换到这个Fiber时,执行ret指令就会跳到_fiber_entry_point
stack_ptr -= sizeof(uintptr_t);
*reinterpret_cast<uintptr_t*>(stack_ptr) = reinterpret_cast<uintptr_t>(&_fiber_entry_point);
// 设置FiberContext的初始RSP和RBP
// RSP指向_fiber_entry_point地址的下方(栈顶)
// RBP通常设置为RSP或更低,这里我们简单设为RSP
context_.rsp = reinterpret_cast<uint64_t>(stack_ptr);
context_.rbp = context_.rsp; // 初始时RBP可以和RSP相同,或指向栈帧底部
}
C++集成与Fiber生命周期
现在我们将所有部分整合到C++类中。
1. Fiber 类实现
// fiber.cpp (部分实现)
#include "fiber.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <sys/mman.h>
#include <unistd.h> // For sysconf
#endif
// Fiber的入口点包装函数实现
// 这个函数会在新Fiber第一次运行时被调用
void _fiber_entry_point(Fiber* fiber_ptr_arg) {
// 理论上,_fiber_entry_point的参数应该通过寄存器传递
// 但由于我们是在栈上模拟返回地址,所以这里接收一个假参数。
// 实际的Fiber*应该从Scheduler获取。
// 但是,为了简化和避免过度依赖Scheduler的内部状态,
// 我们也可以在_fiber_entry_point被调用时,通过某种方式获取当前Fiber。
// 最简单的方法是在initialize_stack时,将this指针压入栈,
// 然后_fiber_entry_point从栈上pop出来。
// 但这与x64 ABI的参数传递不符。
//
// 更合理的做法是:_fiber_entry_point不带参数,它通过Scheduler::get_current_fiber()获取当前Fiber。
// 因此,我们需要确保在调用_switch_context切换到新Fiber之前,Scheduler已经正确设置了current_fiber_。
// 假设Scheduler已经设置了当前的Fiber
// Fiber* current_fiber = Scheduler::get_current_fiber(); // 需要Scheduler提供静态方法或通过TLS
// 或者,将Fiber*作为_fiber_entry_point的"隐藏"参数,通过FiberContext来传递。
// 最简单的是,让Scheduler直接在resume之前更新current_fiber。
// 假设我们能正确获取到当前正在运行的Fiber实例
// 这里我们直接使用传递进来的fiber_ptr_arg,虽然其来源可能不完全符合ABI
// 但在我们的协作式调度器中,通常可以通过调度器来管理当前Fiber。
// 为了让这个例子能跑起来,我们临时使用一个传递进来的指针,
// 但请注意,在真实的ABI下,参数传递是严格的。
Fiber* current_fiber = fiber_ptr_arg; // 假设这个参数是有效的
try {
if (current_fiber && current_fiber->func_) {
current_fiber->set_state(FiberState::RUNNING);
current_fiber->func_(); // 调用用户定义的Fiber函数
}
} catch (const std::exception& e) {
std::cerr << "Fiber " << current_fiber << " exited with exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Fiber " << current_fiber << " exited with unknown exception." << std::endl;
}
// Fiber函数执行完毕,设置为TERMINATED状态
if (current_fiber) {
current_fiber->set_state(FiberState::TERMINATED);
}
std::cout << "Fiber " << current_fiber << " terminated. Yielding back to scheduler." << std::endl;
// Fiber执行完毕后,需要将控制权交还给调度器
// 调度器会负责清理或选择下一个Fiber
// 如果没有这一步,程序会尝试从栈上弹出下一个返回地址,可能导致崩溃
// 这里调用一个特殊函数,将控制权交还给调度器
// 通常是调度器的一个成员函数,例如Scheduler::fiber_finished()
// 简化起见,我们假设Scheduler能够处理TERMINATED状态的Fiber,并切换到下一个。
// 临时方案:直接yield,让调度器处理。
// 实际中,调度器需要一个机制来知道某个fiber已经完成。
// 这里我们无法直接调用Scheduler::yield(),因为_fiber_entry_point是全局函数。
// 理想情况下,我们应该有一个 Scheduler::current_scheduler() 静态方法,或者
// 在Fiber对象中保存Scheduler的指针。
// 为了演示,我们假设Scheduler能感知到Fiber的终止。
// 暂时不做任何特殊处理,依赖Scheduler在run()中检查Fiber状态。
}
Fiber::Fiber(FiberFunc func, size_t stack_size)
: func_(std::move(func)), stack_size_(stack_size), state_(FiberState::READY) {
if (stack_size_ < 4096) { // 最小栈大小
stack_size_ = 4096;
}
// 确保栈大小是页面大小的倍数,方便设置保护页
#ifdef _WIN32
SYSTEM_INFO si;
GetSystemInfo(&si);
size_t page_size = si.dwPageSize;
#else
size_t page_size = sysconf(_SC_PAGESIZE);
#endif
stack_size_ = (stack_size_ + page_size - 1) / page_size * page_size;
// 分配栈内存
#ifdef _WIN32
stack_buffer_.reset(reinterpret_cast<uint8_t*>(VirtualAlloc(nullptr, stack_size_, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE)));
if (!stack_buffer_) {
throw std::runtime_error("Failed to allocate fiber stack (VirtualAlloc)");
}
#else
stack_buffer_.reset(reinterpret_cast<uint8_t*>(mmap(nullptr, stack_size_, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)));
if (stack_buffer_ == MAP_FAILED) {
throw std::runtime_error("Failed to allocate fiber stack (mmap)");
}
#endif
// 初始化栈上下文
// 注意:我们将Fiber*作为参数压入栈中,这样_fiber_entry_point就能接收到它
// 这是一种非标准的参数传递方式,但在用户态调度器中常见
// 更好的做法是_fiber_entry_point通过一个全局的current_scheduler指针来获取current_fiber
uint8_t* stack_top = stack_buffer_.get() + stack_size_;
// 1. 确保栈指针16字节对齐
stack_top = reinterpret_cast<uint8_t*>((reinterpret_cast<uintptr_t>(stack_top) & ~0xF));
// 2. 压入Fiber*指针作为_fiber_entry_point的参数
// 虽然x64 ABI通常通过寄存器传递前几个参数,
// 但对于这种特殊的“首次启动”机制,我们可以模拟栈上的参数。
// _fiber_entry_point将被_switch_context的ret指令调用,
// 其行为类似C函数调用,但参数需要手动放置。
// 为了让_fiber_entry_point能接收到Fiber*,我们把它放在返回地址后面。
// Windows x64 ABI 在call指令前需要预留32字节的shadow space。
// Linux x64 ABI 不需要。
// 为了通用性,我们直接压入,并让_fiber_entry_point知道如何从栈上取。
// 更干净的做法是:_fiber_entry_point不带参数,通过Scheduler维护current_fiber。
// 这里我们采取一种简单但可能不完全符合ABI的栈布局来传递Fiber*。
// 这意味着_fiber_entry_point需要知道如何从栈上获取其参数。
// 例如,如果_fiber_entry_point定义为 `void _fiber_entry_point(Fiber* fiber_ptr)`
// 那么 `fiber_ptr` 就会在返回地址的后面。
// 然而,这与x64 ABI不符,x64参数通常在寄存器。
//
// 最简单的,且与ABI兼容的方式是:
// _fiber_entry_point(void) 不带参数。
// Scheduler在调用resume时,将Scheduler::current_fiber_设置为目标Fiber。
// _fiber_entry_point可以通过 Scheduler::get_current_fiber() 访问。
// 我们这里暂时采取这种方式,避免复杂的栈参数模拟。
// 3. 压入_fiber_entry_point的地址作为返回地址
stack_top -= sizeof(uintptr_t);
*reinterpret_cast<uintptr_t*>(stack_top) = reinterpret_cast<uintptr_t>(&_fiber_entry_point);
// 4. 设置FiberContext的初始RSP和RBP
context_.rsp = reinterpret_cast<uint64_t>(stack_top);
context_.rbp = context_.rsp; // 初始时RBP可以和RSP相同
}
Fiber::~Fiber() {
if (stack_buffer_) {
#ifdef _WIN32
VirtualFree(stack_buffer_.get(), 0, MEM_RELEASE);
#else
munmap(stack_buffer_.get(), stack_size_);
#endif
}
}
Fiber::Fiber(Fiber&& other) noexcept
: context_(other.context_),
func_(std::move(other.func_)),
stack_buffer_(std::move(other.stack_buffer_)),
stack_size_(other.stack_size_),
state_(other.state_) {
other.stack_size_ = 0;
other.state_ = FiberState::TERMINATED; // 移动后原对象无效
}
Fiber& Fiber::operator=(Fiber&& other) noexcept {
if (this != &other) {
// 先释放自己的资源
if (stack_buffer_) {
#ifdef _WIN32
VirtualFree(stack_buffer_.get(), 0, MEM_RELEASE);
#else
munmap(stack_buffer_.get(), stack_size_);
#endif
}
context_ = other.context_;
func_ = std::move(other.func_);
stack_buffer_ = std::move(other.stack_buffer_);
stack_size_ = other.stack_size_;
state_ = other.state_;
other.stack_size_ = 0;
other.state_ = FiberState::TERMINATED;
}
return *this;
}
2. Scheduler 类实现
调度器负责管理所有的Fiber,并提供 yield 和 resume 方法。
// scheduler.cpp
#include "fiber.h"
#include <algorithm> // For std::remove_if
// 静态成员或线程局部存储,用于获取当前调度器
// 真实的调度器可能需要TLS (Thread Local Storage) 来存储当前线程的调度器实例
// 为了简化,我们假设只有一个Scheduler实例,或者通过参数传递
// 但为了_fiber_entry_point能获取到Fiber*,Scheduler::current_fiber_必须是静态或TLS
// 这里我们把它作为Scheduler的成员,并在_fiber_entry_point中通过一个hack来访问。
// 更好的方式是:在_fiber_entry_point函数中,通过一个全局或TLS的Scheduler指针来访问。
// 全局静态变量,用于_fiber_entry_point获取当前调度器实例
// 注意:这不适用于多线程环境,仅用于单线程演示
static Scheduler* s_current_scheduler = nullptr;
// 修改_fiber_entry_point以通过s_current_scheduler获取当前Fiber
void _fiber_entry_point(Fiber* fiber_ptr_dummy) { // 仍然保留参数,但实际不使用
if (!s_current_scheduler) {
std::cerr << "Error: No active scheduler found in _fiber_entry_point." << std::endl;
return;
}
Fiber* current_fiber = s_current_scheduler->get_current_fiber();
if (!current_fiber) {
std::cerr << "Error: No current fiber set in scheduler." << std::endl;
return;
}
try {
current_fiber->set_state(FiberState::RUNNING);
current_fiber->func_(); // 调用用户定义的Fiber函数
} catch (const std::exception& e) {
std::cerr << "Fiber " << current_fiber << " exited with exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Fiber " << current_fiber << " exited with unknown exception." << std::endl;
}
if (current_fiber) {
current_fiber->set_state(FiberState::TERMINATED);
}
std::cout << "Fiber " << current_fiber << " terminated. Yielding back to scheduler." << std::endl;
// Fiber执行完毕,将控制权交还给调度器
s_current_scheduler->yield(); // 这会切换回调度器的主循环或下一个Fiber
}
Scheduler::Scheduler() : current_fiber_(nullptr), running_(false) {
// 保存主线程的上下文,作为调度器自己的上下文
// 在第一次_switch_context调用时,main_thread_context_会被填充
// 之后每次从Fiber切换回主线程或Scheduler,都会恢复这个上下文
s_current_scheduler = this; // 设置全局调度器指针
}
Scheduler::~Scheduler() {
// 释放所有Fiber资源,unique_ptr会自动处理
s_current_scheduler = nullptr; // 清除全局调度器指针
}
Fiber* Scheduler::create_fiber(Fiber::FiberFunc func, size_t stack_size) {
fibers_.push_back(std::make_unique<Fiber>(std::move(func), stack_size));
return fibers_.back().get();
}
void Scheduler::yield() {
if (!current_fiber_) {
// 如果当前没有运行的Fiber(例如在_fiber_entry_point第一次启动时)
// 或者调度器尚未开始运行,则不执行切换。
return;
}
Fiber* old_fiber = current_fiber_;
// 寻找下一个就绪的Fiber
Fiber* next_fiber = nullptr;
if (!fibers_.empty()) {
auto it = std::find(fibers_.begin(), fibers_.end(), std::unique_ptr<Fiber>(old_fiber));
if (it != fibers_.end()) {
size_t current_idx = std::distance(fibers_.begin(), it);
for (size_t i = 1; i <= fibers_.size(); ++i) {
size_t next_idx = (current_idx + i) % fibers_.size();
if (fibers_[next_idx]->get_state() == FiberState::READY ||
fibers_[next_idx]->get_state() == FiberState::SUSPENDED) {
next_fiber = fibers_[next_idx].get();
break;
}
}
}
}
if (next_fiber) {
std::cout << "Yielding from Fiber " << old_fiber << " to Fiber " << next_fiber << std::endl;
current_fiber_ = next_fiber;
if (old_fiber->get_state() == FiberState::RUNNING) {
old_fiber->set_state(FiberState::READY); // 切换前将当前Fiber设置为就绪
}
next_fiber->set_state(FiberState::RUNNING); // 设置新Fiber为运行
_switch_context(&old_fiber->context_, &next_fiber->context_);
} else {
// 没有其他就绪的Fiber,切换回主线程 (调度器自身)
std::cout << "No other ready fibers. Yielding from Fiber " << old_fiber << " back to scheduler." << std::endl;
if (old_fiber->get_state() == FiberState::RUNNING) {
old_fiber->set_state(FiberState::READY);
}
current_fiber_ = nullptr; // 调度器自身没有Fiber
_switch_context(&old_fiber->context_, &main_thread_context_);
}
}
void Scheduler::resume(Fiber* fiber) {
if (!fiber || fiber->get_state() == FiberState::TERMINATED) {
std::cerr << "Cannot resume a null or terminated fiber." << std::endl;
return;
}
if (fiber->get_state() == FiberState::RUNNING) {
std::cerr << "Fiber is already running." << std::endl;
return;
}
Fiber* old_fiber = current_fiber_;
current_fiber_ = fiber; // 设置将要运行的Fiber
std::cout << "Resuming Fiber " << fiber << std::endl;
fiber->set_state(FiberState::RUNNING);
if (old_fiber) {
// 从一个Fiber切换到另一个Fiber
old_fiber->set_state(FiberState::READY); // 旧Fiber暂停
_switch_context(&old_fiber->context_, &fiber->context_);
} else {
// 从主线程/调度器切换到第一个Fiber
// 此时main_thread_context_是主线程的上下文
_switch_context(&main_thread_context_, &fiber->context_);
}
}
void Scheduler::run() {
if (running_) {
std::cerr << "Scheduler is already running." << std::endl;
return;
}
running_ = true;
std::cout << "Scheduler started." << std::endl;
while (running_) {
// 查找下一个要运行的Fiber
Fiber* next_fiber = nullptr;
for (const auto& f_ptr : fibers_) {
if (f_ptr->get_state() == FiberState::READY || f_ptr->get_state() == FiberState::SUSPENDED) {
next_fiber = f_ptr.get();
break;
}
}
if (next_fiber) {
resume(next_fiber); // 恢复该Fiber
} else {
// 没有可运行的Fiber了,检查是否有已终止的Fiber需要移除
fibers_.erase(std::remove_if(fibers_.begin(), fibers_.end(),
[](const std::unique_ptr<Fiber>& f) {
return f->get_state() == FiberState::TERMINATED;
}),
fibers_.end());
if (fibers_.empty()) {
std::cout << "All fibers terminated. Scheduler stopping." << std::endl;
running_ = false;
} else {
// 如果还有非READY/SUSPENDED状态的Fiber(如被阻塞),等待
std::cout << "No ready fibers. Waiting for events or busy-looping (in real app, would block)." << std::endl;
// 在真实的异步I/O调度器中,这里会进入事件循环(epoll_wait, GetQueuedCompletionStatus等)
// 暂时简单地休眠一下,避免空转
#ifdef _WIN32
Sleep(10);
#else
usleep(10000); // 10ms
#endif
}
}
}
std::cout << "Scheduler stopped." << std::endl;
}
3. 示例代码
// main.cpp
#include "fiber.h"
#include <iostream>
#include <chrono>
void fiber_func_A() {
std::cout << "Fiber A: Started." << std::endl;
Scheduler* s = s_current_scheduler; // 获取当前调度器
for (int i = 0; i < 3; ++i) {
std::cout << "Fiber A: Iteration " << i << std::endl;
s->yield(); // 放弃CPU
}
std::cout << "Fiber A: Finished." << std::endl;
}
void fiber_func_B() {
std::cout << "Fiber B: Started." << std::endl;
Scheduler* s = s_current_scheduler;
for (int i = 0; i < 5; ++i) {
std::cout << "Fiber B: Iteration " << i << std::endl;
s->yield(); // 放弃CPU
}
std::cout << "Fiber B: Finished." << std::endl;
}
int main() {
std::cout << "Main: Initializing scheduler." << std::endl;
Scheduler scheduler;
Fiber* fiberA = scheduler.create_fiber(fiber_func_A);
Fiber* fiberB = scheduler.create_fiber(fiber_func_B);
std::cout << "Main: Starting scheduler run loop." << std::endl;
scheduler.run();
std::cout << "Main: Scheduler finished. Exiting." << std::endl;
return 0;
}
编译指令:
Linux/macOS (GCC/Clang):
g++ -std=c++17 -Wall -c fiber.cpp -o fiber.o
g++ -std=c++17 -Wall -c scheduler.cpp -o scheduler.o
as switch_context_asm_linux.s -o switch_context_asm_linux.o
g++ fiber.o scheduler.o switch_context_asm_linux.o main.cpp -o my_fiber_app
Windows (MSVC):
cl /c /std:c++17 /EHsc fiber.cpp
cl /c /std:c++17 /EHsc scheduler.cpp
ml64 /c switch_context_asm_windows.asm
cl /std:c++17 /EHsc fiber.obj scheduler.obj switch_context_asm_windows.obj main.cpp /Fe:my_fiber_app.exe
高级考虑
1. 浮点/SIMD寄存器 (XMM/YMM/ZMM)
我们的 FiberContext 结构体目前只保存了通用寄存器。如果Fiber会执行浮点运算或使用SIMD指令集(如AVX),那么 XMM0-XMM15 (128位)、YMM0-YMM15 (256位) 甚至 ZMM0-ZMM31 (512位) 寄存器也需要保存和恢复。这会显著增加 FiberContext 的大小和上下文切换的开销。
- 需要使用
FXSAVE/FXRSTOR(保存/恢复XMM/MMX寄存器) 或XSAVE/XSTOR(更现代,保存/恢复所有扩展寄存器状态,包括YMM/ZMM) 指令。 XSAVE/XSTOR需要更复杂的设置,因为它涉及一个控制寄存器XCR0来指定要保存哪些扩展状态。
2. 栈溢出检测 (Stack Guard Pages)
Fiber的栈通常比操作系统线程的栈小。为了防止栈溢出,可以在栈的末尾(通常是分配内存的最低地址)设置一个不可读/不可写的“保护页”(Guard Page)。
- Linux/macOS: 使用
mprotect(stack_base, page_size, PROT_NONE)将栈底部的一页设置为不可访问。当Fiber尝试写入或读取这个保护页时,会触发一个分段错误(Segmentation Fault),从而及时发现栈溢出。 - Windows:
VirtualAlloc允许分配MEM_COMMIT和MEM_RESERVE,可以将最后一页设置为PAGE_GUARD属性。
3. 线程局部存储 (TLS)
如果Fiber在C++代码中使用了 thread_local 变量,那么在Fiber切换时,这些变量的状态不会自动随Fiber切换。thread_local 变量是与操作系统线程关联的,而不是与用户态Fiber关联的。
- 要使
thread_local变量在Fiber之间独立,需要实现一个Fiber-local storage (FLS) 机制,这比TLS更复杂,通常需要手动管理。 - 通常的做法是避免在Fiber中使用
thread_local,或者明确知道其行为并手动管理状态。
4. 调试挑战
用户态调度器使得调试变得更加困难:
- 栈回溯: 调试器通常依赖于标准的栈帧结构来回溯函数调用。由于Fiber的栈是被手动管理的,并且上下文切换不涉及标准的函数调用,调试器可能无法正确显示Fiber的调用栈。
- 断点: 在一个Fiber中设置的断点,当切换到另一个Fiber时,可能无法按预期工作。
- 工具: 大多数性能分析工具和调试器都是为操作系统线程设计的,对Fiber的支持有限。
5. 对称与非对称协程
- 非对称协程 (Asymmetric Coroutines): 就像我们实现的
yield和resume,有一个明确的“调用者”(调度器)和一个“被调用者”(Fiber)。Fiber只能yield回到它的调度器,然后调度器再决定resume哪个Fiber。这是我们当前实现的模型。 - 对称协程 (Symmetric Coroutines): 任何一个协程都可以直接
yield或resume任何其他协程,它们之间没有明确的父子关系。这通常需要更复杂的上下文切换机制,允许直接指定目标协程。
总结
本文深入探讨了在 C++ 中实现用户态调度器(Fiber)的核心机制,重点在于如何利用汇编指令接管栈指针。我们通过 FiberContext 结构体保存执行上下文,并编写了平台(Linux/Windows)相关的 _switch_context 汇编函数来精确地保存和恢复 CPU 寄存器,尤其是栈指针 RSP 和指令指针 RIP。同时,详细讨论了 Fiber 栈的分配、初始化以及 C++ 类的集成,构建了一个功能演示性的用户态调度器。
理解并实现用户态调度器需要深厚的底层知识,包括 CPU 架构、内存管理和操作系统 ABI。尽管存在调试困难和与现有工具集成的问题,但 Fibers 在高并发、低延迟和定制化调度场景下展现出无与伦比的优势。掌握这一技术,能够为构建高性能的现代 C++ 应用提供强大的工具。