好的,我们今天来聊聊C++里如何用并发的方式玩转命令模式,让请求发送者和接收者彻底解耦,就像男女朋友分手后,各自美丽,互不打扰。
什么是命令模式?
想象一下,你坐在餐厅里,拿着菜单(Command),对着服务员(Invoker)说:“我要一份宫保鸡丁(Concrete Command)!” 服务员收到菜单后,通知厨房(Receiver):“做一份宫保鸡丁!”
在这个例子里:
- Command (命令接口): 菜单,定义了要做什么。
- Concrete Command (具体命令): 宫保鸡丁,具体要做的菜。
- Invoker (调用者): 服务员,负责接收命令,并传递给接收者。
- Receiver (接收者): 厨房,真正执行命令的人。
命令模式的核心思想就是将请求封装成一个对象,从而允许你使用不同的请求、队列请求或日志请求来参数化客户端。它也支持可撤销的操作。
为什么要并发?
在单线程的世界里,服务员必须等厨房做好宫保鸡丁才能服务下一位顾客。这效率太低了! 如果厨房可以并行处理多个订单,服务员也可以同时服务多个顾客,那效率就蹭蹭往上涨。
在软件开发中,很多操作可能比较耗时,比如数据库查询、网络请求、图像处理等等。如果这些操作都放在主线程执行,会导致界面卡顿,用户体验极差。 使用并发技术,可以将这些耗时操作放在后台线程执行,主线程继续响应用户操作,从而提高程序的响应速度和流畅性。
C++并发利器:线程和任务
C++11引入了 std::thread
和 std::future
,让我们更容易地编写并发程序。
std::thread
: 代表一个独立的执行线程。std::future
: 代表一个异步操作的结果。 你可以把它理解为“未来的结果”,当异步操作完成时,你就可以从std::future
对象中获取结果。
并发命令模式的实现
我们来模拟一个图像处理的例子,用户点击“锐化”按钮后,程序在后台线程对图像进行锐化处理。
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <algorithm>
// 1. Command Interface
class Command {
public:
virtual void execute() = 0;
virtual ~Command() = default;
};
// 2. Receiver (图像处理器)
class ImageProcessor {
public:
ImageProcessor(std::string filename) : filename_(filename) {}
void sharpen() {
std::cout << "线程 " << std::this_thread::get_id() << ": 对图像 " << filename_ << " 进行锐化处理...n";
// 模拟耗时操作
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "线程 " << std::this_thread::get_id() << ": 图像 " << filename_ << " 锐化完成!n";
}
private:
std::string filename_;
};
// 3. Concrete Command (锐化命令)
class SharpenCommand : public Command {
public:
SharpenCommand(ImageProcessor* processor) : processor_(processor) {}
void execute() override {
processor_->sharpen();
}
private:
ImageProcessor* processor_;
};
// 4. Invoker (命令队列)
class CommandQueue {
public:
void addCommand(Command* command) {
commands_.push_back(command);
}
void run() {
//使用线程池来执行命令
std::vector<std::future<void>> futures;
for (Command* command : commands_) {
futures.push_back(std::async(std::launch::async, [command](){ command->execute(); }));
}
//等待所有任务完成
for (auto& future : futures) {
future.get(); //这会阻塞,直到future准备好。
}
// Cleanup commands
for (Command* command : commands_) {
delete command;
}
commands_.clear();
}
private:
std::vector<Command*> commands_;
};
int main() {
ImageProcessor image1("image1.jpg");
ImageProcessor image2("image2.png");
SharpenCommand* sharpen1 = new SharpenCommand(&image1);
SharpenCommand* sharpen2 = new SharpenCommand(&image2);
CommandQueue queue;
queue.addCommand(sharpen1);
queue.addCommand(sharpen2);
std::cout << "主线程: 添加锐化命令到队列...n";
queue.run();
std::cout << "主线程: 所有锐化命令执行完毕!n";
return 0;
}
代码解释:
Command
接口: 定义了execute()
方法,所有命令都要实现这个方法。ImageProcessor
类: 是接收者,负责真正执行图像锐化操作。SharpenCommand
类: 是具体命令,封装了对ImageProcessor
的sharpen()
方法的调用。CommandQueue
类: 是调用者,它维护一个命令队列,并将命令分发到后台线程执行。addCommand()
方法:将命令添加到队列中。run()
方法: 遍历命令队列,使用std::async
将每个命令的execute()
方法提交到后台线程执行。std::launch::async
强制在独立的线程中执行任务。std::future
用于获取异步操作的结果,future.get()
会阻塞当前线程,直到异步操作完成。- 清理命令: 释放
commands_
中的Command
对象,防止内存泄漏。
运行结果:
你将会看到类似这样的输出:
主线程: 添加锐化命令到队列...
主线程: 所有锐化命令执行完毕!
线程 140735641436928: 对图像 image1.jpg 进行锐化处理...
线程 140735633043200: 对图像 image2.png 进行锐化处理...
线程 140735641436928: 图像 image1.jpg 锐化完成!
线程 140735633043200: 图像 image2.png 锐化完成!
注意线程ID可能不同,但关键在于两个图像处理操作是并发执行的。
优点:
- 解耦: 请求发送者(
CommandQueue
)和接收者(ImageProcessor
)完全解耦。 发送者不需要知道接收者是谁,也不需要知道接收者如何执行操作。 - 并发性: 通过使用
std::async
,可以将命令分发到后台线程执行,提高程序的响应速度和流畅性。 - 可扩展性: 可以很容易地添加新的命令,而不需要修改现有的代码。
- 灵活性: 可以将命令排队、记录日志、撤销等等。
更进一步:线程池优化
上面的例子中,每次调用 std::async
都会创建一个新的线程。 如果命令很多,频繁创建和销毁线程会带来性能开销。 可以使用线程池来优化性能。
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
class ThreadPool {
public:
ThreadPool(size_t numThreads) : numThreads_(numThreads), stop_(false) {
threads_.reserve(numThreads_);
for (size_t i = 0; i < numThreads_; ++i) {
threads_.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex_);
condition_.wait(lock, [this]() { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex_);
stop_ = true;
}
condition_.notify_all();
for (std::thread& thread : threads_) {
thread.join();
}
}
template<typename F>
std::future<typename std::result_of<F()>::type> enqueue(F&& task) {
using return_type = typename std::result_of<F()>::type;
auto packaged_task = std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(task));
std::future<return_type> res = packaged_task->get_future();
{
std::unique_lock<std::mutex> lock(queueMutex_);
tasks_.emplace([packaged_task]() { (*packaged_task)(); });
}
condition_.notify_one();
return res;
}
private:
size_t numThreads_;
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex queueMutex_;
std::condition_variable condition_;
bool stop_;
};
// 1. Command Interface
class Command {
public:
virtual void execute() = 0;
virtual ~Command() = default;
};
// 2. Receiver (图像处理器)
class ImageProcessor {
public:
ImageProcessor(std::string filename) : filename_(filename) {}
void sharpen() {
std::cout << "线程 " << std::this_thread::get_id() << ": 对图像 " << filename_ << " 进行锐化处理...n";
// 模拟耗时操作
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "线程 " << std::this_thread::get_id() << ": 图像 " << filename_ << " 锐化完成!n";
}
private:
std::string filename_;
};
// 3. Concrete Command (锐化命令)
class SharpenCommand : public Command {
public:
SharpenCommand(ImageProcessor* processor) : processor_(processor) {}
void execute() override {
processor_->sharpen();
}
private:
ImageProcessor* processor_;
};
// 4. Invoker (命令队列)
class CommandQueue {
public:
CommandQueue(ThreadPool& pool) : pool_(pool) {}
void addCommand(Command* command) {
commands_.push_back(command);
}
void run() {
std::vector<std::future<void>> futures;
for (Command* command : commands_) {
futures.push_back(pool_.enqueue([command]() { command->execute(); }));
}
//等待所有任务完成
for (auto& future : futures) {
future.get();
}
// Cleanup commands
for (Command* command : commands_) {
delete command;
}
commands_.clear();
}
private:
std::vector<Command*> commands_;
ThreadPool& pool_;
};
int main() {
ThreadPool pool(4); // 创建一个包含 4 个线程的线程池
ImageProcessor image1("image1.jpg");
ImageProcessor image2("image2.png");
ImageProcessor image3("image3.bmp");
ImageProcessor image4("image4.gif");
ImageProcessor image5("image5.jpeg");
SharpenCommand* sharpen1 = new SharpenCommand(&image1);
SharpenCommand* sharpen2 = new SharpenCommand(&image2);
SharpenCommand* sharpen3 = new SharpenCommand(&image3);
SharpenCommand* sharpen4 = new SharpenCommand(&image4);
SharpenCommand* sharpen5 = new SharpenCommand(&image5);
CommandQueue queue(pool);
queue.addCommand(sharpen1);
queue.addCommand(sharpen2);
queue.addCommand(sharpen3);
queue.addCommand(sharpen4);
queue.addCommand(sharpen5);
std::cout << "主线程: 添加锐化命令到队列...n";
queue.run();
std::cout << "主线程: 所有锐化命令执行完毕!n";
return 0;
}
代码解释:
ThreadPool
类:- 构造函数: 创建指定数量的线程,每个线程都在循环中等待任务。
enqueue()
方法: 将任务添加到任务队列中,并通知一个等待的线程。 使用std::packaged_task
将函数包装起来,以便获取返回值。- 析构函数: 设置
stop_
标志,通知所有线程退出循环,并等待所有线程结束。 - 使用
std::mutex
和std::condition_variable
实现线程同步和互斥。
CommandQueue
类: 修改CommandQueue
的构造函数,使其接收一个ThreadPool
对象。 使用pool_.enqueue()
将命令提交到线程池执行。
表格总结:
特性 | 简单并发命令模式 (使用 std::async ) |
线程池并发命令模式 (使用 ThreadPool ) |
---|---|---|
线程创建销毁 | 每次执行命令都创建和销毁线程 | 线程池预先创建,重复利用线程 |
性能 | 线程开销大,不适合大量并发任务 | 线程开销小,适合大量并发任务 |
资源管理 | 资源管理简单 | 资源管理复杂,需要考虑线程同步和互斥 |
适用场景 | 并发任务数量较少,对响应时间要求不高 | 并发任务数量较多,对响应时间要求较高 |
注意事项:
- 线程安全: 在并发环境下,需要特别注意线程安全问题。 例如,多个线程同时访问共享资源时,需要使用互斥锁(
std::mutex
)来保护共享资源。 - 死锁: 要避免死锁的发生。 死锁是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行。
- 异常处理: 在多线程环境下,异常处理更加复杂。 需要在每个线程中捕获异常,并进行适当的处理。
- 资源竞争: 需要避免资源竞争。 多个线程同时访问同一资源时,可能会导致数据不一致或其他问题。 可以使用互斥锁、原子操作等技术来避免资源竞争。
总结:
并发命令模式是一种强大的设计模式,可以有效地解耦请求发送者和接收者,并提高程序的并发性和响应速度。 通过使用 C++11 提供的并发特性,我们可以很容易地实现并发命令模式。 但需要注意线程安全、死锁、异常处理和资源竞争等问题。 选择合适的并发模型(如使用 std::async
还是线程池)取决于具体的应用场景和性能需求。
希望今天的讲解能够帮助你更好地理解和应用并发命令模式!