C++ 与 推理流水线:基于 C++ 协程实现预处理、模型计算与后处理的高并发异步编排架构

C++ 协程与推理流水线:告别 std::thread,拥抱异步之美

各位同学,大家好!

欢迎来到今天的“C++ 极客大会:如何用协程把 LLM 推理跑出光速”专场。

坐在我旁边的这位老兄(假设他是一块显卡),他脾气很暴躁。如果你让他做一件事,他就要花 10 毫秒。如果你让他做十件事,他就得花 100 毫秒。如果你让他同时做十件事,他……他只会生气,因为他的算力是有限的。

而在我们写代码的时候,往往需要同时做三件事:把你的文本变成数字(预处理),让显卡算出下一个字(推理),再把数字变回文本(后处理)。如果用老派的写法,就像是用一只手同时剥十个橘子,汁水四溅,最后还得自己擦桌子。

今天,我们要讲的就是怎么用 C++ 协程,像指挥交响乐团一样指挥这三个阶段。我们要构建一个高并发、异步编排的推理流水线。

准备好了吗?让我们开始这场“异步之旅”。


第一回:同步代码的“便秘”时刻

在 C++ 里,如果你不想用协程,最常用的手段就是 std::thread。这就像是让厨师A炒菜,厨师A炒完一道菜,必须喊一声“好了”,厨师B才能开始炒下一道。

问题来了:

  1. 资源浪费: 厨师A(CPU)在等待厨师C切菜(I/O)的时候,手里拿着铲子没事干。厨师C在等待厨师B洗碗(计算)的时候,切菜刀在发呆。
  2. 上下文切换开销: 在操作系统层面,从厨师A切换到厨师B,需要记笔记、擦黑板,这都要花时间。
  3. 代码地狱: 你得用 std::mutex 保护锅,用 std::condition_variable 通知厨师B。一旦逻辑稍微复杂一点,代码就像意大利面一样缠在一起。

举个栗子。这是典型的“阻塞式”写法,我们假设每个函数都是耗时的:

// 这里的代码读起来很顺畅,但运行起来很慢,因为你在"等待"
void processRequest(std::string input) {
    // 阶段1:预处理
    std::vector<int> tokens = tokenize(input); // 假设这里耗时 5ms,CPU在空转
    std::cout << "Tokenized!" << std::endl;

    // 阶段2:推理
    std::vector<int> output_ids = modelInference(tokens); // 假设这里耗时 50ms,CPU在空转
    std::cout << "Inference done!" << std::endl;

    // 阶段3:后处理
    std::string response = decode(output_ids); // 假设这里耗时 5ms
    std::cout << "Response: " << response << std::endl;
}

如果我们要并发处理 100 个请求,用这种同步写法,你需要循环调用 100 次 processRequest。这就像是一个人排队买 100 张电影票,每张票都要在售票窗口停 5 分钟。结果就是,第 100 个人拿到票时,电影都散场了。

C++20 协程 是怎么做的呢?它不是让你去排队,而是给你一张“VIP卡”。你走到窗口(预处理),递进钱(输入),窗口说“好,稍等”,然后你转身去旁边的沙发上喝咖啡(挂起)。等钱收好了,服务员喊你一声,你走回去继续付钱(恢复)。


第二回:协程——不仅仅是“暂停”按钮

很多同学对协程的理解停留在“挂起和恢复”上。其实,协程的核心在于控制流的转移,而不是单纯的“暂停”。

在 C++20 中,协程是一个状态机。编译器(或者运行时)会悄悄地帮你把你的函数变成一个包含 resume()suspend() 方法的类。当你写 co_await 时,你并没有把控制权交给操作系统,而是交给了协程调度器

让我们看一个最简单的协程例子,感受一下它的优雅:

#include <iostream>
#include <coroutine>

// 定义一个简单的任务类型
struct Task {
    struct promise_type {
        Task get_return_object() {
            return Task{ std::coroutine_handle<promise_type>::from_promise(*this) };
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

    std::coroutine_handle<promise_type> handle;

    Task(std::coroutine_handle<promise_type> h) : handle(h) {}
    ~Task() { if (handle) handle.destroy(); }
};

// 一个简单的异步任务
Task myAsyncTask() {
    std::cout << "Step 1: Starting..." << std::endl;
    co_await std::suspend_always{}; // 暂停一下,假装在干别的事
    std::cout << "Step 2: Resumed!" << std::endl;
    co_return;
}

int main() {
    Task t = myAsyncTask();
    // 启动任务
    std::cout << "Task started, but control returned immediately." << std::endl;
    t.handle.resume(); // 手动恢复(生产环境中通常由调度器接管)
    t.handle.resume(); // 再次恢复,结束
    return 0;
}

在这个例子里,co_await 就是一个暂停点。我们可以在一个函数里写多个 co_await,就像写普通的 iffor 一样,但逻辑却是完全异步的。这比写回调函数(Callback hell)要干净得多,比写 Future/Promise 要直观得多。


第三回:推理流水线的“三段论”

现在,让我们把目光聚焦到 LLM 推理上。一个完整的推理流水线,通常分为三个部分:

  1. 预处理: 把你的人类语言(文本)变成机器能听懂的语言(Token IDs)。这里涉及复杂的正则、分词器加载、KV Cache 的填充。
  2. 模型计算: 这是重头戏。把 Token IDs 扔进 Transformer 架构里,进行矩阵乘法。这是 GPU 密集型操作。
  3. 后处理: 把模型吐出的 Token IDs 转回文本,处理 Stop Token(停止符),格式化输出。

我们的目标是用协程把这三个阶段串联起来,让它们像流水线一样并行工作。

架构图解(脑补一下)

  • 输入队列: 放着待处理的文本。
  • 阶段1(Preprocess): 从队列拿文本 -> 处理 -> 放入“推理队列”。
  • 阶段2(Inference): 从“推理队列”拿数据 -> 调用 CUDA Kernel -> 放入“后处理队列”。
  • 阶段3(Postprocess): 从“后处理队列”拿数据 -> 转文本 -> 返回给用户。

关键点: 阶段1不应该等阶段2。阶段1处理完一个请求,立刻去拿下一个,不管阶段2有没有准备好。


第四回:代码实战——编写第一个流水线

为了演示,我们不会真的去写 CUDA 代码,而是用 std::this_thread::sleep_for 来模拟耗时操作。

我们需要一个调度器。在工业界,我们通常使用 boost::asio,它的 use_awaitable 特性是协程的强力助手。不过为了保持代码的纯粹性和可移植性(不需要安装 Boost),我们这里手动实现一个简单的调度器。

1. 定义流水线阶段

首先,我们定义一个 PipelineStage 模板,它负责消费一个输入,处理它,然后产出输出。

#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

// 模拟输入输出类型
using Token = int;
using InputData = std::string;
using OutputData = std::vector<Token>;

// 简单的队列,用于流水线传递数据
template <typename T>
class SafeQueue {
private:
    std::queue<T> queue;
    std::mutex mtx;
    std::condition_variable cv;
    bool done = false;
public:
    void push(T item) {
        std::lock_guard<std::mutex> lock(mtx);
        queue.push(std::move(item));
        cv.notify_one();
    }

    bool try_pop(T& item) {
        std::lock_guard<std::mutex> lock(mtx);
        if (queue.empty()) return false;
        item = std::move(queue.front());
        queue.pop();
        return true;
    }

    void set_done() {
        std::lock_guard<std::mutex> lock(mtx);
        done = true;
        cv.notify_all();
    }

    bool is_done() const { return done && queue.empty(); }
};

// 阶段1:预处理
class PreprocessStage {
public:
    PreprocessStage(SafeQueue<InputData>& input, SafeQueue<OutputData>& output)
        : inputQueue(input), outputQueue(output) {}

    void operator()() {
        std::cout << "[Preprocess] Thread started." << std::endl;
        InputData data;
        while (inputQueue.try_pop(data) || !inputQueue.is_done()) {
            // 模拟耗时:Tokenize
            std::this_thread::sleep_for(std::chrono::milliseconds(10)); 

            std::cout << "[Preprocess] Processing: " << data << std::endl;

            // 这里模拟生成一些 token
            OutputData tokens = {1, 2, 3}; 
            outputQueue.push(std::move(tokens));
        }
        outputQueue.set_done();
        std::cout << "[Preprocess] Thread finished." << std::endl;
    }

private:
    SafeQueue<InputData>& inputQueue;
    SafeQueue<OutputData>& outputQueue;
};

// 阶段2:模型推理
class InferenceStage {
public:
    InferenceStage(SafeQueue<OutputData>& input, SafeQueue<OutputData>& output)
        : inputQueue(input), outputQueue(output) {}

    void operator()() {
        std::cout << "[Inference] Thread started." << std::endl;
        OutputData tokens;
        while (inputQueue.try_pop(tokens) || !inputQueue.is_done()) {
            // 模拟耗时:GPU 推理
            std::this_thread::sleep_for(std::chrono::milliseconds(50)); 

            std::cout << "[Inference] Running model on tokens..." << std::endl;

            // 这里模拟推理结果
            OutputData next_tokens = {4, 5, 6};
            outputQueue.push(std::move(next_tokens));
        }
        outputQueue.set_done();
        std::cout << "[Inference] Thread finished." << std::endl;
    }

private:
    SafeQueue<OutputData>& inputQueue;
    SafeQueue<OutputData>& outputQueue;
};

// 阶段3:后处理
class PostprocessStage {
public:
    PostprocessStage(SafeQueue<OutputData>& input)
        : inputQueue(input) {}

    void operator()() {
        std::cout << "[Postprocess] Thread started." << std::endl;
        OutputData tokens;
        while (inputQueue.try_pop(tokens) || !inputQueue.is_done()) {
            // 模拟耗时:解码
            std::this_thread::sleep_for(std::chrono::milliseconds(10)); 

            std::cout << "[Postprocess] Decoding tokens to text..." << std::endl;
        }
        std::cout << "[Postprocess] Thread finished." << std::endl;
    }

private:
    SafeQueue<OutputData>& inputQueue;
};

2. 编排器

现在,我们需要一个“导演”来启动这三个线程,并控制它们的启动顺序。

void runPipeline() {
    // 1. 定义队列
    SafeQueue<InputData> inputQueue;
    SafeQueue<OutputData> middleQueue; // Pre -> Inf
    SafeQueue<OutputData> outputQueue; // Inf -> Post

    // 2. 启动线程
    std::thread preprocessThread(PreprocessStage(inputQueue, middleQueue));
    std::thread inferenceThread(InferenceStage(middleQueue, outputQueue));
    std::thread postprocessThread(PostprocessStage(outputQueue));

    // 3. 模拟源源不断的输入
    for (int i = 0; i < 5; ++i) {
        inputQueue.push("User Query " + std::to_string(i));
        std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 快速塞入
    }

    // 4. 发送结束信号
    inputQueue.set_done();

    // 5. 等待所有线程结束
    preprocessThread.join();
    inferenceThread.join();
    postprocessThread.join();
}

int main() {
    runPipeline();
    return 0;
}

运行结果分析:

你会发现,输出日志是混杂的。
[Preprocess] Processing: User Query 0
[Inference] Running model...
[Preprocess] Processing: User Query 1
[Inference] Running model...

这就是流水线并行的魅力。Preprocess 正在处理 Query 1 的时候,Inference 正在处理 Query 0。它们互不干扰,各司其职。


第五回:C++20 协程进阶——真正的异步编排

上面的例子用的是 std::thread,这其实还没完全发挥协程的威力。协程最大的好处是无栈(或者轻量级栈),我们可以把成千上万个协程放在一个线程里调度,而不是创建成千上万个操作系统线程。

让我们用 C++20 协程重写这个逻辑。这将是本次讲座的“高潮”部分。

核心挑战:如何用协程实现流水线?

在协程世界里,我们不能用 while 循环阻塞线程,因为线程必须被释放去做别的事。我们需要一个调度器来决定什么时候恢复协程。

为了简化,我们使用 boost::asio(或者类似的调度器)。这里我们手写一个极简的调度器来展示原理。

1. 定义协程任务

#include <coroutine>
#include <atomic>

// 定义 Task 类型
template <typename T>
struct Task {
    struct promise_type {
        T value;
        std::exception_ptr exception;

        Task get_return_object() {
            return Task(std::coroutine_handle<promise_type>::from_promise(*this));
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }

        void return_value(T v) { value = std::move(v); }
        void unhandled_exception() { exception = std::current_exception(); }

        T get_value() {
            if (exception) std::rethrow_exception(exception);
            return std::move(value);
        }
    };

    std::coroutine_handle<promise_type> handle;
    Task(std::coroutine_handle<promise_type> h) : handle(h) {}

    // 转换为 void Task
    operator std::coroutine_handle<>() const { return handle; }

    T get() { return handle.promise().get_value(); }
    ~Task() { if (handle) handle.destroy(); }
};

2. 定义异步操作(模拟 I/O)

我们需要模拟 co_await 的行为。在实际的 boost::asio 中,co_await socket.read_some 会自动挂起。在这里,我们写一个简单的 AsyncOperation 类。

struct AsyncOperation {
    bool done = false;
    int result = 0;

    // 模拟异步操作完成
    void complete(int r) { result = r; done = true; }
};

// 等待器
struct AsyncWaiter {
    AsyncOperation& op;

    bool await_ready() const { return op.done; }
    void await_suspend(std::coroutine_handle<> handle) {
        // 在真实场景中,这里会注册回调
        // 这里我们直接模拟完成,为了演示流程
        std::cout << "[Scheduler] Suspending task, registering callback..." << std::endl;
        // 假设 100ms 后完成
        std::thread([this, h = handle]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            this->op.complete(42);
            std::cout << "[Scheduler] Resuming task!" << std::endl;
            h.resume();
        }).detach();
    }
    int await_resume() { return op.result; }
};

3. 编写流水线阶段的协程

现在,我们用协程语法来写这三个阶段。

// 阶段1:Preprocess
Task<int> preprocess_stage() {
    std::cout << "[Preprocess] Started." << std::endl;

    // 模拟异步 Tokenize
    AsyncOperation op;
    AsyncWaiter waiter{op};
    int token_id = co_await waiter; // 挂起,等待结果

    std::cout << "[Preprocess] Got token: " << token_id << std::endl;

    // 模拟异步发送给下一阶段
    // 在真实代码中,这里 co_await send_to_inference(token_id);
    // 这里我们直接返回
    co_return token_id + 100; // 假设推理阶段需要加 100
}

// 阶段2:Inference
Task<int> inference_stage(int input) {
    std::cout << "[Inference] Started with input: " << input << std::endl;

    // 模拟异步推理
    AsyncOperation op;
    AsyncWaiter waiter{op};
    int next_token = co_await waiter;

    std::cout << "[Inference] Got next token: " << next_token << std::endl;

    co_return next_token * 2;
}

// 阶段3:Postprocess
void postprocess_stage(int input) {
    std::cout << "[Postprocess] Processing final result: " << input << std::endl;
}

// 主调度逻辑
void run_coroutine_pipeline() {
    std::cout << "=== Starting Coroutine Pipeline ===" << std::endl;

    // 启动阶段1
    auto task1 = preprocess_stage();

    // 启动阶段2(这里为了演示简单,我们手动控制流程,实际应该是 event loop)
    // 在真实架构中,阶段1完成后,阶段2自动被调度器唤醒
    // 这里我们模拟一种“回调式”的协程调用

    // ... 实际上,为了实现真正的流水线,我们需要一个协程调度器
    // 这里我们简化演示:直接在主线程中手动调度(虽然失去了并发意义,但展示了逻辑流)

    // 假设 preprocess_stage 完成了
    int pre_result = task1.get();

    // 启动阶段2
    auto task2 = inference_stage(pre_result);
    int inf_result = task2.get();

    // 启动阶段3
    postprocess_stage(inf_result);
}

4. 真正的异步编排(Event Loop 模式)

上面的代码虽然用了协程,但在主线程里 get() 了,这就变成了同步。真正的异步编排需要 Event Loop(事件循环)。

让我们用 boost::asioawaitable 来写一个更接近生产环境的例子。为了节省篇幅,我们假设你已经引入了 boost/asio.hpp

#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <string>

namespace asio = boost::asio;
using asio::awaitable;
using asio::detached;
using asio::steady_timer;

// 模拟 Tokenizer
awaitable<int> tokenize(std::string text) {
    // 模拟异步工作
    co_await asio::post(co_await asio::this_coro::executor, 
        [text]() {
            std::cout << "Tokenizing: " << text << std::endl;
        });
    co_return 42; // 返回一个 token ID
}

// 模拟 Model Inference
awaitable<int> run_inference(int token_id) {
    co_await asio::post(co_await asio::this_coro::executor,
        [token_id]() {
            std::cout << "Running inference for token: " << token_id << std::endl;
        });
    co_return token_id * 2;
}

// 模拟 Decoder
awaitable<void> decode(int token_id) {
    co_await asio::post(co_await asio::this_coro::executor,
        [token_id]() {
            std::cout << "Decoding result: " << token_id << std::endl;
        });
}

// 流水线编排器
awaitable<void> pipeline_worker(std::string input) {
    auto executor = co_await asio::this_coro::executor;

    // 1. 预处理
    int token_id = co_await tokenize(input);

    // 2. 模型计算
    int next_token = co_await run_inference(token_id);

    // 3. 后处理
    co_await decode(next_token);
}

// 启动多个并发请求
awaitable<void> main_coro() {
    auto executor = co_await asio::this_coro::executor;
    asio::thread_pool pool(4); // 模拟 4 个工作线程

    for (int i = 0; i < 5; ++i) {
        std::string msg = "Request " + std::to_string(i);
        asio::co_spawn(pool, pipeline_worker(msg), detached);
    }
}

int main() {
    boost::asio::io_context io;
    asio::co_spawn(io, main_coro(), detached);
    io.run();
    return 0;
}

这段代码的魔力在哪里?

  1. co_spawn:它将一个协程放入线程池的调度队列中。
  2. co_await:当遇到 co_await tokenize 时,当前协程挂起,控制权回到 io_context
  3. 线程池io_context 会在后台线程上执行 tokenize。当 tokenize 完成时,它会自动将控制权交还给 pipeline_worker
  4. 并发main_coro 启动了 5 个 pipeline_worker。它们在不同的线程上运行,但它们共享同一个 io_context 的执行队列。这比直接用 std::thread 管理要高效得多,因为避免了频繁的上下文切换。

第六回:深入内存与 KV Cache 管理

讲了这么多架构,我们得谈谈钱(内存)的问题。在 LLM 推理中,KV Cache 是吞金兽。

1. 内存连续性

协程的好处在于它通常运行在栈上(如果是无栈协程或小栈协程)。这保证了缓存局部性。

// 这是一个非常轻量级的协程
Task<void> memory_intensive_task() {
    // 这里分配的 vector 会在栈上(如果编译器优化得当)或者作为协程帧的一部分
    // 相比于在堆上频繁 malloc/free,这更友好
    std::vector<float> buffer(1024 * 1024); // 4MB buffer

    // 处理数据...
    co_return;
}

2. KV Cache 的流水线

在推理流水线中,KV Cache 的更新是关键。

  • Preprocess:加载 Prompt 的 KV Cache 到 GPU 显存。
  • Inference:计算新 Token 的 KV Cache,并 Append 到显存中。
  • Postprocess:如果遇到 Stop Token,需要把 KV Cache 的这部分清空(或者标记为 invalid)。

使用协程,我们可以让这些操作异步进行。比如,在 Preprocess 等待 GPU 内存分配的时候,我们可以让 CPU 去做其他事情,或者让其他推理请求去抢占 GPU。

3. 避免死锁

协程里最怕的就是“循环等待”。

  • 场景:Stage 1 等待 Stage 2 的结果。
  • 陷阱:如果 Stage 2 没有被正确调度,Stage 1 就会永远挂起。

解决方案:使用 asio::steady_timer 来实现超时机制。

awaitable<void> safe_task() {
    auto executor = co_await asio::this_coro::executor;
    steady_timer timer(executor);
    timer.expires_after(std::chrono::milliseconds(100));

    bool completed = false;

    // 启动真正的任务
    asio::co_spawn(executor, 
        []() -> awaitable<void> {
            co_await std::chrono::milliseconds(500); // 假装任务很慢
            co_return;
        }, 
        [&](std::exception_ptr e) {
            completed = true;
        });

    // 等待任务完成或者超时
    timer.async_wait([&](const boost::system::error_code&) {
        if (!completed) {
            std::cout << "Task timeout! Killing it." << std::endl;
        }
    });

    co_await timer.async_wait(asio::use_awaitable);
}

第七回:从 C++ 到 Python 的桥梁

各位同学,讲到这里,你们可能会问:“这代码太硬核了,我想写个 Python 接口怎么办?”

别担心,C++ 的强类型和协程特性,配合 PyBind11Cython,可以轻松实现 Python 侧的“伪异步”。

想象一下,你的 C++ 流水线是一个黑盒。

// C++ Side
std::vector<std::string> run_pipeline_sync(std::string input) {
    // 内部使用上面讲的协程逻辑,但是同步返回
    // 实际上,我们可以用 co_await + io_context.run_one() 来模拟同步调用
}

// Python Side
import time

def call_cpp_pipeline(text):
    start = time.time()
    result = cpp_module.run_pipeline_sync(text)
    duration = time.time() - start
    print(f"Result: {result}, Time: {duration}")

如果你想在 Python 里也用 async/await,你需要写一个 Python 适配层,它内部持有 C++ 的协程句柄。

import asyncio

# Python async wrapper
async def run_cpp_awaitable(text):
    loop = asyncio.get_event_loop()
    # 这里的 cpp_awaitable 是一个 C++ 编写的,返回 coroutine_handle 的函数
    # 实际实现需要复杂的 C++/Python 绑定,这里仅作概念演示
    handle = cpp_module.create_coroutine(text)

    while not cpp_module.is_finished(handle):
        await asyncio.sleep(0.1) # 让出控制权给其他 Python 任务
        cpp_module.resume(handle)

    return cpp_module.get_result(handle)

第八回:性能调优与陷阱

最后,我们来聊聊“踩坑”。

1. 协程栈大小

如果你使用的是基于栈的协程(某些编译器或库),默认栈可能只有 1KB 或 4KB。如果你的预处理函数里定义了一个巨大的局部数组,协程就会栈溢出(Stack Overflow)。

对策:使用 boost::asio::co_spawn 默认的堆分配栈,或者显式设置栈大小。

2. co_await 的性能

co_await 的开销其实比函数调用略大(因为有状态机跳转)。如果你在一个循环里频繁进行微小的 co_await,可能会成为瓶颈。

对策:不要过度优化。对于推理这种 I/O 密集型任务,co_await 的开销相对于 GPU 等待时间来说,几乎可以忽略不计。

3. 内存泄漏

协程如果没有正确销毁,会泄漏内存。

对策:确保 Task 对象在离开作用域时被销毁,或者使用 std::unique_ptr 管理。


总结(不是总结,是展望)

好了,各位同学。我们今天从“面条代码”聊到了“异步编排”。

通过 C++ 协程,我们构建了一个预处理 -> 推理 -> 后处理的高效流水线。

  • 预处理不再阻塞 CPU,可以并行处理多个请求的 Tokenize。
  • 推理阶段可以充分利用 GPU 的计算周期,减少等待时间。
  • 后处理可以快速响应,提升用户体验。

这种架构不仅适用于 LLM 推理,也适用于视频流处理、数据库查询优化、网络服务器开发。C++ 协程就像是给了你一把瑞士军刀,让你在处理复杂并发逻辑时,既拥有了 C++ 的性能,又拥有了高级语言的优雅。

记住,好的代码不是写出来的,而是“编排”出来的。别再写那个 std::thread 堆砌的噩梦了,拥抱协程吧!

下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注