解析 ‘The Token Budgeting Circuit’:如何在图中为每个子任务分配硬性的‘价值配额’以防止失控消耗?

大家好,

今天,我们将深入探讨一个在构建复杂AI系统,特别是基于大型语言模型(LLM)的多智能体或多步骤工作流中日益重要的话题:资源消耗管理。随着AI能力的飞速发展,我们能够构建出越来越智能、越来越自主的系统。然而,这种能力也伴随着潜在的挑战:失控的资源消耗,尤其是计算资源和API调用成本。

我们的主题是解析“The Token Budgeting Circuit”,核心问题是如何在这样的“电路”中,为每个子任务分配硬性的“价值配额”,从而有效防止资源失控消耗。作为一名编程专家,我将带大家从理论到实践,通过严谨的逻辑和丰富的代码示例,构建一个健壮的资源管理框架。

1. 资源管理:复杂AI系统的生命线

在传统的软件开发中,我们对内存、CPU周期、网络带宽等资源有着成熟的管理机制。但在现代AI系统中,特别是涉及到与外部API(如LLM API)交互时,资源的概念变得更加抽象,也更加直接地与成本挂钩。这里的“资源”不仅仅是传统的计算资源,更常常指的是LLM的token消耗、外部API的调用次数、甚至是等待时间或人工审核成本。

一个典型的AI工作流可能由多个相互协作的子任务组成:一个代理负责理解用户意图,另一个代理负责信息检索,接着是数据分析,最后是报告生成。每个子任务都可能涉及多次LLM调用、工具使用或其他计算密集型操作。如果没有有效的管理,这些子任务可能会:

  • 陷入循环: 代理之间无休止地相互提问或澄清。
  • 过度生成: LLM生成过长的文本,包含大量冗余信息。
  • 滥用工具: 工具被调用过于频繁或不必要地调用昂贵操作。
  • 路径探索失控: 尝试过多不必要的路径来解决问题。

这些情况会导致成本飙升性能下降用户体验受损,甚至可能导致系统因超出API限额或预算而崩溃

“The Token Budgeting Circuit” 概念的提出,正是为了应对这一挑战。它将整个AI工作流视为一个电路,其中每个子任务都是电路中的一个组件,而“Token”或其他“价值单位”则是流经电路的能量或货币。我们的目标是为每个组件设置一个“熔断器”——一个硬性的价值配额,确保任何一个组件的故障或过度消耗都不会拖垮整个系统。

2. 理解“价值配额”与“预算电路”

在深入代码之前,我们先明确几个核心概念。

2.1. 什么是“价值配额”(Hard Quota)?

“价值配额”是指为特定任务或操作设定的、不可逾越的资源上限。一旦达到这个上限,该任务必须立即停止其资源消耗行为,或者触发一个预定义的处理机制(例如:寻求额外预算批准、切换到更经济的策略、直接报错)。它与“软限制”(Soft Limit)不同,软限制可能只发出警告,但允许继续操作。硬配额是强制性的,旨在从根本上避免失控。

2.2. “Token Budgeting Circuit”的构成

想象一个复杂的系统,它由以下部分组成:

  • 电路(Circuit): 整个AI工作流或多代理系统,例如一个自动化的研究助手。
  • 节点/子任务(Nodes/Subtasks): 电路中的独立功能单元,可以是:
    • 一个LLM调用(生成文本、总结、翻译)。
    • 一个工具调用(Web搜索、数据库查询、代码执行)。
    • 一个数据处理步骤(解析、过滤)。
    • 一个代理的决策过程。
  • 价值单位(Value Units): 我们要度量的资源。最常见的是LLM的token,但也包括:
    • API调用次数。
    • 外部服务(如图像生成、语音识别)的计费单位。
    • 计算时间(CPU/GPU周期)。
    • 内存使用量。
  • 预算(Budget): 为每个节点或整个电路分配的价值单位上限。

我们的目标是建立一个机制,能够精确地追踪每个节点消耗的价值单位,并在达到配额时强制停止。

3. 构建硬性配额机制的基石:抽象与设计原则

要实现硬性配额,我们需要一套清晰的抽象和严格的设计原则。

3.1. 核心设计原则

  1. 统一的价值单位抽象: 无论是token、API调用还是其他,都应能够被统一地度量和扣除。
  2. 分层预算管理: 存在一个总预算(Circuit Budget),它向下分配给各个子任务(Subtask Budget)。子任务只知道自己的预算,无需感知总预算。
  3. 不可变性与原子性: 一旦预算分配给子任务,其总额在子任务层面是相对固定的。资源消耗扣除应该是原子的。
  4. 前置检查与事后扣除: 在执行任何资源消耗操作之前,必须检查预算是否充足;操作完成后,立即扣除相应资源。
  5. 强制性: 预算耗尽时,必须立即停止操作,并抛出明确的异常或触发回退机制。
  6. 可见性: 预算使用情况应可追踪和报告。

3.2. 核心抽象类

我们将从最基本的单元开始构建:

3.2.1. 价值单位枚举 ValueUnit

定义我们系统中将跟踪的所有资源类型。

from enum import Enum

class ValueUnit(Enum):
    """
    定义系统中可被预算和计量的价值单位。
    """
    LLM_TOKENS = "LLM_Tokens"
    API_CALLS = "API_Calls"
    COMPUTE_CYCLES = "Compute_Cycles"
    DATA_VOLUME_MB = "Data_Volume_MB"
    # 可以根据需要添加更多单位
    # 例如:EXTERNAL_TOOL_CREDITS, DATABASE_QUERIES等

    def __str__(self):
        return self.value

3.2.2. 预算异常 BudgetExceededError

当预算耗尽时,系统应抛出此异常。

class BudgetExceededError(Exception):
    """
    当任务的预算耗尽时抛出的异常。
    """
    def __init__(self, subtask_id: str, unit: ValueUnit, limit: int, current_spent: int, message: str = None):
        self.subtask_id = subtask_id
        self.unit = unit
        self.limit = limit
        self.current_spent = current_spent
        if message is None:
            message = (f"Budget for subtask '{subtask_id}' exceeded for {unit.value}. "
                       f"Limit: {limit}, Current Spent: {current_spent}. "
                       f"Remaining: {limit - current_spent}.")
        super().__init__(message)

class GlobalBudgetExceededError(BudgetExceededError):
    """
    当整个电路的总预算耗尽时抛出的异常。
    """
    def __init__(self, unit: ValueUnit, limit: int, current_spent: int, message: str = None):
        super().__init__("Circuit", unit, limit, current_spent, message)

3.2.3. 预算对象 Budget

这是每个子任务或整个电路持有的实际预算容器。

import threading

class Budget:
    """
    表示一个特定价值单位的预算。
    管理总额、已花费和剩余金额。
    """
    def __init__(self, unit: ValueUnit, total_limit: int):
        if total_limit < 0:
            raise ValueError("Budget total limit cannot be negative.")
        self.unit = unit
        self._total_limit = total_limit
        self._spent = 0
        self._lock = threading.Lock() # 用于线程安全

    @property
    def total_limit(self) -> int:
        return self._total_limit

    @property
    def spent(self) -> int:
        return self._spent

    @property
    def remaining(self) -> int:
        return self._total_limit - self._spent

    def check_and_deduct(self, amount: int, subtask_id: str = "unknown") -> None:
        """
        检查预算是否充足,如果充足则扣除指定金额。
        如果不足,则抛出 BudgetExceededError。
        """
        if amount < 0:
            raise ValueError("Amount to deduct cannot be negative.")

        with self._lock:
            if self.spent + amount > self.total_limit:
                raise BudgetExceededError(
                    subtask_id=subtask_id,
                    unit=self.unit,
                    limit=self.total_limit,
                    current_spent=self.spent
                )
            self._spent += amount
            # print(f"[{subtask_id}] Deducted {amount} {self.unit}. Spent: {self.spent}/{self.total_limit}. Remaining: {self.remaining}")

    def add_to_budget(self, amount: int) -> None:
        """
        增加预算的总额(例如,当回收未使用的子任务预算时)。
        """
        if amount < 0:
            raise ValueError("Amount to add cannot be negative.")
        with self._lock:
            self._total_limit += amount

    def return_unused(self, amount: int) -> int:
        """
        返还一部分已花费的预算(例如,如果操作被提前终止)。
        注意:这通常用于更复杂的场景,或者在分配时犯了错误。
        更常见的是返还未使用的`remaining`部分到父预算。
        这里实现为减少`_spent`,但要注意逻辑上的合理性。
        """
        if amount < 0:
            raise ValueError("Amount to return cannot be negative.")
        with self._lock:
            returned_amount = min(amount, self._spent)
            self._spent -= returned_amount
            # print(f"Returned {returned_amount} {self.unit}. Spent: {self.spent}/{self.total_limit}. Remaining: {self.remaining}")
            return returned_amount

    def is_depleted(self) -> bool:
        """
        检查预算是否已耗尽。
        """
        return self.remaining <= 0

    def __repr__(self):
        return (f"Budget(unit={self.unit.name}, total={self.total_limit}, "
                f"spent={self.spent}, remaining={self.remaining})")

3.2.4. 中央预算管理器 CircuitBudgetManager

负责管理整个电路的总预算,并向下分配给各个子任务。

from collections import defaultdict
from typing import Dict, Optional, List

class CircuitBudgetManager:
    """
    整个AI电路的中央预算管理器。
    管理总预算,并协调子任务预算的分配和回收。
    """
    def __init__(self, global_budgets: Dict[ValueUnit, int]):
        self._global_budgets: Dict[ValueUnit, Budget] = {
            unit: Budget(unit, limit) for unit, limit in global_budgets.items()
        }
        # 存储每个子任务分配到的预算引用
        self._subtask_budgets: Dict[str, Dict[ValueUnit, Budget]] = defaultdict(dict)
        self._lock = threading.Lock() # 用于线程安全

        print("CircuitBudgetManager initialized with global budgets:")
        for unit, budget in self._global_budgets.items():
            print(f"  - {unit}: {budget.total_limit}")

    def allocate_subtask_budget(self, subtask_id: str, unit: ValueUnit, amount: int) -> Budget:
        """
        从全局预算中为指定子任务分配一个特定价值单位的预算。
        返回一个表示该子任务预算的Budget对象。
        """
        if amount < 0:
            raise ValueError("Amount to allocate cannot be negative.")
        if unit not in self._global_budgets:
            raise ValueError(f"ValueUnit '{unit}' not configured in global budgets.")

        with self._lock:
            global_budget = self._global_budgets[unit]
            try:
                # 尝试从全局预算中预扣除这部分金额
                # 注意:这里我们不是真的“扣除”已花费,而是减少`total_limit`,
                # 表示这部分钱已经“分配出去”了。
                # 我们可以修改Budget类,增加一个`reserve`方法,或者在这里直接操作。
                # 为了简化,我们假设`total_limit`代表的是可分配的上限,
                # 这里通过内部逻辑减少`total_limit`并创建新的Budget对象。

                # 更好的方式是,全局预算只跟踪已分配和未分配,而不是直接修改total_limit。
                # 让我们调整一下`Budget`的语义:`total_limit`是其“最大可用量”,
                # `_spent`是其“已使用量”。
                # 对于全局预算,`_spent`可以表示“已分配给子任务的总量”。

                # 确保全局预算有足够的“剩余空间”来分配
                if global_budget.remaining < amount:
                    raise GlobalBudgetExceededError(
                        unit=unit,
                        limit=global_budget.total_limit,
                        current_spent=global_budget.spent + (amount - global_budget.remaining), # 模拟如果分配会超出的情况
                        message=f"Global budget for {unit} insufficient to allocate {amount} for subtask {subtask_id}. "
                                f"Remaining: {global_budget.remaining}."
                    )

                # 从全局预算的“剩余”中扣除,表示这部分已经分配出去了
                global_budget.check_and_deduct(amount, subtask_id="Circuit_Allocation") 

                # 为子任务创建一个新的Budget实例,它的total_limit就是分配的amount
                subtask_specific_budget = Budget(unit, amount)
                self._subtask_budgets[subtask_id][unit] = subtask_specific_budget

                print(f"Allocated {amount} {unit} to subtask '{subtask_id}'. "
                      f"Global {unit} remaining: {global_budget.remaining}")
                return subtask_specific_budget

            except BudgetExceededError as e:
                # 重新抛出,包装为全局预算超出异常
                raise GlobalBudgetExceededError(
                    unit=unit,
                    limit=global_budget.total_limit,
                    current_spent=global_budget.spent,
                    message=f"Failed to allocate {amount} {unit} to subtask '{subtask_id}'. {e.message}"
                ) from e

    def reclaim_unused_subtask_budget(self, subtask_id: str, unit: ValueUnit) -> int:
        """
        回收子任务未使用的特定价值单位预算到全局预算。
        """
        with self._lock:
            if subtask_id not in self._subtask_budgets or unit not in self._subtask_budgets[subtask_id]:
                print(f"Warning: No budget found for subtask '{subtask_id}' and unit '{unit}' to reclaim.")
                return 0

            subtask_budget = self._subtask_budgets[subtask_id][unit]
            unused_amount = subtask_budget.remaining

            if unused_amount > 0:
                global_budget = self._global_budgets[unit]
                # 全局预算的“已分配量”减少 unused_amount
                # 注意:这里调用return_unused是为了减少全局预算的`_spent`,
                # 因为`_spent`代表了已分配出去的量。
                # 这是一个需要仔细考虑的细节,即`Budget`的`_spent`对于全局和子任务有不同的语义。
                # 让我们修改`Budget`,增加一个`release_allocated`方法来清晰地处理。

                # 重构:全局Budget的`_spent`表示已分配出去的量。
                # 那么回收就是减少这个`_spent`。
                global_budget._spent -= unused_amount # 直接操作,因为我们知道语义
                # 也可以实现一个 `release_allocated` 方法在 `Budget` 中。

                print(f"Reclaimed {unused_amount} {unit} from subtask '{subtask_id}'. "
                      f"Global {unit} remaining: {global_budget.remaining}")

            # 从管理器中移除该子任务的此单位预算,表示它不再有此预算所有权
            del self._subtask_budgets[subtask_id][unit]
            if not self._subtask_budgets[subtask_id]: # 如果子任务所有单位预算都回收了
                del self._subtask_budgets[subtask_id]

            return unused_amount

    def get_subtask_budget(self, subtask_id: str, unit: ValueUnit) -> Optional[Budget]:
        """
        获取指定子任务的特定价值单位预算对象。
        """
        return self._subtask_budgets.get(subtask_id, {}).get(unit)

    def get_global_budget_status(self) -> Dict[ValueUnit, dict]:
        """
        获取全局预算的当前状态。
        """
        status = {}
        with self._lock:
            for unit, budget in self._global_budgets.items():
                status[unit] = {
                    "total_limit": budget.total_limit,
                    "spent_allocated": budget.spent, # spent在这里代表已分配出去的量
                    "remaining_allocatable": budget.remaining
                }
        return status

    def get_all_subtask_budgets_status(self) -> Dict[str, Dict[ValueUnit, dict]]:
        """
        获取所有子任务预算的当前状态。
        """
        status = {}
        with self._lock:
            for subtask_id, budgets_by_unit in self._subtask_budgets.items():
                status[subtask_id] = {}
                for unit, budget in budgets_by_unit.items():
                    status[subtask_id][unit] = {
                        "total_limit": budget.total_limit,
                        "spent_consumed": budget.spent, # spent在这里代表子任务已消耗的量
                        "remaining_available": budget.remaining
                    }
        return status

CircuitBudgetManagerBudget 语义的调整说明:

为了更清晰地处理全局预算和子任务预算之间的关系,我调整了 Budget 类的语义和 CircuitBudgetManager 的实现逻辑:

  • Budget 类的 _spent 属性:
    • 当它作为子任务的预算时,_spent 代表该子任务实际消耗的资源量。
    • 当它作为全局预算时,_spent 代表已从全局预算中分配给所有子任务的总资源量。
  • allocate_subtask_budget 方法:
    • 它首先检查全局预算是否有足够的“可分配”量(即 global_budget.remaining)。
    • 然后,它通过调用 global_budget.check_and_deduct(amount, ...) 来增加全局预算的 _spent,表示这部分资源已被分配。
    • 接着,它为子任务创建一个 新的 Budget 实例,这个新实例的 total_limit 就是分配给它的 amount。这样,子任务的预算是独立的,并且由它自己管理 _spent
  • reclaim_unused_subtask_budget 方法:
    • 它获取子任务预算的 remaining 量(即子任务未使用的量)。
    • 然后,它直接从全局预算的 _spent 中减去这个 unused_amount,表示这部分资源被回收,全局预算的“已分配量”减少了。

这种设计使得全局预算和子任务预算的职责分离更加清晰,同时仍然能够实现精确的追踪和强制性的配额管理。

4. 将预算集成到子任务中:LLM代理和工具

现在我们有了预算管理的基础设施,下一步是将其集成到实际的资源消耗者——LLM代理和工具中。

4.1. LLM Token 计数器

LLM的token消耗是核心。我们需要一个可靠的方法来计算输入和输出的token数量。tiktoken 是OpenAI提供的官方工具,非常适合这个目的。

import tiktoken

class TokenCounter:
    """
    用于计算文本中token数量的实用工具类。
    """
    def __init__(self, model_name: str = "gpt-4"):
        try:
            self.encoding = tiktoken.encoding_for_model(model_name)
        except KeyError:
            print(f"Warning: Model '{model_name}' not found for tiktoken, falling back to 'cl100k_base'.")
            self.encoding = tiktoken.get_encoding("cl100k_base")

    def count_tokens(self, text: str) -> int:
        """
        计算给定文本的token数量。
        """
        return len(self.encoding.encode(text))

# 全局或单例TokenCounter实例
GLOBAL_TOKEN_COUNTER = TokenCounter()

4.2. 预算化LLM代理 BudgetedLLMAgent

一个LLM代理在每次调用LLM时都需要检查和扣除预算。

import time
from typing import Dict, Any, Optional

# 模拟LLM API调用
def mock_llm_call(prompt: str, max_tokens: int, model: str) -> str:
    """
    模拟LLM API调用,返回一个随机长度的响应。
    """
    time.sleep(0.1) # 模拟网络延迟
    response_length = min(len(prompt) // 2 + 10, max_tokens) # 简单模拟
    return "This is a simulated LLM response to: " + prompt[:50] + "..." + " " * (response_length - len("This is a simulated LLM response to: " + prompt[:50] + "..."))

class BudgetedLLMAgent:
    """
    一个集成了预算管理的LLM代理。
    每次LLM调用都会检查并扣除LLM_TOKENS预算。
    """
    def __init__(self, agent_id: str,
                 llm_model: str,
                 token_budget: Budget,
                 api_call_budget: Optional[Budget] = None):
        self.agent_id = agent_id
        self.llm_model = llm_model
        self.token_budget = token_budget
        self.api_call_budget = api_call_budget
        self._token_counter = GLOBAL_TOKEN_COUNTER
        print(f"BudgetedLLMAgent '{agent_id}' initialized with token budget: {token_budget.total_limit}")
        if api_call_budget:
            print(f"  and API call budget: {api_call_budget.total_limit}")

    def generate_response(self, prompt: str, max_output_tokens: int = 500) -> str:
        """
        使用LLM生成响应,并管理token和API调用预算。
        """
        prompt_tokens = self._token_counter.count_tokens(prompt)

        # 1. 预估总消耗:prompt tokens + max_output_tokens
        # 实际操作中,max_output_tokens会是LLM API的参数,我们按照这个上限预估。
        estimated_total_tokens = prompt_tokens + max_output_tokens

        # 2. 检查并扣除API调用预算 (如果存在)
        if self.api_call_budget:
            self.api_call_budget.check_and_deduct(1, subtask_id=self.agent_id)

        # 3. 检查并扣除Token预算(先扣除prompt tokens)
        self.token_budget.check_and_deduct(prompt_tokens, subtask_id=self.agent_id)

        # 动态调整max_output_tokens以适应剩余预算
        actual_max_output_tokens = min(max_output_tokens, self.token_budget.remaining)
        if actual_max_output_tokens <= 0:
            raise BudgetExceededError(
                self.agent_id, ValueUnit.LLM_TOKENS,
                self.token_budget.total_limit, self.token_budget.spent,
                f"No tokens left for output generation in agent '{self.agent_id}' after prompt consumption."
            )

        print(f"Agent '{self.agent_id}' calling LLM. Prompt tokens: {prompt_tokens}. "
              f"Max output tokens adjusted to: {actual_max_output_tokens}.")

        try:
            # 模拟LLM调用
            response = mock_llm_call(prompt, actual_max_output_tokens, self.llm_model)

            # 4. 扣除实际响应的token数量
            response_tokens = self._token_counter.count_tokens(response)

            # 再次检查,因为实际响应可能比预估的max_output_tokens少,
            # 但如果mock_llm_call内部逻辑有误,可能也会超出。
            # 这里是扣除实际消耗,而不是预扣。
            # 如果之前已经预扣了max_output_tokens,这里需要更精细的逻辑来返还多余的。
            # 最简单的处理是:在check_and_deduct中只预扣prompt,
            # 然后在收到response后,再扣除response tokens。
            # 这就要求LLM的API在返回时,不能超过我们给定的max_tokens参数。

            # 修正策略:只在调用前扣除prompt tokens,响应回来后再扣除响应tokens。
            # 这样更精确,但需要确保LLM API遵守max_output_tokens限制。
            # 如果LLM API不遵守,我们就需要在收到响应后进行截断或处理。

            # 在当前实现中,`check_and_deduct` 会在不足时抛出异常。
            # 所以,如果 `mock_llm_call` 实际生成的 `response_tokens` 导致超出剩余预算,
            # 这里的扣除就会失败。
            self.token_budget.check_and_deduct(response_tokens, subtask_id=self.agent_id)
            print(f"Agent '{self.agent_id}' received response. Response tokens: {response_tokens}. "
                  f"Total spent by agent: {self.token_budget.spent}/{self.token_budget.total_limit} {ValueUnit.LLM_TOKENS}.")

            return response

        except BudgetExceededError as e:
            print(f"Agent '{self.agent_id}' failed to deduct response tokens. "
                  f"This indicates the response itself was too long for the remaining budget. Error: {e}")
            # 可以选择截断响应,或者直接抛出异常
            raise BudgetExceededError(
                self.agent_id, ValueUnit.LLM_TOKENS,
                self.token_budget.total_limit, self.token_budget.spent,
                f"LLM response from agent '{self.agent_id}' exceeded budget. "
                f"Truncation or alternative strategy needed. Original error: {e.message}"
            ) from e
        except Exception as e:
            print(f"Agent '{self.agent_id}' LLM call failed: {e}")
            # 如果LLM调用失败,并且之前已经扣除了prompt和api_call预算,
            # 可以在这里选择是否返还这些预算。
            # 为了简单,这里不返还,因为失败本身也消耗了资源(尝试)。
            raise

4.3. 预算化工具执行器 BudgetedToolExecutor

工具调用也需要预算。例如,Web搜索可能以调用次数计费,或以数据量计费。

class Tool:
    """
    模拟一个外部工具的接口。
    """
    def __init__(self, name: str, cost_model: Dict[ValueUnit, int]):
        self.name = name
        self.cost_model = cost_model # 定义每次调用预估的成本

    def execute(self, query: str) -> str:
        """
        模拟工具执行,返回结果。
        """
        print(f"  Executing tool '{self.name}' with query: {query}")
        time.sleep(0.05) # 模拟工具执行时间
        return f"Result from {self.name} for '{query}': Data found."

class BudgetedToolExecutor:
    """
    一个集成了预算管理的工具执行器。
    每次工具调用都会检查并扣除相应的预算。
    """
    def __init__(self, executor_id: str,
                 tools: List[Tool],
                 budgets: Dict[ValueUnit, Budget]):
        self.executor_id = executor_id
        self.tools_map = {tool.name: tool for tool in tools}
        self.budgets = budgets
        print(f"BudgetedToolExecutor '{executor_id}' initialized with budgets:")
        for unit, budget in budgets.items():
            print(f"  - {unit}: {budget.total_limit}")

    def use_tool(self, tool_name: str, query: str) -> str:
        """
        使用指定的工具,并管理工具调用的预算。
        """
        tool = self.tools_map.get(tool_name)
        if not tool:
            raise ValueError(f"Tool '{tool_name}' not found.")

        # 检查并扣除工具的预估成本
        for unit, cost in tool.cost_model.items():
            if unit in self.budgets:
                self.budgets[unit].check_and_deduct(cost, subtask_id=self.executor_id)
            else:
                print(f"Warning: Tool '{tool_name}' has cost for {unit}, but no budget configured for it in executor '{self.executor_id}'.")

        print(f"Tool executor '{self.executor_id}' using tool '{tool_name}'.")
        result = tool.execute(query)
        return result

5. 组装电路:一个多代理研究助手示例

现在,我们将以上组件组合起来,构建一个简单的多代理研究助手,并观察预算管理如何工作。

场景: 一个研究助手,它需要执行以下子任务:

  1. 规划(Planner Agent): 理解研究请求,生成搜索查询。
  2. 搜索(Search Tool): 执行Web搜索。
  3. 分析(Analyzer Agent): 分析搜索结果,提取关键信息。
  4. 报告(Reporter Agent): 根据分析结果生成最终报告。

预算分配策略:

  • 整个电路有一个总的LLM token和API调用预算。
  • 每个代理(Planner, Analyzer, Reporter)分配一部分LLM token预算。
  • 搜索工具分配API调用预算。

5.1. 预算配置

我们将使用一个表格来清晰地展示预算分配:

预算类型 价值单位 总额 分配给 Planner 分配给 SearchTool 分配给 Analyzer 分配给 Reporter
LLM Token 总预算 LLM_TOKENS 10000 2000 4000 4000
API 调用总预算 API_CALLS 50 20
Web 搜索工具成本 API_CALLS 1 (每次调用)

5.2. 电路初始化与运行

def run_research_assistant(topic: str, circuit_manager: CircuitBudgetManager):
    print(f"n--- Starting Research Assistant for topic: '{topic}' ---")

    try:
        # 1. 分配子任务预算
        planner_token_budget = circuit_manager.allocate_subtask_budget("planner_agent", ValueUnit.LLM_TOKENS, 2000)
        analyzer_token_budget = circuit_manager.allocate_subtask_budget("analyzer_agent", ValueUnit.LLM_TOKENS, 4000)
        reporter_token_budget = circuit_manager.allocate_subtask_budget("reporter_agent", ValueUnit.LLM_TOKENS, 4000)

        search_api_budget = circuit_manager.allocate_subtask_budget("search_tool_executor", ValueUnit.API_CALLS, 20)

        # 2. 初始化子任务(代理和工具)
        planner_agent = BudgetedLLMAgent("planner_agent", "gpt-3.5-turbo", planner_token_budget)
        analyzer_agent = BudgetedLLMAgent("analyzer_agent", "gpt-4", analyzer_token_budget)
        reporter_agent = BudgetedLLMAgent("reporter_agent", "gpt-3.5-turbo", reporter_token_budget)

        web_search_tool = Tool("web_search", {ValueUnit.API_CALLS: 1})
        tool_executor = BudgetedToolExecutor("search_tool_executor", [web_search_tool], {ValueUnit.API_CALLS: search_api_budget})

        # --- 任务流执行 ---

        # Step 1: Planner Agent - 理解主题并生成搜索查询
        print("n[Planner Agent] Planning research...")
        planner_prompt = f"Understand the topic '{topic}' and generate 3 concise web search queries to gather information. List them line by line."
        search_queries_str = planner_agent.generate_response(planner_prompt, max_output_tokens=500)
        search_queries = [q.strip() for q in search_queries_str.split('n') if q.strip()]
        print(f"[Planner Agent] Generated search queries: {search_queries}")
        if not search_queries:
             raise ValueError("Planner failed to generate search queries.")

        # Step 2: Search Tool - 执行搜索
        print("n[Search Tool] Executing web searches...")
        all_search_results = []
        for i, query in enumerate(search_queries):
            print(f"  Searching for: '{query}' ({i+1}/{len(search_queries)})")
            result = tool_executor.use_tool("web_search", query)
            all_search_results.append(result)
            if i >= search_api_budget.remaining -1: # 防止在下一轮循环中才发现预算耗尽
                print(f"  Search API budget is nearly depleted, stopping further searches.")
                break

        combined_search_results = "n".join(all_search_results)
        print(f"[Search Tool] Combined {len(all_search_results)} search results.")

        # Step 3: Analyzer Agent - 分析结果
        print("n[Analyzer Agent] Analyzing search results...")
        analysis_prompt = (f"Analyze the following search results about '{topic}'. "
                           f"Extract key facts, summarize findings, and identify any contradictions or gaps. "
                           f"Results:n{combined_search_results}")
        analysis_report = analyzer_agent.generate_response(analysis_prompt, max_output_tokens=2000)
        print(f"[Analyzer Agent] Analysis complete. Report snippet: {analysis_report[:200]}...")

        # Step 4: Reporter Agent - 生成最终报告
        print("n[Reporter Agent] Generating final report...")
        report_prompt = (f"Based on the following analysis, write a comprehensive report on '{topic}'. "
                         f"Include an introduction, key findings, and a conclusion. "
                         f"Analysis:n{analysis_report}")
        final_report = reporter_agent.generate_response(report_prompt, max_output_tokens=1500)
        print(f"n--- Final Report for '{topic}' ---n{final_report[:500]}...")
        print("...")

    except BudgetExceededError as e:
        print(f"n!!! Circuit Execution Halted: {e.message}")
        print(f"Subtask '{e.subtask_id}' exceeded budget for {e.unit.value}. Spent: {e.current_spent}/{e.limit}.")
    except Exception as e:
        print(f"n!!! An unexpected error occurred: {e}")
    finally:
        print("n--- Circuit Execution Finished ---")
        # 回收所有未使用的子任务预算
        for subtask_id in list(circuit_manager._subtask_budgets.keys()): # list copy to avoid mutation during iteration
            for unit in list(circuit_manager._subtask_budgets[subtask_id].keys()):
                circuit_manager.reclaim_unused_subtask_budget(subtask_id, unit)

        print("n--- Final Global Budget Status ---")
        for unit, status in circuit_manager.get_global_budget_status().items():
            print(f"  {unit}: Total {status['total_limit']}, Allocated {status['spent_allocated']}, Remaining {status['remaining_allocatable']}")

        print("n--- Final Subtask Budgets Status ---")
        if not circuit_manager.get_all_subtask_budgets_status():
            print("  (All subtask budgets reclaimed or none were left)")
        else:
            for subtask_id, budgets_by_unit in circuit_manager.get_all_subtask_budgets_status().items():
                print(f"  {subtask_id}:")
                for unit, status in budgets_by_unit.items():
                    print(f"    - {unit}: Total allocated {status['total_limit']}, Consumed {status['spent_consumed']}, Available {status['remaining_available']}")

# 初始化全局预算管理器
global_budgets_config = {
    ValueUnit.LLM_TOKENS: 10000,
    ValueUnit.API_CALLS: 50
}
circuit_manager = CircuitBudgetManager(global_budgets_config)

# 运行研究助手
run_research_assistant("The future of AI in personalized education", circuit_manager)

# 尝试一个可能超出预算的场景,例如,给Analyzer Agent一个非常小的预算
print("n" + "="*80)
print("Attempting another run with tighter Analyzer Agent budget...")
global_budgets_config_tight = {
    ValueUnit.LLM_TOKENS: 5000, # 减少总预算
    ValueUnit.API_CALLS: 10
}
circuit_manager_tight = CircuitBudgetManager(global_budgets_config_tight)

def run_research_assistant_tight(topic: str, circuit_manager: CircuitBudgetManager):
    print(f"n--- Starting Research Assistant (Tight Budget) for topic: '{topic}' ---")

    try:
        # 1. 分配子任务预算 (Analyzer预算很小)
        planner_token_budget = circuit_manager.allocate_subtask_budget("planner_agent_tight", ValueUnit.LLM_TOKENS, 1000)
        analyzer_token_budget = circuit_manager.allocate_subtask_budget("analyzer_agent_tight", ValueUnit.LLM_TOKENS, 500) # 故意给少
        reporter_token_budget = circuit_manager.allocate_subtask_budget("reporter_agent_tight", ValueUnit.LLM_TOKENS, 1500)

        search_api_budget = circuit_manager.allocate_subtask_budget("search_tool_executor_tight", ValueUnit.API_CALLS, 5)

        # 2. 初始化子任务(代理和工具)
        planner_agent = BudgetedLLMAgent("planner_agent_tight", "gpt-3.5-turbo", planner_token_budget)
        analyzer_agent = BudgetedLLMAgent("analyzer_agent_tight", "gpt-4", analyzer_token_budget)
        reporter_agent = BudgetedLLMAgent("reporter_agent_tight", "gpt-3.5-turbo", reporter_token_budget)

        web_search_tool = Tool("web_search", {ValueUnit.API_CALLS: 1})
        tool_executor = BudgetedToolExecutor("search_tool_executor_tight", [web_search_tool], {ValueUnit.API_CALLS: search_api_budget})

        # --- 任务流执行 ---
        planner_prompt = f"Understand the topic '{topic}' and generate 3 concise web search queries to gather information. List them line by line."
        search_queries_str = planner_agent.generate_response(planner_prompt, max_output_tokens=500)
        search_queries = [q.strip() for q in search_queries_str.split('n') if q.strip()]

        all_search_results = []
        for i, query in enumerate(search_queries):
            if i >= search_api_budget.total_limit: # 提前检查,防止超出
                print(f"  Search API budget depleted, stopping further searches.")
                break
            result = tool_executor.use_tool("web_search", query)
            all_search_results.append(result)
        combined_search_results = "n".join(all_search_results)

        # 故意让分析任务超出预算
        analysis_prompt = (f"Analyze the following extensive search results about '{topic}'. "
                           f"Provide a very detailed, long, and comprehensive summary, "
                           f"extracting every possible fact and insight, identifying all nuances, "
                           f"and generating a thorough report without any brevity. "
                           f"Results:n{combined_search_results}")
        analysis_report = analyzer_agent.generate_response(analysis_prompt, max_output_tokens=2000) # 即使这里请求2000,但agent的budget只有500,会触发异常

        report_prompt = (f"Based on the following analysis, write a comprehensive report on '{topic}'. "
                         f"Include an introduction, key findings, and a conclusion. "
                         f"Analysis:n{analysis_report}")
        final_report = reporter_agent.generate_response(report_prompt, max_output_tokens=1500)
        print(f"n--- Final Report (Tight Budget) for '{topic}' ---n{final_report[:500]}...")

    except BudgetExceededError as e:
        print(f"n!!! Circuit Execution Halted (Tight Budget): {e.message}")
        print(f"Subtask '{e.subtask_id}' exceeded budget for {e.unit.value}. Spent: {e.current_spent}/{e.limit}.")
    except Exception as e:
        print(f"n!!! An unexpected error occurred (Tight Budget): {e}")
    finally:
        print("n--- Circuit Execution Finished (Tight Budget) ---")
        for subtask_id in list(circuit_manager._subtask_budgets.keys()):
            for unit in list(circuit_manager._subtask_budgets[subtask_id].keys()):
                circuit_manager.reclaim_unused_subtask_budget(subtask_id, unit)

        print("n--- Final Global Budget Status (Tight Budget) ---")
        for unit, status in circuit_manager.get_global_budget_status().items():
            print(f"  {unit}: Total {status['total_limit']}, Allocated {status['spent_allocated']}, Remaining {status['remaining_allocatable']}")

run_research_assistant_tight("The impact of quantum computing on cryptography", circuit_manager_tight)

在上述示例中,我们首先定义了全局预算,然后由 CircuitBudgetManager 将其分配给不同的子任务。每个代理和工具在执行操作前,都会通过它们各自的 Budget 对象进行检查和扣除。当 analyzer_agent_tight 在第二个案例中尝试进行过长的分析时,它会因为其分配到的 LLM_TOKENS 预算不足而抛出 BudgetExceededError,从而阻止进一步的资源消耗。

6. 深入探讨与高级策略

6.1. 动态预算调整与请求

虽然硬配额是防止失控的关键,但在某些复杂场景下,子任务可能确实需要更多的资源才能完成其目标。我们可以引入一个机制,允许子任务在耗尽预算时向 CircuitBudgetManager “请求”额外预算。

  • 请求机制: 子任务抛出 BudgetExceededError,但包含一个 can_request_more=True 标志。
  • 审批逻辑: CircuitBudgetManager 接收请求,根据预设策略(例如:全局预算是否充足、该任务的优先级、是否是关键路径任务)决定是否批准。
  • 重新分配: 如果批准,CircuitBudgetManager 会从全局预算或从其他子任务未使用的预算中重新分配。

这增加了系统的复杂性,但提供了更大的灵活性,可以在保持控制的同时提高成功率。

6.2. 成本模型与预估

精准的预算分配依赖于对未来消耗的准确预估。

价值单位 预估策略 挑战
LLM_TOKENS 基于历史平均值、输入长度、预期输出长度、LLM模型类型(不同模型token成本不同)。 LLM输出长度不确定性高,prompt工程会影响token效率。
API_CALLS 每次调用固定成本、基于请求参数(例如图片尺寸、数据量)的阶梯成本。 外部API定价模型多样,可能随时间变化。
COMPUTE_CYCLES 任务类型(CPU/GPU)、数据量、算法复杂度。 难以精确计量,通常通过经验值或SLA约定。
DATA_VOLUME_MB 输入/输出数据量、存储时间。 流式数据或未知数据源的预估困难。

我们可以为每个工具或LLM模型维护一个成本模型,在分配预算时参考这些模型。

6.3. 优先级与权重

并非所有子任务都同等重要。在预算紧张时,高优先级任务可能被允许获得更多预算,甚至可以从低优先级任务中“借用”未使用的预算。这可以通过在 CircuitBudgetManager 中引入优先级队列或加权分配算法来实现。

6.4. 监控与告警

一个健全的预算管理系统还需要:

  • 实时仪表盘: 可视化地展示每个子任务和整个电路的预算使用情况。
  • 告警机制: 当预算达到某个阈值(例如,剩余20%)时,自动发送通知给相关人员。
  • 日志记录: 详细记录每次预算分配、扣除、回收和超出事件,以便事后分析和审计。

6.5. 故障处理与回滚

当预算耗尽导致任务失败时,系统应能优雅地处理:

  • 回退策略: 尝试使用更便宜的LLM模型、更简单的算法或更有限的工具集。
  • 截断/总结: 如果是文本生成任务,在预算耗尽时截断输出,并添加提示信息。
  • 人工干预: 将问题上报给人,请求人工审查或额外预算。
  • 事务性回滚: 如果整个工作流是事务性的,预算耗尽可能需要回滚之前已完成的步骤。

7. 挑战与权衡

实施硬性价值配额并非没有挑战:

  • 过度预算 vs. 预算不足: 分配过多会浪费资源,分配过少则可能导致任务频繁失败。找到最佳平衡点需要经验和迭代。
  • 复杂性增加: 引入预算管理机制无疑会增加系统的设计、实现和维护复杂性。
  • 预估准确性: 特别是LLM的输出长度,很难精确预估,这使得初始预算分配成为一个挑战。
  • 开发体验: 开发者需要遵循预算集成规范,这可能对开发流程产生影响。

尽管存在这些挑战,但考虑到失控资源消耗可能带来的巨大成本和稳定性风险,投入精力构建一个健壮的预算管理系统是绝对值得的。

8. 总结与展望

在复杂的AI工作流中,对资源消耗进行硬性价值配额管理是确保系统稳定、高效和经济运行的关键。通过建立统一的价值单位抽象、分层的预算管理机制,并将预算检查与扣除集成到每个资源消耗点,我们可以有效地防止失控消耗。这不仅能帮助我们控制成本,还能提高系统的可预测性和可靠性,为构建更先进、更负责任的AI系统奠定坚实基础。

未来的工作可以围绕更智能的动态预算分配、基于机器学习的成本预测模型以及更友好的开发者工具和可视化界面展开,进一步提升预算管理的能力和易用性。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注