好的,没问题,我们直接开始。
各位听众,大家好!我是今天的讲者,咱们今天聊聊C++状态模式,尤其是在并发系统里它怎么大显身手。状态模式这东西,听起来好像很高大上,其实本质上就是把一堆if-else或者switch-case语句,变成更优雅、更易于维护的玩意儿。尤其是在处理复杂的并发状态转换的时候,它能让你少掉头发,多活几年。
状态模式:化繁为简的利器
首先,咱们来复习一下状态模式的核心思想。简单来说,就是:
- 定义一系列状态类: 每个类代表对象可能处于的一种状态。
- 状态类封装状态相关的行为: 状态不同,行为也不同。
- Context类持有当前状态: Context类知道对象目前处于什么状态,并把请求委托给当前状态对象处理。
- 状态转换: 当对象的状态需要改变时,Context类会切换到新的状态对象。
用人话说,就是把一个对象的所有可能的状态都分门别类地装进不同的“盒子”(状态类)里。对象当前用哪个盒子,就执行哪个盒子里面的操作。想换状态,就换个盒子呗。
举个栗子:并发状态下的文件下载器
咱们来个实际点的例子。假设我们要实现一个文件下载器,它可能处于以下几种状态:
状态 | 描述 |
---|---|
Idle | 空闲,等待开始下载 |
Connecting | 正在连接服务器 |
Downloading | 正在下载数据 |
Paused | 下载暂停 |
Resuming | 正在恢复下载 |
Completed | 下载完成 |
Error | 下载出错 |
如果我们不用状态模式,而是直接用if-else或者switch-case,代码可能长这样:
class FileDownloader {
public:
enum State {
IDLE,
CONNECTING,
DOWNLOADING,
PAUSED,
RESUMING,
COMPLETED,
ERROR
};
FileDownloader() : state_(IDLE) {}
void startDownload() {
if (state_ == IDLE) {
std::cout << "Connecting to server...n";
// 启动连接的逻辑
state_ = CONNECTING;
} else {
std::cout << "Invalid state to start download.n";
}
}
void receiveData(const char* data, size_t size) {
if (state_ == DOWNLOADING) {
std::cout << "Receiving data...n";
// 处理接收数据的逻辑
} else {
std::cout << "Invalid state to receive data.n";
}
}
void pauseDownload() {
if (state_ == DOWNLOADING) {
std::cout << "Pausing download...n";
// 暂停下载的逻辑
state_ = PAUSED;
} else {
std::cout << "Invalid state to pause download.n";
}
}
void resumeDownload() {
if (state_ == PAUSED) {
std::cout << "Resuming download...n";
// 恢复下载的逻辑
state_ = RESUMING;
} else {
std::cout << "Invalid state to resume download.n";
}
}
void completeDownload() {
if (state_ == DOWNLOADING || state_ == RESUMING) {
std::cout << "Download completed.n";
// 完成下载的逻辑
state_ = COMPLETED;
} else {
std::cout << "Invalid state to complete download.n";
}
}
void handleError() {
std::cout << "An error occurred.n";
state_ = ERROR;
}
State getState() const {
return state_;
}
private:
State state_;
};
这段代码还能凑合看,但是如果状态再多一些,状态之间的转换逻辑再复杂一些,那代码就变成一坨意大利面了。难以维护,难以测试,而且容易出错。
状态模式的优雅解法
现在,咱们用状态模式来重构这段代码。首先,定义一个抽象的状态类:
#include <iostream>
class DownloadState {
public:
virtual void startDownload() {
std::cout << "Invalid operation in current state.n";
}
virtual void receiveData(const char* data, size_t size) {
std::cout << "Invalid operation in current state.n";
}
virtual void pauseDownload() {
std::cout << "Invalid operation in current state.n";
}
virtual void resumeDownload() {
std::cout << "Invalid operation in current state.n";
}
virtual void completeDownload() {
std::cout << "Invalid operation in current state.n";
}
virtual void handleError() {
std::cout << "An error occurred in current state.n";
}
virtual ~DownloadState() {}
};
这个抽象类定义了所有状态类都需要实现的基本操作。如果某个状态不支持某个操作,就输出一个错误信息。
接下来,定义具体的状态类:
class IdleState : public DownloadState {
public:
void startDownload() override;
};
class ConnectingState : public DownloadState {
public:
};
class DownloadingState : public DownloadState {
public:
void receiveData(const char* data, size_t size) override;
void pauseDownload() override;
void completeDownload() override;
};
class PausedState : public DownloadState {
public:
void resumeDownload() override;
};
class ResumingState : public DownloadState {
public:
void completeDownload() override;
};
class CompletedState : public DownloadState {
public:
};
class ErrorState : public DownloadState {
public:
};
这些类分别代表了下载器的不同状态。注意,每个类只需要实现它所支持的操作。
最后,定义Context类:
#include <iostream>
#include <string>
class FileDownloader; // Forward declaration
class DownloadState {
public:
virtual void startDownload(FileDownloader* downloader) {
std::cout << "Invalid operation in current state.n";
}
virtual void receiveData(FileDownloader* downloader, const char* data, size_t size) {
std::cout << "Invalid operation in current state.n";
}
virtual void pauseDownload(FileDownloader* downloader) {
std::cout << "Invalid operation in current state.n";
}
virtual void resumeDownload(FileDownloader* downloader) {
std::cout << "Invalid operation in current state.n";
}
virtual void completeDownload(FileDownloader* downloader) {
std::cout << "Invalid operation in current state.n";
}
virtual void handleError(FileDownloader* downloader) {
std::cout << "An error occurred in current state.n";
}
virtual ~DownloadState() {}
};
class FileDownloader {
public:
FileDownloader();
~FileDownloader();
void setState(DownloadState* state);
void startDownload() {
current_state_->startDownload(this);
}
void receiveData(const char* data, size_t size) {
current_state_->receiveData(this, data, size);
}
void pauseDownload() {
current_state_->pauseDownload(this);
}
void resumeDownload() {
current_state_->resumeDownload(this);
}
void completeDownload() {
current_state_->completeDownload(this);
}
void handleError() {
current_state_->handleError(this);
}
DownloadState* getCurrentState() const { return current_state_; }
// Helper methods to change state (can be made private/protected)
class IdleState* getIdleState();
class ConnectingState* getConnectingState();
class DownloadingState* getDownloadingState();
class PausedState* getPausedState();
class ResumingState* getResumingState();
class CompletedState* getCompletedState();
class ErrorState* getErrorState();
private:
DownloadState* current_state_;
// Singleton instances of states
class IdleState* idle_state_;
class ConnectingState* connecting_state_;
class DownloadingState* downloading_state_;
class PausedState* paused_state_;
class ResumingState* resuming_state_;
class CompletedState* completed_state_;
class ErrorState* error_state_;
};
class IdleState : public DownloadState {
public:
void startDownload(FileDownloader* downloader) override;
};
class ConnectingState : public DownloadState {
public:
};
class DownloadingState : public DownloadState {
public:
void receiveData(FileDownloader* downloader, const char* data, size_t size) override;
void pauseDownload(FileDownloader* downloader) override;
void completeDownload(FileDownloader* downloader) override;
};
class PausedState : public DownloadState {
public:
void resumeDownload(FileDownloader* downloader) override;
};
class ResumingState : public DownloadState {
public:
void completeDownload(FileDownloader* downloader) override;
};
class CompletedState : public DownloadState {
public:
};
class ErrorState : public DownloadState {
public:
};
// Implementation of FileDownloader and its states
FileDownloader::FileDownloader() : current_state_(nullptr),
idle_state_(new IdleState()),
connecting_state_(new ConnectingState()),
downloading_state_(new DownloadingState()),
paused_state_(new PausedState()),
resuming_state_(new ResumingState()),
completed_state_(new CompletedState()),
error_state_(new ErrorState())
{
current_state_ = idle_state_; // Initial state
}
FileDownloader::~FileDownloader() {
delete idle_state_;
delete connecting_state_;
delete downloading_state_;
delete paused_state_;
delete resuming_state_;
delete completed_state_;
delete error_state_;
}
void FileDownloader::setState(DownloadState* state) {
current_state_ = state;
}
// Helper methods to access state instances
IdleState* FileDownloader::getIdleState() { return idle_state_; }
ConnectingState* FileDownloader::getConnectingState() { return connecting_state_; }
DownloadingState* FileDownloader::getDownloadingState() { return downloading_state_; }
PausedState* FileDownloader::getPausedState() { return paused_state_; }
ResumingState* FileDownloader::getResumingState() { return resuming_state_; }
CompletedState* FileDownloader::getCompletedState() { return completed_state_; }
ErrorState* FileDownloader::getErrorState() { return error_state_; }
// Implementations of state methods
void IdleState::startDownload(FileDownloader* downloader) {
std::cout << "Connecting to server...n";
// Start connecting logic here (e.g., create a thread)
downloader->setState(downloader->getConnectingState());
}
void DownloadingState::receiveData(FileDownloader* downloader, const char* data, size_t size) {
std::cout << "Receiving data: " << size << " bytesn";
// Process the received data (e.g., write to file)
}
void DownloadingState::pauseDownload(FileDownloader* downloader) {
std::cout << "Pausing download...n";
// Pause download logic here
downloader->setState(downloader->getPausedState());
}
void DownloadingState::completeDownload(FileDownloader* downloader) {
std::cout << "Download completed successfully!n";
// Handle completion logic (e.g., close file, cleanup)
downloader->setState(downloader->getCompletedState());
}
void PausedState::resumeDownload(FileDownloader* downloader) {
std::cout << "Resuming download...n";
// Resume download logic here
downloader->setState(downloader->getResumingState());
}
void ResumingState::completeDownload(FileDownloader* downloader) {
std::cout << "Download completed after resuming!n";
downloader->setState(downloader->getCompletedState());
}
int main() {
FileDownloader downloader;
downloader.startDownload(); // Connects to the server
downloader.receiveData("dummy data", 10); // Does nothing, still connecting
downloader.setState(&downloader.getDownloadingState()); //Manually set to downloading state for testing purposes
downloader.receiveData("More dummy data", 15); // Receives data
downloader.pauseDownload(); // Pauses the download
downloader.resumeDownload(); // Resumes the download
downloader.completeDownload(); // Completes the download
return 0;
}
Context类持有当前的状态对象,并把请求委托给当前状态对象处理。当状态需要改变时,Context类会切换到新的状态对象。
并发环境下的状态模式
好了,现在咱们把这个文件下载器放到并发环境里试试。想象一下,有多个线程同时访问下载器,可能会发生什么?
- 数据竞争: 多个线程可能同时修改下载器的状态,导致状态不一致。
- 死锁: 多个线程可能互相等待对方释放资源,导致程序卡死。
- 活锁: 多个线程不断尝试改变状态,但总是失败,导致程序一直在忙碌,但没有任何进展。
为了解决这些问题,我们需要对状态模式进行一些改进。
1. 线程安全的状态切换
首先,我们需要确保状态切换是线程安全的。这可以通过使用互斥锁(mutex)来实现。
#include <mutex>
class FileDownloader {
public:
FileDownloader();
~FileDownloader();
void setState(DownloadState* state);
void startDownload() {
std::lock_guard<std::mutex> lock(mutex_);
current_state_->startDownload(this);
}
void receiveData(const char* data, size_t size) {
std::lock_guard<std::mutex> lock(mutex_);
current_state_->receiveData(this, data, size);
}
void pauseDownload() {
std::lock_guard<std::mutex> lock(mutex_);
current_state_->pauseDownload(this);
}
void resumeDownload() {
std::lock_guard<std::mutex> lock(mutex_);
current_state_->resumeDownload(this);
}
void completeDownload() {
std::lock_guard<std::mutex> lock(mutex_);
current_state_->completeDownload(this);
}
void handleError() {
std::lock_guard<std::mutex> lock(mutex_);
current_state_->handleError(this);
}
DownloadState* getCurrentState() const {
std::lock_guard<std::mutex> lock(mutex_);
return current_state_;
}
// Helper methods to change state (can be made private/protected)
class IdleState* getIdleState();
class ConnectingState* getConnectingState();
class DownloadingState* getDownloadingState();
class PausedState* getPausedState();
class ResumingState* getResumingState();
class CompletedState* getCompletedState();
class ErrorState* getErrorState();
private:
DownloadState* current_state_;
// Singleton instances of states
class IdleState* idle_state_;
class ConnectingState* connecting_state_;
class DownloadingState* downloading_state_;
class PausedState* paused_state_;
class ResumingState* resuming_state_;
class CompletedState* completed_state_;
class ErrorState* error_state_;
std::mutex mutex_; // 互斥锁
};
void FileDownloader::setState(DownloadState* state) {
std::lock_guard<std::mutex> lock(mutex_);
current_state_ = state;
}
我们在FileDownloader类的每个公共方法里都加了一个互斥锁。这样,同一时刻只有一个线程可以访问下载器的状态。
2. 原子操作
对于一些简单的状态,我们可以使用原子操作来代替互斥锁。原子操作是不可分割的操作,可以保证在并发环境下数据的完整性。
例如,如果我们的状态只是一个简单的枚举值,我们可以使用std::atomic来保证状态切换的原子性。
#include <atomic>
class FileDownloader {
public:
enum class State {
IDLE,
CONNECTING,
DOWNLOADING,
PAUSED,
RESUMING,
COMPLETED,
ERROR
};
FileDownloader() : state_(State::IDLE) {}
void startDownload() {
if (state_.load() == State::IDLE) {
std::cout << "Connecting to server...n";
// 启动连接的逻辑
state_.store(State::CONNECTING);
} else {
std::cout << "Invalid state to start download.n";
}
}
private:
std::atomic<State> state_;
};
3. 使用消息队列
在某些情况下,直接修改状态可能不是最好的选择。例如,如果状态切换涉及到复杂的逻辑,或者需要通知其他组件,我们可以使用消息队列来异步地处理状态切换。
#include <queue>
#include <condition_variable>
class FileDownloader {
public:
enum class Event {
START_DOWNLOAD,
RECEIVE_DATA,
PAUSE_DOWNLOAD,
RESUME_DOWNLOAD,
COMPLETE_DOWNLOAD,
HANDLE_ERROR
};
FileDownloader() : running_(true) {
worker_thread_ = std::thread([this]() { processEvents(); });
}
~FileDownloader() {
running_ = false;
event_queue_cv_.notify_one();
worker_thread_.join();
}
void postEvent(Event event) {
{
std::lock_guard<std::mutex> lock(event_queue_mutex_);
event_queue_.push(event);
}
event_queue_cv_.notify_one();
}
private:
void processEvents() {
while (running_) {
std::unique_lock<std::mutex> lock(event_queue_mutex_);
event_queue_cv_.wait(lock, [this]() { return !event_queue_.empty() || !running_; });
if (!running_ && event_queue_.empty()) {
break;
}
Event event = event_queue_.front();
event_queue_.pop();
lock.unlock();
switch (event) {
case Event::START_DOWNLOAD:
startDownloadInternal();
break;
case Event::RECEIVE_DATA:
//receiveDataInternal();
break;
// ... other event handlers
default:
std::cout << "Unhandled event.n";
}
}
}
void startDownloadInternal() {
if (state_.load() == State::IDLE) {
std::cout << "Connecting to server...n";
// 启动连接的逻辑
state_.store(State::CONNECTING);
} else {
std::cout << "Invalid state to start download.n";
}
}
private:
std::atomic<State> state_;
std::queue<Event> event_queue_;
std::mutex event_queue_mutex_;
std::condition_variable event_queue_cv_;
std::thread worker_thread_;
std::atomic<bool> running_;
};
在这个例子中,我们使用了一个消息队列来异步地处理状态切换。当需要切换状态时,我们不是直接修改状态,而是把一个事件放入消息队列。一个专门的worker线程会从消息队列中取出事件,并执行相应的操作。
总结
状态模式是一种强大的设计模式,可以帮助我们管理复杂的状态转换逻辑。在并发环境下,我们需要特别注意线程安全问题。可以使用互斥锁、原子操作或者消息队列来保证状态切换的线程安全。
总而言之,状态模式在并发系统中的应用,就像给混乱的战场安排了一个靠谱的指挥官,让各种状态转换井然有序,避免了各种并发问题。希望今天的讲解能帮助大家在实际项目中更好地运用状态模式,写出更健壮、更易于维护的并发程序。
谢谢大家!