什么是 ‘Graceful Degradation’:当外部 API(如 OpenAI)宕机时,图如何平滑切换到本地 Llama 实例?

各位同仁,下午好。今天,我们将深入探讨一个在现代分布式系统,特别是依赖外部AI服务应用中至关重要的概念——平滑降级(Graceful Degradation)。我们将聚焦于一个具体的场景:当我们的核心功能依赖于外部大型语言模型(LLM)API,如OpenAI时,如何在这种外部服务不可用或性能下降时,平滑地切换到本地部署的Llama实例,以最小化对用户体验的影响。

平滑降级的核心要义

首先,我们来明确“平滑降级”的含义。它指的是系统在部分功能或组件出现故障时,能够保持核心功能可用,通过提供降级服务而非完全崩溃,从而维持用户的基本体验。这与“断路器(Circuit Breaker)”模式紧密相关,但平滑降级是一个更广阔的策略,它关注的是在故障发生后,如何优雅地处理并提供替代方案。

在AI应用中,对外部API的依赖日益增长。OpenAI、Anthropic等提供了强大的模型能力,但它们是外部服务,面临着网络延迟、API限速、服务中断、成本波动甚至数据隐私等挑战。如果我们的应用完全绑定于这些外部服务,一旦它们出现问题,我们的应用将立即面临全面瘫痪的风险。

这就是平滑降级的价值所在。通过预先设计一个本地的Llama实例作为备选,我们可以在外部API不可用时,无缝地将流量切换到本地实例。虽然本地Llama可能在性能、模型规模或特定能力上不及OpenAI,但它能确保应用的核心功能得以延续,用户仍然可以获得一个“足够好”的体验,而不是面对一个完全无响应的界面。

外部API依赖的挑战与风险

在构建依赖于OpenAI等外部API的应用程序时,我们通常会享受到以下便利:

  • 强大的模型能力:无需自行训练或部署庞大模型。
  • 低运维成本:无需管理GPU集群或复杂的模型服务基础设施。
  • 快速迭代:可以直接调用最新模型,享受持续的模型改进。

然而,这些便利背后隐藏着不容忽视的风险:

  1. 服务可用性 (Availability):外部服务可能因自身故障、维护、DDoS攻击或其他原因而宕机或不可用。
  2. 网络延迟 (Latency):跨地域的网络通信引入不确定延迟,影响用户体验。
  3. API限速 (Rate Limiting):外部API通常有严格的请求频率限制,超出后请求会被拒绝。
  4. 成本波动与控制 (Cost):API调用是按量计费的,高并发或意外使用可能导致成本飙升。
  5. 数据隐私与合规 (Data Privacy & Compliance):敏感数据通过外部API处理可能带来合规性挑战。
  6. 厂商锁定 (Vendor Lock-in):深度集成特定API可能导致切换成本高昂。
  7. 性能瓶颈 (Performance):外部API的响应时间可能受其自身负载影响。

这些风险使得我们必须为最坏的情况做好准备,平滑降级正是应对这些挑战的有效策略。

平滑降级核心原则与策略

为了实现从OpenAI到本地Llama的平滑切换,我们需要遵循几个核心原则并采用相应的策略:

  1. 抽象与解耦: 将LLM调用逻辑与具体的提供商(OpenAI或本地Llama)解耦,通过一个统一的接口进行交互。
  2. 故障检测: 准确、及时地检测外部API的故障状态。
  3. 智能切换: 根据故障状态和预设策略,自动或半自动地切换到备用方案。
  4. 功能降级: 在切换到备用方案时,可能需要调整应用的功能或提示用户正在使用受限服务。
  5. 资源管理: 确保本地Llama实例的资源(CPU/GPU、内存)充足且随时可用。
  6. 监控与告警: 持续监控服务状态和切换事件,以便及时响应和分析。

实现平滑降级的关键技术与模式

1. 抽象层与适配器模式

这是实现平滑降级的基础。我们需要定义一个抽象接口,所有LLM提供商都必须实现这个接口。这样,我们的应用代码就可以面向接口编程,无需关心底层是OpenAI还是Llama。

import abc
import os
import time
import httpx
from typing import List, Dict, Any, Optional

# 定义一个通用的LLM消息结构,简化兼容性处理
class Message:
    def __init__(self, role: str, content: str):
        self.role = role
        self.content = content

    def to_dict(self):
        return {"role": self.role, "content": self.content}

    @classmethod
    def from_dict(cls, data: Dict[str, str]):
        return cls(data["role"], data["content"])

class LLMProvider(abc.ABC):
    """
    抽象基类:定义所有LLM提供商必须实现的接口
    """
    @abc.abstractmethod
    async def generate_completion(
        self,
        messages: List[Message],
        model: str = "default",
        temperature: float = 0.7,
        max_tokens: int = 512,
        **kwargs
    ) -> str:
        """
        生成文本补全。
        Args:
            messages: 对话历史消息列表。
            model: 使用的模型名称。
            temperature: 采样温度。
            max_tokens: 生成的最大token数。
            **kwargs: 其他特定于提供商的参数。
        Returns:
            生成的文本内容。
        Raises:
            LLMServiceException: 如果调用失败。
        """
        pass

class LLMServiceException(Exception):
    """自定义LLM服务异常"""
    pass

# --- OpenAI 提供商实现 ---
class OpenAIProvider(LLMProvider):
    def __init__(self, api_key: str, base_url: str = "https://api.openai.com/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {self.api_key}"},
            base_url=self.base_url,
            timeout=30.0 # 默认超时时间
        )

    async def generate_completion(
        self,
        messages: List[Message],
        model: str = "gpt-3.5-turbo", # OpenAI默认模型
        temperature: float = 0.7,
        max_tokens: int = 512,
        **kwargs
    ) -> str:
        try:
            payload = {
                "model": model,
                "messages": [m.to_dict() for m in messages],
                "temperature": temperature,
                "max_tokens": max_tokens,
                **kwargs
            }
            response = await self.client.post("/chat/completions", json=payload)
            response.raise_for_status() # 抛出HTTPError如果状态码是4xx/5xx

            data = response.json()
            if "choices" in data and data["choices"]:
                return data["choices"][0]["message"]["content"]
            raise LLMServiceException("OpenAI response missing choices.")
        except httpx.TimeoutException as e:
            raise LLMServiceException(f"OpenAI API request timed out: {e}") from e
        except httpx.RequestError as e:
            raise LLMServiceException(f"OpenAI API request failed: {e}") from e
        except httpx.HTTPStatusError as e:
            # 捕获HTTP错误,例如401, 429, 500等
            status_code = e.response.status_code
            error_msg = e.response.text
            raise LLMServiceException(f"OpenAI API returned HTTP {status_code}: {error_msg}") from e
        except Exception as e:
            raise LLMServiceException(f"An unexpected error occurred with OpenAI API: {e}") from e

# --- 本地Llama提供商实现 (基于Ollama或llama.cpp的HTTP服务) ---
class LocalLlamaProvider(LLMProvider):
    def __init__(self, base_url: str = "http://localhost:11434/api"):
        self.base_url = base_url
        self.client = httpx.AsyncClient(
            base_url=self.base_url,
            timeout=60.0 # 本地Llama可能需要更长的生成时间
        )
        self.default_model = os.getenv("LOCAL_LLAMA_MODEL", "llama2") # 可通过环境变量配置默认模型

    async def generate_completion(
        self,
        messages: List[Message],
        model: str = None, # 覆盖为None,使用内部默认或用户指定
        temperature: float = 0.7,
        max_tokens: int = 512,
        **kwargs
    ) -> str:
        # 如果未指定模型,使用实例的默认模型
        model_to_use = model if model else self.default_model
        if not model_to_use:
            raise ValueError("Local Llama model must be specified or configured.")

        # Ollama API 兼容 OpenAI chat completion 接口
        try:
            payload = {
                "model": model_to_use,
                "messages": [m.to_dict() for m in messages],
                "temperature": temperature,
                "max_tokens": max_tokens,
                "stream": False, # Ollama默认流式,我们这里统一返回完整结果
                **kwargs
            }
            response = await self.client.post("/chat", json=payload) # Ollama的聊天补全接口
            response.raise_for_status()

            data = response.json()
            if "message" in data and "content" in data["message"]:
                return data["message"]["content"]
            # 对于llama.cpp的服务器模式,接口可能略有不同,需要适配
            # 例如:llama.cpp的/completion接口返回的可能是 {"content": "..."}
            # 所以这里需要根据实际情况调整解析逻辑
            # if "content" in data: return data["content"]
            raise LLMServiceException("Local Llama response missing expected content.")
        except httpx.TimeoutException as e:
            raise LLMServiceException(f"Local Llama API request timed out: {e}") from e
        except httpx.RequestError as e:
            raise LLMServiceException(f"Local Llama API request failed: {e}") from e
        except httpx.HTTPStatusError as e:
            status_code = e.response.status_code
            error_msg = e.response.text
            raise LLMServiceException(f"Local Llama API returned HTTP {status_code}: {error_msg}") from e
        except Exception as e:
            raise LLMServiceException(f"An unexpected error occurred with Local Llama API: {e}") from e

2. 故障检测机制

准确、及时地检测外部API的故障是切换策略的基石。

a. 超时 (Timeouts)
这是最基本的故障检测手段。如果一个请求在预设时间内未收到响应,就认为该请求失败。对于外部网络请求,应该设置合理的连接超时和读取超时。

  • 连接超时 (Connect Timeout):建立与服务器的连接所需的最大时间。
  • 读取超时 (Read Timeout):服务器发送响应帧之间允许的最大时间。

在上面的httpx.AsyncClient中,我们已经设置了timeout参数。

b. HTTP状态码与错误类型
不同的HTTP状态码代表不同类型的错误。

  • 4xx 系列(客户端错误):如 401 Unauthorized (API Key错误), 429 Too Many Requests (限速)。
  • 5xx 系列(服务器错误):如 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout

我们应该捕获这些错误,并根据其类型决定是否触发降级。对于401等配置错误,不应触发降级,而应立即报告并停止服务。对于4295xx等临时性或服务问题,则可以触发降级。

c. 断路器模式 (Circuit Breaker)
断路器模式是防止应用程序重复尝试执行可能失败的操作的关键模式。它在检测到一系列失败后,会“跳闸”,阻止后续请求发送到故障服务,从而给服务恢复时间,并避免资源浪费和级联故障。

断路器有三种状态:

  • 关闭 (Closed):正常状态,请求直接通过。
  • 打开 (Open):当失败次数达到阈值时,断路器跳闸,所有请求立即失败,不再尝试调用故障服务。
  • 半开 (Half-Open):在断路器打开一段时间后(恢复超时),允许少量请求通过,以测试服务是否已恢复。如果这些测试请求成功,断路器关闭;如果失败,则回到打开状态。

我们可以使用像 pybreaker 这样的库来实现断路器。

import pybreaker
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 定义断路器配置
# fail_max: 失败次数阈值,达到后断路器跳闸
# reset_timeout: 断路器打开后,进入半开状态前等待的时间(秒)
# exclude: 不计入失败的异常类型(例如,业务逻辑错误不应导致断路器跳闸)
# call_timeout: 每次调用允许的最大时间(此超时与httpx的超时共同作用,这里只是断路器层面的额外保护)
openai_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60, # 60秒后尝试进入半开状态
    exclude=[LLMServiceException], # 假设我们的LLMServiceException包含可降级的错误
    # 如果要精确控制,可以自定义错误处理逻辑,或根据错误类型判断是否计入失败
    # 例如:只有5xx和429才计入失败
)

# 装饰器方式使用断路器
class LLMServiceWithBreaker:
    def __init__(self, primary_provider: LLMProvider, fallback_provider: LLMProvider):
        self.primary_provider = primary_provider
        self.fallback_provider = fallback_provider
        self.current_provider = primary_provider
        self.is_degraded = False # 标记是否处于降级状态

    @openai_breaker
    async def _call_primary_provider(self, *args, **kwargs):
        """
        通过断路器调用主提供商。
        如果主提供商失败并导致断路器跳闸,此方法会抛出 CircuitBreakerError。
        """
        logging.info(f"Attempting to use primary provider: {type(self.primary_provider).__name__}")
        return await self.primary_provider.generate_completion(*args, **kwargs)

    async def generate_completion(self, messages: List[Message], model: str = "default", **kwargs) -> str:
        try:
            # 尝试调用主提供商
            result = await self._call_primary_provider(messages, model, **kwargs)
            if self.is_degraded:
                logging.info("Primary provider recovered. Switching back from fallback.")
                self.is_degraded = False
            self.current_provider = self.primary_provider
            return result
        except pybreaker.CircuitBreakerError:
            # 断路器已打开,直接使用备用提供商
            logging.warning("Primary provider circuit is OPEN. Falling back to local Llama.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            return await self.fallback_provider.generate_completion(messages, model, **kwargs)
        except LLMServiceException as e:
            # 主提供商出现LLM服务异常 (例如429, 5xx),断路器会计数
            logging.error(f"Primary provider failed with LLMServiceException: {e}. Attempting fallback.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            return await self.fallback_provider.generate_completion(messages, model, **kwargs)
        except Exception as e:
            # 捕获其他未知异常,也尝试降级
            logging.error(f"Unexpected error with primary provider: {e}. Attempting fallback.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            return await self.fallback_provider.generate_completion(messages, model, **kwargs)

注意: LLMServiceWithBreaker 这里的逻辑是,如果主提供商抛出 LLMServiceException (例如 4295xx),断路器会根据配置自动计数。如果达到 fail_max,断路器状态会变为 OPEN_call_primary_provider 会立即抛出 CircuitBreakerError。这样,我们就能在这两种情况下都切换到备用提供商。

3. 重试策略 (Retry Mechanisms)

对于瞬时故障(如网络抖动、短暂的服务过载),立即切换到备用方案可能过于激进。在进行降级之前,通常会尝试几次重试。重试策略应包含指数退避 (Exponential Backoff),即每次重试之间等待的时间呈指数增长,以避免对已经过载的服务造成更大的压力。

我们可以使用 tenacity 这样的库来实现重试逻辑。

from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type

# 定义一个针对LLMServiceException的重试装饰器
# 等待策略:指数退避,初始等待0.5秒,最大等待10秒
# 停止策略:最多重试3次
# 重试条件:只有当抛出 LLMServiceException 时才重试
# 如果是 CircuitBreakerError,则不重试,因为这意味着断路器已打开
@retry(
    wait=wait_exponential(multiplier=1, min=0.5, max=10),
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type(LLMServiceException), # 只对LLMServiceException进行重试
    reraise=True # 重试失败后重新抛出异常
)
async def _retryable_llm_call(provider: LLMProvider, messages: List[Message], model: str, **kwargs) -> str:
    """
    一个可重试的LLM调用函数。
    此函数会被断路器装饰器内部调用,或者在降级逻辑中调用备用提供商。
    """
    logging.debug(f"Attempting LLM call with provider {type(provider).__name__}")
    return await provider.generate_completion(messages, model, **kwargs)

# 重新构建 LLMServiceWithBreaker 来集成重试
class LLMServiceWithBreakerAndRetry:
    def __init__(self, primary_provider: LLMProvider, fallback_provider: LLMProvider):
        self.primary_provider = primary_provider
        self.fallback_provider = fallback_provider
        self.current_provider = primary_provider
        self.is_degraded = False

    @openai_breaker # 断路器在最外层包裹,防止对已故障的服务进行重试
    async def _call_primary_with_retry(self, *args, **kwargs):
        """
        尝试通过重试机制调用主提供商。
        如果重试后仍失败,会抛出异常,断路器会捕获并计数。
        """
        logging.info(f"Attempting to use primary provider with retry: {type(self.primary_provider).__name__}")
        return await _retryable_llm_call(self.primary_provider, *args, **kwargs)

    async def generate_completion(self, messages: List[Message], model: str = "default", **kwargs) -> str:
        try:
            result = await self._call_primary_with_retry(messages, model, **kwargs)
            if self.is_degraded:
                logging.info("Primary provider recovered. Switching back from fallback.")
                self.is_degraded = False
            self.current_provider = self.primary_provider
            return result
        except pybreaker.CircuitBreakerError:
            logging.warning("Primary provider circuit is OPEN. Falling back to local Llama.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            # 备用提供商也可能失败,这里也应该考虑重试或更进一步的降级
            # 为了简洁,这里直接调用,实际应用中可以给fallback_provider也加重试
            return await _retryable_llm_call(self.fallback_provider, messages, model, **kwargs)
        except LLMServiceException as e:
            logging.error(f"Primary provider failed after retries with LLMServiceException: {e}. Falling back.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            return await _retryable_llm_call(self.fallback_provider, messages, model, **kwargs)
        except Exception as e:
            logging.error(f"Unexpected error with primary provider after retries: {e}. Falling back.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            return await _retryable_llm_call(self.fallback_provider, messages, model, **kwargs)

在这个结构中,断路器在外层,重试在内层。这意味着:

  1. 正常情况:请求通过断路器(CLOSED状态),然后进入重试逻辑调用主提供商。
  2. 瞬时故障:主提供商偶尔失败,重试逻辑会介入。如果重试成功,断路器不会计数。
  3. 持续故障:主提供商持续失败,重试后仍失败,LLMServiceException会被抛出。断路器捕获到这些失败并计数。
  4. 断路器跳闸:当失败次数达到阈值,断路器变为OPEN状态,_call_primary_with_retry会立即抛出CircuitBreakerError,从而绕过主提供商和重试逻辑,直接切换到备用提供商。

4. 配置管理与动态切换

降级策略应该易于配置和管理。我们可以通过环境变量、配置文件或集中式配置服务来控制:

  • 主/备用提供商的URL和API Key
  • 断路器的参数 ( fail_max, reset_timeout )。
  • 重试策略的参数 ( stop_after_attempt, wait_exponential )。
  • 是否启用降级 (Feature Flag)。
# 示例配置
class AppConfig:
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-openai-key")
    OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
    LOCAL_LLAMA_BASE_URL = os.getenv("LOCAL_LLAMA_BASE_URL", "http://localhost:11434/api")
    LOCAL_LLAMA_MODEL = os.getenv("LOCAL_LLAMA_MODEL", "llama2")

    # 断路器配置
    CB_FAIL_MAX = int(os.getenv("CB_FAIL_MAX", "5"))
    CB_RESET_TIMEOUT = int(os.getenv("CB_RESET_TIMEOUT", "60"))

    # 重试配置
    RETRY_ATTEMPTS = int(os.getenv("RETRY_ATTEMPTS", "3"))
    RETRY_MIN_WAIT = float(os.getenv("RETRY_MIN_WAIT", "0.5"))
    RETRY_MAX_WAIT = float(os.getenv("RETRY_MAX_WAIT", "10"))

本地Llama实例的准备与集成

要在生产环境中使用本地Llama作为备用,需要一些准备工作。

1. 选择本地Llama框架

  • Ollama: 这是目前最推荐的方式之一,它提供了一个用户友好的命令行工具,可以轻松下载、运行各种Llama模型,并提供一个兼容OpenAI Chat Completion API的HTTP服务。
  • llama.cpp: 这是Llama模型在C++上的高效实现,可以独立运行,也可以通过其提供的server模式启动一个HTTP API服务。它通常提供最佳性能,但设置可能稍微复杂一些。
  • Hugging Face transformers + text-generation-inference: 对于更大的模型或需要更多自定义的场景,可以考虑使用Hugging Face生态系统,结合text-generation-inference来部署一个高性能的推理服务。

这里我们以Ollama为例,因为它最方便集成且API与OpenAI高度相似。

2. 本地Llama的API接口

Ollama启动后,默认会在 http://localhost:11434 监听,并提供以下关键API:

  • /api/chat: 兼容OpenAI的聊天补全接口。
  • /api/generate: 传统的文本生成接口。
  • /api/tags: 列出本地已下载的模型。

确保你的Llama模型(例如llama2mistral)已通过Ollama下载并可用。
ollama run llama2

3. API兼容层设计

LocalLlamaProvider 已经展示了如何将OpenAI风格的请求映射到Ollama的/api/chat接口。关键在于:

  • 请求体格式: 确保 messages 列表的格式 ({"role": "user", "content": "..."}) 与OpenAI和Ollama都兼容。
  • 响应体解析: 解析Ollama返回的JSON结构以提取生成的文本。Ollama的 /api/chat 响应通常是 {"message": {"role": "assistant", "content": "..."}}
  • 模型名称: 外部API可能使用 gpt-3.5-turbo,本地Llama可能使用 llama2。在 LocalLlamaProvider 中,我们允许通过 model 参数覆盖,或使用一个默认值。

4. 资源管理

运行本地Llama实例需要消耗大量的计算资源(CPU/GPU)和内存。

  • 硬件需求: 对于较大的Llama模型,强烈推荐使用具备足够VRAM的GPU。如果只有CPU,推理速度会慢得多。
  • 模型大小: 不同的Llama模型有不同的参数量(例如7B, 13B, 70B),直接影响资源消耗和推理速度。选择一个适合你硬件和性能要求的模型。
  • 并发: 本地Llama实例通常不如大型云服务那样能高效处理高并发请求。在降级模式下,可能需要限制并发用户数,或者告知用户响应时间会变长。
  • 启动/停止: 确保本地Llama服务在应用启动前就已经运行,或者应用具备按需启动/停止本地Llama实例的能力(这会增加复杂性)。

端到端实现架构

现在,我们将所有组件整合起来,构建一个完整的服务。

# main.py 或 app.py

import asyncio
import os
import logging

# 导入之前定义的类和函数
# from .llm_providers import LLMProvider, OpenAIProvider, LocalLlamaProvider, LLMServiceException, Message
# from .circuit_breaker import LLMServiceWithBreakerAndRetry, openai_breaker, _retryable_llm_call
# from .config import AppConfig

# 假设所有类都在当前文件或已正确导入

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class AppConfig:
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "sk-your-openai-key-here") # 实际应用中请从安全位置加载
    OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
    LOCAL_LLAMA_BASE_URL = os.getenv("LOCAL_LLAMA_BASE_URL", "http://localhost:11434/api")
    LOCAL_LLAMA_MODEL = os.getenv("LOCAL_LLAMA_MODEL", "llama2")

    CB_FAIL_MAX = int(os.getenv("CB_FAIL_MAX", "5"))
    CB_RESET_TIMEOUT = int(os.getenv("CB_RESET_TIMEOUT", "60"))
    RETRY_ATTEMPTS = int(os.getenv("RETRY_ATTEMPTS", "3"))
    RETRY_MIN_WAIT = float(os.getenv("RETRY_MIN_WAIT", "0.5"))
    RETRY_MAX_WAIT = float(os.getenv("RETRY_MAX_WAIT", "10"))

# 重新定义断路器,确保使用AppConfig的参数
openai_breaker = pybreaker.CircuitBreaker(
    fail_max=AppConfig.CB_FAIL_MAX,
    reset_timeout=AppConfig.CB_RESET_TIMEOUT,
    exclude=[LLMServiceException],
)

@retry(
    wait=wait_exponential(multiplier=1, min=AppConfig.RETRY_MIN_WAIT, max=AppConfig.RETRY_MAX_WAIT),
    stop=stop_after_attempt(AppConfig.RETRY_ATTEMPTS),
    retry=retry_if_exception_type(LLMServiceException),
    reraise=True
)
async def _retryable_llm_call(provider: LLMProvider, messages: List[Message], model: str, **kwargs) -> str:
    logging.debug(f"Attempting LLM call with provider {type(provider).__name__}")
    return await provider.generate_completion(messages, model, **kwargs)

class LLMServiceWithBreakerAndRetry:
    def __init__(self, primary_provider: LLMProvider, fallback_provider: LLMProvider):
        self.primary_provider = primary_provider
        self.fallback_provider = fallback_provider
        self.current_provider = primary_provider
        self.is_degraded = False

    @openai_breaker
    async def _call_primary_with_retry(self, *args, **kwargs):
        logging.info(f"Attempting to use primary provider with retry: {type(self.primary_provider).__name__}")
        return await _retryable_llm_call(self.primary_provider, *args, **kwargs)

    async def generate_completion(self, messages: List[Message], model: str = "default", **kwargs) -> str:
        try:
            result = await self._call_primary_with_retry(messages, model, **kwargs)
            if self.is_degraded:
                logging.info("Primary provider recovered. Switching back from fallback.")
                self.is_degraded = False
            self.current_provider = self.primary_provider
            return result
        except pybreaker.CircuitBreakerError:
            logging.warning("Primary provider circuit is OPEN. Falling back to local Llama.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            try:
                return await _retryable_llm_call(self.fallback_provider, messages, model, **kwargs)
            except Exception as fe:
                logging.critical(f"Fallback provider also failed after retries: {fe}. No LLM service available.")
                raise LLMServiceException("Both primary and fallback LLM services are unavailable.") from fe
        except LLMServiceException as e:
            logging.error(f"Primary provider failed after retries with LLMServiceException: {e}. Falling back.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            try:
                return await _retryable_llm_call(self.fallback_provider, messages, model, **kwargs)
            except Exception as fe:
                logging.critical(f"Fallback provider also failed after retries: {fe}. No LLM service available.")
                raise LLMServiceException("Both primary and fallback LLM services are unavailable.") from fe
        except Exception as e:
            logging.error(f"Unexpected error with primary provider after retries: {e}. Falling back.")
            self.is_degraded = True
            self.current_provider = self.fallback_provider
            try:
                return await _retryable_llm_call(self.fallback_provider, messages, model, **kwargs)
            except Exception as fe:
                logging.critical(f"Fallback provider also failed after retries: {fe}. No LLM service available.")
                raise LLMServiceException("Both primary and fallback LLM services are unavailable.") from fe

async def main():
    # 初始化LLM提供商
    openai_provider = OpenAIProvider(api_key=AppConfig.OPENAI_API_KEY, base_url=AppConfig.OPENAI_BASE_URL)
    local_llama_provider = LocalLlamaProvider(base_url=AppConfig.LOCAL_LLAMA_BASE_URL)

    # 初始化带有断路器和重试逻辑的LLM服务
    llm_service = LLMServiceWithBreakerAndRetry(
        primary_provider=openai_provider,
        fallback_provider=local_llama_provider
    )

    test_messages = [
        Message(role="user", content="请用一句话介绍平滑降级。")
    ]

    print("n--- 第一次尝试 (预期使用OpenAI) ---")
    try:
        response = await llm_service.generate_completion(test_messages, model="gpt-3.5-turbo")
        print(f"LLM Response: {response}")
        print(f"Currently using: {type(llm_service.current_provider).__name__}")
    except LLMServiceException as e:
        print(f"Error during LLM call: {e}")

    # 模拟OpenAI故障,例如通过一个无效的API Key或关闭网络连接,或者让OpenAIProvider直接抛出异常
    # 为了演示,我们可以暂时修改OpenAIProvider的API Key或URL来强制失败
    # 或直接模拟失败场景
    print("n--- 模拟OpenAI故障,触发降级 ---")
    # 强制让OpenAIProvider抛出异常,触发断路器
    # 实际场景中,这将是网络错误、API限速、5xx错误等
    # 这里我们通过在内部修改api_key来模拟认证失败,这将导致401,并触发LLMServiceException
    openai_provider.api_key = "invalid-key" # 模拟API Key失效

    for i in range(AppConfig.CB_FAIL_MAX + 2): # 尝试次数超过fail_max + 预留几次
        print(f"n--- 模拟调用 {i+1} ---")
        try:
            response = await llm_service.generate_completion(test_messages, model="gpt-3.5-turbo")
            print(f"LLM Response: {response}")
            print(f"Currently using: {type(llm_service.current_provider).__name__}")
        except LLMServiceException as e:
            print(f"Error during LLM call: {e}")
        await asyncio.sleep(1) # 模拟每次请求间隔

    print("n--- 断路器打开后,继续调用 (预期直接使用本地Llama) ---")
    try:
        response = await llm_service.generate_completion(test_messages, model="gpt-3.5-turbo")
        print(f"LLM Response: {response}")
        print(f"Currently using: {type(llm_service.current_provider).__name__}")
    except LLMServiceException as e:
        print(f"Error during LLM call: {e}")

    print(f"n--- 等待断路器重置 ({AppConfig.CB_RESET_TIMEOUT} 秒) ---")
    await asyncio.sleep(AppConfig.CB_RESET_TIMEOUT + 5) # 等待超过重置时间

    print("n--- 断路器半开状态,测试OpenAI是否恢复 (预期尝试OpenAI,可能恢复或继续降级) ---")
    # 恢复OpenAI API Key,模拟服务恢复
    openai_provider.api_key = AppConfig.OPENAI_API_KEY # 恢复有效API Key
    try:
        response = await llm_service.generate_completion(test_messages, model="gpt-3.5-turbo")
        print(f"LLM Response: {response}")
        print(f"Currently using: {type(llm_service.current_provider).__name__}")
        if not llm_service.is_degraded:
            print("OpenAI seems to have recovered and circuit breaker is CLOSED.")
    except LLMServiceException as e:
        print(f"Error during LLM call: {e}")
        print(f"Currently using: {type(llm_service.current_provider).__name__}")
        if llm_service.is_degraded:
            print("OpenAI is still failing, remaining in degraded state.")

if __name__ == "__main__":
    # 确保本地Ollama服务正在运行,例如:ollama serve
    # 并且已经拉取了llama2模型:ollama pull llama2
    asyncio.run(main())

架构流程图 (概念性描述,非图像)

用户请求
   |
   V
应用主逻辑 (调用 LLMServiceWithBreakerAndRetry)
   |
   V
LLMServiceWithBreakerAndRetry.generate_completion()
   |
   +-----> 断路器 (CircuitBreaker) <----+
   |          (状态: CLOSED)             |
   V                                     |
_call_primary_with_retry()               |
   |                                     |
   V                                     |
重试装饰器 (Retry)                       |
   |                                     |
   V                                     |
OpenAIProvider.generate_completion() ----+-----> (成功) -> 返回结果
   |                                     |
   | (失败: LLMServiceException, httpx.TimeoutException等)
   V                                     |
(重试,如果仍失败)                         |
   |                                     |
   +-----> (失败) -> 报告失败给断路器 ------+
                   (断路器计数增加)
                   (如果达到阈值,断路器变为 OPEN)
   |
   V
(如果断路器 OPEN 或主服务重试后失败)
   |
   V
LLMServiceWithBreakerAndRetry 捕获异常
   |
   V
切换到 Fallback (LocalLlamaProvider)
   |
   V
重试装饰器 (Retry) (针对 LocalLlamaProvider)
   |
   V
LocalLlamaProvider.generate_completion()
   |
   +-----> (成功) -> 返回降级结果
   |
   | (失败)
   V
抛出最终异常 (表示所有LLM服务均不可用)

断路器周期:
CLOSED --(失败达到阈值)--> OPEN --(等待reset_timeout)--> HALF-OPEN --(少量请求成功)--> CLOSED
                                                         |
                                                         +--(少量请求失败)--> OPEN

平滑降级策略的权衡与考量

在设计和实现平滑降级时,需要仔细权衡以下因素:

  1. 性能差异: 本地Llama实例的性能(推理速度、吞吐量)通常不如OpenAI等云服务,尤其是在CPU上运行大型模型时。这可能意味着用户在降级模式下会感受到更长的响应时间。
  2. 功能差异与模型能力:
    • 模型规模: 本地Llama通常是较小规模的模型(如7B, 13B),可能在复杂推理、知识广度或语言理解方面不如OpenAI的顶级模型(如GPT-4)。
    • 上下文窗口: 本地模型的上下文窗口可能较小,限制了处理长文本的能力。
    • 特定能力: 某些OpenAI特有的功能(如函数调用、DALL-E集成)在本地Llama上可能不存在或需要额外适配。
    • 微调: 如果应用依赖于OpenAI的微调模型,切换到本地Llama意味着可能需要重新进行微调或接受通用模型的表现。
  3. 成本: 运行本地Llama需要投入硬件成本(GPU),虽然单次推理免费,但长期运行和维护仍有成本。
  4. 数据隐私: 本地Llama在数据隐私方面有显著优势,数据不出本地环境。这在某些行业或应用中可能是关键因素。
  5. 复杂性: 引入平滑降级策略会增加系统的复杂性,包括代码逻辑、部署、监控和测试。
  6. 用户体验管理: 在降级模式下,是应该静默切换还是明确告知用户?静默切换可能导致用户对性能或结果质量感到困惑;明确告知则可能影响用户信心,但更透明。通常,对于影响较大的降级,建议在UI上给出提示。
  7. 模型版本管理: 确保本地Llama模型的版本与主服务模型的能力差异在可接受范围内,并定期更新本地模型。

测试与监控

a. 测试

  • 单元测试: 独立测试 OpenAIProviderLocalLlamaProvider 的功能。
  • 集成测试: 模拟OpenAI API的各种故障情况(超时、5xx错误、429限速),验证断路器和降级逻辑是否按预期工作。可以使用 pytesthttpx.mockrespx 来模拟HTTP响应。
  • 负载测试: 测试本地Llama实例在不同负载下的性能极限,了解其可支持的并发用户数和响应时间。
  • 恢复测试: 模拟外部API恢复后,断路器是否能正确关闭并切换回主服务。

b. 监控与告警

  • 服务可用性: 监控OpenAI API的响应时间、错误率。
  • 本地Llama状态: 监控本地Llama实例的CPU/GPU使用率、内存消耗、响应时间,确保其健康运行。
  • 降级事件: 记录并告警何时发生了降级切换,以及切换回主服务的时间。
  • 断路器状态: 监控断路器的状态(CLOSED, OPEN, HALF-OPEN),这能直观反映外部服务的健康状况。
  • 错误日志: 详细记录LLM调用中的错误信息,便于故障排查。

持续演进与最佳实践

平滑降级不是一劳永逸的解决方案,它需要随着应用和外部服务环境的变化而持续演进。

  • 定期审查策略: 随着业务需求和技术发展,定期评估当前的降级策略是否仍然有效和最佳。
  • 自动化部署与测试: 将降级相关的配置和测试纳入CI/CD流程,确保每次部署都能正确处理故障。
  • 文档化: 详细记录降级策略、配置参数、故障处理流程和监控指标,方便团队成员理解和维护。
  • 渐进式部署: 如果要引入新的降级策略或备用模型,可以考虑使用金丝雀发布或A/B测试,逐步验证其效果。
  • 用户反馈: 关注用户在降级模式下的反馈,根据实际体验调整策略,例如改进降级提示语或优化本地模型的表现。

通过精心设计和实施平滑降级策略,我们可以显著提高依赖外部AI服务的应用程序的韧性和可用性,确保在面对不可避免的外部服务故障时,依然能够提供稳定且有价值的用户体验。这是一个对系统架构师和开发者都充满挑战但极具回报的实践。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注