好的,各位观众老爷们,今天咱们来聊聊C++分布式容错和熔断机制,就像给你的代码穿上盔甲,避免被突如其来的错误一刀秒杀。Hystrix和Resilience4j这两个名字,在Java世界里如雷贯耳,专门负责干这事儿。但C++世界里,原装进口的暂时没有,不过没关系,咱们可以自己动手,丰衣足食,打造一套类似的机制。
啥是容错和熔断?为啥我们需要它?
想象一下,你的程序是个餐厅,用户请求就是顾客。如果某个服务(比如烤羊腿的服务员)突然罢工了,整个餐厅就瘫痪了吗?肯定不行!容错机制就是让你有备用方案,比如换个服务员(重试),或者提供替代菜品(降级)。
熔断机制更狠,就像电路里的保险丝。如果烤羊腿服务员连续罢工多次,你直接把“烤羊腿”这个菜从菜单上划掉(熔断),省得顾客点了又失望,还浪费资源。等到服务员状态恢复了,你再悄悄把烤羊腿放回菜单(半开)。
在分布式系统里,服务调用链条很长,任何一个环节出问题,都可能引发雪崩效应。容错和熔断就是用来防止这种雪崩的利器。
C++实现思路:核心组件
咱们要打造的C++版容错和熔断机制,至少需要以下几个核心组件:
- Command(命令): 封装对外部服务的调用。
- Circuit Breaker(熔断器): 监控命令的执行情况,决定是否熔断。
- Fallback(降级): 当命令执行失败时,提供的备用方案。
- Retry(重试): 自动重试失败的命令。
- Timeout(超时): 限制命令的执行时间。
- Metrics(指标): 收集命令的执行数据,供熔断器决策。
代码示例:Command 和 Timeout
先来个简单的,实现 Command
类和 Timeout
机制。
#include <iostream>
#include <chrono>
#include <future>
#include <stdexcept>
class Command {
public:
virtual std::string execute() = 0;
virtual ~Command() = default;
};
// 超时装饰器
class TimeoutCommand : public Command {
public:
TimeoutCommand(Command* command, std::chrono::milliseconds timeout) :
m_command(command), m_timeout(timeout) {}
std::string execute() override {
std::future<std::string> future = std::async(std::launch::async, [this]() {
return m_command->execute();
});
auto status = future.wait_for(m_timeout);
if (status == std::future_status::timeout) {
throw std::runtime_error("Command timed out");
}
return future.get();
}
private:
Command* m_command;
std::chrono::milliseconds m_timeout;
};
// 模拟一个外部服务调用
class ExternalServiceCommand : public Command {
public:
std::string execute() override {
// 模拟耗时操作
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return "Data from external service";
}
};
int main() {
ExternalServiceCommand* externalService = new ExternalServiceCommand();
TimeoutCommand* timeoutCommand = new TimeoutCommand(externalService, std::chrono::milliseconds(200));
try {
std::string result = timeoutCommand->execute();
std::cout << "Result: " << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
delete timeoutCommand;
delete externalService;
return 0;
}
这个例子里,ExternalServiceCommand
模拟了一个耗时500毫秒的外部服务调用。TimeoutCommand
用 std::future
和 std::async
启动一个异步任务,并设置了200毫秒的超时时间。如果任务在200毫秒内没有完成,就会抛出一个 std::runtime_error
异常。
Circuit Breaker:熔断器的核心逻辑
熔断器是咱们的重头戏,它需要记录命令的执行情况,并根据一定的规则决定是否熔断。
#include <atomic>
#include <mutex>
#include <condition_variable>
enum class CircuitBreakerState {
CLOSED, // 关闭状态:允许所有请求通过
OPEN, // 打开状态:拒绝所有请求
HALF_OPEN // 半开状态:允许部分请求通过,尝试恢复
};
class CircuitBreaker {
public:
CircuitBreaker(int failureThreshold, int retryTimeoutMs, int slidingWindowSize) :
m_failureThreshold(failureThreshold),
m_retryTimeoutMs(retryTimeoutMs),
m_slidingWindowSize(slidingWindowSize),
m_state(CircuitBreakerState::CLOSED),
m_failureCount(0),
m_lastFailureTime(std::chrono::steady_clock::now()) {}
bool allowRequest() {
std::lock_guard<std::mutex> lock(m_mutex);
switch (m_state) {
case CircuitBreakerState::CLOSED:
return true;
case CircuitBreakerState::OPEN:
// 检查是否过了重试超时时间
if (std::chrono::steady_clock::now() - m_lastFailureTime >= std::chrono::milliseconds(m_retryTimeoutMs)) {
m_state = CircuitBreakerState::HALF_OPEN;
return true; // 允许一个请求通过
}
return false;
case CircuitBreakerState::HALF_OPEN:
return true; // 允许一个请求通过
default:
return false; // 理论上不应该到这里
}
}
void onSuccess() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_state == CircuitBreakerState::HALF_OPEN) {
// 恢复成功,回到关闭状态
m_state = CircuitBreakerState::CLOSED;
m_failureCount = 0;
}
// 这里可以添加滑动窗口的逻辑,移除旧的成功记录
}
void onFailure() {
std::lock_guard<std::mutex> lock(m_mutex);
m_failureCount++;
m_lastFailureTime = std::chrono::steady_clock::now();
if (m_failureCount >= m_failureThreshold) {
m_state = CircuitBreakerState::OPEN;
}
// 这里可以添加滑动窗口的逻辑,移除旧的成功记录
}
CircuitBreakerState getState() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_state;
}
private:
int m_failureThreshold; // 失败次数阈值
int m_retryTimeoutMs; // 重试超时时间(毫秒)
int m_slidingWindowSize; // 滑动窗口大小 (未使用,需要实现)
std::atomic<CircuitBreakerState> m_state; // 熔断器状态
int m_failureCount; // 失败次数
std::chrono::steady_clock::time_point m_lastFailureTime; // 上次失败时间
std::mutex m_mutex; // 保护共享数据
};
这个 CircuitBreaker
类有以下几个关键点:
- 状态(
CircuitBreakerState
):CLOSED
(关闭)、OPEN
(打开)、HALF_OPEN
(半开)。 allowRequest()
: 判断是否允许请求通过。onSuccess()
: 当请求成功时调用。onFailure()
: 当请求失败时调用。- 阈值(
m_failureThreshold
): 失败次数超过这个阈值,熔断器就会打开。 - 重试超时时间(
m_retryTimeoutMs
): 熔断器打开后,经过这个时间,会尝试进入半开状态。
代码示例:整合 Command 和 CircuitBreaker
现在,咱们把 Command
和 CircuitBreaker
整合起来。
#include <iostream>
#include <chrono>
#include <future>
#include <stdexcept>
#include <atomic>
#include <mutex>
#include <condition_variable>
// 前面定义的 Command, TimeoutCommand, ExternalServiceCommand, CircuitBreaker 都放在这里
// 使用 CircuitBreaker 的 Command
class CircuitBreakerCommand : public Command {
public:
CircuitBreakerCommand(Command* command, CircuitBreaker* circuitBreaker) :
m_command(command), m_circuitBreaker(circuitBreaker) {}
std::string execute() override {
if (!m_circuitBreaker->allowRequest()) {
throw std::runtime_error("Circuit breaker is open");
}
try {
std::string result = m_command->execute();
m_circuitBreaker->onSuccess();
return result;
} catch (const std::exception& e) {
m_circuitBreaker->onFailure();
throw; // 继续抛出异常
}
}
private:
Command* m_command;
CircuitBreaker* m_circuitBreaker;
};
int main() {
ExternalServiceCommand* externalService = new ExternalServiceCommand();
CircuitBreaker* circuitBreaker = new CircuitBreaker(3, 5000, 10); // 失败3次后熔断,5秒后尝试恢复
CircuitBreakerCommand* circuitBreakerCommand = new CircuitBreakerCommand(externalService, circuitBreaker);
TimeoutCommand* timeoutCommand = new TimeoutCommand(circuitBreakerCommand, std::chrono::milliseconds(200));
for (int i = 0; i < 10; ++i) {
try {
std::string result = timeoutCommand->execute();
std::cout << "Result: " << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << ", Circuit Breaker State: " << static_cast<int>(circuitBreaker->getState()) << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
delete timeoutCommand;
delete circuitBreakerCommand;
delete circuitBreaker;
delete externalService;
return 0;
}
这个例子中,CircuitBreakerCommand
负责在执行命令之前检查熔断器的状态,并在命令执行成功或失败后更新熔断器的状态。
Fallback:降级处理
当熔断器打开时,或者命令执行失败时,咱们需要提供一个备用方案,这就是 Fallback。
class FallbackCommand : public Command {
public:
FallbackCommand(Command* command, std::function<std::string()> fallback) :
m_command(command), m_fallback(fallback) {}
std::string execute() override {
try {
return m_command->execute();
} catch (const std::exception& e) {
std::cout << "Executing fallback: " << e.what() << std::endl;
return m_fallback();
}
}
private:
Command* m_command;
std::function<std::string()> m_fallback;
};
// 修改main函数,加入Fallback
int main() {
ExternalServiceCommand* externalService = new ExternalServiceCommand();
CircuitBreaker* circuitBreaker = new CircuitBreaker(3, 5000, 10);
CircuitBreakerCommand* circuitBreakerCommand = new CircuitBreakerCommand(externalService, circuitBreaker);
// 定义一个Fallback函数
auto fallbackFunction = []() {
return "Fallback data";
};
FallbackCommand* fallbackCommand = new FallbackCommand(circuitBreakerCommand, fallbackFunction);
TimeoutCommand* timeoutCommand = new TimeoutCommand(fallbackCommand, std::chrono::milliseconds(200));
for (int i = 0; i < 10; ++i) {
try {
std::string result = timeoutCommand->execute();
std::cout << "Result: " << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << ", Circuit Breaker State: " << static_cast<int>(circuitBreaker->getState()) << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
delete timeoutCommand;
delete fallbackCommand;
delete circuitBreakerCommand;
delete circuitBreaker;
delete externalService;
return 0;
}
FallbackCommand
接收一个 std::function
作为 Fallback 函数,当命令执行失败时,就会调用这个函数。
Retry:自动重试
有时候,服务调用失败只是暂时的,重试一下可能就好了。
class RetryCommand : public Command {
public:
RetryCommand(Command* command, int maxRetries) :
m_command(command), m_maxRetries(maxRetries) {}
std::string execute() override {
for (int i = 0; i <= m_maxRetries; ++i) {
try {
return m_command->execute();
} catch (const std::exception& e) {
if (i == m_maxRetries) {
throw; // 重试次数用完,抛出异常
}
std::cout << "Retry attempt " << i + 1 << " failed: " << e.what() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 稍微等待一下
}
}
throw std::runtime_error("Retry logic failed"); // 理论上不应该到这里
}
private:
Command* m_command;
int m_maxRetries;
};
// 修改main函数,加入Retry
int main() {
ExternalServiceCommand* externalService = new ExternalServiceCommand();
CircuitBreaker* circuitBreaker = new CircuitBreaker(3, 5000, 10);
CircuitBreakerCommand* circuitBreakerCommand = new CircuitBreakerCommand(externalService, circuitBreaker);
RetryCommand* retryCommand = new RetryCommand(circuitBreakerCommand, 2); // 重试2次
// 定义一个Fallback函数
auto fallbackFunction = []() {
return "Fallback data";
};
FallbackCommand* fallbackCommand = new FallbackCommand(retryCommand, fallbackFunction);
TimeoutCommand* timeoutCommand = new TimeoutCommand(fallbackCommand, std::chrono::milliseconds(200));
for (int i = 0; i < 10; ++i) {
try {
std::string result = timeoutCommand->execute();
std::cout << "Result: " << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << ", Circuit Breaker State: " << static_cast<int>(circuitBreaker->getState()) << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
delete timeoutCommand;
delete fallbackCommand;
delete retryCommand;
delete circuitBreakerCommand;
delete circuitBreaker;
delete externalService;
return 0;
}
RetryCommand
会自动重试 m_maxRetries
次,每次重试之间稍微等待一下。
Metrics:指标收集
要让熔断器更智能,我们需要收集命令的执行数据,比如成功次数、失败次数、平均响应时间等。
#include <chrono>
#include <mutex>
class CommandMetrics {
public:
CommandMetrics() : m_successCount(0), m_failureCount(0), m_totalLatency(0), m_requestCount(0){}
void recordSuccess(std::chrono::milliseconds latency) {
std::lock_guard<std::mutex> lock(m_mutex);
m_successCount++;
m_totalLatency += latency.count();
m_requestCount++;
}
void recordFailure(std::chrono::milliseconds latency) {
std::lock_guard<std::mutex> lock(m_mutex);
m_failureCount++;
m_totalLatency += latency.count();
m_requestCount++;
}
int getSuccessCount() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_successCount;
}
int getFailureCount() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_failureCount;
}
double getAverageLatency() const {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_requestCount == 0) {
return 0.0;
}
return static_cast<double>(m_totalLatency) / m_requestCount;
}
int getRequestCount() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_requestCount;
}
private:
int m_successCount;
int m_failureCount;
long long m_totalLatency;
int m_requestCount;
std::mutex m_mutex;
};
//修改 CircuitBreaker,使用Metrics
class CircuitBreaker {
public:
CircuitBreaker(int failureThreshold, int retryTimeoutMs, int slidingWindowSize, CommandMetrics& metrics) :
m_failureThreshold(failureThreshold),
m_retryTimeoutMs(retryTimeoutMs),
m_slidingWindowSize(slidingWindowSize),
m_state(CircuitBreakerState::CLOSED),
m_metrics(metrics),
m_lastFailureTime(std::chrono::steady_clock::now()) {}
bool allowRequest() {
std::lock_guard<std::mutex> lock(m_mutex);
switch (m_state) {
case CircuitBreakerState::CLOSED:
return true;
case CircuitBreakerState::OPEN:
// 检查是否过了重试超时时间
if (std::chrono::steady_clock::now() - m_lastFailureTime >= std::chrono::milliseconds(m_retryTimeoutMs)) {
m_state = CircuitBreakerState::HALF_OPEN;
return true; // 允许一个请求通过
}
return false;
case CircuitBreakerState::HALF_OPEN:
return true; // 允许一个请求通过
default:
return false; // 理论上不应该到这里
}
}
void onSuccess(std::chrono::milliseconds latency) {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_state == CircuitBreakerState::HALF_OPEN) {
// 恢复成功,回到关闭状态
m_state = CircuitBreakerState::CLOSED;
}
m_metrics.recordSuccess(latency);
// 这里可以添加滑动窗口的逻辑,移除旧的成功记录
}
void onFailure(std::chrono::milliseconds latency) {
std::lock_guard<std::mutex> lock(m_mutex);
m_metrics.recordFailure(latency);
m_lastFailureTime = std::chrono::steady_clock::now();
//使用 metrics 重新计算熔断条件
if (m_metrics.getFailureCount() >= m_failureThreshold) {
m_state = CircuitBreakerState::OPEN;
}
// 这里可以添加滑动窗口的逻辑,移除旧的成功记录
}
CircuitBreakerState getState() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_state;
}
private:
int m_failureThreshold; // 失败次数阈值
int m_retryTimeoutMs; // 重试超时时间(毫秒)
int m_slidingWindowSize; // 滑动窗口大小
std::atomic<CircuitBreakerState> m_state; // 熔断器状态
std::chrono::steady_clock::time_point m_lastFailureTime; // 上次失败时间
CommandMetrics& m_metrics; // 指标对象
std::mutex m_mutex; // 保护共享数据
};
// 修改 CircuitBreakerCommand 使用 Metrics
class CircuitBreakerCommand : public Command {
public:
CircuitBreakerCommand(Command* command, CircuitBreaker* circuitBreaker) :
m_command(command), m_circuitBreaker(circuitBreaker) {}
std::string execute() override {
if (!m_circuitBreaker->allowRequest()) {
throw std::runtime_error("Circuit breaker is open");
}
auto start = std::chrono::steady_clock::now();
try {
std::string result = m_command->execute();
auto end = std::chrono::steady_clock::now();
auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
m_circuitBreaker->onSuccess(latency);
return result;
} catch (const std::exception& e) {
auto end = std::chrono::steady_clock::now();
auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
m_circuitBreaker->onFailure(latency);
throw; // 继续抛出异常
}
}
private:
Command* m_command;
CircuitBreaker* m_circuitBreaker;
};
// 修改main函数,加入Metrics
int main() {
ExternalServiceCommand* externalService = new ExternalServiceCommand();
CommandMetrics metrics;
CircuitBreaker* circuitBreaker = new CircuitBreaker(3, 5000, 10, metrics);
CircuitBreakerCommand* circuitBreakerCommand = new CircuitBreakerCommand(externalService, circuitBreaker);
RetryCommand* retryCommand = new RetryCommand(circuitBreakerCommand, 2);
// 定义一个Fallback函数
auto fallbackFunction = []() {
return "Fallback data";
};
FallbackCommand* fallbackCommand = new FallbackCommand(retryCommand, fallbackFunction);
TimeoutCommand* timeoutCommand = new TimeoutCommand(fallbackCommand, std::chrono::milliseconds(200));
for (int i = 0; i < 10; ++i) {
try {
std::string result = timeoutCommand->execute();
std::cout << "Result: " << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << ", Circuit Breaker State: " << static_cast<int>(circuitBreaker->getState()) << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "Success Count: " << metrics.getSuccessCount() << std::endl;
std::cout << "Failure Count: " << metrics.getFailureCount() << std::endl;
std::cout << "Average Latency: " << metrics.getAverageLatency() << " ms" << std::endl;
std::cout << "Request Count: " << metrics.getRequestCount() << std::endl;
delete timeoutCommand;
delete fallbackCommand;
delete retryCommand;
delete circuitBreakerCommand;
delete circuitBreaker;
delete externalService;
return 0;
}
CommandMetrics
类负责记录成功次数、失败次数和总延迟。 CircuitBreaker
使用 CommandMetrics
来判断是否需要熔断。
总结:C++容错和熔断机制的核心
组件 | 功能 |
---|---|
Command | 封装对外部服务的调用,方便添加各种策略。 |
CircuitBreaker | 监控命令的执行情况,根据成功/失败次数、错误率等指标,决定是否熔断。 |
Fallback | 当命令执行失败或熔断器打开时,提供备用方案,避免程序崩溃。 |
Retry | 自动重试失败的命令,提高程序的可靠性。 |
Timeout | 限制命令的执行时间,防止程序长时间阻塞。 |
Metrics | 收集命令的执行数据,为熔断器提供决策依据。 |
更进一步:高级特性
- 滑动窗口: 使用滑动窗口来统计成功/失败次数,而不是简单的计数,可以更准确地反映服务的状态。
- 配置中心: 将熔断器的配置(阈值、超时时间等)放在配置中心,方便动态调整。
- 事件通知: 当熔断器状态改变时,发送事件通知,方便监控和告警。
- 线程池隔离: 为不同的服务调用使用不同的线程池,防止一个服务的故障影响其他服务。
注意事项
- 线程安全: 在多线程环境下,要特别注意线程安全问题,使用互斥锁等机制保护共享数据。
- 性能: 容错和熔断机制会增加程序的开销,要根据实际情况选择合适的策略,避免过度保护。
- 测试: 对容错和熔断机制进行充分的测试,确保其能够正常工作。
好了,今天的讲座就到这里。希望这些代码示例和思路能够帮助你打造自己的C++版容错和熔断机制。记住,代码要像盔甲一样坚固,才能在分布式战场上屹立不倒!