C++ 协程与推理流水线:告别 std::thread,拥抱异步之美
各位同学,大家好!
欢迎来到今天的“C++ 极客大会:如何用协程把 LLM 推理跑出光速”专场。
坐在我旁边的这位老兄(假设他是一块显卡),他脾气很暴躁。如果你让他做一件事,他就要花 10 毫秒。如果你让他做十件事,他就得花 100 毫秒。如果你让他同时做十件事,他……他只会生气,因为他的算力是有限的。
而在我们写代码的时候,往往需要同时做三件事:把你的文本变成数字(预处理),让显卡算出下一个字(推理),再把数字变回文本(后处理)。如果用老派的写法,就像是用一只手同时剥十个橘子,汁水四溅,最后还得自己擦桌子。
今天,我们要讲的就是怎么用 C++ 协程,像指挥交响乐团一样指挥这三个阶段。我们要构建一个高并发、异步编排的推理流水线。
准备好了吗?让我们开始这场“异步之旅”。
第一回:同步代码的“便秘”时刻
在 C++ 里,如果你不想用协程,最常用的手段就是 std::thread。这就像是让厨师A炒菜,厨师A炒完一道菜,必须喊一声“好了”,厨师B才能开始炒下一道。
问题来了:
- 资源浪费: 厨师A(CPU)在等待厨师C切菜(I/O)的时候,手里拿着铲子没事干。厨师C在等待厨师B洗碗(计算)的时候,切菜刀在发呆。
- 上下文切换开销: 在操作系统层面,从厨师A切换到厨师B,需要记笔记、擦黑板,这都要花时间。
- 代码地狱: 你得用
std::mutex保护锅,用std::condition_variable通知厨师B。一旦逻辑稍微复杂一点,代码就像意大利面一样缠在一起。
举个栗子。这是典型的“阻塞式”写法,我们假设每个函数都是耗时的:
// 这里的代码读起来很顺畅,但运行起来很慢,因为你在"等待"
void processRequest(std::string input) {
// 阶段1:预处理
std::vector<int> tokens = tokenize(input); // 假设这里耗时 5ms,CPU在空转
std::cout << "Tokenized!" << std::endl;
// 阶段2:推理
std::vector<int> output_ids = modelInference(tokens); // 假设这里耗时 50ms,CPU在空转
std::cout << "Inference done!" << std::endl;
// 阶段3:后处理
std::string response = decode(output_ids); // 假设这里耗时 5ms
std::cout << "Response: " << response << std::endl;
}
如果我们要并发处理 100 个请求,用这种同步写法,你需要循环调用 100 次 processRequest。这就像是一个人排队买 100 张电影票,每张票都要在售票窗口停 5 分钟。结果就是,第 100 个人拿到票时,电影都散场了。
C++20 协程 是怎么做的呢?它不是让你去排队,而是给你一张“VIP卡”。你走到窗口(预处理),递进钱(输入),窗口说“好,稍等”,然后你转身去旁边的沙发上喝咖啡(挂起)。等钱收好了,服务员喊你一声,你走回去继续付钱(恢复)。
第二回:协程——不仅仅是“暂停”按钮
很多同学对协程的理解停留在“挂起和恢复”上。其实,协程的核心在于控制流的转移,而不是单纯的“暂停”。
在 C++20 中,协程是一个状态机。编译器(或者运行时)会悄悄地帮你把你的函数变成一个包含 resume() 和 suspend() 方法的类。当你写 co_await 时,你并没有把控制权交给操作系统,而是交给了协程调度器。
让我们看一个最简单的协程例子,感受一下它的优雅:
#include <iostream>
#include <coroutine>
// 定义一个简单的任务类型
struct Task {
struct promise_type {
Task get_return_object() {
return Task{ std::coroutine_handle<promise_type>::from_promise(*this) };
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
Task(std::coroutine_handle<promise_type> h) : handle(h) {}
~Task() { if (handle) handle.destroy(); }
};
// 一个简单的异步任务
Task myAsyncTask() {
std::cout << "Step 1: Starting..." << std::endl;
co_await std::suspend_always{}; // 暂停一下,假装在干别的事
std::cout << "Step 2: Resumed!" << std::endl;
co_return;
}
int main() {
Task t = myAsyncTask();
// 启动任务
std::cout << "Task started, but control returned immediately." << std::endl;
t.handle.resume(); // 手动恢复(生产环境中通常由调度器接管)
t.handle.resume(); // 再次恢复,结束
return 0;
}
在这个例子里,co_await 就是一个暂停点。我们可以在一个函数里写多个 co_await,就像写普通的 if 和 for 一样,但逻辑却是完全异步的。这比写回调函数(Callback hell)要干净得多,比写 Future/Promise 要直观得多。
第三回:推理流水线的“三段论”
现在,让我们把目光聚焦到 LLM 推理上。一个完整的推理流水线,通常分为三个部分:
- 预处理: 把你的人类语言(文本)变成机器能听懂的语言(Token IDs)。这里涉及复杂的正则、分词器加载、KV Cache 的填充。
- 模型计算: 这是重头戏。把 Token IDs 扔进 Transformer 架构里,进行矩阵乘法。这是 GPU 密集型操作。
- 后处理: 把模型吐出的 Token IDs 转回文本,处理 Stop Token(停止符),格式化输出。
我们的目标是用协程把这三个阶段串联起来,让它们像流水线一样并行工作。
架构图解(脑补一下)
- 输入队列: 放着待处理的文本。
- 阶段1(Preprocess): 从队列拿文本 -> 处理 -> 放入“推理队列”。
- 阶段2(Inference): 从“推理队列”拿数据 -> 调用 CUDA Kernel -> 放入“后处理队列”。
- 阶段3(Postprocess): 从“后处理队列”拿数据 -> 转文本 -> 返回给用户。
关键点: 阶段1不应该等阶段2。阶段1处理完一个请求,立刻去拿下一个,不管阶段2有没有准备好。
第四回:代码实战——编写第一个流水线
为了演示,我们不会真的去写 CUDA 代码,而是用 std::this_thread::sleep_for 来模拟耗时操作。
我们需要一个调度器。在工业界,我们通常使用 boost::asio,它的 use_awaitable 特性是协程的强力助手。不过为了保持代码的纯粹性和可移植性(不需要安装 Boost),我们这里手动实现一个简单的调度器。
1. 定义流水线阶段
首先,我们定义一个 PipelineStage 模板,它负责消费一个输入,处理它,然后产出输出。
#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
// 模拟输入输出类型
using Token = int;
using InputData = std::string;
using OutputData = std::vector<Token>;
// 简单的队列,用于流水线传递数据
template <typename T>
class SafeQueue {
private:
std::queue<T> queue;
std::mutex mtx;
std::condition_variable cv;
bool done = false;
public:
void push(T item) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(std::move(item));
cv.notify_one();
}
bool try_pop(T& item) {
std::lock_guard<std::mutex> lock(mtx);
if (queue.empty()) return false;
item = std::move(queue.front());
queue.pop();
return true;
}
void set_done() {
std::lock_guard<std::mutex> lock(mtx);
done = true;
cv.notify_all();
}
bool is_done() const { return done && queue.empty(); }
};
// 阶段1:预处理
class PreprocessStage {
public:
PreprocessStage(SafeQueue<InputData>& input, SafeQueue<OutputData>& output)
: inputQueue(input), outputQueue(output) {}
void operator()() {
std::cout << "[Preprocess] Thread started." << std::endl;
InputData data;
while (inputQueue.try_pop(data) || !inputQueue.is_done()) {
// 模拟耗时:Tokenize
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::cout << "[Preprocess] Processing: " << data << std::endl;
// 这里模拟生成一些 token
OutputData tokens = {1, 2, 3};
outputQueue.push(std::move(tokens));
}
outputQueue.set_done();
std::cout << "[Preprocess] Thread finished." << std::endl;
}
private:
SafeQueue<InputData>& inputQueue;
SafeQueue<OutputData>& outputQueue;
};
// 阶段2:模型推理
class InferenceStage {
public:
InferenceStage(SafeQueue<OutputData>& input, SafeQueue<OutputData>& output)
: inputQueue(input), outputQueue(output) {}
void operator()() {
std::cout << "[Inference] Thread started." << std::endl;
OutputData tokens;
while (inputQueue.try_pop(tokens) || !inputQueue.is_done()) {
// 模拟耗时:GPU 推理
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::cout << "[Inference] Running model on tokens..." << std::endl;
// 这里模拟推理结果
OutputData next_tokens = {4, 5, 6};
outputQueue.push(std::move(next_tokens));
}
outputQueue.set_done();
std::cout << "[Inference] Thread finished." << std::endl;
}
private:
SafeQueue<OutputData>& inputQueue;
SafeQueue<OutputData>& outputQueue;
};
// 阶段3:后处理
class PostprocessStage {
public:
PostprocessStage(SafeQueue<OutputData>& input)
: inputQueue(input) {}
void operator()() {
std::cout << "[Postprocess] Thread started." << std::endl;
OutputData tokens;
while (inputQueue.try_pop(tokens) || !inputQueue.is_done()) {
// 模拟耗时:解码
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::cout << "[Postprocess] Decoding tokens to text..." << std::endl;
}
std::cout << "[Postprocess] Thread finished." << std::endl;
}
private:
SafeQueue<OutputData>& inputQueue;
};
2. 编排器
现在,我们需要一个“导演”来启动这三个线程,并控制它们的启动顺序。
void runPipeline() {
// 1. 定义队列
SafeQueue<InputData> inputQueue;
SafeQueue<OutputData> middleQueue; // Pre -> Inf
SafeQueue<OutputData> outputQueue; // Inf -> Post
// 2. 启动线程
std::thread preprocessThread(PreprocessStage(inputQueue, middleQueue));
std::thread inferenceThread(InferenceStage(middleQueue, outputQueue));
std::thread postprocessThread(PostprocessStage(outputQueue));
// 3. 模拟源源不断的输入
for (int i = 0; i < 5; ++i) {
inputQueue.push("User Query " + std::to_string(i));
std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 快速塞入
}
// 4. 发送结束信号
inputQueue.set_done();
// 5. 等待所有线程结束
preprocessThread.join();
inferenceThread.join();
postprocessThread.join();
}
int main() {
runPipeline();
return 0;
}
运行结果分析:
你会发现,输出日志是混杂的。
[Preprocess] Processing: User Query 0
[Inference] Running model...
[Preprocess] Processing: User Query 1
[Inference] Running model...
这就是流水线并行的魅力。Preprocess 正在处理 Query 1 的时候,Inference 正在处理 Query 0。它们互不干扰,各司其职。
第五回:C++20 协程进阶——真正的异步编排
上面的例子用的是 std::thread,这其实还没完全发挥协程的威力。协程最大的好处是无栈(或者轻量级栈),我们可以把成千上万个协程放在一个线程里调度,而不是创建成千上万个操作系统线程。
让我们用 C++20 协程重写这个逻辑。这将是本次讲座的“高潮”部分。
核心挑战:如何用协程实现流水线?
在协程世界里,我们不能用 while 循环阻塞线程,因为线程必须被释放去做别的事。我们需要一个调度器来决定什么时候恢复协程。
为了简化,我们使用 boost::asio(或者类似的调度器)。这里我们手写一个极简的调度器来展示原理。
1. 定义协程任务
#include <coroutine>
#include <atomic>
// 定义 Task 类型
template <typename T>
struct Task {
struct promise_type {
T value;
std::exception_ptr exception;
Task get_return_object() {
return Task(std::coroutine_handle<promise_type>::from_promise(*this));
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_value(T v) { value = std::move(v); }
void unhandled_exception() { exception = std::current_exception(); }
T get_value() {
if (exception) std::rethrow_exception(exception);
return std::move(value);
}
};
std::coroutine_handle<promise_type> handle;
Task(std::coroutine_handle<promise_type> h) : handle(h) {}
// 转换为 void Task
operator std::coroutine_handle<>() const { return handle; }
T get() { return handle.promise().get_value(); }
~Task() { if (handle) handle.destroy(); }
};
2. 定义异步操作(模拟 I/O)
我们需要模拟 co_await 的行为。在实际的 boost::asio 中,co_await socket.read_some 会自动挂起。在这里,我们写一个简单的 AsyncOperation 类。
struct AsyncOperation {
bool done = false;
int result = 0;
// 模拟异步操作完成
void complete(int r) { result = r; done = true; }
};
// 等待器
struct AsyncWaiter {
AsyncOperation& op;
bool await_ready() const { return op.done; }
void await_suspend(std::coroutine_handle<> handle) {
// 在真实场景中,这里会注册回调
// 这里我们直接模拟完成,为了演示流程
std::cout << "[Scheduler] Suspending task, registering callback..." << std::endl;
// 假设 100ms 后完成
std::thread([this, h = handle]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
this->op.complete(42);
std::cout << "[Scheduler] Resuming task!" << std::endl;
h.resume();
}).detach();
}
int await_resume() { return op.result; }
};
3. 编写流水线阶段的协程
现在,我们用协程语法来写这三个阶段。
// 阶段1:Preprocess
Task<int> preprocess_stage() {
std::cout << "[Preprocess] Started." << std::endl;
// 模拟异步 Tokenize
AsyncOperation op;
AsyncWaiter waiter{op};
int token_id = co_await waiter; // 挂起,等待结果
std::cout << "[Preprocess] Got token: " << token_id << std::endl;
// 模拟异步发送给下一阶段
// 在真实代码中,这里 co_await send_to_inference(token_id);
// 这里我们直接返回
co_return token_id + 100; // 假设推理阶段需要加 100
}
// 阶段2:Inference
Task<int> inference_stage(int input) {
std::cout << "[Inference] Started with input: " << input << std::endl;
// 模拟异步推理
AsyncOperation op;
AsyncWaiter waiter{op};
int next_token = co_await waiter;
std::cout << "[Inference] Got next token: " << next_token << std::endl;
co_return next_token * 2;
}
// 阶段3:Postprocess
void postprocess_stage(int input) {
std::cout << "[Postprocess] Processing final result: " << input << std::endl;
}
// 主调度逻辑
void run_coroutine_pipeline() {
std::cout << "=== Starting Coroutine Pipeline ===" << std::endl;
// 启动阶段1
auto task1 = preprocess_stage();
// 启动阶段2(这里为了演示简单,我们手动控制流程,实际应该是 event loop)
// 在真实架构中,阶段1完成后,阶段2自动被调度器唤醒
// 这里我们模拟一种“回调式”的协程调用
// ... 实际上,为了实现真正的流水线,我们需要一个协程调度器
// 这里我们简化演示:直接在主线程中手动调度(虽然失去了并发意义,但展示了逻辑流)
// 假设 preprocess_stage 完成了
int pre_result = task1.get();
// 启动阶段2
auto task2 = inference_stage(pre_result);
int inf_result = task2.get();
// 启动阶段3
postprocess_stage(inf_result);
}
4. 真正的异步编排(Event Loop 模式)
上面的代码虽然用了协程,但在主线程里 get() 了,这就变成了同步。真正的异步编排需要 Event Loop(事件循环)。
让我们用 boost::asio 的 awaitable 来写一个更接近生产环境的例子。为了节省篇幅,我们假设你已经引入了 boost/asio.hpp。
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <string>
namespace asio = boost::asio;
using asio::awaitable;
using asio::detached;
using asio::steady_timer;
// 模拟 Tokenizer
awaitable<int> tokenize(std::string text) {
// 模拟异步工作
co_await asio::post(co_await asio::this_coro::executor,
[text]() {
std::cout << "Tokenizing: " << text << std::endl;
});
co_return 42; // 返回一个 token ID
}
// 模拟 Model Inference
awaitable<int> run_inference(int token_id) {
co_await asio::post(co_await asio::this_coro::executor,
[token_id]() {
std::cout << "Running inference for token: " << token_id << std::endl;
});
co_return token_id * 2;
}
// 模拟 Decoder
awaitable<void> decode(int token_id) {
co_await asio::post(co_await asio::this_coro::executor,
[token_id]() {
std::cout << "Decoding result: " << token_id << std::endl;
});
}
// 流水线编排器
awaitable<void> pipeline_worker(std::string input) {
auto executor = co_await asio::this_coro::executor;
// 1. 预处理
int token_id = co_await tokenize(input);
// 2. 模型计算
int next_token = co_await run_inference(token_id);
// 3. 后处理
co_await decode(next_token);
}
// 启动多个并发请求
awaitable<void> main_coro() {
auto executor = co_await asio::this_coro::executor;
asio::thread_pool pool(4); // 模拟 4 个工作线程
for (int i = 0; i < 5; ++i) {
std::string msg = "Request " + std::to_string(i);
asio::co_spawn(pool, pipeline_worker(msg), detached);
}
}
int main() {
boost::asio::io_context io;
asio::co_spawn(io, main_coro(), detached);
io.run();
return 0;
}
这段代码的魔力在哪里?
co_spawn:它将一个协程放入线程池的调度队列中。co_await:当遇到co_await tokenize时,当前协程挂起,控制权回到io_context。- 线程池:
io_context会在后台线程上执行tokenize。当tokenize完成时,它会自动将控制权交还给pipeline_worker。 - 并发:
main_coro启动了 5 个pipeline_worker。它们在不同的线程上运行,但它们共享同一个io_context的执行队列。这比直接用std::thread管理要高效得多,因为避免了频繁的上下文切换。
第六回:深入内存与 KV Cache 管理
讲了这么多架构,我们得谈谈钱(内存)的问题。在 LLM 推理中,KV Cache 是吞金兽。
1. 内存连续性
协程的好处在于它通常运行在栈上(如果是无栈协程或小栈协程)。这保证了缓存局部性。
// 这是一个非常轻量级的协程
Task<void> memory_intensive_task() {
// 这里分配的 vector 会在栈上(如果编译器优化得当)或者作为协程帧的一部分
// 相比于在堆上频繁 malloc/free,这更友好
std::vector<float> buffer(1024 * 1024); // 4MB buffer
// 处理数据...
co_return;
}
2. KV Cache 的流水线
在推理流水线中,KV Cache 的更新是关键。
- Preprocess:加载 Prompt 的 KV Cache 到 GPU 显存。
- Inference:计算新 Token 的 KV Cache,并 Append 到显存中。
- Postprocess:如果遇到 Stop Token,需要把 KV Cache 的这部分清空(或者标记为 invalid)。
使用协程,我们可以让这些操作异步进行。比如,在 Preprocess 等待 GPU 内存分配的时候,我们可以让 CPU 去做其他事情,或者让其他推理请求去抢占 GPU。
3. 避免死锁
协程里最怕的就是“循环等待”。
- 场景:Stage 1 等待 Stage 2 的结果。
- 陷阱:如果 Stage 2 没有被正确调度,Stage 1 就会永远挂起。
解决方案:使用 asio::steady_timer 来实现超时机制。
awaitable<void> safe_task() {
auto executor = co_await asio::this_coro::executor;
steady_timer timer(executor);
timer.expires_after(std::chrono::milliseconds(100));
bool completed = false;
// 启动真正的任务
asio::co_spawn(executor,
[]() -> awaitable<void> {
co_await std::chrono::milliseconds(500); // 假装任务很慢
co_return;
},
[&](std::exception_ptr e) {
completed = true;
});
// 等待任务完成或者超时
timer.async_wait([&](const boost::system::error_code&) {
if (!completed) {
std::cout << "Task timeout! Killing it." << std::endl;
}
});
co_await timer.async_wait(asio::use_awaitable);
}
第七回:从 C++ 到 Python 的桥梁
各位同学,讲到这里,你们可能会问:“这代码太硬核了,我想写个 Python 接口怎么办?”
别担心,C++ 的强类型和协程特性,配合 PyBind11 或 Cython,可以轻松实现 Python 侧的“伪异步”。
想象一下,你的 C++ 流水线是一个黑盒。
// C++ Side
std::vector<std::string> run_pipeline_sync(std::string input) {
// 内部使用上面讲的协程逻辑,但是同步返回
// 实际上,我们可以用 co_await + io_context.run_one() 来模拟同步调用
}
// Python Side
import time
def call_cpp_pipeline(text):
start = time.time()
result = cpp_module.run_pipeline_sync(text)
duration = time.time() - start
print(f"Result: {result}, Time: {duration}")
如果你想在 Python 里也用 async/await,你需要写一个 Python 适配层,它内部持有 C++ 的协程句柄。
import asyncio
# Python async wrapper
async def run_cpp_awaitable(text):
loop = asyncio.get_event_loop()
# 这里的 cpp_awaitable 是一个 C++ 编写的,返回 coroutine_handle 的函数
# 实际实现需要复杂的 C++/Python 绑定,这里仅作概念演示
handle = cpp_module.create_coroutine(text)
while not cpp_module.is_finished(handle):
await asyncio.sleep(0.1) # 让出控制权给其他 Python 任务
cpp_module.resume(handle)
return cpp_module.get_result(handle)
第八回:性能调优与陷阱
最后,我们来聊聊“踩坑”。
1. 协程栈大小
如果你使用的是基于栈的协程(某些编译器或库),默认栈可能只有 1KB 或 4KB。如果你的预处理函数里定义了一个巨大的局部数组,协程就会栈溢出(Stack Overflow)。
对策:使用 boost::asio::co_spawn 默认的堆分配栈,或者显式设置栈大小。
2. co_await 的性能
co_await 的开销其实比函数调用略大(因为有状态机跳转)。如果你在一个循环里频繁进行微小的 co_await,可能会成为瓶颈。
对策:不要过度优化。对于推理这种 I/O 密集型任务,co_await 的开销相对于 GPU 等待时间来说,几乎可以忽略不计。
3. 内存泄漏
协程如果没有正确销毁,会泄漏内存。
对策:确保 Task 对象在离开作用域时被销毁,或者使用 std::unique_ptr 管理。
总结(不是总结,是展望)
好了,各位同学。我们今天从“面条代码”聊到了“异步编排”。
通过 C++ 协程,我们构建了一个预处理 -> 推理 -> 后处理的高效流水线。
- 预处理不再阻塞 CPU,可以并行处理多个请求的 Tokenize。
- 推理阶段可以充分利用 GPU 的计算周期,减少等待时间。
- 后处理可以快速响应,提升用户体验。
这种架构不仅适用于 LLM 推理,也适用于视频流处理、数据库查询优化、网络服务器开发。C++ 协程就像是给了你一把瑞士军刀,让你在处理复杂并发逻辑时,既拥有了 C++ 的性能,又拥有了高级语言的优雅。
记住,好的代码不是写出来的,而是“编排”出来的。别再写那个 std::thread 堆砌的噩梦了,拥抱协程吧!
下课!