深入 ‘Automated Prompt Optimization’:利用 APE (Automatic Prompt Engineer) 在图中持续微调节点指令

各位专家、同仁,下午好!

今天,我们齐聚一堂,共同探讨一个前沿且极具实践意义的话题:深入“Automated Prompt Optimization”(自动化提示词优化),特别是如何利用APE(Automatic Prompt Engineer)在复杂的图结构中,对节点指令进行持续的微调。在AI技术飞速发展的今天,大型语言模型(LLM)已成为我们构建智能系统的核心组件。然而,要充分发挥LLM的潜力,仅仅调用API是远远不够的,精妙的提示词(Prompt)设计至关重要。

引言:提示词工程的挑战与自动化需求

我们都曾是“提示词工程师”。为了让LLM完成特定任务,我们绞尽脑汁地构造指令,尝试各种措辞、格式、示例,甚至魔法咒语般的关键词。这门艺术被称为“提示词工程”(Prompt Engineering)。它要求我们对LLM的行为模式有深刻的理解,对领域知识有扎实的掌握,并且需要大量的试错和经验积累。

然而,手动提示词工程面临着诸多挑战:

  1. 效率低下与可扩展性差: 针对每一个新任务、新场景,都需要从头开始设计和优化提示词,耗时耗力。当系统包含成百上千个LLM调用点时,手动维护和优化这些提示词几乎是不可能完成的任务。
  2. 次优解风险: 人的认知和尝试空间是有限的。我们很难穷尽所有可能的提示词组合,因此很可能错过更优的解决方案。
  3. 主观性与不稳定性: 不同的工程师可能会写出不同的提示词,导致输出结果的不一致性。同时,LLM本身也在不断迭代,一个在旧模型上表现良好的提示词,在新模型上可能效果不佳。
  4. 复杂系统中的依赖性: 在多步骤、多代理或图结构的工作流中,一个节点的输出是另一个节点的输入。一个节点的提示词优化不仅影响自身性能,还可能级联影响整个工作流的最终效果。

正是这些挑战催生了对“自动化提示词优化”(Automated Prompt Optimization, APO)的迫切需求。APO的核心思想是:让LLM自己去优化提示词。这听起来有些元编程的味道——我们用LLM来指导LLM更好地工作。而APE(Automatic Prompt Engineer)正是APO领域的一个杰出代表,它提供了一套系统化的方法来自动化这一过程。

今天,我们将更进一步,探讨如何将APE的强大能力融入到我们日益复杂的图结构系统中,实现对图中节点指令的持续微调。想象一下,您的AI工作流不再是静态的,而是能够根据实际运行数据和反馈,智能地、持续地优化每一个执行步骤的指令,这将是何等强大的能力!

第一部分:Automated Prompt Optimization (APO) 核心理念与 APE 机制解析

1.1 APO 的基本原理

APO 的核心在于构建一个反馈循环。它不再依赖人类工程师的直觉,而是通过以下步骤来迭代改进提示词:

  1. 提示词生成(Prompt Generation): 利用一个“元LLM”(Meta-LLM)来生成候选提示词。这个元LLM的输入是原始任务描述和一些初始的、可能不够完善的提示词。
  2. 任务执行(Task Execution): 将生成的候选提示词应用到目标LLM上,并结合实际的任务输入数据来执行任务。
  3. 效果评估(Performance Evaluation): 根据任务的特定指标(例如准确率、F1分数、ROUGE分数、人类偏好等)来评估目标LLM的输出质量。
  4. 优化与选择(Optimization & Selection): 根据评估结果,选择表现最佳的提示词,或者利用评估结果来指导元LLM生成下一轮更优的提示词。

这个循环可以持续进行,直到达到满意的性能,或者达到预设的迭代次数。

1.2 APE (Automatic Prompt Engineer) 机制详解

APE 是由 Google DeepMind 提出的一种具体的 APO 框架。它的关键创新在于,它将“生成提示词”这个任务本身也交给了LLM。APE 的主要步骤如下:

  1. 种子提示词(Seed Prompts)初始化: 从一个或几个简单的、人工编写的提示词开始。这些提示词可能很粗糙,但能大致描述任务。
  2. 提示词变异(Prompt Mutation)/生成: 利用一个强大的LLM(我们称之为“提示词生成器”或“元LLM”)来生成新的、多样化的候选提示词。它接收当前表现较好的提示词作为输入,并被指示去“生成更有效、更清晰、更能引导LLM完成任务的替代提示词”。
    • 指令示例: "Given the following prompt: ‘{current_prompt}’, generate 5 alternative prompts that could lead to better performance for the task of {task_description}. Focus on clarity, conciseness, and specific instructions."
  3. 任务执行与评估: 对于每一个生成的候选提示词,将其应用到目标任务的测试数据集上。目标LLM执行任务,并计算其性能指标。
  4. 排序与选择: 根据评估指标对所有候选提示词进行排序。选择性能最佳的提示词作为下一轮的“种子提示词”,或者作为最终的优化结果。
  5. 迭代: 重复步骤2-4,直到收敛或达到预设的停止条件。

表1: 手动提示词工程与APE的对比

特性 手动提示词工程 APE (Automatic Prompt Engineer)
设计主体 人类工程师 LLM (元LLM)
迭代方式 经验、直觉、A/B测试 系统化、数据驱动的反馈循环
效率 低,耗时耗力 高,可自动化大规模优化
可扩展性 差,难以应对大量任务和提示词 强,可并行优化多个提示词
优化质量 受限于人类认知,可能次优 有潜力发现人类难以想象的优化方案,趋近最优
所需资源 工程师时间、专业知识 计算资源(LLM API调用成本)、高质量评估数据
适用场景 简单、小规模任务;原型开发 复杂、大规模任务;需要持续优化的生产环境

1.3 APE 的基本实现结构(Python 示例)

为了更好地理解 APE 的工作原理,我们先看一个简化的 Python 骨架,它模拟了一个基础的 APE 优化流程。这里我们假设有一个 MockLLM 类来模拟实际的语言模型调用,以及一个 evaluate_output 函数来模拟任务评估。

import os
import random
from typing import List, Dict, Any, Callable, Tuple

# 假设我们有一个Mock LLM来模拟OpenAI/Anthropic等API
class MockLLM:
    """
    模拟一个LLM,可以根据提示词生成响应。
    在实际应用中,这里会集成OpenAI, Anthropic, HuggingFace Transformers等。
    """
    def __init__(self, model_name: str = "mock-gpt-3.5-turbo"):
        self.model_name = model_name
        self.call_count = 0

    def generate(self, prompt: str, temperature: float = 0.7, max_tokens: int = 150) -> str:
        self.call_count += 1
        print(f"MockLLM ({self.model_name}) call {self.call_count} with prompt fragment: '{prompt[:50]}...'")
        # 简单模拟不同的prompt可能导致不同的输出
        if "summarize" in prompt.lower():
            return f"This is a mock summary generated by '{prompt}' for the input. (Call {self.call_count})"
        elif "classify" in prompt.lower():
            return f"This is a mock classification output for '{prompt}'. (Call {self.call_count})"
        elif "refine" in prompt.lower() or "improve" in prompt.lower():
            # 模拟提示词生成器的响应
            candidate_prompts = [
                f"Please provide a concise summary of the following text, focusing on key facts. V{self.call_count}",
                f"Extract the main ideas from the text below in bullet points. V{self.call_count}",
                f"Summarize the essence of the document in exactly 50 words. V{self.call_count}",
                f"Rewrite the following text into a professional summary. V{self.call_count}",
                f"Given the text, identify and list the most important information. V{self.call_count}"
            ]
            return "n".join(candidate_prompts)
        return f"Mock response for '{prompt}'. (Call {self.call_count})"

# 假设一个评估函数,用于评估LLM输出的质量
# 在实际中,这可能是ROUGE, BLEU, F1-score, BERTScore, 或基于人类反馈的评估
def evaluate_summary(llm_output: str, ground_truth: str) -> float:
    """
    模拟摘要任务的评估函数。
    这里使用一个非常简化的方法:检查关键词匹配和长度。
    """
    score = 0.0
    # 假设ground_truth是期望的关键词列表或者一个参考摘要
    if "key facts" in ground_truth and "key facts" in llm_output:
        score += 0.3
    if "concise" in ground_truth and "concise" in llm_output:
        score += 0.2

    # 模拟与长度相关的评估
    if len(llm_output) > 50 and len(llm_output) < 100:
        score += 0.4 # 假设这是比较好的长度

    # 模拟随机性,让分数有所浮动
    score += random.uniform(-0.1, 0.1)
    return max(0.0, min(1.0, score)) # 分数限制在0到1之间

class APEOptimizer:
    def __init__(self,
                 target_llm: MockLLM,
                 prompt_generator_llm: MockLLM,
                 task_description: str,
                 evaluation_function: Callable[[str, str], float],
                 num_candidate_prompts: int = 5,
                 num_iterations: int = 5):
        self.target_llm = target_llm # 用于执行任务的LLM
        self.prompt_generator_llm = prompt_generator_llm # 用于生成新提示词的LLM
        self.task_description = task_description
        self.evaluation_function = evaluation_function
        self.num_candidate_prompts = num_candidate_prompts
        self.num_iterations = num_iterations

    def _generate_candidate_prompts(self, current_best_prompt: str) -> List[str]:
        """
        使用prompt_generator_llm生成新的候选提示词。
        """
        generation_prompt = (
            f"Given the task: '{self.task_description}'. "
            f"The current best performing prompt is: '{current_best_prompt}'. "
            f"Suggest {self.num_candidate_prompts} alternative prompts that could lead to better performance. "
            "Each prompt should be on a new line."
        )
        raw_response = self.prompt_generator_llm.generate(generation_prompt, temperature=0.9, max_tokens=300)
        # 简单地按行分割,并过滤空行
        candidates = [p.strip() for p in raw_response.split('n') if p.strip()]
        # 确保有足够的候选,不足时可以重复或使用默认
        if len(candidates) < self.num_candidate_prompts:
            print(f"Warning: Prompt generator returned {len(candidates)} candidates, expected {self.num_candidate_prompts}. Filling with duplicates or defaults.")
            while len(candidates) < self.num_candidate_prompts:
                candidates.append(current_best_prompt + f" (Variant {len(candidates)})")
        return candidates[:self.num_candidate_prompts]

    def optimize(self, initial_prompt: str, test_data: List[Dict[str, str]]) -> Tuple[str, float]:
        """
        执行APE优化循环。
        :param initial_prompt: 初始的种子提示词。
        :param test_data: 包含 'input' 和 'ground_truth' 的测试数据集。
        :return: 优化后的最佳提示词及其评估分数。
        """
        current_best_prompt = initial_prompt
        current_best_score = -float('inf')

        print(f"n--- Starting APE Optimization for task: {self.task_description} ---")
        print(f"Initial Prompt: '{initial_best_prompt}'")

        for iteration in range(self.num_iterations):
            print(f"n--- Iteration {iteration + 1}/{self.num_iterations} ---")

            # 1. 生成候选提示词
            candidate_prompts = self._generate_candidate_prompts(current_best_prompt)
            print(f"Generated {len(candidate_prompts)} candidate prompts.")

            iteration_scores = []
            for prompt in candidate_prompts:
                total_score_for_prompt = 0.0
                # 2. 对每个候选提示词,在测试数据上执行并评估
                for example in test_data:
                    llm_input = example['input']
                    ground_truth = example['ground_truth']

                    # 实际调用目标LLM
                    full_llm_prompt = f"{prompt}nnText: {llm_input}"
                    llm_output = self.target_llm.generate(full_llm_prompt, temperature=0.5)

                    # 评估LLM输出
                    score = self.evaluation_function(llm_output, ground_truth)
                    total_score_for_prompt += score

                avg_score = total_score_for_prompt / len(test_data)
                iteration_scores.append((prompt, avg_score))
                print(f"  - Prompt: '{prompt[:70]}...' -> Avg Score: {avg_score:.4f}")

            # 3. 选择本轮最佳提示词
            best_candidate_prompt_this_iter, best_candidate_score_this_iter = max(iteration_scores, key=lambda x: x[1])

            if best_candidate_score_this_iter > current_best_score:
                current_best_prompt = best_candidate_prompt_this_iter
                current_best_score = best_candidate_score_this_iter
                print(f"nNew best prompt found in iteration {iteration + 1}: '{current_best_prompt[:70]}...' with score {current_best_score:.4f}")
            else:
                print(f"nNo improvement in iteration {iteration + 1}. Current best remains: '{current_best_prompt[:70]}...' with score {current_best_score:.4f}")
                # 可以在这里增加提前停止的逻辑

        print(f"n--- APE Optimization Finished ---")
        print(f"Final Best Prompt: '{current_best_prompt}'")
        print(f"Final Best Score: {current_best_score:.4f}")
        return current_best_prompt, current_best_score

# --- 运行示例 ---
if __name__ == "__main__":
    # 初始化模拟LLM
    target_llm = MockLLM(model_name="target-llm-summarizer")
    prompt_generator_llm = MockLLM(model_name="prompt-generator-llm")

    # 定义任务描述
    task_description = "Summarize the provided text concisely and accurately."

    # 准备测试数据 (在实际中,这应该是一个包含大量高质量标注数据的测试集)
    test_data = [
        {'input': 'The quick brown fox jumps over the lazy dog. This is a classic sentence for testing typing speed.',
         'ground_truth': 'A quick brown fox sentence for typing tests.'},
        {'input': 'Artificial intelligence (AI) is intelligence demonstrated by machines, as opposed to the natural intelligence displayed by animals including humans.',
         'ground_truth': 'AI is machine intelligence, contrasting with natural intelligence.'},
        {'input': 'Prompt engineering is a concept in artificial intelligence, particularly in the context of natural language processing (NLP), that focuses on designing and refining the input "prompts" given to an AI model.',
         'ground_truth': 'Prompt engineering refines AI model inputs in NLP.'},
    ]

    # 初始的种子提示词
    initial_best_prompt = "Summarize the following text."

    # 创建并运行APE优化器
    ape_optimizer = APEOptimizer(
        target_llm=target_llm,
        prompt_generator_llm=prompt_generator_llm,
        task_description=task_description,
        evaluation_function=evaluate_summary,
        num_candidate_prompts=3,
        num_iterations=3
    )

    final_prompt, final_score = ape_optimizer.optimize(initial_best_prompt, test_data)
    print(f"nOptimization complete. The best prompt found is: '{final_prompt}' with an average score of {final_score:.4f}")

在这个示例中,我们展示了 APE 的核心循环:生成候选提示词、使用这些提示词执行任务、评估结果,然后根据评估结果选择最佳提示词进行下一轮迭代。这是我们深入到图结构中进行优化的基石。

第二部分:将 APE 扩展到图结构中的节点指令微调

现在,我们把视角从单一任务的提示词优化,扩展到一个更宏大、更复杂的场景:图结构中的节点指令持续微调

2.1 为什么是图结构?

在现代AI系统中,尤其是处理复杂工作流、多代理协作、知识图谱推理或数据处理管道时,图结构是一种非常自然且强大的表示方式。

  • 多步骤工作流: 例如,一个文档处理管道可能包括:文档解析 -> 实体抽取 -> 关系识别 -> 摘要生成 -> 情感分析。每个步骤都可以是一个图节点。
  • 多代理系统: 不同的AI代理(Agent)负责不同的子任务,并通过消息传递在图上协作。每个代理的行为由其“指令”定义。
  • 知识图谱推理: 节点可以是知识图谱中的实体或概念,边表示它们之间的关系。对节点进行推理或生成描述可能需要特定的指令。
  • 数据转换管道: 数据从一个格式转换到另一个,经过清洗、聚合、丰富等步骤。每个转换操作都可以看作一个节点。

在这些场景中,每个节点执行特定的任务,而这些任务的“指令”往往就是LLM的提示词。

2.2 图中“节点指令”的定义

在我们的语境中,“节点指令”特指驱动图中某个节点执行其特定任务的LLM提示词。

例如:

  • 文档摘要节点: 指令可能是 "请用50字以内总结以下文章的核心内容。"
  • 实体抽取节点: 指令可能是 "从以下文本中抽取出所有人名、地名和组织名。"
  • 情感分析节点: 指令可能是 "判断以下评论的情感是积极、消极还是中性。"
  • 数据清洗节点: 指令可能是 "标准化以下地址信息,确保格式为:[省][市][区][街道][门牌号]。"

这些指令的质量直接决定了单个节点的输出质量,进而影响整个图的端到端性能。

2.3 持续微调的必要性

“持续微调”是这里的核心概念。它意味着优化不是一次性的,而是随着时间的推移、数据的变化、业务需求迭代而不断进行的。

  • 数据漂移(Data Drift): 现实世界的数据是动态变化的。新出现的术语、表达方式、领域趋势都可能导致现有提示词效果下降。
  • 任务演变: 业务需求可能发生变化,对节点任务的精度、侧重点、输出格式提出新要求。
  • 模型更新: 底层LLM可能升级,新的模型版本可能对提示词的敏感性不同,需要重新优化。
  • 级联效应: 上游节点的优化可能改变其输出,从而对下游节点的最佳提示词产生影响。

因此,我们需要一个机制,能够让图中的节点指令“活”起来,能够根据实际运行情况,自动地进行学习和调整。APE正是实现这一目标的理想工具。

第三部分:图结构中 APE 节点指令微调的架构设计

为了在图结构中应用 APE 进行持续微调,我们需要一个更加复杂的架构,它能够管理图、执行图、并对图中的特定节点进行优化。

3.1 核心组件

  1. Node (节点):

    • 表示图中的一个操作单元。
    • 包含其当前执行任务的LLM prompt
    • 拥有一个 task_description
    • 可能有一个 evaluation_metricevaluation_function 来评估其自身的输出。
    • 存储其输入和输出数据。
    • node_idnode_type (e.g., "LLM_CALL", "DATA_TRANSFORM", "MERGE").
    • dependencies (上游节点的ID列表)。
  2. Graph (图):

    • 存储节点及其连接关系(有向无环图 DAG)。
    • 可以使用邻接列表或邻接矩阵表示。
    • 提供遍历和执行节点的方法。
  3. GraphExecutor (图执行器):

    • 负责按照拓扑排序执行图中的节点。
    • 管理节点之间的数据流。
    • 收集每个节点的输入、输出和执行状态。
    • 能够执行整个图,或从某个节点开始执行子图。
  4. APENodeOptimizer (节点优化器):

    • 封装了前面介绍的 APE 优化逻辑。
    • 针对图中的单个可优化节点进行提示词优化。
    • 需要能够从 GraphExecutor 获取该节点的输入和预期输出(或通过整个图的端到端评估间接获取)。
  5. APEOrchestrator (APE 编排器):

    • 这是核心的调度单元。
    • 它监控图的运行状况,触发优化过程。
    • 决定何时哪个节点需要优化。
    • 调用 APENodeOptimizer 对选定节点进行优化。
    • 更新图中的节点提示词。
    • 可能需要管理优化数据集(用于节点评估)和生产数据集(用于实际运行)。

3.2 数据流与评估策略

  • 节点级评估: 对于每个可优化的LLM节点,我们需要一个针对其特定任务的评估函数和相应的“黄金标准”(ground truth)数据。例如,摘要节点的ROUGE分数,分类节点的F1分数。
  • 图级评估(端到端评估): 整个图的最终输出可能有一个更高层次的业务指标。例如,整个文档处理管道的最终报告质量。
  • 优化数据流:APENodeOptimizer 优化某个节点时,它需要模拟该节点在图中的实际运行环境。这意味着它需要从上游节点接收“模拟输入”,这些模拟输入应该尽可能真实地反映生产数据经过上游节点处理后的状态。这通常通过在优化过程中运行一遍(或部分)图来生成。

3.3 持续微调的触发机制

  • 定时触发: 每隔一段时间(例如每天、每周)对所有可优化节点进行一次检查或重新优化。
  • 性能下降触发: 监控图的端到端性能或关键节点的性能。当性能跌破某个阈值时,触发相关节点的优化。
  • 数据漂移检测: 当检测到输入数据分布发生显著变化时,触发可能受影响的节点的优化。
  • 人工触发: 工程师手动启动对特定节点的优化。

第四部分:实践中的代码实现与复杂性考量

我们将通过一个更详细的 Python 示例来展示如何在图结构中实现 APE 驱动的节点指令微调。

4.1 定义图节点与图

首先,我们定义 Node 类和 Graph 类。

from abc import ABC, abstractmethod
import networkx as nx # 使用networkx库来管理图结构

# 抽象基类,定义可执行的图节点接口
class GraphNode(ABC):
    def __init__(self, node_id: str, task_description: str, initial_prompt: str = ""):
        self.node_id = node_id
        self.task_description = task_description
        self._current_prompt = initial_prompt
        self.output_data = None # 存储节点执行后的输出
        self.input_data = None  # 存储节点接收到的输入

    @property
    def current_prompt(self) -> str:
        return self._current_prompt

    @current_prompt.setter
    def current_prompt(self, new_prompt: str):
        print(f"Node '{self.node_id}' prompt updated from '{self._current_prompt[:30]}...' to '{new_prompt[:30]}...'")
        self._current_prompt = new_prompt

    @abstractmethod
    def execute(self, input_data: Any, llm_instance: MockLLM) -> Any:
        """
        执行节点任务,返回输出。
        input_data: 来自上游节点的输入。
        llm_instance: 用于执行任务的LLM实例。
        """
        pass

    @abstractmethod
    def evaluate(self, actual_output: Any, ground_truth: Any) -> float:
        """
        评估节点输出的质量。
        """
        pass

    def __repr__(self):
        return f"GraphNode(id='{self.node_id}', task='{self.task_description[:30]}...', prompt='{self.current_prompt[:30]}...')"

# 具体的LLM调用节点
class LLMCallNode(GraphNode):
    def __init__(self, node_id: str, task_description: str, initial_prompt: str, evaluation_function: Callable[[str, str], float]):
        super().__init__(node_id, task_description, initial_prompt)
        self.evaluation_function = evaluation_function

    def execute(self, input_data: Any, llm_instance: MockLLM) -> Any:
        """
        执行LLM调用任务。
        """
        self.input_data = input_data
        full_llm_prompt = f"{self.current_prompt}nnInput: {input_data}"
        self.output_data = llm_instance.generate(full_llm_prompt, temperature=0.5)
        return self.output_data

    def evaluate(self, actual_output: Any, ground_truth: Any) -> float:
        """
        评估LLM输出。
        """
        return self.evaluation_function(actual_output, ground_truth)

# 示例:一个简单的数据处理节点,不涉及LLM,但可以作为图的一部分
class DataTransformNode(GraphNode):
    def __init__(self, node_id: str, task_description: str, transform_func: Callable[[Any], Any]):
        super().__init__(node_id, task_description, "") # 不需要LLM prompt
        self.transform_func = transform_func

    def execute(self, input_data: Any, llm_instance: MockLLM = None) -> Any:
        self.input_data = input_data
        # 假设transform_func是一个简单的字符串大写转换
        self.output_data = self.transform_func(input_data)
        print(f"Node '{self.node_id}' executed. Input: {input_data[:20]}..., Output: {self.output_data[:20]}...")
        return self.output_data

    def evaluate(self, actual_output: Any, ground_truth: Any) -> float:
        # 对于非LLM节点,评估可能更直接,例如精确匹配
        return 1.0 if actual_output == ground_truth else 0.0

class WorkflowGraph:
    def __init__(self):
        self.graph = nx.DiGraph() # 使用networkx构建有向无环图
        self.nodes_map: Dict[str, GraphNode] = {}

    def add_node(self, node: GraphNode):
        if node.node_id in self.nodes_map:
            raise ValueError(f"Node with ID '{node.node_id}' already exists.")
        self.nodes_map[node.node_id] = node
        self.graph.add_node(node.node_id)

    def add_edge(self, upstream_node_id: str, downstream_node_id: str):
        if upstream_node_id not in self.nodes_map or downstream_node_id not in self.nodes_map:
            raise ValueError("Both upstream and downstream nodes must exist in the graph.")
        self.graph.add_edge(upstream_node_id, downstream_node_id)

    def get_node(self, node_id: str) -> GraphNode:
        return self.nodes_map.get(node_id)

    def get_upstream_nodes(self, node_id: str) -> List[GraphNode]:
        return [self.nodes_map[pred] for pred in self.graph.predecessors(node_id)]

    def topological_sort(self) -> List[GraphNode]:
        """按拓扑顺序返回所有节点"""
        return [self.nodes_map[node_id] for node_id in nx.topological_sort(self.graph)]

    def __repr__(self):
        return f"WorkflowGraph with {len(self.nodes_map)} nodes and {self.graph.number_of_edges()} edges."

4.2 图执行器 GraphExecutor

GraphExecutor 负责按正确顺序运行图中的节点,并管理数据流。

class GraphExecutor:
    def __init__(self, graph: WorkflowGraph, llm_instance: MockLLM):
        self.graph = graph
        self.llm_instance = llm_instance
        self.node_outputs: Dict[str, Any] = {} # 存储每个节点的输出

    def execute_graph(self, initial_input: Any, start_node_id: str = None) -> Dict[str, Any]:
        """
        执行整个图或从指定节点开始执行子图。
        initial_input: 整个图的初始输入。
        start_node_id: 如果指定,则从该节点开始执行子图,并将其作为第一个节点的输入。
        """
        sorted_nodes = self.graph.topological_sort()

        # 确定实际的起始节点和输入
        if start_node_id:
            start_index = next((i for i, node in enumerate(sorted_nodes) if node.node_id == start_node_id), -1)
            if start_index == -1:
                raise ValueError(f"Start node '{start_node_id}' not found in graph.")
            nodes_to_execute = sorted_nodes[start_index:]
            # 假设start_node_id的输入就是initial_input,或者需要特殊处理
            self.node_outputs[start_node_id] = initial_input # 临时设置为输入
        else:
            nodes_to_execute = sorted_nodes
            # 找到没有上游节点的起始节点,将initial_input传递给它们
            for node in nodes_to_execute:
                if not list(self.graph.graph.predecessors(node.node_id)):
                    self.node_outputs[node.node_id] = initial_input # 默认将初始输入给所有无上游节点

        for node in nodes_to_execute:
            inputs_for_node = []
            predecessor_ids = list(self.graph.graph.predecessors(node.node_id))

            if not predecessor_ids and not start_node_id: # 针对没有上游的图的起始节点
                 inputs_for_node.append(initial_input)
            elif node.node_id == start_node_id: # 如果是子图的起始节点
                 inputs_for_node.append(initial_input)
            else:
                for pred_id in predecessor_ids:
                    if pred_id not in self.node_outputs:
                        # 这通常不应该发生在一个正确的拓扑排序中,除非是子图执行且上游不在子图内
                        raise RuntimeError(f"Missing output from upstream node '{pred_id}' for node '{node.node_id}'.")
                    inputs_for_node.append(self.node_outputs[pred_id])

            # 聚合输入。这里简化为直接传递第一个输入,实际可能需要合并
            if inputs_for_node:
                aggregated_input = inputs_for_node[0] # 简化处理,实际可能需要更复杂的合并逻辑
            else:
                aggregated_input = "" # 如果没有上游,且不是初始节点,则输入为空

            print(f"Executing node: {node.node_id} with input: '{str(aggregated_input)[:50]}...'")
            node_output = node.execute(aggregated_input, self.llm_instance)
            self.node_outputs[node.node_id] = node_output
            print(f"Node {node.node_id} output: '{str(node_output)[:50]}...'")

        return self.node_outputs

    def get_node_output(self, node_id: str) -> Any:
        return self.node_outputs.get(node_id)

4.3 APE 节点优化器 APENodeOptimizer

这个类将 APE 的核心逻辑与 GraphExecutor 结合起来,以便优化图中的特定节点。

class APENodeOptimizer:
    def __init__(self,
                 target_graph: WorkflowGraph,
                 target_node_id: str,
                 llm_instance: MockLLM, # 用于执行目标节点的LLM
                 prompt_generator_llm: MockLLM, # 用于生成提示词的LLM
                 optimization_data: List[Dict[str, Any]], # 用于优化的数据,包含 'graph_initial_input' 和 'node_ground_truth'
                 num_candidate_prompts: int = 5,
                 num_iterations: int = 5):
        self.target_graph = target_graph
        self.target_node_id = target_node_id
        self.target_node = self.target_graph.get_node(target_node_id)
        if not isinstance(self.target_node, LLMCallNode):
            raise TypeError(f"Node '{target_node_id}' is not an LLMCallNode and cannot be optimized by APE.")

        self.llm_instance = llm_instance
        self.prompt_generator_llm = prompt_generator_llm
        self.optimization_data = optimization_data
        self.num_candidate_prompts = num_candidate_prompts
        self.num_iterations = num_iterations
        self.graph_executor = GraphExecutor(target_graph, llm_instance)

    def _generate_candidate_prompts(self, current_best_prompt: str) -> List[str]:
        """
        使用prompt_generator_llm生成新的候选提示词。
        """
        generation_prompt = (
            f"Given the task: '{self.target_node.task_description}'. "
            f"The current best performing prompt for this task is: '{current_best_prompt}'. "
            f"Suggest {self.num_candidate_prompts} alternative prompts that could lead to better performance. "
            "Each prompt should be concise and on a new line."
        )
        raw_response = self.prompt_generator_llm.generate(generation_prompt, temperature=0.9, max_tokens=300)
        candidates = [p.strip() for p in raw_response.split('n') if p.strip()]
        if len(candidates) < self.num_candidate_prompts:
            print(f"Warning: Prompt generator returned {len(candidates)} candidates, expected {self.num_candidate_prompts}. Filling with current best.")
            while len(candidates) < self.num_candidate_prompts:
                candidates.append(current_best_prompt + f" (Var {len(candidates)})")
        return candidates[:self.num_candidate_prompts]

    def optimize_node_prompt(self) -> Tuple[str, float]:
        """
        对指定节点进行APE优化。
        """
        initial_prompt = self.target_node.current_prompt
        current_best_prompt = initial_prompt
        current_best_score = -float('inf')

        print(f"n--- Starting APE Node Optimization for Node: '{self.target_node_id}' (Task: '{self.target_node.task_description}') ---")
        print(f"Initial Prompt: '{initial_prompt}'")

        for iteration in range(self.num_iterations):
            print(f"n--- Iteration {iteration + 1}/{self.num_iterations} for Node '{self.target_node_id}' ---")

            candidate_prompts = self._generate_candidate_prompts(current_best_prompt)
            iteration_scores = []

            for prompt_candidate in candidate_prompts:
                total_score_for_candidate = 0.0

                # 临时更新节点的提示词进行测试
                original_prompt = self.target_node.current_prompt
                self.target_node.current_prompt = prompt_candidate

                for example_data in self.optimization_data:
                    graph_initial_input = example_data['graph_initial_input']
                    node_ground_truth = example_data['node_ground_truth']

                    # 运行图直到目标节点,获取其输入
                    # 关键步骤:执行子图以获取目标节点的真实输入
                    self.graph_executor.execute_graph(graph_initial_input) # 运行整个图,或到目标节点
                    target_node_input = self.graph_executor.get_node_output(self.target_node_id)

                    # 如果目标节点有上游,且其输入不是graph_initial_input,则需要从上游获取
                    # 这里简化为直接使用GraphExecutor的输出,实际需要精确到目标节点的前一个输出

                    if target_node_input is None:
                        # 如果是第一个节点,它的输入就是graph_initial_input
                        if not list(self.target_graph.graph.predecessors(self.target_node_id)):
                            target_node_input = graph_initial_input
                        else:
                            print(f"Warning: Could not get input for node {self.target_node_id}. Skipping example.")
                            continue

                    # 执行目标节点并评估
                    actual_output = self.target_node.execute(target_node_input, self.llm_instance)
                    score = self.target_node.evaluate(actual_output, node_ground_truth)
                    total_score_for_candidate += score

                # 恢复节点的原始提示词
                self.target_node.current_prompt = original_prompt

                avg_score = total_score_for_candidate / len(self.optimization_data)
                iteration_scores.append((prompt_candidate, avg_score))
                print(f"  - Candidate Prompt: '{prompt_candidate[:70]}...' -> Avg Score: {avg_score:.4f}")

            best_candidate_prompt_this_iter, best_candidate_score_this_iter = max(iteration_scores, key=lambda x: x[1])

            if best_candidate_score_this_iter > current_best_score:
                current_best_prompt = best_candidate_prompt_this_iter
                current_best_score = best_candidate_score_this_iter
                print(f"nNew best prompt for node '{self.target_node_id}' found in iteration {iteration + 1}: '{current_best_prompt[:70]}...' with score {current_best_score:.4f}")
            else:
                print(f"nNo improvement for node '{self.target_node_id}' in iteration {iteration + 1}. Current best remains: '{current_best_prompt[:70]}...' with score {current_best_score:.4f}")
                # 可以在这里增加提前停止的逻辑

        # 最终将最佳提示词更新回目标节点
        self.target_node.current_prompt = current_best_prompt
        print(f"n--- APE Node Optimization for Node '{self.target_node_id}' Finished ---")
        print(f"Final Best Prompt: '{current_best_prompt}'")
        print(f"Final Best Score: {current_best_score:.4f}")
        return current_best_prompt, current_best_score

4.4 APE 编排器 APEOrchestrator (概念与简化实现)

APEOrchestrator 是整个系统的智能大脑,负责决定何时、何地进行优化。

class APEOrchestrator:
    def __init__(self,
                 workflow_graph: WorkflowGraph,
                 llm_instance: MockLLM,
                 prompt_generator_llm: MockLLM,
                 global_optimization_data: List[Dict[str, Any]]):
        self.workflow_graph = workflow_graph
        self.llm_instance = llm_instance
        self.prompt_generator_llm = prompt_generator_llm
        self.global_optimization_data = global_optimization_data
        self.graph_executor = GraphExecutor(workflow_graph, llm_instance)

    def _get_node_optimization_data(self, node_id: str) -> List[Dict[str, Any]]:
        """
        为特定节点生成优化数据。
        这可能需要从global_optimization_data中提取与该节点相关的ground truth。
        """
        node_data = []
        for example in self.global_optimization_data:
            # 假设global_optimization_data的结构是:
            # {'graph_initial_input': '...', 'node_id_1_ground_truth': '...', 'node_id_2_ground_truth': '...'}
            node_gt_key = f"{node_id}_ground_truth"
            if node_gt_key in example:
                node_data.append({
                    'graph_initial_input': example['graph_initial_input'],
                    'node_ground_truth': example[node_gt_key]
                })
            else:
                print(f"Warning: No ground truth found for node '{node_id}' in some optimization examples.")
        return node_data

    def trigger_continuous_optimization(self, nodes_to_optimize: List[str] = None):
        """
        触发图中的节点进行持续优化。
        nodes_to_optimize: 如果指定,则只优化这些节点;否则优化所有LLMCallNode。
        """
        print("n--- APE Orchestrator: Starting Continuous Optimization Cycle ---")

        if nodes_to_optimize is None:
            nodes_to_optimize = [node.node_id for node in self.workflow_graph.nodes_map.values() if isinstance(node, LLMCallNode)]

        for node_id in nodes_to_optimize:
            node = self.workflow_graph.get_node(node_id)
            if not isinstance(node, LLMCallNode):
                print(f"Skipping optimization for non-LLM node: {node_id}")
                continue

            print(f"nOrchestrator optimizing node: {node_id}")
            node_optimization_data = self._get_node_optimization_data(node_id)
            if not node_optimization_data:
                print(f"No optimization data available for node '{node_id}'. Skipping.")
                continue

            node_optimizer = APENodeOptimizer(
                target_graph=self.workflow_graph,
                target_node_id=node_id,
                llm_instance=self.llm_instance,
                prompt_generator_llm=self.prompt_generator_llm,
                optimization_data=node_optimization_data,
                num_candidate_prompts=3,
                num_iterations=2
            )

            optimized_prompt, final_score = node_optimizer.optimize_node_prompt()
            print(f"Orchestrator finished optimizing node '{node_id}'. Best prompt: '{optimized_prompt[:50]}...', Score: {final_score:.4f}")
            # 这里的node_optimizer已经自动更新了self.workflow_graph中的节点prompt

        print("n--- APE Orchestrator: Optimization Cycle Finished ---")

        # 可以在这里添加图的全局评估逻辑
        overall_performance = self._evaluate_overall_graph_performance()
        print(f"Overall graph performance after optimization: {overall_performance:.4f}")

        # 持久化优化后的提示词
        self._persist_prompts()

    def _evaluate_overall_graph_performance(self) -> float:
        """
        评估整个图的端到端性能。
        这可能需要一个全局的评估函数和相应的ground truth。
        """
        total_graph_score = 0.0
        for example in self.global_optimization_data:
            graph_initial_input = example['graph_initial_input']
            # 假设有一个全局的ground truth,例如最终节点的期望输出
            final_node_id = self.workflow_graph.topological_sort()[-1].node_id
            final_node_ground_truth_key = f"{final_node_id}_ground_truth"

            if final_node_ground_truth_key not in example:
                print(f"Warning: No global ground truth for final node '{final_node_id}' in some examples. Skipping.")
                continue

            self.graph_executor.execute_graph(graph_initial_input)
            final_output = self.graph_executor.get_node_output(final_node_id)

            final_node = self.workflow_graph.get_node(final_node_id)
            if isinstance(final_node, LLMCallNode):
                score = final_node.evaluate(final_output, example[final_node_ground_truth_key])
            else: # 对于非LLM最终节点,使用简单评估
                score = 1.0 if final_output == example[final_node_ground_truth_key] else 0.0

            total_graph_score += score
        return total_graph_score / len(self.global_optimization_data) if self.global_optimization_data else 0.0

    def _persist_prompts(self):
        """
        将优化后的提示词持久化到存储(例如数据库、文件)。
        """
        print("Persisting optimized prompts...")
        for node_id, node in self.workflow_graph.nodes_map.items():
            if isinstance(node, LLMCallNode):
                print(f"  - Node '{node_id}': '{node.current_prompt}'")
                # 实际中会写入数据库或配置文件
        print("Prompts persisted.")

4.5 运行一个图优化示例

现在,我们将这些组件组合起来,运行一个简单的三节点工作流优化示例。

if __name__ == "__main__":
    print("n--- Setting up the Graph Workflow and APE Orchestrator ---")

    # 初始化LLM实例
    main_llm = MockLLM(model_name="main-workflow-llm")
    prompt_gen_llm = MockLLM(model_name="prompt-generator-llm")

    # 定义评估函数(同上)
    def evaluate_output_simple(llm_output: str, ground_truth: str) -> float:
        score = 0.0
        if ground_truth.lower() in llm_output.lower():
            score += 0.7
        if len(llm_output) > 20 and len(llm_output) < 80:
            score += 0.3
        score += random.uniform(-0.1, 0.1) # 引入随机性
        return max(0.0, min(1.0, score))

    def evaluate_sentiment(llm_output: str, ground_truth: str) -> float:
        if ground_truth.lower() in llm_output.lower():
            return 1.0
        return 0.0

    # 创建图节点
    node_extract = LLMCallNode(
        node_id="extract_keywords",
        task_description="Extract key keywords from the text.",
        initial_prompt="List the 3 most important keywords from the following text, separated by commas.",
        evaluation_function=evaluate_output_simple
    )

    node_transform = DataTransformNode(
        node_id="to_uppercase",
        task_description="Convert text to uppercase.",
        transform_func=lambda x: str(x).upper()
    )

    node_summarize = LLMCallNode(
        node_id="summarize_sentiment",
        task_description="Summarize the sentiment of the text.",
        initial_prompt="Analyze the overall sentiment of the text (positive, negative, neutral) and provide a one-sentence summary.",
        evaluation_function=evaluate_sentiment
    )

    # 构建图
    workflow_graph = WorkflowGraph()
    workflow_graph.add_node(node_extract)
    workflow_graph.add_node(node_transform)
    workflow_graph.add_node(node_summarize)

    workflow_graph.add_edge("extract_keywords", "to_uppercase")
    workflow_graph.add_edge("to_uppercase", "summarize_sentiment")

    print(workflow_graph)

    # 准备全局优化数据
    # 每个示例包含:图的初始输入,以及每个可优化节点的ground truth
    global_optimization_data = [
        {
            'graph_initial_input': "The new product launch was a huge success, exceeding all expectations. Customers love its innovative features.",
            'extract_keywords_ground_truth': "product launch, success, innovative features",
            'summarize_sentiment_ground_truth': "positive"
        },
        {
            'graph_initial_input': "Customer service was slow and unhelpful. Many users complained about bugs in the latest update.",
            'extract_keywords_ground_truth': "customer service, bugs, complained",
            'summarize_sentiment_ground_truth': "negative"
        },
        {
            'graph_initial_input': "The report details quarterly earnings. Sales remained stable, with no significant changes from previous periods.",
            'extract_keywords_ground_truth': "quarterly earnings, sales, stable",
            'summarize_sentiment_ground_truth': "neutral"
        }
    ]

    # 运行一次图,看看初始性能
    print("n--- Initial Graph Execution Performance ---")
    initial_executor = GraphExecutor(workflow_graph, main_llm)
    initial_executor.execute_graph(global_optimization_data[0]['graph_initial_input'])

    # 初始化并运行APE编排器
    orchestrator = APEOrchestrator(
        workflow_graph=workflow_graph,
        llm_instance=main_llm,
        prompt_generator_llm=prompt_gen_llm,
        global_optimization_data=global_optimization_data
    )

    orchestrator.trigger_continuous_optimization()

    # 优化后再次运行图,观察变化
    print("n--- Graph Execution Performance AFTER Optimization ---")
    final_executor = GraphExecutor(workflow_graph, main_llm)
    final_executor.execute_graph(global_optimization_data[0]['graph_initial_input'])

    print("nFinal state of prompts:")
    for node_id, node in workflow_graph.nodes_map.items():
        if isinstance(node, LLMCallNode):
            print(f"Node '{node_id}' final prompt: '{node.current_prompt}'")

4.6 复杂性考量与高级主题

  1. 计算成本: APE 涉及大量的 LLM 调用,尤其是在图结构中,每个节点的优化都需要多次图执行(或子图执行),这会显著增加 API 调用成本和时间。
    • 策略: 缓存 LLM 响应、限制迭代次数、并行化优化过程、使用更小的测试集进行快速迭代、使用更便宜的 LLM 作为提示词生成器。
  2. 评估数据质量: 优化效果严重依赖于评估函数的准确性和“黄金标准”数据的质量。在多步骤任务中,为中间节点提供高质量的 ground truth 尤其困难。
    • 策略: 采用基于人类偏好的评估、弱监督学习、启发式评估、或仅依赖端到端指标进行优化(但会增加优化难度)。
  3. 局部最优与全局最优: 单个节点的最优提示词可能不是整个图全局最优的一部分。
    • 策略: 引入图级评估指标,在节点优化后重新评估整个图。考虑“多目标优化”,平衡节点性能和全局性能。
    • 级联优化: 当上游节点优化后,其输出可能发生变化,下游节点可能需要重新优化。可以设计一个链式或依赖图感知的优化调度。
  4. Prompt 版本的管理: 优化后的提示词需要版本控制和回滚机制。
  5. LLM 作为评估器: 在某些情况下,LLM 也可以用于评估其他 LLM 的输出,尤其是在缺乏明确指标的创造性任务中。但这需要精心设计的评估提示词。
  6. 强化学习集成: 可以将整个图的执行视为一个马尔可夫决策过程,提示词的生成和选择可以由强化学习代理来完成,以最大化长期奖励(即图的端到端性能)。
  7. 自适应优化: 不仅仅是持续优化,还可以是“自适应”优化,即系统能智能判断何时启动优化,优化哪个节点,以及使用何种优化策略,从而减少不必要的资源消耗。

结语

Automated Prompt Optimization,特别是 APE,为我们提供了一个强大的工具,将提示词工程从一门艺术提升为一门科学。当我们将 APE 的能力与图结构的工作流相结合,实现对节点指令的持续微调时,我们便能够构建出更具韧性、更自适应、更高性能的智能系统。这不仅仅是效率的提升,更是AI系统迈向真正自主和智能化的关键一步。通过这种方式,我们的AI系统将不再是固定指令的执行者,而能像活的有机体一样,不断学习、进化,以适应不断变化的世界。

这个领域充满挑战,但也蕴藏着巨大的潜力。我期待看到更多创新性的应用和技术突破,共同推动自动化提示词优化在复杂AI系统中的广泛实践。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注