解析 ‘Fair-share Scheduling’:在多代理系统中,如何公平分配 LLM API 配额以防止单个 Agent 霸占资源?

尊敬的各位专家、开发者同仁们,大家好!

在当今人工智能浪潮中,大型语言模型(LLM)API已成为多代理系统(Multi-Agent Systems, MAS)不可或缺的基石。无论是智能客服、自动化内容生成、代码辅助开发,还是复杂的决策支持系统,LLM API都赋予了这些代理无与伦比的“思考”和“表达”能力。然而,这种强大的能力并非没有代价。LLM API调用通常按量计费,资源有限(无论是并发数、速率限制还是总体预算),并且在复杂的MAS中,不同的代理可能具有不同的优先级、重要性或实际需求。

想象一下这样一个场景:一个由数十个甚至上百个智能代理组成的团队,它们共同协作完成一个项目。其中一些代理可能负责核心业务逻辑,需要高频、低延迟地访问LLM;另一些代理可能进行背景研究或辅助性任务,需求相对较低。如果没有一个有效的管理机制,某个“话痨”代理可能会因为频繁调用API而迅速耗尽团队的配额,导致其他关键代理“无话可说”,甚至整个系统瘫痪。这不仅会造成成本失控,更会严重影响系统的稳定性和整体性能。

这正是我们今天讲座的核心议题:如何通过“公平共享调度”(Fair-share Scheduling)的理念和技术,在多代理系统中公平、高效地分配LLM API配额,防止单个代理霸占资源,确保整个系统的健康运行。

我们将深入探讨公平共享调度的原理、挑战、常用算法,并结合具体的代码实现,构建一套实用的LLM API配额管理系统。


第一讲:公平共享调度的核心理念与挑战

1.1 什么是公平共享?为何重要?

公平共享调度是一种资源管理策略,其核心思想是确保系统中的各个“用户”(在我们的场景中是各个代理或代理组)能够根据其预设的“份额”或“权重”,公平地访问和利用共享资源。它不仅仅是简单地平均分配资源,而是更注重按需分配、按权重分配,并防止任何一个用户无限制地占用资源而导致其他用户饥饿。

在LLM API配额分配的语境下,公平共享的目标是:

  • 防止饥饿 (Starvation Prevention):确保每个代理都有机会访问API,即使在资源紧张时也不会被完全排除在外。
  • 资源利用率最大化 (Maximizing Resource Utilization):当某个代理不需要其全部份额时,其剩余份额可以被其他有需求的代理临时借用,提高整体资源利用率。
  • 优先级管理 (Priority Management):通过为不同代理设置不同的权重,实现基于业务重要性的优先级管理。
  • 成本控制 (Cost Control):将API调用与预算挂钩,通过配额限制实现成本预期。
  • 可预测性 (Predictability):代理可以相对准确地预估自己能够获得的API访问量,从而更好地规划任务。

1.2 LLM API配额分配的独特挑战

尽管公平共享调度是一个相对成熟的领域,但在LLM API配额分配场景下,我们面临一些独特的挑战:

  1. 异构的资源计量单位

    • 请求次数 (Requests):最直观的单位,但不同请求的计算成本可能天差地别(短prompt与长prompt)。
    • Token数量 (Tokens):更细粒度的计量,直接反映API的计算量和成本,但统计和管理可能更复杂。
    • 成本 (Cost):最直接的业务指标,但需要实时获取不同模型、不同操作的费率,并进行汇率转换。
    • 并发数 (Concurrency):API提供商通常对并发请求数有限制,这是一种瞬时资源。
      我们可能需要同时管理多个维度的配额。
  2. 动态与突发性需求 (Dynamic & Burstiness)

    • 代理的工作模式可能并非恒定,而是具有高峰和低谷。例如,一个代理在处理批量数据时可能会瞬间发起大量请求,之后又长时间空闲。
    • 系统需要能够平滑这些突发需求,允许一定程度的“超发”,同时又不能打破总体限制。
  3. 长期与短期公平性 (Long-term vs. Short-term Fairness)

    • 在短时间内,某个高优先级的代理可能需要暂时占用更多资源。但从长期来看,所有代理都应该得到其应有的份额。如何在瞬时响应和长期公平之间找到平衡?
  4. 状态管理与持久化 (State Management & Persistence)

    • 每个代理的使用量、剩余配额、权重等信息都需要被准确地追踪和持久化,以应对系统重启或故障。
    • 在分布式系统中,这涉及到一致性问题。
  5. 反馈与调整机制 (Feedback & Adjustment)

    • 配额分配策略可能需要根据实际使用情况、业务优先级变化、甚至LLM API提供商的策略调整而动态优化。

第二讲:基础调度算法及其在配额分配中的应用

我们将从一些基础的调度算法出发,逐步构建起对LLM API配额管理系统的理解。

2.1 简单轮询与加权轮询:起步与局限性

简单轮询 (Round Robin) 是最简单的调度方式,每个代理依次获得一次API调用机会。
加权轮询 (Weighted Round Robin) 允许为不同的代理分配不同的“权重”,权重高的代理在每个周期内获得更多次调用机会。

例如,代理A权重为2,代理B权重为1,则调度序列可能是 A, A, B, A, A, B…

优点:实现简单,易于理解。
局限性

  • 不适合异步、并发环境:LLM API调用通常是异步的,代理不会等待“轮到自己”才发起请求。
  • 无法处理突发请求:如果一个代理在轮到它时没有请求,它的份额就被浪费了;如果它有很多请求,也只能等待下一次轮询。
  • 缺乏长期公平性保障:在动态负载下,一个代理可能因为其他代理的空闲而获得超出其份额的资源,或者因为其他代理的繁忙而被延迟。

因此,虽然加权轮询提供了权重的概念,但它不足以应对LLM API配额分配的复杂性。我们需要更精细的机制。

2.2 令牌桶与漏桶算法:流量整形与突发管理

令牌桶(Token Bucket)和漏桶(Leaky Bucket)是两种常用的限流算法,它们在管理API调用速率和处理突发流量方面非常有效。

2.2.1 令牌桶算法 (Token Bucket Algorithm)

原理
一个“桶”以恒定速率生成“令牌”。每个API请求都需要消耗一个令牌。如果桶里有足够的令牌,请求就被允许;否则,请求被拒绝或排队等待。桶的容量有限,可以存储一定数量的令牌,这允许系统处理短时间的突发请求。

  • 桶容量 (Bucket Capacity):允许的最大突发量。
  • 令牌生成速率 (Token Rate):长期允许的平均请求速率。

优点

  • 允许一定程度的突发流量,提高了API的响应性。
  • 能够控制长期平均速率。

缺点

  • 不保证请求的公平性,只是简单地限制了总速率。
  • 不能直接与代理的“份额”概念结合,需要额外的逻辑。

Python代码示例:一个简单的令牌桶实现

import time
import threading

class TokenBucket:
    def __init__(self, capacity: int, fill_rate: float):
        """
        :param capacity: 令牌桶的最大容量(允许的最大突发请求数)
        :param fill_rate: 令牌生成速率,单位:令牌/秒
        """
        if capacity <= 0 or fill_rate <= 0:
            raise ValueError("Capacity and fill_rate must be positive.")

        self.capacity = capacity
        self.fill_rate = fill_rate
        self.current_tokens = capacity  # 初始时桶是满的
        self.last_fill_time = time.monotonic() # monotonic() 保证时间单调递增,不受系统时间调整影响
        self.lock = threading.Lock()

    def _refill_tokens(self):
        """根据时间间隔和填充速率补充令牌"""
        now = time.monotonic()
        time_passed = now - self.last_fill_time
        tokens_to_add = time_passed * self.fill_rate
        self.current_tokens = min(self.capacity, self.current_tokens + tokens_to_add)
        self.last_fill_time = now

    def try_consume(self, tokens_needed: int = 1) -> bool:
        """尝试消费指定数量的令牌"""
        if tokens_needed <= 0:
            return True # 消耗0个令牌总是成功

        with self.lock:
            self._refill_tokens()
            if self.current_tokens >= tokens_needed:
                self.current_tokens -= tokens_needed
                return True
            return False

    def get_remaining_tokens(self) -> float:
        """获取当前桶中的令牌数量"""
        with self.lock:
            self._refill_tokens()
            return self.current_tokens

# 示例用法
if __name__ == "__main__":
    bucket = TokenBucket(capacity=10, fill_rate=2) # 容量10,每秒生成2个令牌

    print(f"初始令牌数: {bucket.get_remaining_tokens()}")

    # 模拟突发请求
    print("n模拟突发请求:")
    for i in range(1, 15):
        if bucket.try_consume(1):
            print(f"请求 {i}: 成功消费令牌。剩余: {bucket.get_remaining_tokens():.2f}")
        else:
            print(f"请求 {i}: 失败,令牌不足。剩余: {bucket.get_remaining_tokens():.2f}")
        time.sleep(0.1) # 短暂停顿

    print(f"n等待一段时间(补充令牌)...")
    time.sleep(3) # 等待3秒,理论上会补充 3 * 2 = 6 个令牌

    print(f"等待后令牌数: {bucket.get_remaining_tokens():.2f}")

    # 模拟正常速率请求
    print("n模拟正常速率请求:")
    for i in range(1, 5):
        if bucket.try_consume(1):
            print(f"请求 {i}: 成功消费令牌。剩余: {bucket.get_remaining_tokens():.2f}")
        else:
            print(f"请求 {i}: 失败,令牌不足。剩余: {bucket.get_remaining_tokens():.2f}")
        time.sleep(0.4) # 速率小于生成速率,应该都能成功

2.2.2 漏桶算法 (Leaky Bucket Algorithm)

原理
一个“桶”以固定速率流出请求,无论请求进入桶的速度有多快,流出速度是恒定的。如果请求到达时桶已满,那么新请求会被丢弃。

优点

  • 输出速率平滑,非常适合需要稳定流量的场景。

缺点

  • 无法处理突发流量,突发请求会被直接丢弃,响应性较差。
  • 同样不直接提供公平性保障。

在LLM API配额管理中,令牌桶通常更受欢迎,因为它在控制平均速率的同时,允许一定程度的突发,这与实际的代理行为更为匹配。我们可以为每个代理维护一个独立的令牌桶,或者将令牌桶与配额管理系统结合。

2.3 加权公平排队 (WFQ) 思想的借鉴

加权公平排队 (Weighted Fair Queuing, WFQ) 及其理论基础 广义处理器共享 (Generalized Processor Sharing, GPS) 是网络调度领域的核心算法。虽然它们直接应用于包队列调度,但其核心思想——虚拟时间 (Virtual Time)份额 (Share) ——对我们的配额分配非常有启发。

核心思想

  • 权重 (Weight):每个代理被分配一个权重,代表其应得的资源份额。
  • 虚拟时间 (Virtual Time):系统维护一个全局虚拟时间。每个代理也有一个自己的虚拟完成时间。当一个代理使用资源时,它的虚拟完成时间会根据其权重和消耗的资源量向前推进。
  • 调度决策:系统总是选择虚拟完成时间最小的代理来服务。这确保了权重高的代理(其虚拟时间推进较慢,因为资源消耗对其虚拟时间的影响被权重“稀释”了)能够更快地获得资源。

在LLM API配额中的借鉴
我们不直接排队请求,而是将“虚拟时间”映射为代理的“消耗速度”或“成本”。一个代理的“虚拟完成时间”可以代表它在当前配额周期内已经“消耗”了多少相对于其份额的资源。

例如:

  • 每个代理都有一个权重 W_i
  • 每个API调用有成本 C(可以是1个请求,100个token等)。
  • 代理 i 的“虚拟使用量” V_i 初始为0。
  • 当代理 i 消耗 C 资源时,其 V_i 变为 V_i + C / W_i
  • 在任何时候,优先服务那些 V_i 值最小的代理(即那些相对于其份额来说,消耗最少的代理)。

优点

  • 提供了强大的长期公平性保障。
  • 能够动态适应不同代理的请求速率。
  • 允许“借贷”机制的自然实现:如果一个代理的 V_i 远低于其他代理,它可以在短时间内获得更多资源。

局限性

  • 直接实现 WFQ 需要复杂的队列管理和实时计算,对于API调用这种非排队、瞬时决策的场景,可能过于复杂。
  • 我们更倾向于基于配额的准入控制 (Admission Control),而非严格的请求排队。

2.4 比例份额调度 (Proportional Share Scheduling)

比例份额调度是WFQ思想在配额管理中的简化和实用化。其核心是:每个代理在一个预设的周期内,可以消耗的资源总量,应与其权重在所有活跃代理总权重中所占的比例成正比。

核心思想

  • 为每个代理 i 分配一个权重 W_i
  • 在一个配额周期(例如一天、一周)内,总的可用LLM API配额为 TotalQuota
  • 代理 i 的“理论”配额 Quota_i = TotalQuota * (W_i / Sum(W_j))

Python代码示例:一个简单的比例分配器

这个例子演示了如何根据权重计算每个代理在总配额中的初始份额。

class ProportionalQuotaAllocator:
    def __init__(self, total_quota: int, agent_weights: dict[str, int]):
        """
        :param total_quota: 总的API配额(例如,每月100万个token)
        :param agent_weights: 字典,键为代理ID,值为其权重
        """
        if total_quota <= 0:
            raise ValueError("Total quota must be positive.")
        if not agent_weights or any(w <= 0 for w in agent_weights.values()):
            raise ValueError("Agent weights must be provided and positive.")

        self.total_quota = total_quota
        self.agent_weights = agent_weights
        self.total_weight = sum(agent_weights.values())
        if self.total_weight == 0: # 避免除以零
            raise ValueError("Total weight of agents cannot be zero.")

        self.allocated_quotas = {}
        self._calculate_initial_quotas()

    def _calculate_initial_quotas(self):
        """根据权重计算每个代理的初始分配配额"""
        remaining_quota = self.total_quota
        allocated_sum = 0

        # 优先处理整数部分,避免浮点数累积误差
        for agent_id, weight in self.agent_weights.items():
            share = weight / self.total_weight
            quota = int(self.total_quota * share) # 取整数部分
            self.allocated_quotas[agent_id] = quota
            allocated_sum += quota

        # 将剩余的配额(由于取整造成的)按权重从高到低分配给代理
        remainder = self.total_quota - allocated_sum
        sorted_agents = sorted(self.agent_weights.items(), key=lambda item: item[1], reverse=True)

        for agent_id, _ in sorted_agents:
            if remainder > 0:
                self.allocated_quotas[agent_id] += 1
                remainder -= 1
            else:
                break

    def get_allocated_quota(self, agent_id: str) -> int:
        """获取指定代理的分配配额"""
        return self.allocated_quotas.get(agent_id, 0)

    def get_all_allocated_quotas(self) -> dict[str, int]:
        """获取所有代理的分配配额"""
        return self.allocated_quotas

# 示例用法
if __name__ == "__main__":
    total_monthly_tokens = 1_000_000 # 总计100万个token
    agent_weights_config = {
        "Agent_CoreBusiness": 5,  # 核心业务代理,权重最高
        "Agent_Research": 3,      # 研究性代理
        "Agent_Marketing": 2,     # 市场营销代理
        "Agent_InternalTool": 1   # 内部工具代理,权重最低
    }

    allocator = ProportionalQuotaAllocator(total_monthly_tokens, agent_weights_config)

    print(f"总配额: {total_monthly_tokens} tokens")
    print("-----------------------------------")
    for agent_id, quota in allocator.get_all_allocated_quotas().items():
        print(f"代理 '{agent_id}': 分配配额 {quota} tokens")
    print(f"总分配配额校验: {sum(allocator.get_all_allocated_quotas().values())} tokens")

    # 另一个例子:权重变化
    print("n--- 权重变化示例 ---")
    total_daily_requests = 1000
    agent_weights_daily = {
        "Agent_A": 10,
        "Agent_B": 5,
        "Agent_C": 5
    }
    daily_allocator = ProportionalQuotaAllocator(total_daily_requests, agent_weights_daily)
    for agent_id, quota in daily_allocator.get_all_allocated_quotas().items():
        print(f"代理 '{agent_id}': 分配配额 {quota} 请求")
    print(f"总分配配额校验: {sum(daily_allocator.get_all_allocated_quotas().values())} 请求")

这个基础的比例分配器为我们设定了每个代理的理论上限。但在实际运行中,我们还需要一个系统来实时跟踪使用量,并强制执行这些限制,同时还要处理动态需求和借贷机制。这正是我们第三讲的重点。


第三讲:构建实用的LLM API配额管理系统

现在,我们将把上述理念和算法整合起来,设计并实现一个实用的LLM API配额管理系统。

3.1 系统架构概览

为了实现高效、可靠、可扩展的配额管理,我们通常会采用以下架构:

+-------------------+       +--------------------+       +-------------------+
|                   |       |                    |       |                   |
|   Agent N         |<----->|   API Gateway      |<----->|   LLM API         |
|   (LLM Consumer)  |       |   (Nginx/Envoy/Kong)|       |   (OpenAI/Anthropic)|
|                   |       |                    |       |                   |
+-------------------+       +---------^----------+       +-------------------+
                                      |
                                      | (Query Quota)
                                      | (Consume Quota)
                                      v
+----------------------------------------------------------------+
|                                                                |
|                 Fair-Share Quota Manager Service               |
|                                                                |
|  +---------------------+   +---------------------+   +---------------------+
|  | Request Validator   |   | Quota Tracker       |   | Quota Policy Engine |
|  | (Pre-check)         |   | (Usage/Balance)     |   | (Weights/Rules)     |
|  +---------------------+   +---------------------+   +---------------------+
|                                      ^                                       |
|                                      | (Read/Write Quota Data)               |
|                                      v                                       |
|  +-------------------------------------------------------------------------+
|  |                 Persistent Storage (Redis/PostgreSQL)                   |
|  |                 (Agent Quota, Usage, Weights, Configuration)            |
|  +-------------------------------------------------------------------------+
|                                                                |
+----------------------------------------------------------------+

核心组件说明

  1. Agent (LLM Consumer):发起LLM API请求的智能代理或服务。它们需要通过API Gateway访问LLM API。
  2. API Gateway (API 网关):所有LLM API请求的入口点。它负责身份验证、授权、路由,最重要的是,与配额管理服务集成,在请求转发前进行配额检查和扣减。 常见的如Nginx、Envoy、Kong、Apigee等。
  3. Fair-Share Quota Manager Service (公平共享配额管理服务)
    • Request Validator (请求校验器):接收来自API Gateway的请求,根据代理ID和请求类型,向Quota Tracker查询配额。
    • Quota Tracker (配额追踪器):核心组件,负责维护每个代理的当前使用量、剩余配额、令牌桶状态等。
    • Quota Policy Engine (配额策略引擎):负责根据预设的权重、总配额、配额周期等,计算和调整代理的理论配额。
  4. Persistent Storage (持久化存储):存储所有配额相关数据,包括代理配置、权重、当前使用量、配额周期等。
    • Redis:适合存储瞬时令牌桶状态、实时使用量,以及需要快速读写的配额数据。
    • PostgreSQL/MongoDB:适合存储代理的长期配置、历史使用记录、审计日志等。
  5. LLM API:实际的LLM服务提供商(如OpenAI, Anthropic, Google Gemini等)。

3.2 核心组件设计与实现

我们将聚焦于 Fair-Share Quota Manager Service 的设计与实现。

3.2.1 Agent与Group定义

为了进行公平分配,我们首先需要识别请求的来源。

  • Agent ID:每个独立的智能代理都有一个唯一的标识符。
  • Group ID:代理可能属于某个团队、项目或部门。我们可以为这些组分配配额,然后组内的代理再根据其权重共享组的配额,形成分层公平共享 (Hierarchical Fair-share Scheduling)
    • 例如,一个项目组获得了10万tokens的月配额,该组下的Agent A、B、C再按2:2:1的权重分配这10万tokens。

配置示例(YAML格式)

# agents.yaml
agents:
  - id: agent_core_business_1
    group_id: project_alpha
    weight: 5
    initial_quota: 0 # 可以为0,由组配额计算
    rate_limit_rps: 5 # 每秒请求数限制(令牌桶参数)
    burst_capacity: 10 # 突发容量

  - id: agent_core_business_2
    group_id: project_alpha
    weight: 5
    initial_quota: 0
    rate_limit_rps: 5
    burst_capacity: 10

  - id: agent_research_1
    group_id: project_beta
    weight: 3
    initial_quota: 0
    rate_limit_rps: 3
    burst_capacity: 6

  - id: agent_marketing_tool
    group_id: project_beta
    weight: 2
    initial_quota: 0
    rate_limit_rps: 2
    burst_capacity: 4

# groups.yaml
groups:
  - id: project_alpha
    total_quota_monthly_tokens: 500000 # 月度总配额
    reset_interval: monthly # 配额重置周期
  - id: project_beta
    total_quota_monthly_tokens: 300000
    reset_interval: monthly

3.2.2 权重与份额的确定

这通常是一个业务决策过程:

  • 业务重要性:核心业务代理应有更高权重。
  • 预算分配:与项目的预算挂钩。
  • 历史使用模式:分析过往使用数据,预估未来需求。
  • SLA (服务等级协议):对关键代理的性能承诺可能要求更高的配额。

3.2.3 使用量追踪机制

这是系统的核心。我们需要精确追踪每个代理的:

  • 已消耗配额 (Consumed Quota):在当前配额周期内已使用的tokens/requests。
  • 剩余配额 (Remaining Quota)分配配额 - 已消耗配额
  • 令牌桶状态 (Token Bucket State):用于实时速率限制。

计量单位:对于LLM API,通常按token数计费,所以我们的系统应该以token为主要计量单位。

持久化存储选择

  • Redis Hash:非常适合存储代理的实时配额数据,例如 agent:{id}:quota 存储 {total, used, last_reset_time}
  • Redis Sorted Set/ZSET:可以用来实现借贷机制,按代理的“虚拟使用量”排序。

3.2.4 配额核查与强制 (Quota Enforcement)

当一个代理尝试调用LLM API时,请求会首先到达API Gateway。Gateway会调用 Fair-Share Quota Manager Servicecheck_and_consume_quota 方法。

强制逻辑

  1. 速率限制 (Rate Limiting):使用代理独立的令牌桶检查瞬时速率。
  2. 硬配额限制 (Hard Quota Limit):检查代理在当前周期内的总消耗是否超过其分配的硬配额。
  3. 借贷机制 (Borrowing/Lending):如果代理已用完其硬配额,但整个组或系统还有剩余配额,可以允许它临时“借用”。这需要一个策略来决定哪些代理可以借,借多少,以及如何回收。

3.3 代码示例:一个更完整的 FairShareQuotaManager

我们将结合前面提到的概念,构建一个 FairShareQuotaManager 类。它将:

  • 管理代理的权重和分配的配额。
  • 追踪每个代理的实时使用量。
  • 实现配额检查和扣减。
  • 支持配额重置。
  • 集成令牌桶进行实时速率限制。
  • (简化版)实现借贷机制。

为了简化代码,我们将使用一个内存中的字典来模拟Redis存储。在生产环境中,这部分需要替换为真正的Redis客户端。

import time
import threading
from collections import defaultdict
import math

# --------------------------------------------------------------------------------
# 辅助类:TokenBucket
# --------------------------------------------------------------------------------
class TokenBucket:
    def __init__(self, capacity: int, fill_rate: float):
        self.capacity = capacity
        self.fill_rate = fill_rate
        self.current_tokens = capacity
        self.last_fill_time = time.monotonic()
        self.lock = threading.Lock()

    def _refill_tokens(self):
        now = time.monotonic()
        time_passed = now - self.last_fill_time
        tokens_to_add = time_passed * self.fill_rate
        self.current_tokens = min(self.capacity, self.current_tokens + tokens_to_add)
        self.last_fill_time = now

    def try_consume(self, tokens_needed: int = 1) -> bool:
        if tokens_needed <= 0: return True
        with self.lock:
            self._refill_tokens()
            if self.current_tokens >= tokens_needed:
                self.current_tokens -= tokens_needed
                return True
            return False

    def get_remaining_tokens(self) -> float:
        with self.lock:
            self._refill_tokens()
            return self.current_tokens

# --------------------------------------------------------------------------------
# 主类:FairShareQuotaManager
# --------------------------------------------------------------------------------
class FairShareQuotaManager:
    def __init__(self, global_total_quota: int, reset_interval_seconds: int):
        """
        初始化公平共享配额管理器。
        :param global_total_quota: 全局总配额(例如,每月总tokens数)。
        :param reset_interval_seconds: 配额重置周期,单位秒(例如,一个月30天*24小时*3600秒)。
        """
        if global_total_quota <= 0:
            raise ValueError("Global total quota must be positive.")
        if reset_interval_seconds <= 0:
            raise ValueError("Reset interval must be positive.")

        self.global_total_quota = global_total_quota
        self.reset_interval_seconds = reset_interval_seconds

        # 存储代理配置:{agent_id: {'weight': int, 'group_id': str, 'rate_limit_rps': float, 'burst_capacity': int}}
        self.agent_configs = {}
        # 存储组配置:{group_id: {'total_quota': int, 'reset_time': float}}
        self.group_configs = {}
        # 存储每个代理的实际使用量:{agent_id: {'used': int, 'last_reset_time': float, 'allocated_quota': int}}
        self.agent_usage_data = defaultdict(lambda: {'used': 0, 'last_reset_time': time.time(), 'allocated_quota': 0})
        # 存储每个代理的令牌桶实例:{agent_id: TokenBucket}
        self.agent_token_buckets = {}

        self.lock = threading.Lock() # 用于保护共享数据的锁

        # 初始化配额重置时间
        self.last_global_reset_time = time.time()

    def register_group(self, group_id: str, group_total_quota: int):
        """注册一个组,并分配其总配额。"""
        with self.lock:
            if group_id in self.group_configs:
                print(f"警告: 组 '{group_id}' 已存在,将更新其配额。")
            self.group_configs[group_id] = {
                'total_quota': group_total_quota,
                'reset_time': time.time() # 组的配额重置时间
            }
            # 当组配额更新时,需要重新计算组内代理的分配配额
            self._recalculate_group_agent_quotas(group_id)

    def register_agent(self, agent_id: str, group_id: str, weight: int, rate_limit_rps: float, burst_capacity: int):
        """
        注册一个代理。
        :param agent_id: 代理的唯一ID。
        :param group_id: 代理所属的组ID。
        :param weight: 代理在组内的权重。
        :param rate_limit_rps: 代理的每秒请求数限制 (RPS)。
        :param burst_capacity: 代理的令牌桶突发容量。
        """
        with self.lock:
            if group_id not in self.group_configs:
                raise ValueError(f"组 '{group_id}' 未注册。请先注册组。")

            self.agent_configs[agent_id] = {
                'group_id': group_id,
                'weight': weight,
                'rate_limit_rps': rate_limit_rps,
                'burst_capacity': burst_capacity
            }
            self.agent_token_buckets[agent_id] = TokenBucket(burst_capacity, rate_limit_rps)
            # 重新计算组内代理的分配配额
            self._recalculate_group_agent_quotas(group_id)

    def _recalculate_group_agent_quotas(self, group_id: str):
        """
        根据组的总配额和组内代理的权重,重新计算每个代理的分配配额。
        当组或代理注册/更新时调用。
        """
        group_total_quota = self.group_configs.get(group_id, {}).get('total_quota', 0)
        if group_total_quota == 0:
            return

        group_agents = {aid: conf for aid, conf in self.agent_configs.items() if conf['group_id'] == group_id}
        total_group_weight = sum(agent_conf['weight'] for agent_conf in group_agents.values())

        if total_group_weight == 0:
            for agent_id in group_agents:
                self.agent_usage_data[agent_id]['allocated_quota'] = 0
            return

        remaining_quota = group_total_quota
        allocated_sum = 0

        # 优先处理整数部分
        for agent_id, agent_conf in group_agents.items():
            share = agent_conf['weight'] / total_group_weight
            quota = int(group_total_quota * share)
            self.agent_usage_data[agent_id]['allocated_quota'] = quota
            allocated_sum += quota

        # 将剩余配额(取整导致)按权重从高到低分配
        remainder = group_total_quota - allocated_sum
        sorted_agents = sorted(group_agents.items(), key=lambda item: item[1]['weight'], reverse=True)

        for agent_id, _ in sorted_agents:
            if remainder > 0:
                self.agent_usage_data[agent_id]['allocated_quota'] += 1
                remainder -= 1
            else:
                break

        # print(f"DEBUG: 组 '{group_id}' 配额重新计算完成。")
        # for agent_id in group_agents:
        #     print(f"  代理 '{agent_id}': 分配 {self.agent_usage_data[agent_id]['allocated_quota']} tokens")

    def _reset_agent_quota(self, agent_id: str):
        """重置单个代理的已使用配额和令牌桶。"""
        self.agent_usage_data[agent_id]['used'] = 0
        self.agent_usage_data[agent_id]['last_reset_time'] = time.time()
        # 重置令牌桶,使其回到满的状态
        if agent_id in self.agent_token_buckets:
            self.agent_token_buckets[agent_id].current_tokens = self.agent_token_buckets[agent_id].capacity
            self.agent_token_buckets[agent_id].last_fill_time = time.monotonic()

    def _check_and_reset_group_quotas(self, group_id: str):
        """检查并重置组内所有代理的配额(如果重置周期到了)。"""
        group_data = self.group_configs.get(group_id)
        if not group_data:
            return

        if time.time() - group_data['reset_time'] >= self.reset_interval_seconds:
            print(f"--- 组 '{group_id}' 配额重置中 ---")
            group_data['reset_time'] = time.time() # 更新组的重置时间
            for agent_id, config in self.agent_configs.items():
                if config['group_id'] == group_id:
                    self._reset_agent_quota(agent_id)
            # 重新计算组内代理的分配配额 (如果组的总配额是动态的,这里会更复杂)
            self._recalculate_group_agent_quotas(group_id)

    def check_and_consume_quota(self, agent_id: str, tokens_needed: int = 1, allow_borrowing: bool = True) -> bool:
        """
        检查代理是否有足够的配额和速率限制,并在允许时消耗配额。
        :param agent_id: 发起请求的代理ID。
        :param tokens_needed: 本次请求需要消耗的tokens数量。
        :param allow_borrowing: 是否允许从组/全局池借用配额。
        :return: True如果成功消耗配额,False否则。
        """
        if agent_id not in self.agent_configs:
            print(f"错误: 代理 '{agent_id}' 未注册。")
            return False
        if tokens_needed <= 0:
            return True # 消耗0个令牌总是成功

        with self.lock:
            # 1. 检查并重置配额周期
            group_id = self.agent_configs[agent_id]['group_id']
            self._check_and_reset_group_quotas(group_id)

            agent_data = self.agent_usage_data[agent_id]
            allocated_quota = agent_data['allocated_quota']
            current_used = agent_data['used']

            # 2. 检查令牌桶(速率限制)
            if agent_id not in self.agent_token_buckets or not self.agent_token_buckets[agent_id].try_consume(tokens_needed):
                print(f"代理 '{agent_id}' (组: {group_id}): 速率限制,无法消费 {tokens_needed} tokens。")
                return False

            # 3. 检查硬配额
            if current_used + tokens_needed <= allocated_quota:
                agent_data['used'] += tokens_needed
                # print(f"代理 '{agent_id}' (组: {group_id}): 成功消费 {tokens_needed} tokens。剩余个人配额: {allocated_quota - agent_data['used']}")
                return True
            else:
                # 4. 硬配额不足,尝试借用 (简化实现:直接检查组剩余配额)
                if allow_borrowing:
                    group_total_quota = self.group_configs[group_id]['total_quota']
                    group_used = sum(self.agent_usage_data[aid]['used'] for aid, conf in self.agent_configs.items() if conf['group_id'] == group_id)

                    if group_used + tokens_needed <= group_total_quota:
                        # 允许借用
                        agent_data['used'] += tokens_needed
                        print(f"代理 '{agent_id}' (组: {group_id}): 个人配额不足,成功从组借用 {tokens_needed} tokens。")
                        return True
                    else:
                        print(f"代理 '{agent_id}' (组: {group_id}): 个人和组配额均不足,无法消费 {tokens_needed} tokens。")
                        return False
                else:
                    print(f"代理 '{agent_id}' (组: {group_id}): 个人配额不足,不允许借用,无法消费 {tokens_needed} tokens。")
                    return False

    def get_agent_quota_info(self, agent_id: str) -> dict:
        """获取指定代理的配额使用信息。"""
        with self.lock:
            self._check_and_reset_group_quotas(self.agent_configs.get(agent_id, {}).get('group_id'))
            agent_data = self.agent_usage_data.get(agent_id, {})
            config = self.agent_configs.get(agent_id, {})

            allocated_quota = agent_data.get('allocated_quota', 0)
            used_quota = agent_data.get('used', 0)
            remaining_quota = allocated_quota - used_quota

            group_id = config.get('group_id')
            group_info = {}
            if group_id:
                group_total = self.group_configs.get(group_id, {}).get('total_quota', 0)
                group_used = sum(self.agent_usage_data[aid]['used'] for aid, conf in self.agent_configs.items() if conf['group_id'] == group_id)
                group_remaining = group_total - group_used
                group_info = {'id': group_id, 'total': group_total, 'used': group_used, 'remaining': group_remaining}

            token_bucket_info = {}
            if agent_id in self.agent_token_buckets:
                tb = self.agent_token_buckets[agent_id]
                token_bucket_info = {
                    'capacity': tb.capacity,
                    'fill_rate': tb.fill_rate,
                    'current_tokens': tb.get_remaining_tokens()
                }

            return {
                'agent_id': agent_id,
                'weight': config.get('weight', 0),
                'allocated_quota': allocated_quota,
                'used_quota': used_quota,
                'remaining_quota': remaining_quota,
                'group_info': group_info,
                'token_bucket': token_bucket_info
            }

    def get_group_quota_info(self, group_id: str) -> dict:
        """获取指定组的配额使用信息。"""
        with self.lock:
            self._check_and_reset_group_quotas(group_id)
            group_data = self.group_configs.get(group_id, {})
            if not group_data:
                return {}

            group_total = group_data.get('total_quota', 0)
            group_used = sum(self.agent_usage_data[aid]['used'] for aid, conf in self.agent_configs.items() if conf['group_id'] == group_id)
            group_remaining = group_total - group_used

            return {
                'group_id': group_id,
                'total_quota': group_total,
                'used_quota': group_used,
                'remaining_quota': group_remaining,
                'reset_time': group_data.get('reset_time', 0)
            }

# --------------------------------------------------------------------------------
# 示例用法
# --------------------------------------------------------------------------------
if __name__ == "__main__":
    # 模拟一个月(为了测试方便,设为10秒)
    MONTHLY_RESET_SECONDS = 10 

    # 初始化配额管理器
    quota_manager = FairShareQuotaManager(global_total_quota=1_000_000, reset_interval_seconds=MONTHLY_RESET_SECONDS)

    # 注册组和它们的月度总配额
    quota_manager.register_group("project_alpha", 500_000) # Alpha组每月50万tokens
    quota_manager.register_group("project_beta", 300_000)  # Beta组每月30万tokens
    # 剩余20万 tokens 暂时未分配给任何组,可以在需要时动态分配或作为全局共享。这里我们只处理已注册组的配额。

    # 注册代理及其在组内的权重、速率限制
    quota_manager.register_agent("agent_core_1", "project_alpha", weight=5, rate_limit_rps=5, burst_capacity=10)
    quota_manager.register_agent("agent_core_2", "project_alpha", weight=3, rate_limit_rps=3, burst_capacity=6)
    quota_manager.register_agent("agent_research_1", "project_beta", weight=4, rate_limit_rps=4, burst_capacity=8)
    quota_manager.register_agent("agent_marketing_1", "project_beta", weight=1, rate_limit_rps=1, burst_capacity=2)
    quota_manager.register_agent("agent_independent", "project_alpha", weight=2, rate_limit_rps=2, burst_capacity=4) # 属于alpha组但权重较低

    print("--- 初始配额分配 ---")
    print(quota_manager.get_agent_quota_info("agent_core_1"))
    print(quota_manager.get_agent_quota_info("agent_core_2"))
    print(quota_manager.get_agent_quota_info("agent_research_1"))
    print(quota_manager.get_agent_quota_info("agent_marketing_1"))
    print(quota_manager.get_agent_quota_info("agent_independent"))
    print("n")

    # 模拟代理请求
    print("--- 模拟代理请求 ---")
    for i in range(1, 20):
        # agent_core_1 高频请求
        if quota_manager.check_and_consume_quota("agent_core_1", tokens_needed=100):
            pass # print(f"agent_core_1 成功消费 100 tokens.")
        else:
            print(f"agent_core_1 消费失败。")

        # agent_research_1 少量请求
        if i % 3 == 0:
            if quota_manager.check_and_consume_quota("agent_research_1", tokens_needed=50):
                pass # print(f"agent_research_1 成功消费 50 tokens.")
            else:
                print(f"agent_research_1 消费失败。")

        # agent_marketing_1 偶尔请求
        if i % 5 == 0:
            if quota_manager.check_and_consume_quota("agent_marketing_1", tokens_needed=200):
                pass # print(f"agent_marketing_1 成功消费 200 tokens.")
            else:
                print(f"agent_marketing_1 消费失败。")

        # agent_core_2 也请求
        if i % 2 == 0:
            if quota_manager.check_and_consume_quota("agent_core_2", tokens_needed=80):
                pass # print(f"agent_core_2 成功消费 80 tokens.")
            else:
                print(f"agent_core_2 消费失败。")

        time.sleep(0.1) # 模拟请求间隔

    print("n--- 请求后配额状态 ---")
    print(quota_manager.get_agent_quota_info("agent_core_1"))
    print(quota_manager.get_agent_quota_info("agent_core_2"))
    print(quota_manager.get_agent_quota_info("agent_research_1"))
    print(quota_manager.get_agent_quota_info("agent_marketing_1"))
    print(quota_manager.get_agent_quota_info("agent_independent"))

    print("n--- 尝试让 agent_core_1 超出个人配额,从组借用 ---")
    # 让 agent_core_1 耗尽个人配额
    while quota_manager.get_agent_quota_info("agent_core_1")['remaining_quota'] > 0:
        quota_manager.check_and_consume_quota("agent_core_1", tokens_needed=100)

    print(quota_manager.get_agent_quota_info("agent_core_1"))
    print(quota_manager.get_group_quota_info("project_alpha"))

    # 继续让 agent_core_1 消费,此时它会尝试从 project_alpha 组借用
    if quota_manager.check_and_consume_quota("agent_core_1", tokens_needed=5000, allow_borrowing=True):
        print(f"agent_core_1 成功借用 5000 tokens。")
    else:
        print(f"agent_core_1 借用失败。")
    print(quota_manager.get_agent_quota_info("agent_core_1"))
    print(quota_manager.get_group_quota_info("project_alpha"))

    print("n--- 模拟配额重置周期到达 ---")
    print(f"等待 {MONTHLY_RESET_SECONDS + 1} 秒,模拟配额重置...")
    time.sleep(MONTHLY_RESET_SECONDS + 1) # 等待超过重置周期

    # 再次请求,触发重置
    if quota_manager.check_and_consume_quota("agent_core_1", tokens_needed=100):
        print(f"agent_core_1 在重置后成功消费 100 tokens。")
    else:
        print(f"agent_core_1 在重置后消费失败。")

    print("n--- 重置后配额状态 ---")
    print(quota_manager.get_agent_quota_info("agent_core_1"))
    print(quota_manager.get_group_quota_info("project_alpha"))
    print(quota_manager.get_agent_quota_info("agent_research_1"))
    print(quota_manager.get_group_quota_info("project_beta"))

代码解释

  1. TokenBucket:独立的令牌桶实现,用于瞬时速率限制。
  2. FairShareQuotaManager
    • __init__:初始化全局配额和重置周期。
    • register_group / register_agent:用于配置组和代理的权重、配额和速率限制参数。这是系统的静态配置部分。
    • _recalculate_group_agent_quotas:这是实现比例份额调度的核心。当组或代理的配置发生变化时,它会重新计算组内每个代理的理论分配配额。它处理了浮点数取整问题,确保总配额的精确分配。
    • _reset_agent_quota / _check_and_reset_group_quotas:负责周期性地重置代理和组的已使用配额。
    • check_and_consume_quota:这是API Gateway调用的主要方法。它执行以下检查:
      • 配额周期重置:首先检查是否需要重置配额。
      • 令牌桶检查:检查是否违反了瞬时速率限制。
      • 硬配额检查:检查代理是否还有剩余的个人分配配额。
      • 借贷机制(简化版):如果个人配额不足,会检查所属组是否有剩余配额可供借用。
      • 如果所有检查通过,则扣减配额并返回 True;否则返回 False
    • get_agent_quota_info / get_group_quota_info:提供查询当前配额状态的接口。

借贷机制(Borrowing/Lending)的简化与扩展
当前代码中,借贷机制非常简化:如果代理用完了自己的配额,它会尝试从所属组的剩余总配额中借用。

更复杂的借贷策略可以包括

  • 优先级借贷:高优先级的代理可以优先借用。
  • 限制借用量:每个代理可以借用的配额有一个上限,例如不超过其自身配额的X%。
  • 惩罚机制:借用的配额可能在下个周期从该代理的分配中扣除,或者以更高“成本”计算。
  • 全局池:除了组内借贷,还可以有一个全局的、未分配的配额池,供所有代理在紧急情况下借用。
  • 动态调整权重:如果某个代理长期不使用其配额,其权重可以在短时间内降低,以释放资源给其他代理。

3.4 配额重置与周期管理

配额重置是公平共享调度的重要组成部分,它确保了“历史债务”不会无限期累积。常见的重置周期包括:

  • 每日 (Daily)
  • 每周 (Weekly)
  • 每月 (Monthly)
  • 每年 (Annually)

在我们的代码中,reset_interval_seconds 定义了重置周期。当 check_and_consume_quota 被调用时,它会自动检查距离上次重置的时间,如果超过了设定的间隔,就会触发重置操作。

3.5 监控与告警

一个健全的配额管理系统必须提供:

  • 实时监控仪表板:展示每个代理、每个组的当前配额使用率、剩余量、速率限制状态。
  • 告警机制:当某个代理或组的配额即将耗尽(例如达到80%或90%)时,及时通过邮件、短信或Slack通知相关负责人。
  • 历史数据分析:长期跟踪配额使用趋势,为未来的配额规划提供数据支持。

第四讲:进阶策略与系统优化

4.1 动态权重调整

静态的权重配置可能无法完全适应业务的快速变化。考虑以下场景:

  • 一个新项目启动,其核心代理需要临时提升优先级。
  • 某个代理在进行一次性的大规模数据处理,需要临时获得更多配额。
  • 某个代理长期处于低使用率状态,其部分权重可以被回收或临时分配给其他代理。

实现方式

  • 管理API:提供一个HTTP API接口,允许管理员或授权服务动态修改代理或组的权重。
  • 配置热加载:配额管理服务能够不重启地加载新的权重配置。
  • 基于规则的自动化调整:例如,如果一个代理连续三天使用率低于10%,其权重自动下调1个点;如果一个代理响应时间过长,可能需要增加其配额。

4.2 多维度配额管理

如前所述,LLM API的计量单位可以是请求数、token数,甚至成本。一个健壮的系统应能同时管理这些维度。

示例

  • Agent A:每月10万tokens,每秒5请求。
  • Agent B:每月50美元预算,每秒2请求。

这意味着 check_and_consume_quota 方法可能需要接收更复杂的 cost_units 参数,并在内部进行单位转换和多维度的配额检查。

实现挑战

  • 需要维护实时的汇率或不同LLM模型的token价格。
  • 所有计量单位都需要统一到内部的“消耗点数”或“成本单位”。

4.3 跨服务提供商配额集成

在实际场景中,一个多代理系统可能同时使用OpenAI、Anthropic、Google Gemini等多个LLM提供商的API。每个提供商都有自己的配额限制和计费方式。

策略

  • 统一抽象层:在配额管理服务之上,建立一个统一的LLM API调用抽象层。代理通过这个抽象层请求LLM,而不是直接调用特定提供商的API。
  • 多维度配额池:配额管理服务为每个提供商维护独立的配额池,并根据请求的目标API路由到相应的配额检查逻辑。
  • 智能路由:根据代理的配额使用情况、API提供商的当前价格、性能(延迟、成功率)等因素,动态选择最佳的LLM API提供商。

4.4 容错与高可用

配额管理服务是系统的关键组件。如果它发生故障,整个多代理系统可能会停摆。

高可用性策略

  • 冗余部署:部署多个配额管理服务实例,通过负载均衡器分发请求。
  • 持久化存储高可用:Redis集群、PostgreSQL复制等。
  • 故障转移:当主实例失效时,自动切换到备用实例。
  • 缓存机制:在API Gateway侧对配额进行短期缓存,减少对配额管理服务的实时依赖,提高响应速度,并提供一定的容错能力(即使配额服务短暂不可用,系统也能继续运行一段短时间)。

4.5 与成本管理结合

最终,配额管理是为了更好地控制成本。

集成策略

  • 预算分配:将每个组或代理的配额直接与财务预算挂钩。
  • 成本预测:根据当前使用率和配额,预测本周期结束时的总成本。
  • 超预算告警:当预计成本即将超出预算时,发出告警。
  • 账单核对:将配额管理系统记录的使用量与LLM API提供商的实际账单进行核对,发现潜在的差异或异常。

总结与展望

公平共享调度是多代理系统有效管理和利用LLM API资源的关键策略。它不仅仅关乎技术实现,更是一种平衡业务需求、成本控制和系统稳定性的管理哲学。通过精细化的配额分配、实时的使用量追踪、灵活的借贷机制以及多维度的考量,我们可以构建一个既能防止资源滥用,又能最大化资源利用率的智能系统。

未来,随着LLM技术的不断演进和多代理系统复杂性的增加,配额管理将变得更加智能和自适应。例如,基于机器学习的配额预测模型可以根据历史数据和实时负载,动态调整代理的权重和配额;利用区块链技术实现去中心化的配额交易,允许代理之间更灵活地买卖或交换配额。这些前沿探索将进一步提升系统的韧性和效率,为人工智能的广泛应用奠定坚实基础。

感谢大家的聆听!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注