C++ 状态模式:状态机在并发系统中的应用

好的,没问题,我们直接开始。

各位听众,大家好!我是今天的讲者,咱们今天聊聊C++状态模式,尤其是在并发系统里它怎么大显身手。状态模式这东西,听起来好像很高大上,其实本质上就是把一堆if-else或者switch-case语句,变成更优雅、更易于维护的玩意儿。尤其是在处理复杂的并发状态转换的时候,它能让你少掉头发,多活几年。

状态模式:化繁为简的利器

首先,咱们来复习一下状态模式的核心思想。简单来说,就是:

  • 定义一系列状态类: 每个类代表对象可能处于的一种状态。
  • 状态类封装状态相关的行为: 状态不同,行为也不同。
  • Context类持有当前状态: Context类知道对象目前处于什么状态,并把请求委托给当前状态对象处理。
  • 状态转换: 当对象的状态需要改变时,Context类会切换到新的状态对象。

用人话说,就是把一个对象的所有可能的状态都分门别类地装进不同的“盒子”(状态类)里。对象当前用哪个盒子,就执行哪个盒子里面的操作。想换状态,就换个盒子呗。

举个栗子:并发状态下的文件下载器

咱们来个实际点的例子。假设我们要实现一个文件下载器,它可能处于以下几种状态:

状态 描述
Idle 空闲,等待开始下载
Connecting 正在连接服务器
Downloading 正在下载数据
Paused 下载暂停
Resuming 正在恢复下载
Completed 下载完成
Error 下载出错

如果我们不用状态模式,而是直接用if-else或者switch-case,代码可能长这样:

class FileDownloader {
public:
    enum State {
        IDLE,
        CONNECTING,
        DOWNLOADING,
        PAUSED,
        RESUMING,
        COMPLETED,
        ERROR
    };

    FileDownloader() : state_(IDLE) {}

    void startDownload() {
        if (state_ == IDLE) {
            std::cout << "Connecting to server...n";
            // 启动连接的逻辑
            state_ = CONNECTING;
        } else {
            std::cout << "Invalid state to start download.n";
        }
    }

    void receiveData(const char* data, size_t size) {
        if (state_ == DOWNLOADING) {
            std::cout << "Receiving data...n";
            // 处理接收数据的逻辑
        } else {
            std::cout << "Invalid state to receive data.n";
        }
    }

    void pauseDownload() {
        if (state_ == DOWNLOADING) {
            std::cout << "Pausing download...n";
            // 暂停下载的逻辑
            state_ = PAUSED;
        } else {
            std::cout << "Invalid state to pause download.n";
        }
    }

    void resumeDownload() {
        if (state_ == PAUSED) {
            std::cout << "Resuming download...n";
            // 恢复下载的逻辑
            state_ = RESUMING;
        } else {
            std::cout << "Invalid state to resume download.n";
        }
    }

    void completeDownload() {
        if (state_ == DOWNLOADING || state_ == RESUMING) {
            std::cout << "Download completed.n";
            // 完成下载的逻辑
            state_ = COMPLETED;
        } else {
            std::cout << "Invalid state to complete download.n";
        }
    }

    void handleError() {
        std::cout << "An error occurred.n";
        state_ = ERROR;
    }

    State getState() const {
        return state_;
    }

private:
    State state_;
};

这段代码还能凑合看,但是如果状态再多一些,状态之间的转换逻辑再复杂一些,那代码就变成一坨意大利面了。难以维护,难以测试,而且容易出错。

状态模式的优雅解法

现在,咱们用状态模式来重构这段代码。首先,定义一个抽象的状态类:

#include <iostream>

class DownloadState {
public:
    virtual void startDownload() {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void receiveData(const char* data, size_t size) {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void pauseDownload() {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void resumeDownload() {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void completeDownload() {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void handleError() {
        std::cout << "An error occurred in current state.n";
    }
    virtual ~DownloadState() {}
};

这个抽象类定义了所有状态类都需要实现的基本操作。如果某个状态不支持某个操作,就输出一个错误信息。

接下来,定义具体的状态类:

class IdleState : public DownloadState {
public:
    void startDownload() override;
};

class ConnectingState : public DownloadState {
public:
};

class DownloadingState : public DownloadState {
public:
    void receiveData(const char* data, size_t size) override;
    void pauseDownload() override;
    void completeDownload() override;
};

class PausedState : public DownloadState {
public:
    void resumeDownload() override;
};

class ResumingState : public DownloadState {
public:
    void completeDownload() override;
};

class CompletedState : public DownloadState {
public:
};

class ErrorState : public DownloadState {
public:
};

这些类分别代表了下载器的不同状态。注意,每个类只需要实现它所支持的操作。

最后,定义Context类:

#include <iostream>
#include <string>

class FileDownloader; // Forward declaration

class DownloadState {
public:
    virtual void startDownload(FileDownloader* downloader) {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void receiveData(FileDownloader* downloader, const char* data, size_t size) {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void pauseDownload(FileDownloader* downloader) {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void resumeDownload(FileDownloader* downloader) {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void completeDownload(FileDownloader* downloader) {
        std::cout << "Invalid operation in current state.n";
    }
    virtual void handleError(FileDownloader* downloader) {
        std::cout << "An error occurred in current state.n";
    }
    virtual ~DownloadState() {}
};

class FileDownloader {
public:
    FileDownloader();
    ~FileDownloader();

    void setState(DownloadState* state);

    void startDownload() {
        current_state_->startDownload(this);
    }
    void receiveData(const char* data, size_t size) {
        current_state_->receiveData(this, data, size);
    }
    void pauseDownload() {
        current_state_->pauseDownload(this);
    }
    void resumeDownload() {
        current_state_->resumeDownload(this);
    }
    void completeDownload() {
        current_state_->completeDownload(this);
    }
    void handleError() {
        current_state_->handleError(this);
    }

    DownloadState* getCurrentState() const { return current_state_; }

    // Helper methods to change state (can be made private/protected)
    class IdleState* getIdleState();
    class ConnectingState* getConnectingState();
    class DownloadingState* getDownloadingState();
    class PausedState* getPausedState();
    class ResumingState* getResumingState();
    class CompletedState* getCompletedState();
    class ErrorState* getErrorState();

private:
    DownloadState* current_state_;

    // Singleton instances of states
    class IdleState* idle_state_;
    class ConnectingState* connecting_state_;
    class DownloadingState* downloading_state_;
    class PausedState* paused_state_;
    class ResumingState* resuming_state_;
    class CompletedState* completed_state_;
    class ErrorState* error_state_;
};

class IdleState : public DownloadState {
public:
    void startDownload(FileDownloader* downloader) override;
};

class ConnectingState : public DownloadState {
public:
};

class DownloadingState : public DownloadState {
public:
    void receiveData(FileDownloader* downloader, const char* data, size_t size) override;
    void pauseDownload(FileDownloader* downloader) override;
    void completeDownload(FileDownloader* downloader) override;
};

class PausedState : public DownloadState {
public:
    void resumeDownload(FileDownloader* downloader) override;
};

class ResumingState : public DownloadState {
public:
    void completeDownload(FileDownloader* downloader) override;
};

class CompletedState : public DownloadState {
public:
};

class ErrorState : public DownloadState {
public:
};

// Implementation of FileDownloader and its states
FileDownloader::FileDownloader() : current_state_(nullptr),
                                   idle_state_(new IdleState()),
                                   connecting_state_(new ConnectingState()),
                                   downloading_state_(new DownloadingState()),
                                   paused_state_(new PausedState()),
                                   resuming_state_(new ResumingState()),
                                   completed_state_(new CompletedState()),
                                   error_state_(new ErrorState())
{
    current_state_ = idle_state_; // Initial state
}

FileDownloader::~FileDownloader() {
    delete idle_state_;
    delete connecting_state_;
    delete downloading_state_;
    delete paused_state_;
    delete resuming_state_;
    delete completed_state_;
    delete error_state_;
}

void FileDownloader::setState(DownloadState* state) {
    current_state_ = state;
}

// Helper methods to access state instances
IdleState* FileDownloader::getIdleState() { return idle_state_; }
ConnectingState* FileDownloader::getConnectingState() { return connecting_state_; }
DownloadingState* FileDownloader::getDownloadingState() { return downloading_state_; }
PausedState* FileDownloader::getPausedState() { return paused_state_; }
ResumingState* FileDownloader::getResumingState() { return resuming_state_; }
CompletedState* FileDownloader::getCompletedState() { return completed_state_; }
ErrorState* FileDownloader::getErrorState() { return error_state_; }

// Implementations of state methods

void IdleState::startDownload(FileDownloader* downloader) {
    std::cout << "Connecting to server...n";
    // Start connecting logic here (e.g., create a thread)
    downloader->setState(downloader->getConnectingState());
}

void DownloadingState::receiveData(FileDownloader* downloader, const char* data, size_t size) {
    std::cout << "Receiving data: " << size << " bytesn";
    // Process the received data (e.g., write to file)
}

void DownloadingState::pauseDownload(FileDownloader* downloader) {
    std::cout << "Pausing download...n";
    // Pause download logic here
    downloader->setState(downloader->getPausedState());
}

void DownloadingState::completeDownload(FileDownloader* downloader) {
    std::cout << "Download completed successfully!n";
    // Handle completion logic (e.g., close file, cleanup)
    downloader->setState(downloader->getCompletedState());
}

void PausedState::resumeDownload(FileDownloader* downloader) {
    std::cout << "Resuming download...n";
    // Resume download logic here
    downloader->setState(downloader->getResumingState());
}

void ResumingState::completeDownload(FileDownloader* downloader) {
    std::cout << "Download completed after resuming!n";
    downloader->setState(downloader->getCompletedState());
}

int main() {
    FileDownloader downloader;

    downloader.startDownload(); // Connects to the server
    downloader.receiveData("dummy data", 10); // Does nothing, still connecting

    downloader.setState(&downloader.getDownloadingState()); //Manually set to downloading state for testing purposes

    downloader.receiveData("More dummy data", 15); // Receives data
    downloader.pauseDownload(); // Pauses the download
    downloader.resumeDownload(); // Resumes the download
    downloader.completeDownload(); // Completes the download

    return 0;
}

Context类持有当前的状态对象,并把请求委托给当前状态对象处理。当状态需要改变时,Context类会切换到新的状态对象。

并发环境下的状态模式

好了,现在咱们把这个文件下载器放到并发环境里试试。想象一下,有多个线程同时访问下载器,可能会发生什么?

  • 数据竞争: 多个线程可能同时修改下载器的状态,导致状态不一致。
  • 死锁: 多个线程可能互相等待对方释放资源,导致程序卡死。
  • 活锁: 多个线程不断尝试改变状态,但总是失败,导致程序一直在忙碌,但没有任何进展。

为了解决这些问题,我们需要对状态模式进行一些改进。

1. 线程安全的状态切换

首先,我们需要确保状态切换是线程安全的。这可以通过使用互斥锁(mutex)来实现。

#include <mutex>

class FileDownloader {
public:
    FileDownloader();
    ~FileDownloader();

    void setState(DownloadState* state);

    void startDownload() {
        std::lock_guard<std::mutex> lock(mutex_);
        current_state_->startDownload(this);
    }
    void receiveData(const char* data, size_t size) {
        std::lock_guard<std::mutex> lock(mutex_);
        current_state_->receiveData(this, data, size);
    }
    void pauseDownload() {
        std::lock_guard<std::mutex> lock(mutex_);
        current_state_->pauseDownload(this);
    }
    void resumeDownload() {
        std::lock_guard<std::mutex> lock(mutex_);
        current_state_->resumeDownload(this);
    }
    void completeDownload() {
        std::lock_guard<std::mutex> lock(mutex_);
        current_state_->completeDownload(this);
    }
    void handleError() {
        std::lock_guard<std::mutex> lock(mutex_);
        current_state_->handleError(this);
    }

    DownloadState* getCurrentState() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return current_state_;
    }

    // Helper methods to change state (can be made private/protected)
    class IdleState* getIdleState();
    class ConnectingState* getConnectingState();
    class DownloadingState* getDownloadingState();
    class PausedState* getPausedState();
    class ResumingState* getResumingState();
    class CompletedState* getCompletedState();
    class ErrorState* getErrorState();

private:
    DownloadState* current_state_;

    // Singleton instances of states
    class IdleState* idle_state_;
    class ConnectingState* connecting_state_;
    class DownloadingState* downloading_state_;
    class PausedState* paused_state_;
    class ResumingState* resuming_state_;
    class CompletedState* completed_state_;
    class ErrorState* error_state_;

    std::mutex mutex_; // 互斥锁
};

void FileDownloader::setState(DownloadState* state) {
    std::lock_guard<std::mutex> lock(mutex_);
    current_state_ = state;
}

我们在FileDownloader类的每个公共方法里都加了一个互斥锁。这样,同一时刻只有一个线程可以访问下载器的状态。

2. 原子操作

对于一些简单的状态,我们可以使用原子操作来代替互斥锁。原子操作是不可分割的操作,可以保证在并发环境下数据的完整性。

例如,如果我们的状态只是一个简单的枚举值,我们可以使用std::atomic来保证状态切换的原子性。

#include <atomic>

class FileDownloader {
public:
    enum class State {
        IDLE,
        CONNECTING,
        DOWNLOADING,
        PAUSED,
        RESUMING,
        COMPLETED,
        ERROR
    };

    FileDownloader() : state_(State::IDLE) {}

    void startDownload() {
        if (state_.load() == State::IDLE) {
            std::cout << "Connecting to server...n";
            // 启动连接的逻辑
            state_.store(State::CONNECTING);
        } else {
            std::cout << "Invalid state to start download.n";
        }
    }

private:
    std::atomic<State> state_;
};

3. 使用消息队列

在某些情况下,直接修改状态可能不是最好的选择。例如,如果状态切换涉及到复杂的逻辑,或者需要通知其他组件,我们可以使用消息队列来异步地处理状态切换。

#include <queue>
#include <condition_variable>

class FileDownloader {
public:
    enum class Event {
        START_DOWNLOAD,
        RECEIVE_DATA,
        PAUSE_DOWNLOAD,
        RESUME_DOWNLOAD,
        COMPLETE_DOWNLOAD,
        HANDLE_ERROR
    };

    FileDownloader() : running_(true) {
        worker_thread_ = std::thread([this]() { processEvents(); });
    }

    ~FileDownloader() {
        running_ = false;
        event_queue_cv_.notify_one();
        worker_thread_.join();
    }

    void postEvent(Event event) {
        {
            std::lock_guard<std::mutex> lock(event_queue_mutex_);
            event_queue_.push(event);
        }
        event_queue_cv_.notify_one();
    }

private:
    void processEvents() {
        while (running_) {
            std::unique_lock<std::mutex> lock(event_queue_mutex_);
            event_queue_cv_.wait(lock, [this]() { return !event_queue_.empty() || !running_; });

            if (!running_ && event_queue_.empty()) {
                break;
            }

            Event event = event_queue_.front();
            event_queue_.pop();
            lock.unlock();

            switch (event) {
                case Event::START_DOWNLOAD:
                    startDownloadInternal();
                    break;
                case Event::RECEIVE_DATA:
                    //receiveDataInternal();
                    break;
                // ... other event handlers
                default:
                    std::cout << "Unhandled event.n";
            }
        }
    }

    void startDownloadInternal() {
        if (state_.load() == State::IDLE) {
            std::cout << "Connecting to server...n";
            // 启动连接的逻辑
            state_.store(State::CONNECTING);
        } else {
            std::cout << "Invalid state to start download.n";
        }
    }

private:
    std::atomic<State> state_;
    std::queue<Event> event_queue_;
    std::mutex event_queue_mutex_;
    std::condition_variable event_queue_cv_;
    std::thread worker_thread_;
    std::atomic<bool> running_;
};

在这个例子中,我们使用了一个消息队列来异步地处理状态切换。当需要切换状态时,我们不是直接修改状态,而是把一个事件放入消息队列。一个专门的worker线程会从消息队列中取出事件,并执行相应的操作。

总结

状态模式是一种强大的设计模式,可以帮助我们管理复杂的状态转换逻辑。在并发环境下,我们需要特别注意线程安全问题。可以使用互斥锁、原子操作或者消息队列来保证状态切换的线程安全。

总而言之,状态模式在并发系统中的应用,就像给混乱的战场安排了一个靠谱的指挥官,让各种状态转换井然有序,避免了各种并发问题。希望今天的讲解能帮助大家在实际项目中更好地运用状态模式,写出更健壮、更易于维护的并发程序。

谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注