实战:利用 C++ 实现一个支持 Backpressure(背压)的异步反应式流框架

各位技术同仁,大家好!

今天,我们将深入探讨一个在现代高并发、高吞吐量系统中至关重要的设计模式:反应式流(Reactive Streams),并亲自动手,利用 C++ 实现一个支持背压(Backpressure)的异步反应式流框架。作为一名编程专家,我将带领大家从零开始,理解其核心原理,并逐步构建一个功能完备但又足够简洁的框架。

1. 引言:为什么我们需要反应式流与背压?

在当今的软件世界中,我们经常面临处理大量数据流、I/O密集型操作、事件驱动系统以及微服务间通信的挑战。传统的回调机制或阻塞式编程模型在处理这些场景时,往往会导致以下问题:

  1. 资源管理困难: 生产者可能以远超消费者处理能力的速度生成数据,导致内存溢出、延迟增加或系统崩溃。
  2. 复杂性增加: 嵌套回调(Callback Hell)使得代码难以阅读、维护和调试。
  3. 并发管理: 手动管理线程、锁和同步原语,容易出错且效率低下。
  4. 统一抽象缺失: 不同数据源(数据库、网络、文件、UI事件)的处理方式各异,缺乏统一的编程模型。

反应式流(Reactive Streams)规范正是为了解决这些问题而生。它提供了一套标准化的接口和协议,用于处理异步数据流,并特别强调了背压(Backpressure)机制。

什么是背压?

背压是一种流量控制机制。它允许消费者(Subscriber)向上游生产者(Publisher)发出信号,告知其能够处理多少数据。如果消费者处理速度慢于生产者,它会请求较少的数据,从而有效地减慢生产者的速度,防止系统过载。这就像一个水龙头(生产者)和水桶(消费者):如果水桶满了,我们会关小水龙头,而不是让水溢出来。

为什么是 C++?

C++ 以其卓越的性能、对系统资源的精细控制以及丰富的并发编程工具而闻名。虽然 Java、Scala 等语言有成熟的 ReactiveX 实现(如 Reactor、RxJava),但 C++ 在需要极致性能、低延迟或与现有 C/C++ 基础设施集成的场景中,依然是不可替代的选择。通过 C++ 实现,我们可以更深刻地理解底层机制,并对其进行高度优化。

我们的目标是构建一个轻量级、高性能、易于扩展的 C++ 反应式流框架,它将包含以下核心组件:

  • Publisher (发布者): 产生数据流。
  • Subscriber (订阅者): 消费数据流。
  • Subscription (订阅): 连接发布者和订阅者,管理数据请求和取消。
  • Processor (处理器): 既是订阅者又是发布者,用于转换数据流(例如 map, filter 等操作符)。
  • Backpressure 机制: 通过 Subscription 实现请求-应答模式。
  • Asynchronicity (异步性): 利用线程池实现非阻塞操作。

2. 核心接口设计:遵循 Reactive Streams 规范

Reactive Streams 规范定义了四个核心接口,我们将它们映射到 C++ 的抽象基类。

2.1 Publisher 接口

Publisher 是数据流的源头。它只有一个方法:subscribe,用于接收 Subscriber 对象。

// Publisher.h
#pragma once
#include <memory> // For std::shared_ptr

template <typename T>
class Subscriber; // Forward declaration

class Subscription; // Forward declaration

template <typename T>
class Publisher {
public:
    virtual ~Publisher() = default;
    // 订阅者通过此方法订阅发布者
    // 发布者在收到订阅后,会调用 subscriber->onSubscribe(std::shared_ptr<Subscription>)
    virtual void subscribe(std::shared_ptr<Subscriber<T>> subscriber) = 0;
};

2.2 Subscriber 接口

Subscriber 是数据流的终点,或者说是数据流的消费者。它有四个方法来处理不同的事件:

  • onSubscribe: 当 Publisher 接受订阅时调用,并提供一个 Subscription 对象。Subscriber 通过 Subscription 来请求数据和取消订阅。
  • onNext: 当 Publisher 发送新数据时调用。
  • onError: 当数据流中发生错误时调用,并终止流。
  • onComplete: 当数据流正常结束时调用,并终止流。
// Subscriber.h
#pragma once
#include <memory> // For std::shared_ptr

class Subscription; // Forward declaration

template <typename T>
class Subscriber {
public:
    virtual ~Subscriber() = default;

    // 当发布者接受订阅时调用。订阅者应在此方法中保存Subscription对象
    // 并通过它请求数据或取消订阅。
    virtual void onSubscribe(std::shared_ptr<Subscription> subscription) = 0;

    // 发布者发出下一个数据项时调用。
    virtual void onNext(const T& item) = 0;

    // 发布者遇到错误并终止流时调用。
    virtual void onError(const std::exception_ptr& error) = 0;

    // 发布者完成数据流发送并终止流时调用。
    virtual void onComplete() = 0;
};

2.3 Subscription 接口

SubscriptionPublisherSubscriber 之间的桥梁。它是实现背压的关键。

  • request(n): Subscriber 调用此方法来请求 n 个数据项。
  • cancel(): Subscriber 调用此方法来取消订阅,表示不再需要数据。
// Subscription.h
#pragma once
#include <atomic> // For std::atomic_long

class Subscription {
public:
    virtual ~Subscription() = default;

    // 订阅者通过此方法请求N个数据项。N必须是正数。
    // 请求的数量是累积的。
    virtual void request(long n) = 0;

    // 订阅者通过此方法取消订阅。
    // 取消后,发布者不应再发送任何数据。
    virtual void cancel() = 0;
};

关于 request(long n) 的重要说明:

  • n 必须是正数。如果为零或负数,应视为无效请求,可能抛出异常或导致 onError
  • long 类型是为了支持非常大的请求数量,理论上可以请求 Long.MAX_VALUE
  • request(n) 是累积的。例如,先请求 10 个,再请求 5 个,总共是请求了 15 个。Publisher 应该跟踪这个累积值。

2.4 Processor 接口

Processor 既是 Subscriber 又是 Publisher。它接收上游数据,对其进行处理或转换,然后将结果发布到下游。

// Processor.h
#pragma once
#include "Publisher.h"
#include "Subscriber.h"

template <typename T, typename R>
class Processor : public Publisher<R>, public Subscriber<T> {
public:
    virtual ~Processor() = default;
};

至此,我们的核心接口已经定义完毕,它们构成了框架的基础骨架。

3. 实现基础设施:线程池与调度器

为了支持异步操作,我们需要一个线程池来执行任务。一个简单的固定大小线程池足以满足我们的需求。

3.1 ThreadPool 实现

// ThreadPool.h
#pragma once
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t threads);
    ~ThreadPool();

    // 提交任务到线程池
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

private:
    // 需要追踪的线程
    std::vector<std::thread> workers;
    // 任务队列
    std::queue<std::function<void()>> tasks;

    // 同步
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// 构造函数
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
    if (threads == 0) {
        throw std::invalid_argument("ThreadPool must have at least one thread.");
    }
    for (size_t i = 0; i < threads; ++i) {
        workers.emplace_back(
            [this] {
                for (;;) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            }
        );
    }
}

// 添加新任务到队列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // 不允许在停止的线程池中enqueue
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// 析构函数
inline ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker : workers)
        worker.join();
}

这个线程池允许我们提交任何可调用对象,并在池中的一个线程上异步执行它们。

4. 核心实现:Subscription 与 Backpressure 机制

背压的核心在于 Subscription 的具体实现。它需要管理订阅者的请求数量,并在数据可用时将数据发送给订阅者。

4.1 BaseSubscription 抽象基类

为了方便实现,我们定义一个 BaseSubscription,它包含了一些通用的状态管理,比如请求计数和取消状态。

// BaseSubscription.h
#pragma once
#include "Subscription.h"
#include <atomic>
#include <mutex> // For potential use in derived classes

class BaseSubscription : public Subscription {
public:
    BaseSubscription() : requested_items_(0), cancelled_(false) {}
    virtual ~BaseSubscription() = default;

    void request(long n) override {
        if (n <= 0) {
            // 根据Reactive Streams规范,无效请求应报错
            // 这里简化处理,实际中可能需要通过onError向下游传播错误
            // std::cerr << "Warning: request(n) must be positive. Received: " << n << std::endl;
            return;
        }
        requested_items_.fetch_add(n, std::memory_order_relaxed);
        // 通常,这里会触发Publisher开始或继续发送数据
        // 具体触发机制由派生类实现
        signal_producer_to_produce();
    }

    void cancel() override {
        cancelled_.store(true, std::memory_order_relaxed);
        // 释放资源或通知生产者停止
        on_cancel();
    }

    bool is_cancelled() const {
        return cancelled_.load(std::memory_order_relaxed);
    }

    long get_requested() const {
        return requested_items_.load(std::memory_order_relaxed);
    }

    // 尝试减少请求计数。如果请求计数足够,则减少并返回true。
    // 否则返回false。
    bool try_decrement_requested() {
        long current = requested_items_.load(std::memory_order_acquire);
        while (current > 0) {
            if (requested_items_.compare_exchange_weak(current, current - 1,
                                                         std::memory_order_release,
                                                         std::memory_order_relaxed)) {
                return true;
            }
            current = requested_items_.load(std::memory_order_acquire);
        }
        return false;
    }

protected:
    // 派生类需要实现这个方法,以通知生产者开始或继续发送数据
    virtual void signal_producer_to_produce() = 0;
    // 派生类需要实现这个方法,在取消时执行清理
    virtual void on_cancel() = 0;

private:
    std::atomic<long> requested_items_; // 累积的请求数量
    std::atomic<bool> cancelled_;       // 订阅是否已取消
};

BaseSubscription 提供了原子操作来管理 requested_items_cancelled_ 状态。signal_producer_to_produce() 是一个虚函数,因为不同类型的 Publisher 可能会有不同的触发数据生成的方式。

5. 实现 Publisher:从源头生成数据

让我们实现一个最简单的 Publisher,它生成一个整数序列。

5.1 RangePublisher

RangePublisher 会从 startend 生成整数。

// RangePublisher.h
#pragma once
#include "Publisher.h"
#include "Subscriber.h"
#include "BaseSubscription.h"
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>

template <typename T>
class RangePublisher : public Publisher<T> {
private:
    T start_;
    T end_;
    std::shared_ptr<Subscriber<T>> current_subscriber_; // 暂时只支持一个订阅者
    std::shared_ptr<BaseSubscription> current_subscription_;

public:
    RangePublisher(T start, T end) : start_(start), end_(end) {}

    void subscribe(std::shared_ptr<Subscriber<T>> subscriber) override {
        if (current_subscriber_) {
            // 规范要求Publisher只支持一次订阅,后续订阅应调用onError
            // 但这里简化处理,直接返回或抛出错误
            // 实际应用中可能需要更复杂的错误处理或支持多播
            subscriber->onError(std::make_exception_ptr(
                std::runtime_error("RangePublisher only supports one subscriber at a time.")));
            return;
        }
        current_subscriber_ = subscriber;
        // 创建一个RangeSubscription实例
        current_subscription_ = std::make_shared<RangeSubscription>(this, subscriber);
        subscriber->onSubscribe(current_subscription_);
    }

    // RangePublisher特有的方法,用于通知Subscription开始生产
    void start_producing(std::shared_ptr<Subscriber<T>> subscriber, std::shared_ptr<BaseSubscription> subscription) {
        // 在新线程中执行生产逻辑,避免阻塞 subscribe() 调用
        std::thread([this, subscriber, subscription]() {
            T current_value = start_;
            while (current_value <= end_ && !subscription->is_cancelled()) {
                // 等待有请求
                // 使用RangeSubscription的wait_for_request方法
                std::static_pointer_cast<RangeSubscription>(subscription)->wait_for_request();

                if (subscription->is_cancelled()) {
                    break;
                }

                // 尝试发送数据,并减少请求计数
                if (subscription->try_decrement_requested()) {
                    subscriber->onNext(current_value);
                    current_value++;
                } else {
                    // 如果 try_decrement_requested 返回 false,说明请求计数已为0
                    // 此时应该再次等待,而不是继续尝试发送
                    // continue; // 循环会再次进入wait_for_request
                }
            }

            if (!subscription->is_cancelled()) {
                if (current_value > end_) {
                    subscriber->onComplete();
                } else {
                    // 如果是因取消而退出循环,则不调用onComplete
                }
            }
            // 流结束,清理资源
            current_subscriber_.reset();
            current_subscription_.reset();
        }).detach(); // 使用detach简单化,实际生产中应管理线程生命周期
    }

private:
    // RangeSubscription 是 RangePublisher 内部的具体 Subscription 实现
    class RangeSubscription : public BaseSubscription {
    private:
        RangePublisher<T>* parent_publisher_;
        std::shared_ptr<Subscriber<T>> downstream_subscriber_;
        std::mutex mutex_;
        std::condition_variable cv_;

    public:
        RangeSubscription(RangePublisher<T>* publisher, std::shared_ptr<Subscriber<T>> subscriber)
            : parent_publisher_(publisher), downstream_subscriber_(subscriber) {}

        void signal_producer_to_produce() override {
            // 通知生产线程,有新的请求到来
            cv_.notify_one();
        }

        void on_cancel() override {
            // 当取消发生时,通知生产线程停止
            cv_.notify_all(); // 确保正在等待的线程被唤醒
            // 清理下游订阅者引用
            downstream_subscriber_.reset();
        }

        // 供生产线程调用,等待请求
        void wait_for_request() {
            std::unique_lock<std::mutex> lock(mutex_);
            cv_.wait(lock, [this]() {
                return get_requested() > 0 || is_cancelled();
            });
        }
    };
};

RangePublisher 内部定义了一个 RangeSubscription 类。当 Subscriber 调用 request(n) 时,RangeSubscriptionsignal_producer_to_produce() 方法会被调用,它会通过条件变量 cv_ 唤醒正在等待的生产线程。生产线程在每次发送数据前,都会调用 try_decrement_requested() 来检查是否有足够的请求。

6. 实现 Subscriber:消费数据流

接下来,我们实现一个简单的 Subscriber,它接收数据并打印出来。

6.1 PrintSubscriber

// PrintSubscriber.h
#pragma once
#include "Subscriber.h"
#include "Subscription.h"
#include <iostream>
#include <string>
#include <atomic>
#include <thread> // For sleep_for

template <typename T>
class PrintSubscriber : public Subscriber<T> {
private:
    std::shared_ptr<Subscription> subscription_;
    std::string name_;
    long request_batch_size_;
    std::atomic<long> received_count_;

public:
    PrintSubscriber(const std::string& name = "PrintSubscriber", long batch_size = 5)
        : name_(name), request_batch_size_(batch_size), received_count_(0) {}

    void onSubscribe(std::shared_ptr<Subscription> subscription) override {
        subscription_ = subscription;
        std::cout << name_ << ": Subscribed! Requesting " << request_batch_size_ << " items." << std::endl;
        subscription_->request(request_batch_size_); // 首次请求N个数据
    }

    void onNext(const T& item) override {
        if (subscription_ == nullptr) {
            // 应该在onSubscribe之后才收到onNext
            return;
        }
        received_count_++;
        std::cout << name_ << ": Received: " << item << std::endl;

        // 模拟处理耗时
        std::this_thread::sleep_for(std::chrono::milliseconds(100));

        // 每处理完一个批次,就再次请求
        if (received_count_.load() % request_batch_size_ == 0) {
            std::cout << name_ << ": Processed " << received_count_.load() << " items. Requesting another " << request_batch_size_ << " items." << std::endl;
            subscription_->request(request_batch_size_);
        }
    }

    void onError(const std::exception_ptr& error) override {
        try {
            if (error) {
                std::rethrow_exception(error);
            }
        } catch (const std::exception& e) {
            std::cerr << name_ << ": Error: " << e.what() << std::endl;
        }
        subscription_.reset(); // 释放Subscription
    }

    void onComplete() override {
        std::cout << name_ << ": Completed! Total received: " << received_count_.load() << std::endl;
        subscription_.reset(); // 释放Subscription
    }
};

PrintSubscriberonSubscribe 中首次请求数据,并在 onNext 中模拟处理耗时,然后每隔 request_batch_size_ 个数据再次请求,从而实现背压。

7. 实现 Processor:操作符(Operators)

操作符是反应式流框架的核心,它们允许我们转换、过滤或组合数据流。操作符本身就是 Processor,既是上游的 Subscriber 又是下游的 Publisher

7.1 MapOperator

MapOperator 将数据流中的每个元素通过一个函数进行转换。

// MapOperator.h
#pragma once
#include "Processor.h"
#include "BaseSubscription.h"
#include "ThreadPool.h"
#include <functional>
#include <queue>
#include <mutex>

template <typename T, typename R>
class MapOperator : public Processor<T, R> {
private:
    std::function<R(T)> mapper_func_;
    std::shared_ptr<Subscriber<R>> downstream_subscriber_;
    std::shared_ptr<Subscription> upstream_subscription_; // 对上游的订阅
    std::shared_ptr<MapSubscription> map_subscription_; // 给下游的订阅

    // 内部缓冲区,用于存储上游onNext的数据,待下游请求时再发送
    std::queue<R> buffer_;
    std::mutex buffer_mutex_;
    std::condition_variable buffer_cv_; // 用于通知有数据可发送

public:
    MapOperator(std::function<R(T)> mapper_func) : mapper_func_(std::move(mapper_func)) {}

    // ------------------- Subscriber<T> 接口实现 (接收上游数据) -------------------
    void onSubscribe(std::shared_ptr<Subscription> s) override {
        upstream_subscription_ = s;
        // 如果下游已经订阅了此MapOperator,则立即传递onSubscribe
        if (downstream_subscriber_) {
            map_subscription_ = std::make_shared<MapSubscription>(this, downstream_subscriber_);
            downstream_subscriber_->onSubscribe(map_subscription_);
        } else {
            // 如果下游还未订阅,MapOperator会等待下游订阅后再传递onSubscribe
            // 此时不立即请求数据,等待下游Subscriber的request()
        }
    }

    void onNext(const T& item) override {
        if (upstream_subscription_->is_cancelled()) return;

        try {
            R result = mapper_func_(item);
            {
                std::unique_lock<std::mutex> lock(buffer_mutex_);
                buffer_.push(result);
            }
            // 通知下游可能有数据可以发送了
            buffer_cv_.notify_one();
        } catch (...) {
            onError(std::current_exception());
            upstream_subscription_->cancel(); // 映射失败,取消上游
        }
    }

    void onError(const std::exception_ptr& error) override {
        if (downstream_subscriber_) {
            downstream_subscriber_->onError(error);
        }
        // 清理资源
        upstream_subscription_.reset();
        downstream_subscriber_.reset();
        map_subscription_.reset();
    }

    void onComplete() override {
        // 等待所有缓冲数据发送完毕
        {
            std::unique_lock<std::mutex> lock(buffer_mutex_);
            buffer_cv_.wait(lock, [this]{ return map_subscription_ == nullptr || buffer_.empty() || map_subscription_->is_cancelled(); });
        }

        if (downstream_subscriber_) {
            downstream_subscriber_->onComplete();
        }
        // 清理资源
        upstream_subscription_.reset();
        downstream_subscriber_.reset();
        map_subscription_.reset();
    }

    // ------------------- Publisher<R> 接口实现 (发布给下游) -------------------
    void subscribe(std::shared_ptr<Subscriber<R>> subscriber) override {
        if (downstream_subscriber_) {
            subscriber->onError(std::make_exception_ptr(
                std::runtime_error("MapOperator only supports one downstream subscriber.")));
            return;
        }
        downstream_subscriber_ = subscriber;
        // 如果上游已经onSubscribe了,则立即传递给下游
        if (upstream_subscription_) {
            map_subscription_ = std::make_shared<MapSubscription>(this, downstream_subscriber_);
            downstream_subscriber_->onSubscribe(map_subscription_);
        }
        // 否则,等待上游onSubscribe
    }

private:
    // MapSubscription 是 MapOperator 内部的 Subscription 实现
    class MapSubscription : public BaseSubscription {
    private:
        MapOperator<T, R>* parent_operator_;
        std::shared_ptr<Subscriber<R>> downstream_subscriber_;
        std::mutex production_mutex_; // 保护发送数据逻辑
        std::thread production_thread_; // 专门用于处理数据发送的线程
        std::atomic<bool> producer_active_;

    public:
        MapSubscription(MapOperator<T, R>* op, std::shared_ptr<Subscriber<R>> subscriber)
            : parent_operator_(op), downstream_subscriber_(subscriber), producer_active_(true) {
            // 启动一个独立的线程来处理数据发送,以避免阻塞下游的request()调用
            production_thread_ = std::thread(&MapSubscription::produce_items, this);
        }

        ~MapSubscription() {
            producer_active_.store(false);
            signal_producer_to_produce(); // 唤醒生产线程
            if (production_thread_.joinable()) {
                production_thread_.join();
            }
        }

        void signal_producer_to_produce() override {
            // 通知生产线程,有新的请求或新的数据
            parent_operator_->buffer_cv_.notify_one();
        }

        void on_cancel() override {
            producer_active_.store(false);
            parent_operator_->upstream_subscription_->cancel(); // 取消上游订阅
            parent_operator_->buffer_cv_.notify_all(); // 唤醒等待的onComplete
            downstream_subscriber_.reset();
        }

    private:
        void produce_items() {
            while (producer_active_.load() && !is_cancelled()) {
                std::unique_lock<std::mutex> lock(parent_operator_->buffer_mutex_);
                parent_operator_->buffer_cv_.wait(lock, [this] {
                    return get_requested() > 0 && !parent_operator_->buffer_.empty() || is_cancelled() || !producer_active_.load();
                });

                if (is_cancelled() || !producer_active_.load()) {
                    break;
                }

                if (get_requested() > 0 && !parent_operator_->buffer_.empty()) {
                    // 尝试发送数据
                    if (try_decrement_requested()) {
                        R item = parent_operator_->buffer_.front();
                        parent_operator_->buffer_.pop();
                        lock.unlock(); // 立即释放锁,允许onNext并行处理

                        downstream_subscriber_->onNext(item);
                    }
                }
            }
        }
    };
};

MapOperator 的实现相对复杂一些,因为它需要协调上游 onNext、下游 request 和自身的处理逻辑。它使用一个内部 buffer_ 来暂存上游发来的数据。当下游调用 request(n) 时,MapSubscription 会在一个独立的 production_thread_ 中从 buffer_ 中取出数据并发送给下游。这种分离可以更好地实现异步和背压。

7.2 FilterOperator

FilterOperator 根据一个谓词函数过滤数据流中的元素。

// FilterOperator.h
#pragma once
#include "Processor.h"
#include "BaseSubscription.h"
#include <functional>
#include <queue>
#include <mutex>
#include <thread>

template <typename T>
class FilterOperator : public Processor<T, T> {
private:
    std::function<bool(T)> predicate_func_;
    std::shared_ptr<Subscriber<T>> downstream_subscriber_;
    std::shared_ptr<Subscription> upstream_subscription_;
    std::shared_ptr<FilterSubscription> filter_subscription_;

    std::queue<T> buffer_; // 内部缓冲区
    std::mutex buffer_mutex_;
    std::condition_variable buffer_cv_;

public:
    FilterOperator(std::function<bool(T)> predicate_func) : predicate_func_(std::move(predicate_func)) {}

    // ------------------- Subscriber<T> 接口实现 (接收上游数据) -------------------
    void onSubscribe(std::shared_ptr<Subscription> s) override {
        upstream_subscription_ = s;
        if (downstream_subscriber_) {
            filter_subscription_ = std::make_shared<FilterSubscription>(this, downstream_subscriber_);
            downstream_subscriber_->onSubscribe(filter_subscription_);
            // 立即向上游请求一批数据,因为过滤器可能会丢弃大部分数据
            upstream_subscription_->request(filter_subscription_->get_request_batch_size() * 2); // 预请求更多
        }
    }

    void onNext(const T& item) override {
        if (upstream_subscription_->is_cancelled()) return;

        try {
            if (predicate_func_(item)) {
                std::unique_lock<std::mutex> lock(buffer_mutex_);
                buffer_.push(item);
                buffer_cv_.notify_one(); // 通知下游有数据
            } else {
                // 如果数据被过滤,那么这个“请求”就被消耗了,需要向上游再请求一个
                // 确保下游请求的数量能被满足
                if (filter_subscription_ && filter_subscription_->get_requested() > 0) {
                     upstream_subscription_->request(1);
                }
            }
        } catch (...) {
            onError(std::current_exception());
            upstream_subscription_->cancel();
        }
    }

    void onError(const std::exception_ptr& error) override {
        if (downstream_subscriber_) {
            downstream_subscriber_->onError(error);
        }
        upstream_subscription_.reset();
        downstream_subscriber_.reset();
        filter_subscription_.reset();
    }

    void onComplete() override {
        // 等待所有缓冲数据发送完毕
        {
            std::unique_lock<std::mutex> lock(buffer_mutex_);
            buffer_cv_.wait(lock, [this]{ return filter_subscription_ == nullptr || buffer_.empty() || filter_subscription_->is_cancelled(); });
        }

        if (downstream_subscriber_) {
            downstream_subscriber_->onComplete();
        }
        upstream_subscription_.reset();
        downstream_subscriber_.reset();
        filter_subscription_.reset();
    }

    // ------------------- Publisher<T> 接口实现 (发布给下游) -------------------
    void subscribe(std::shared_ptr<Subscriber<T>> subscriber) override {
        if (downstream_subscriber_) {
            subscriber->onError(std::make_exception_ptr(
                std::runtime_error("FilterOperator only supports one downstream subscriber.")));
            return;
        }
        downstream_subscriber_ = subscriber;
        if (upstream_subscription_) {
            filter_subscription_ = std::make_shared<FilterSubscription>(this, downstream_subscriber_);
            downstream_subscriber_->onSubscribe(filter_subscription_);
            // 下游订阅后,立即向上游请求数据
            upstream_subscription_->request(filter_subscription_->get_request_batch_size() * 2); // 预请求
        }
    }

private:
    class FilterSubscription : public BaseSubscription {
    private:
        FilterOperator<T>* parent_operator_;
        std::shared_ptr<Subscriber<T>> downstream_subscriber_;
        std::thread production_thread_;
        std::atomic<bool> producer_active_;
        long initial_request_batch_size_ = 5; // 用于判断在onNext中是否需要额外请求

    public:
        FilterSubscription(FilterOperator<T>* op, std::shared_ptr<Subscriber<T>> subscriber)
            : parent_operator_(op), downstream_subscriber_(subscriber), producer_active_(true) {
            production_thread_ = std::thread(&FilterSubscription::produce_items, this);
        }

        ~FilterSubscription() {
            producer_active_.store(false);
            signal_producer_to_produce();
            if (production_thread_.joinable()) {
                production_thread_.join();
            }
        }

        void signal_producer_to_produce() override {
            parent_operator_->buffer_cv_.notify_one();
        }

        void on_cancel() override {
            producer_active_.store(false);
            parent_operator_->upstream_subscription_->cancel();
            parent_operator_->buffer_cv_.notify_all();
            downstream_subscriber_.reset();
        }

        long get_request_batch_size() const { return initial_request_batch_size_; }

    private:
        void produce_items() {
            while (producer_active_.load() && !is_cancelled()) {
                std::unique_lock<std::mutex> lock(parent_operator_->buffer_mutex_);
                parent_operator_->buffer_cv_.wait(lock, [this] {
                    return get_requested() > 0 && !parent_operator_->buffer_.empty() || is_cancelled() || !producer_active_.load();
                });

                if (is_cancelled() || !producer_active_.load()) {
                    break;
                }

                if (get_requested() > 0 && !parent_operator_->buffer_.empty()) {
                    if (try_decrement_requested()) {
                        T item = parent_operator_->buffer_.front();
                        parent_operator_->buffer_.pop();
                        lock.unlock();

                        downstream_subscriber_->onNext(item);
                        // 在FilterOperator中,如果一个元素被过滤,它不会被发送到下游,
                        // 但它消耗了上游的一个“潜在”请求。
                        // 为了确保下游请求的N个元素最终能收到N个,
                        // 当下游请求的元素被发出后,需要向上游请求更多元素来填充“空缺”。
                        // 这里简化处理:每发送一个,就向上游请求一个,以维持上游的发送速度。
                        // 实际需要更精细的策略,例如批量请求或基于比率的请求。
                        parent_operator_->upstream_subscription_->request(1);
                    }
                }
            }
        }
    };
};

FilterOperatorMapOperator 类似,也使用内部缓冲区和独立线程来处理数据发送。特别之处在于,当一个元素被过滤掉时,它不会被发送给下游,但这个“请求”实际上被消耗了(因为上游发了一个)。为了保持背压平衡,FilterOperator 会在发送一个元素后,立即向上游请求一个新元素,以补充因过滤而产生的空缺。

7.3 ObserveOnOperator (调度器)

ObserveOnOperator 是实现异步流处理的关键。它将下游 SubscriberonNext, onError, onComplete 事件调度到指定的 ThreadPool 中执行。

// ObserveOnOperator.h
#pragma once
#include "Processor.h"
#include "BaseSubscription.h"
#include "ThreadPool.h"
#include <queue>
#include <mutex>

template <typename T>
class ObserveOnOperator : public Processor<T, T> {
private:
    std::shared_ptr<ThreadPool> thread_pool_;
    std::shared_ptr<Subscriber<T>> downstream_subscriber_;
    std::shared_ptr<Subscription> upstream_subscription_;
    std::shared_ptr<ObserveOnSubscription> observe_on_subscription_;

    // 内部队列用于存储待处理的事件
    std::queue<std::function<void()>> event_queue_;
    std::mutex queue_mutex_;
    std::condition_variable queue_cv_;
    std::atomic<bool> active_; // 用于控制调度线程的生命周期

public:
    ObserveOnOperator(std::shared_ptr<ThreadPool> pool) : thread_pool_(std::move(pool)), active_(true) {}
    ~ObserveOnOperator() {
        active_.store(false);
        queue_cv_.notify_all(); // 唤醒等待中的调度线程
    }

    // ------------------- Subscriber<T> 接口实现 (接收上游数据) -------------------
    void onSubscribe(std::shared_ptr<Subscription> s) override {
        upstream_subscription_ = s;
        if (downstream_subscriber_) {
            observe_on_subscription_ = std::make_shared<ObserveOnSubscription>(this, downstream_subscriber_);
            downstream_subscriber_->onSubscribe(observe_on_subscription_);
        }
    }

    void onNext(const T& item) override {
        if (!active_.load() || upstream_subscription_->is_cancelled()) return;
        enqueue_event([this, item]() {
            if (!observe_on_subscription_->is_cancelled()) {
                downstream_subscriber_->onNext(item);
                // 每处理一个onNext,就向上游请求一个。
                // 确保观察线程处理速度和上游请求速度一致。
                upstream_subscription_->request(1);
            }
        });
    }

    void onError(const std::exception_ptr& error) override {
        if (!active_.load()) return;
        enqueue_event([this, error]() {
            if (!observe_on_subscription_->is_cancelled()) {
                downstream_subscriber_->onError(error);
            }
            cleanup();
        });
    }

    void onComplete() override {
        if (!active_.load()) return;
        enqueue_event([this]() {
            if (!observe_on_subscription_->is_cancelled()) {
                downstream_subscriber_->onComplete();
            }
            cleanup();
        });
    }

    // ------------------- Publisher<T> 接口实现 (发布给下游) -------------------
    void subscribe(std::shared_ptr<Subscriber<T>> subscriber) override {
        if (downstream_subscriber_) {
            subscriber->onError(std::make_exception_ptr(
                std::runtime_error("ObserveOnOperator only supports one downstream subscriber.")));
            return;
        }
        downstream_subscriber_ = subscriber;
        if (upstream_subscription_) {
            observe_on_subscription_ = std::make_shared<ObserveOnSubscription>(this, downstream_subscriber_);
            downstream_subscriber_->onSubscribe(observe_on_subscription_);
        }
    }

private:
    void enqueue_event(std::function<void()> event_task) {
        if (!active_.load()) return;
        thread_pool_->enqueue([this, event_task = std::move(event_task)]() mutable {
            // 在线程池线程中执行事件,并处理背压逻辑
            std::unique_lock<std::mutex> lock(queue_mutex_);
            queue_cv_.wait(lock, [this] {
                return !active_.load() || observe_on_subscription_->get_requested() > 0 || observe_on_subscription_->is_cancelled();
            });

            if (!active_.load() || observe_on_subscription_->is_cancelled()) {
                return;
            }

            if (observe_on_subscription_->try_decrement_requested()) {
                lock.unlock(); // 释放锁,允许事件执行
                event_task();
            } else {
                // 如果请求计数为0,则重新入队或等待
                // 这里简化处理,直接让当前任务等待,直到有请求
                // 实际生产中可能需要更复杂的任务调度或缓冲机制
                // 暂时这里不会发生,因为我们会在onNext中立即请求下一个
            }
        });
        // 每次有事件进入队列,都会通知一下调度器,但调度器会等待请求
        queue_cv_.notify_one();
    }

    void cleanup() {
        active_.store(false);
        upstream_subscription_.reset();
        downstream_subscriber_.reset();
        observe_on_subscription_.reset();
        queue_cv_.notify_all();
    }

private:
    class ObserveOnSubscription : public BaseSubscription {
    private:
        ObserveOnOperator<T>* parent_operator_;
        std::shared_ptr<Subscriber<T>> downstream_subscriber_;

    public:
        ObserveOnSubscription(ObserveOnOperator<T>* op, std::shared_ptr<Subscriber<T>> subscriber)
            : parent_operator_(op), downstream_subscriber_(subscriber) {}

        void signal_producer_to_produce() override {
            // 当下游请求数据时,通知ObserveOnOperator的调度器
            parent_operator_->queue_cv_.notify_one();
        }

        void on_cancel() override {
            // 取消上游,并通知调度器停止
            parent_operator_->upstream_subscription_->cancel();
            parent_operator_->active_.store(false);
            parent_operator_->queue_cv_.notify_all();
            downstream_subscriber_.reset();
        }
    };
};

ObserveOnOperator 是异步的关键。当上游调用 onNext 时,ObserveOnOperator 会将实际的 downstream_subscriber_->onNext(item) 调用封装成一个任务,并提交到其内部的 ThreadPool 中。这个任务在执行前会检查 ObserveOnSubscriptionrequested_items_。只有当有请求时,任务才会被执行,并且 requested_items_ 减一。一旦 onNext 被处理,它会立即向上游 upstream_subscription_ 请求一个新数据项,以维持流的活性。

8. 整合与示例

现在我们有了 PublisherSubscriberProcessor,可以构建一个完整的反应式流管道了。

8.1 框架文件结构

.
├── BaseSubscription.h
├── Processor.h
├── Publisher.h
├── Subscriber.h
├── Subscription.h
├── ThreadPool.h
├── RangePublisher.h
├── PrintSubscriber.h
├── MapOperator.h
├── FilterOperator.h
└── ObserveOnOperator.h

8.2 完整的示例代码 main.cpp

#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <thread>
#include <chrono>

// Include all necessary headers
#include "ThreadPool.h"
#include "Publisher.h"
#include "Subscriber.h"
#include "Subscription.h"
#include "Processor.h"
#include "BaseSubscription.h"

// Concrete implementations
#include "RangePublisher.h"
#include "PrintSubscriber.h"
#include "MapOperator.h"
#include "FilterOperator.h"
#include "ObserveOnOperator.h"

int main() {
    std::cout << "Starting Reactive Stream Example with Backpressure and Asynchronicity!" << std::endl;

    // 1. 创建线程池用于异步调度
    std::shared_ptr<ThreadPool> thread_pool = std::make_shared<ThreadPool>(4); // 4个工作线程

    // 2. 创建发布者: 从1生成到10的整数序列
    std::shared_ptr<RangePublisher<int>> publisher = std::make_shared<RangePublisher<int>>(1, 20);

    // 3. 创建操作符链
    // Map: 将每个整数乘以2
    std::shared_ptr<MapOperator<int, int>> map_operator =
        std::make_shared<MapOperator<int, int>>([](int x) {
            std::cout << "  Map: " << x << " -> " << x * 2 << " (thread: " << std::this_thread::get_id() << ")" << std::endl;
            // 模拟耗时操作
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
            return x * 2;
        });

    // Filter: 过滤掉偶数 (只保留奇数)
    std::shared_ptr<FilterOperator<int>> filter_operator =
        std::make_shared<FilterOperator<int>>([](int x) {
            bool keep = (x % 2 != 0);
            std::cout << "    Filter: " << x << (keep ? " -> Kept" : " -> Dropped") << " (thread: " << std::this_thread::get_id() << ")" << std::endl;
            return keep;
        });

    // ObserveOn: 将后续操作调度到线程池中执行
    std::shared_ptr<ObserveOnOperator<int>> observe_on_operator =
        std::make_shared<ObserveOnOperator<int>>(thread_pool);

    // 4. 创建订阅者: 打印接收到的数据,并模拟慢速消费者
    std::shared_ptr<PrintSubscriber<int>> subscriber =
        std::make_shared<PrintSubscriber<int>>("MyPrintSubscriber", 3); // 每次请求3个数据

    // 5. 组装流管道
    // publisher -> map_operator -> filter_operator -> observe_on_operator -> subscriber
    publisher->subscribe(map_operator);             // RangePublisher 订阅 MapOperator
    map_operator->subscribe(filter_operator);       // MapOperator 订阅 FilterOperator
    filter_operator->subscribe(observe_on_operator); // FilterOperator 订阅 ObserveOnOperator
    observe_on_operator->subscribe(subscriber);     // ObserveOnOperator 订阅 PrintSubscriber

    std::cout << "nStream pipeline assembled. Waiting for completion..." << std::endl;

    // 让主线程等待一段时间,确保异步流有足够时间运行
    // 实际应用中,你可能需要更复杂的同步机制(如Future/Promise或屏障)来等待流的完成
    std::this_thread::sleep_for(std::chrono::seconds(10));

    std::cout << "nMain thread finished waiting. Stream should be completing or cancelled." << std::endl;

    // 线程池会在析构时join所有线程,所以这里不需要手动停止。
    // 如果流没有正常完成,可以在这里手动取消订阅
    // if (subscriber->get_subscription()) {
    //     std::cout << "Cancelling subscription from main thread." << std::endl;
    //     subscriber->get_subscription()->cancel();
    // }

    return 0;
}

编译命令示例 (假设所有 .h 文件都在当前目录,并且使用 C++17 或更高版本):

g++ -std=c++17 -O2 -Wall -pthread main.cpp -o reactive_stream_demo
./reactive_stream_demo

运行结果会清晰地展示背压和异步性:

  • PrintSubscriber 会先请求3个数据。
  • RangePublisher 收到请求后开始生产。
  • 数据流经 MapOperatorFilterOperator。注意 MapFilter 操作可能在不同的线程中执行(如果它们内部也使用了线程,这里简化为同步执行,但 ObserveOn 确保了 PrintSubscriberonNext 在线程池中)。
  • ObserveOnOperatoronNext 调度到线程池中的某个线程。
  • PrintSubscriber 收到并处理数据,模拟耗时,处理完3个后再次请求3个。
  • 整个过程生产者会根据消费者的速度自动调整。

9. 进阶考量与未来展望

我们构建的框架虽然功能完整,但仍有许多可以优化和扩展的地方:

  1. 多播(Multicast)与单播(Unicast): 当前的 PublisherProcessor 默认是单播的(只支持一个 Subscriber)。在真实世界中,一个 Publisher 可能需要被多个 Subscriber 订阅(例如,一个事件源)。这需要引入 ConnectablePublisherSubject 的概念。
  2. 错误处理策略: 当前的 onError 只是简单地终止流。更健壮的框架可能需要错误重试、错误恢复(例如,跳过错误项)等策略。
  3. 调度器(Scheduler)抽象: ObserveOnOperator 直接依赖于 ThreadPool。可以进一步抽象出 Scheduler 接口,允许用户指定不同的调度策略(例如,单线程调度、I/O线程调度、计算线程调度)。
  4. 更多操作符: 实现更多 Rx 风格的操作符,如 zip, merge, flatMap, buffer, throttle 等。其中 flatMap 是最复杂的,因为它涉及到将多个内部流扁平化,并正确管理它们的背压。
  5. 资源管理: 确保在流终止(onCompleteonError)或取消(cancel)时,所有中间资源(如线程、缓冲区)都能被正确释放。std::shared_ptr 提供了一定的帮助,但复杂场景仍需小心。
  6. 测试: 严格的单元测试和集成测试对于确保背压机制的正确性和并发安全性至关重要。
  7. 性能优化: 针对高吞吐量场景,可能需要更高效的无锁队列、更轻量级的同步原语或零拷贝技术。
  8. 冷流(Cold Stream)与热流(Hot Stream): 当前的 RangePublisher 是冷流(每次订阅都会重新开始生产)。热流(例如,鼠标点击事件)则无论何时订阅,都从当前时间点开始接收事件。

10. 总结

通过本次讲座,我们深入探讨了反应式流和背压的核心概念,并亲手使用 C++ 实现了一个支持异步和背压的轻量级框架。我们定义了 PublisherSubscriberSubscriptionProcessor 四大核心接口,并构建了 ThreadPool 作为异步执行的基础。随后,我们实现了 RangePublisher 作为数据源,PrintSubscriber 作为数据消费者,以及 MapOperatorFilterOperatorObserveOnOperator 作为流转换和调度工具。

这个框架展示了如何通过 request(n) 机制实现生产者与消费者之间的流量控制,有效防止系统过载。同时,ObserveOnOperator 的引入使得数据处理可以跨线程异步进行,极大地提升了系统的响应性和吞吐量。虽然这是一个简化版,但它包含了反应式流框架的所有基本要素,为构建更复杂、更健壮的异步数据处理系统奠定了坚实的基础。希望这次实践能让大家对 C++ 在现代并发编程中的应用有更深刻的理解。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注