C++ 基于并发的命令模式:解耦请求发送者与接收者

好的,我们今天来聊聊C++里如何用并发的方式玩转命令模式,让请求发送者和接收者彻底解耦,就像男女朋友分手后,各自美丽,互不打扰。

什么是命令模式?

想象一下,你坐在餐厅里,拿着菜单(Command),对着服务员(Invoker)说:“我要一份宫保鸡丁(Concrete Command)!” 服务员收到菜单后,通知厨房(Receiver):“做一份宫保鸡丁!”

在这个例子里:

  • Command (命令接口): 菜单,定义了要做什么。
  • Concrete Command (具体命令): 宫保鸡丁,具体要做的菜。
  • Invoker (调用者): 服务员,负责接收命令,并传递给接收者。
  • Receiver (接收者): 厨房,真正执行命令的人。

命令模式的核心思想就是将请求封装成一个对象,从而允许你使用不同的请求、队列请求或日志请求来参数化客户端。它也支持可撤销的操作。

为什么要并发?

在单线程的世界里,服务员必须等厨房做好宫保鸡丁才能服务下一位顾客。这效率太低了! 如果厨房可以并行处理多个订单,服务员也可以同时服务多个顾客,那效率就蹭蹭往上涨。

在软件开发中,很多操作可能比较耗时,比如数据库查询、网络请求、图像处理等等。如果这些操作都放在主线程执行,会导致界面卡顿,用户体验极差。 使用并发技术,可以将这些耗时操作放在后台线程执行,主线程继续响应用户操作,从而提高程序的响应速度和流畅性。

C++并发利器:线程和任务

C++11引入了 std::threadstd::future,让我们更容易地编写并发程序。

  • std::thread: 代表一个独立的执行线程。
  • std::future: 代表一个异步操作的结果。 你可以把它理解为“未来的结果”,当异步操作完成时,你就可以从 std::future 对象中获取结果。

并发命令模式的实现

我们来模拟一个图像处理的例子,用户点击“锐化”按钮后,程序在后台线程对图像进行锐化处理。

#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <algorithm>

// 1. Command Interface
class Command {
public:
    virtual void execute() = 0;
    virtual ~Command() = default;
};

// 2. Receiver (图像处理器)
class ImageProcessor {
public:
    ImageProcessor(std::string filename) : filename_(filename) {}

    void sharpen() {
        std::cout << "线程 " << std::this_thread::get_id() << ": 对图像 " << filename_ << " 进行锐化处理...n";
        // 模拟耗时操作
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "线程 " << std::this_thread::get_id() << ": 图像 " << filename_ << " 锐化完成!n";
    }

private:
    std::string filename_;
};

// 3. Concrete Command (锐化命令)
class SharpenCommand : public Command {
public:
    SharpenCommand(ImageProcessor* processor) : processor_(processor) {}

    void execute() override {
        processor_->sharpen();
    }

private:
    ImageProcessor* processor_;
};

// 4. Invoker (命令队列)
class CommandQueue {
public:
    void addCommand(Command* command) {
        commands_.push_back(command);
    }

    void run() {
        //使用线程池来执行命令
        std::vector<std::future<void>> futures;
        for (Command* command : commands_) {
            futures.push_back(std::async(std::launch::async, [command](){ command->execute(); }));
        }

        //等待所有任务完成
        for (auto& future : futures) {
            future.get(); //这会阻塞,直到future准备好。
        }

        // Cleanup commands
        for (Command* command : commands_) {
            delete command;
        }
        commands_.clear();
    }

private:
    std::vector<Command*> commands_;
};

int main() {
    ImageProcessor image1("image1.jpg");
    ImageProcessor image2("image2.png");

    SharpenCommand* sharpen1 = new SharpenCommand(&image1);
    SharpenCommand* sharpen2 = new SharpenCommand(&image2);

    CommandQueue queue;
    queue.addCommand(sharpen1);
    queue.addCommand(sharpen2);

    std::cout << "主线程: 添加锐化命令到队列...n";

    queue.run();

    std::cout << "主线程: 所有锐化命令执行完毕!n";

    return 0;
}

代码解释:

  1. Command 接口: 定义了 execute() 方法,所有命令都要实现这个方法。
  2. ImageProcessor 类: 是接收者,负责真正执行图像锐化操作。
  3. SharpenCommand 类: 是具体命令,封装了对 ImageProcessorsharpen() 方法的调用。
  4. CommandQueue 类: 是调用者,它维护一个命令队列,并将命令分发到后台线程执行。
    • addCommand() 方法:将命令添加到队列中。
    • run() 方法: 遍历命令队列,使用 std::async 将每个命令的 execute() 方法提交到后台线程执行。 std::launch::async 强制在独立的线程中执行任务。
    • std::future 用于获取异步操作的结果,future.get() 会阻塞当前线程,直到异步操作完成。
    • 清理命令: 释放commands_中的Command对象,防止内存泄漏。

运行结果:

你将会看到类似这样的输出:

主线程: 添加锐化命令到队列...
主线程: 所有锐化命令执行完毕!
线程 140735641436928: 对图像 image1.jpg 进行锐化处理...
线程 140735633043200: 对图像 image2.png 进行锐化处理...
线程 140735641436928: 图像 image1.jpg 锐化完成!
线程 140735633043200: 图像 image2.png 锐化完成!

注意线程ID可能不同,但关键在于两个图像处理操作是并发执行的。

优点:

  • 解耦: 请求发送者(CommandQueue)和接收者(ImageProcessor)完全解耦。 发送者不需要知道接收者是谁,也不需要知道接收者如何执行操作。
  • 并发性: 通过使用 std::async,可以将命令分发到后台线程执行,提高程序的响应速度和流畅性。
  • 可扩展性: 可以很容易地添加新的命令,而不需要修改现有的代码。
  • 灵活性: 可以将命令排队、记录日志、撤销等等。

更进一步:线程池优化

上面的例子中,每次调用 std::async 都会创建一个新的线程。 如果命令很多,频繁创建和销毁线程会带来性能开销。 可以使用线程池来优化性能。

#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>

class ThreadPool {
public:
    ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
        threads_.reserve(numThreads_);
        for (size_t i = 0; i < numThreads_; ++i) {
            threads_.emplace_back([this]() {
                while (true) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(queueMutex_);
                        condition_.wait(lock, [this]() { return stop_ || !tasks_.empty(); });
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }

                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& thread : threads_) {
            thread.join();
        }
    }

    template<typename F>
    std::future<typename std::result_of<F()>::type> enqueue(F&& task) {
        using return_type = typename std::result_of<F()>::type;

        auto packaged_task = std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(task));

        std::future<return_type> res = packaged_task->get_future();
        {
            std::unique_lock<std::mutex> lock(queueMutex_);
            tasks_.emplace([packaged_task]() { (*packaged_task)(); });
        }
        condition_.notify_one();
        return res;
    }

private:
    size_t numThreads_;
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queueMutex_;
    std::condition_variable condition_;
    bool stop_;
};

// 1. Command Interface
class Command {
public:
    virtual void execute() = 0;
    virtual ~Command() = default;
};

// 2. Receiver (图像处理器)
class ImageProcessor {
public:
    ImageProcessor(std::string filename) : filename_(filename) {}

    void sharpen() {
        std::cout << "线程 " << std::this_thread::get_id() << ": 对图像 " << filename_ << " 进行锐化处理...n";
        // 模拟耗时操作
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "线程 " << std::this_thread::get_id() << ": 图像 " << filename_ << " 锐化完成!n";
    }

private:
    std::string filename_;
};

// 3. Concrete Command (锐化命令)
class SharpenCommand : public Command {
public:
    SharpenCommand(ImageProcessor* processor) : processor_(processor) {}

    void execute() override {
        processor_->sharpen();
    }

private:
    ImageProcessor* processor_;
};

// 4. Invoker (命令队列)
class CommandQueue {
public:
    CommandQueue(ThreadPool& pool) : pool_(pool) {}

    void addCommand(Command* command) {
        commands_.push_back(command);
    }

    void run() {
        std::vector<std::future<void>> futures;
        for (Command* command : commands_) {
             futures.push_back(pool_.enqueue([command]() { command->execute(); }));
        }

        //等待所有任务完成
        for (auto& future : futures) {
            future.get();
        }

        // Cleanup commands
        for (Command* command : commands_) {
            delete command;
        }
        commands_.clear();
    }

private:
    std::vector<Command*> commands_;
    ThreadPool& pool_;
};

int main() {
    ThreadPool pool(4); // 创建一个包含 4 个线程的线程池

    ImageProcessor image1("image1.jpg");
    ImageProcessor image2("image2.png");
    ImageProcessor image3("image3.bmp");
    ImageProcessor image4("image4.gif");
    ImageProcessor image5("image5.jpeg");

    SharpenCommand* sharpen1 = new SharpenCommand(&image1);
    SharpenCommand* sharpen2 = new SharpenCommand(&image2);
    SharpenCommand* sharpen3 = new SharpenCommand(&image3);
    SharpenCommand* sharpen4 = new SharpenCommand(&image4);
    SharpenCommand* sharpen5 = new SharpenCommand(&image5);

    CommandQueue queue(pool);
    queue.addCommand(sharpen1);
    queue.addCommand(sharpen2);
    queue.addCommand(sharpen3);
    queue.addCommand(sharpen4);
    queue.addCommand(sharpen5);

    std::cout << "主线程: 添加锐化命令到队列...n";

    queue.run();

    std::cout << "主线程: 所有锐化命令执行完毕!n";

    return 0;
}

代码解释:

  1. ThreadPool 类:
    • 构造函数: 创建指定数量的线程,每个线程都在循环中等待任务。
    • enqueue() 方法: 将任务添加到任务队列中,并通知一个等待的线程。 使用 std::packaged_task 将函数包装起来,以便获取返回值。
    • 析构函数: 设置 stop_ 标志,通知所有线程退出循环,并等待所有线程结束。
    • 使用 std::mutexstd::condition_variable 实现线程同步和互斥。
  2. CommandQueue 类: 修改 CommandQueue 的构造函数,使其接收一个 ThreadPool 对象。 使用 pool_.enqueue() 将命令提交到线程池执行。

表格总结:

特性 简单并发命令模式 (使用 std::async) 线程池并发命令模式 (使用 ThreadPool)
线程创建销毁 每次执行命令都创建和销毁线程 线程池预先创建,重复利用线程
性能 线程开销大,不适合大量并发任务 线程开销小,适合大量并发任务
资源管理 资源管理简单 资源管理复杂,需要考虑线程同步和互斥
适用场景 并发任务数量较少,对响应时间要求不高 并发任务数量较多,对响应时间要求较高

注意事项:

  • 线程安全: 在并发环境下,需要特别注意线程安全问题。 例如,多个线程同时访问共享资源时,需要使用互斥锁(std::mutex)来保护共享资源。
  • 死锁: 要避免死锁的发生。 死锁是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行。
  • 异常处理: 在多线程环境下,异常处理更加复杂。 需要在每个线程中捕获异常,并进行适当的处理。
  • 资源竞争: 需要避免资源竞争。 多个线程同时访问同一资源时,可能会导致数据不一致或其他问题。 可以使用互斥锁、原子操作等技术来避免资源竞争。

总结:

并发命令模式是一种强大的设计模式,可以有效地解耦请求发送者和接收者,并提高程序的并发性和响应速度。 通过使用 C++11 提供的并发特性,我们可以很容易地实现并发命令模式。 但需要注意线程安全、死锁、异常处理和资源竞争等问题。 选择合适的并发模型(如使用 std::async 还是线程池)取决于具体的应用场景和性能需求。

希望今天的讲解能够帮助你更好地理解和应用并发命令模式!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注