什么是 ‘Sub-graphs’?在大规模项目中如何通过嵌套图实现 Agent 逻辑的模块化拆解?

各位同学,大家好!

今天我们的话题是关于构建复杂系统,特别是大规模AI Agent时的一个核心技术:如何通过 嵌套图(Nested Graphs)子图(Sub-graphs) 的概念,实现Agent逻辑的模块化拆解。在当今AI领域,我们构建的Agent不再是简单的单点决策器,它们往往需要融合多模态感知、复杂推理、工具调用、记忆管理以及人机协作等多种能力。这就导致Agent的内部逻辑变得异常复杂,单一的、扁平化的逻辑流难以维护和扩展。

作为一名编程专家,我将带领大家深入探讨子图的本质,剖析它如何成为解决这一复杂性的利器,并通过大量的代码示例,展示如何在实际项目中构建和运用这种强大的架构模式。


第一章:复杂性挑战与图模型基础

1.1 大规模Agent的复杂度困境

想象一下,一个能够与用户自由对话、理解其意图、查询外部数据库、调用API执行操作,并最终生成自然语言回复的Agent。这个Agent的内部流程可能包括:

  1. 意图识别 (Intent Recognition):用户想做什么?
  2. 槽位填充 (Slot Filling):收集完成意图所需的关键信息。
  3. 知识检索 (Knowledge Retrieval):从内部或外部知识库获取相关信息。
  4. 工具选择 (Tool Selection):决定调用哪个外部工具(如天气API、预订系统)。
  5. 参数构建 (Parameter Generation):为选定的工具生成正确的调用参数。
  6. 工具执行 (Tool Execution):实际调用工具。
  7. 结果解析 (Result Parsing):理解工具返回的结果。
  8. 状态更新 (State Update):根据新的信息更新Agent的内部状态。
  9. 回复生成 (Response Generation):基于所有信息生成用户友好的回复。
  10. 错误处理 (Error Handling):在任何阶段出现问题时进行优雅处理。

如果将所有这些步骤扁平地堆砌在一个巨大的代码块中,或者仅仅通过简单的条件分支来实现,那么代码将变得难以理解、难以测试、难以修改,更不用说在团队协作中的效率问题。这正是我们面临的“复杂度困境”。

1.2 图模型:天生适合流程编排

图(Graph)是一种由节点(Nodes)和边(Edges)组成的数据结构,它天然适合表示各种流程、依赖关系和状态转换。

  • 节点 (Node):代表一个独立的计算单元、一个决策点、一个数据处理步骤或一个状态。
  • 边 (Edge):代表数据流、控制流或依赖关系,连接着不同的节点。通常,我们使用有向图(Directed Graph)来表示流程方向。

为什么图模型适合Agent逻辑?

  • 直观性:流程图是人类理解复杂系统最直观的方式之一。
  • 模块化:每个节点可以是一个独立的、可测试的单元。
  • 可扩展性:通过添加或修改节点和边,可以灵活地改变Agent行为。
  • 可视化:图结构易于渲染,帮助开发者快速理解Agent的整体架构和运行时状态。

让我们从最基本的图结构开始。

import uuid
from collections import deque, defaultdict
from typing import Dict, Any, List, Optional, Callable, Tuple, Type, Union

# --------------------------------------------------------------------------------
# 基础图模型组件
# --------------------------------------------------------------------------------

class Node:
    """
    图中的基本节点。代表一个计算单元或操作。
    每个节点有一个唯一的ID,可以有名称和描述。
    """
    def __init__(self, node_id: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None):
        self.node_id = node_id if node_id else str(uuid.uuid4())
        self.name = name if name else f"Node_{self.node_id[:8]}"
        self.description = description if description else ""
        self._inputs: Dict[str, Any] = {}  # 节点的输入数据
        self._outputs: Dict[str, Any] = {} # 节点的输出数据

    def set_input(self, key: str, value: Any):
        """设置节点的输入数据"""
        self._inputs[key] = value

    def get_input(self, key: str, default: Any = None) -> Any:
        """获取节点的输入数据"""
        return self._inputs.get(key, default)

    def set_output(self, key: str, value: Any):
        """设置节点的输出数据"""
        self._outputs[key] = value

    def get_output(self, key: str, default: Any = None) -> Any:
        """获取节点的输出数据"""
        return self._outputs.get(key, default)

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """
        执行节点逻辑。这是抽象方法,子类必须实现。
        接收一个上下文字典作为输入,返回一个结果字典作为输出。
        """
        raise NotImplementedError("Subclasses must implement the 'execute' method.")

    def __repr__(self):
        return f"Node(id='{self.node_id}', name='{self.name}')"

class Edge:
    """
    图中的边。连接两个节点,表示数据或控制流。
    可以指定源节点的哪个输出字段连接到目标节点的哪个输入字段。
    """
    def __init__(self, source_node_id: str, target_node_id: str,
                 source_output_key: Optional[str] = None, target_input_key: Optional[str] = None):
        self.source_node_id = source_node_id
        self.target_node_id = target_node_id
        self.source_output_key = source_output_key
        self.target_input_key = target_input_key

    def __repr__(self):
        return (f"Edge(from='{self.source_node_id}'"
                f"{f':{self.source_output_key}' if self.source_output_key else ''} "
                f"to='{self.target_node_id}'"
                f"{f':{self.target_input_key}' if self.target_input_key else ''})")

class Graph:
    """
    图的抽象表示,包含节点和边。
    负责管理图的结构和执行流。
    """
    def __init__(self, graph_id: Optional[str] = None, name: Optional[str] = None):
        self.graph_id = graph_id if graph_id else str(uuid.uuid4())
        self.name = name if name else f"Graph_{self.graph_id[:8]}"
        self.nodes: Dict[str, Node] = {}
        self.edges: List[Edge] = []
        self._adj_list: Dict[str, List[Edge]] = defaultdict(list) # 邻接表,用于快速查找后续节点
        self._rev_adj_list: Dict[str, List[Edge]] = defaultdict(list) # 反向邻接表,用于查找前驱节点

    def add_node(self, node: Node):
        """添加一个节点到图中"""
        if node.node_id in self.nodes:
            raise ValueError(f"Node with ID {node.node_id} already exists in graph {self.name}.")
        self.nodes[node.node_id] = node

    def add_edge(self, edge: Edge):
        """添加一条边到图中,并更新邻接表"""
        if edge.source_node_id not in self.nodes or edge.target_node_id not in self.nodes:
            raise ValueError("Source or target node for edge does not exist in the graph.")
        self.edges.append(edge)
        self._adj_list[edge.source_node_id].append(edge)
        self._rev_adj_list[edge.target_node_id].append(edge)

    def get_node(self, node_id: str) -> Optional[Node]:
        """根据ID获取节点"""
        return self.nodes.get(node_id)

    def get_successors(self, node_id: str) -> List[Edge]:
        """获取给定节点的所有出边"""
        return self._adj_list.get(node_id, [])

    def get_predecessors(self, node_id: str) -> List[Edge]:
        """获取给定节点的所有入边"""
        return self._rev_adj_list.get(node_id, [])

    def topological_sort(self) -> List[str]:
        """
        对图进行拓扑排序,返回节点ID的有序列表。
        用于确定节点的执行顺序。
        """
        in_degree = {node_id: 0 for node_id in self.nodes}
        for edge in self.edges:
            in_degree[edge.target_node_id] += 1

        queue = deque([node_id for node_id, degree in in_degree.items() if degree == 0])
        sorted_nodes = []

        while queue:
            node_id = queue.popleft()
            sorted_nodes.append(node_id)

            for edge in self._adj_list[node_id]:
                in_degree[edge.target_node_id] -= 1
                if in_degree[edge.target_node_id] == 0:
                    queue.append(edge.target_node_id)

        if len(sorted_nodes) != len(self.nodes):
            raise ValueError("Graph has a cycle, cannot perform topological sort.")
        return sorted_nodes

    def execute(self, initial_context: Dict[str, Any]) -> Dict[str, Any]:
        """
        执行图中的所有节点,按照拓扑排序顺序。
        管理数据在节点间的传递。
        """
        print(f"n--- Executing Graph: {self.name} ---")
        context = initial_context.copy()
        execution_order = self.topological_sort()
        results: Dict[str, Dict[str, Any]] = {} # 存储每个节点执行的输出

        # 将初始上下文作为输入传递给所有入度为0的节点
        for node_id in execution_order:
            node = self.nodes[node_id]
            if not self._rev_adj_list[node_id]: # 如果是入度为0的节点
                for key, value in context.items():
                    node.set_input(key, value)

            # 从前驱节点收集输入
            for incoming_edge in self.get_predecessors(node_id):
                source_node_output = results.get(incoming_edge.source_node_id, {})

                # 如果指定了具体的输出/输入键
                if incoming_edge.source_output_key and incoming_edge.target_input_key:
                    if incoming_edge.source_output_key in source_node_output:
                        node.set_input(incoming_edge.target_input_key, source_node_output[incoming_edge.source_output_key])
                # 否则,将前驱节点的所有输出作为整体输入
                elif not incoming_edge.source_output_key and not incoming_edge.target_input_key:
                    for k, v in source_node_output.items():
                        node.set_input(k, v)
                else:
                    # 混合模式,例如只指定source_output_key,但没指定target_input_key,这通常是不推荐的,或需要额外的默认逻辑
                    if incoming_edge.source_output_key and incoming_edge.source_output_key in source_node_output:
                        node.set_input(incoming_edge.source_output_key, source_node_output[incoming_edge.source_output_key])

            print(f"  Executing Node: {node.name} (ID: {node.node_id[:8]}) with inputs: {node._inputs}")
            try:
                node_output = node.execute(node._inputs)
                for k, v in node_output.items():
                    node.set_output(k, v) # 将输出存储在节点自身
                results[node_id] = node_output # 存储到结果集中
                print(f"    Node '{node.name}' output: {node_output}")
            except Exception as e:
                print(f"    Error executing node '{node.name}': {e}")
                raise # 抛出异常,或进行更复杂的错误处理

            # 将当前节点的输出更新到全局上下文,以便后续节点使用
            context.update(node_output)

        # 收集最终输出(通常是最后一个或特定输出节点的输出)
        final_output = {}
        for node_id in reversed(execution_order): # 从后往前找,或者定义一个明确的输出节点
            if self.nodes[node_id].get_output("final_result"):
                final_output["final_result"] = self.nodes[node_id].get_output("final_result")
                break
            # 如果没有明确的final_result,可以返回所有节点的最终状态
            final_output.update(results.get(node_id, {}))

        print(f"--- Graph '{self.name}' finished. Final Output: {final_output} ---")
        return final_output

1.3 示例:一个简单的计算图

为了演示,我们先定义一些具体的节点类型:

class AddNode(Node):
    def __init__(self, node_id: Optional[str] = None, name: str = "Add"):
        super().__init__(node_id, name)

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        a = context.get("a", 0)
        b = context.get("b", 0)
        result = a + b
        return {"sum": result}

class MultiplyNode(Node):
    def __init__(self, node_id: Optional[str] = None, name: str = "Multiply"):
        super().__init__(node_id, name)

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        x = context.get("x", 1)
        y = context.get("y", 1)
        result = x * y
        return {"product": result}

class OutputNode(Node):
    def __init__(self, node_id: Optional[str] = None, name: str = "Output"):
        super().__init__(node_id, name)

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        final_result = context.get("value")
        print(f"  OutputNode received final value: {final_result}")
        return {"final_result": final_result}

# 构建一个简单的计算图: (A + B) * C
# 节点
node_a = AddNode(name="Add A+B")
node_b = MultiplyNode(name="Multiply by C")
node_c = OutputNode(name="Final Output")

# 图
calc_graph = Graph(name="Simple Calculation")
calc_graph.add_node(node_a)
calc_graph.add_node(node_b)
calc_graph.add_node(node_c)

# 边:连接节点,并指定数据流
calc_graph.add_edge(Edge(node_a.node_id, node_b.node_id, "sum", "x")) # AddNode的'sum'输出作为MultiplyNode的'x'输入
calc_graph.add_edge(Edge(node_b.node_id, node_c.node_id, "product", "value")) # MultiplyNode的'product'输出作为OutputNode的'value'输入

# 执行图
initial_data = {"a": 5, "b": 3, "y": 2} # y是MultiplyNode的另一个输入
final_result = calc_graph.execute(initial_data)
# 预期输出: (5 + 3) * 2 = 16
print(f"Overall Calculation Result: {final_result}")

在这个例子中,我们看到了图模型如何通过节点和边清晰地表示计算流程。然而,当流程变得更复杂时,即使是扁平的图也会变得难以管理。这就是子图发挥作用的地方。


第二章:子图(Sub-graphs)的引入与核心机制

2.1 什么是子图?

子图(Sub-graph) 是一种特殊的节点。它本身是一个计算单元,但这个计算单元的内部又包含了一个完整的图。你可以将其理解为编程语言中的“函数”或“模块”——它封装了一段复杂的逻辑,对外只暴露简洁的输入和输出接口。

子图的核心思想是层次化抽象

  • 高层图:由表示主要业务流程的节点组成,其中一些节点可能是子图。
  • 子图内部:包含更细粒度的节点和边,实现高层图中某个复杂步骤的具体逻辑。

2.2 子图的结构与端口

为了让子图能够像普通节点一样被连接和执行,它需要定义明确的输入端口(Input Ports)输出端口(Output Ports)

  • 输入端口:定义了子图期望从外部接收哪些数据。这些外部数据会映射到子图内部某个(或多个)起始节点的输入。
  • 输出端口:定义了子图完成其内部逻辑后,会向外部提供哪些数据。这些数据来源于子图内部某个(或多个)终止节点的输出。
# --------------------------------------------------------------------------------
# 子图节点实现
# --------------------------------------------------------------------------------

class SubgraphNode(Node):
    """
    子图节点:一个特殊的节点,其内部包含一个完整的Graph。
    它通过 input_map 和 output_map 将外部输入/输出映射到内部图的节点。
    """
    def __init__(self, inner_graph: Graph, node_id: Optional[str] = None, name: Optional[str] = None):
        super().__init__(node_id, name if name else inner_graph.name)
        self.inner_graph = inner_graph
        # 定义子图的输入映射:
        # key: 子图外部输入名称
        # value: (内部目标节点ID, 内部目标节点输入key)
        self.input_map: Dict[str, Tuple[str, str]] = {}

        # 定义子图的输出映射:
        # key: 子图外部输出名称
        # value: (内部源节点ID, 内部源节点输出key)
        self.output_map: Dict[str, Tuple[str, str]] = {}

    def add_input_mapping(self, external_key: str, internal_node_id: str, internal_input_key: str):
        """添加一个外部输入到内部节点输入的映射"""
        if internal_node_id not in self.inner_graph.nodes:
            raise ValueError(f"Internal node {internal_node_id} not found in inner graph {self.inner_graph.name}")
        self.input_map[external_key] = (internal_node_id, internal_input_key)

    def add_output_mapping(self, external_key: str, internal_node_id: str, internal_output_key: str):
        """添加一个内部节点输出到外部输出的映射"""
        if internal_node_id not in self.inner_graph.nodes:
            raise ValueError(f"Internal node {internal_node_id} not found in inner graph {self.inner_graph.name}")
        self.output_map[external_key] = (internal_node_id, internal_output_key)

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        """
        执行子图节点逻辑:
        1. 根据 input_map 将外部上下文映射到内部图的初始上下文。
        2. 执行内部图。
        3. 根据 output_map 将内部图的最终结果映射到子图的外部输出。
        """
        print(f"n--- Entering Subgraph: {self.name} ---")
        inner_initial_context: Dict[str, Any] = {}

        # 1. 应用输入映射:将外部上下文的值传递给内部图的特定节点输入
        for external_key, (internal_node_id, internal_input_key) in self.input_map.items():
            if external_key in context:
                # 这里我们直接将映射的值放入内部图的初始上下文中
                # 内部图的execute方法会负责将这些初始上下文传递给其入度为0的节点
                inner_initial_context[internal_input_key] = context[external_key]

        # 将未映射的外部上下文也传递给内部图,作为通用上下文
        for k, v in context.items():
            if k not in self.input_map:
                inner_initial_context[k] = v

        # 2. 执行内部图
        inner_results = self.inner_graph.execute(inner_initial_context)

        # 3. 应用输出映射:将内部图的结果映射到子图的外部输出
        subgraph_output: Dict[str, Any] = {}
        for external_key, (internal_node_id, internal_output_key) in self.output_map.items():
            # 尝试从内部图的最终结果中获取
            # 注意:这里假设内部图的execute会返回所有相关节点的最终输出,或者我们知道哪个节点是“出口”
            # 更健壮的做法是让内部图有一个明确的“OutputNode”
            target_node = self.inner_graph.get_node(internal_node_id)
            if target_node:
                value = target_node.get_output(internal_output_key)
                if value is not None:
                    subgraph_output[external_key] = value
            # Fallback: 如果映射的输出键直接存在于inner_results中
            elif internal_output_key in inner_results:
                 subgraph_output[external_key] = inner_results[internal_output_key]

        print(f"--- Exiting Subgraph: {self.name}. Output: {subgraph_output} ---n")
        return subgraph_output

2.3 嵌套图的执行流

当执行一个包含子图的顶层图时:

  1. 顶层图的执行引擎按照拓扑顺序遍历节点。
  2. 当遇到一个 SubgraphNode 时,它会暂停当前顶层图的执行。
  3. SubgraphNodeexecute 方法被调用。
  4. SubgraphNode 根据其 input_map,将当前顶层图的上下文数据传递给其内部图。
  5. 内部图开始执行,其自身的执行引擎按照内部节点的拓扑顺序进行。
  6. 内部图执行完毕后,其结果根据 SubgraphNodeoutput_map 被提取。
  7. SubgraphNodeexecute 方法返回提取的结果。
  8. 顶层图的执行引擎恢复,将 SubgraphNode 的输出作为后续节点的输入继续执行。

这个过程可以无限递归,形成任意深度的嵌套。


第三章:Agent逻辑的模块化拆解实践

现在,让我们将子图的概念应用到我们之前讨论的复杂Agent逻辑中。我们将构建一个简单的“预订机票”Agent,并使用子图来模块化其关键功能。

3.1 定义Agent所需的基础节点

除了前面定义的 OutputNode,我们还需要一些针对Agent的专业节点:

class PromptNode(Node):
    """
    用于生成LLM提示的节点。
    """
    def __init__(self, template: str, node_id: Optional[str] = None, name: str = "Prompt"):
        super().__init__(node_id, name)
        self.template = template

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        try:
            # 使用上下文中的数据填充模板
            prompt = self.template.format(**context)
            print(f"    Generated Prompt: {prompt[:100]}...")
            return {"prompt": prompt}
        except KeyError as e:
            raise ValueError(f"Missing context key for prompt template: {e}. Context: {context}")

class LLMNode(Node):
    """
    调用大型语言模型(LLM)的节点。
    模拟LLM调用,实际项目中会集成OpenAI/Anthropic等API。
    """
    def __init__(self, model_name: str = "mock-llm", node_id: Optional[str] = None, name: str = "LLM Call"):
        super().__init__(node_id, name)
        self.model_name = model_name

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        prompt = context.get("prompt")
        if not prompt:
            raise ValueError("LLMNode requires a 'prompt' in context.")

        print(f"    Calling LLM '{self.model_name}' with prompt: {prompt[:100]}...")

        # 模拟LLM响应逻辑
        response = f"LLM responded to: '{prompt[:50]}...' with some generated text."

        # 更复杂的LLM响应模拟,例如意图识别和槽位提取
        if "预订机票" in prompt or "订票" in prompt:
            response = {
                "intent": "book_flight",
                "departure": "北京",
                "destination": "上海",
                "date": "明天" # 假设从prompt中解析出来
            }
        elif "查询天气" in prompt:
            response = {
                "intent": "query_weather",
                "city": "北京"
            }
        else:
            response = {"intent": "general_chat", "text": "好的,我明白了。"}

        return {"llm_output": response}

class ConditionalNode(Node):
    """
    根据某个条件路由执行流的节点。
    """
    def __init__(self, condition_func: Callable[[Dict[str, Any]], str], 
                 node_id: Optional[str] = None, name: str = "Conditional Router"):
        super().__init__(node_id, name)
        self.condition_func = condition_func # 一个函数,接收context,返回一个字符串作为路由键

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        route_key = self.condition_func(context)
        print(f"    ConditionalNode routed to: {route_key}")
        return {"route": route_key}

class DatabaseQueryNode(Node):
    """
    模拟数据库查询的节点。
    """
    def __init__(self, db_name: str = "mock-db", node_id: Optional[str] = None, name: str = "DB Query"):
        super().__init__(node_id, name)
        self.db_name = db_name
        self.mock_data = {
            "flights": {
                "北京-上海-明天": {"flight_no": "CA1881", "price": 850, "time": "10:00 AM"},
                "北京-广州-今天": {"flight_no": "CZ3101", "price": 1200, "time": "08:30 AM"},
            },
            "weather": {
                "北京": {"condition": "晴朗", "temperature": "25°C"},
                "上海": {"condition": "多云", "temperature": "28°C"},
            }
        }

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        query_type = context.get("query_type")
        query_params = context.get("query_params", {})

        print(f"    Querying DB '{self.db_name}' for '{query_type}' with params: {query_params}")

        if query_type == "flights":
            departure = query_params.get("departure")
            destination = query_params.get("destination")
            date = query_params.get("date")
            key = f"{departure}-{destination}-{date}"
            result = self.mock_data["flights"].get(key, {"error": "No flight found"})
        elif query_type == "weather":
            city = query_params.get("city")
            result = self.mock_data["weather"].get(city, {"error": "Weather data not found"})
        else:
            result = {"error": "Unsupported query type"}

        print(f"    DB Query Result: {result}")
        return {"db_result": result}

class ResponseGenerationNode(Node):
    """
    根据Agent的最终状态生成用户回复的节点。
    """
    def __init__(self, node_id: Optional[str] = None, name: str = "Response Generation"):
        super().__init__(node_id, name)

    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        final_agent_state = context.get("agent_state", {})
        db_result = context.get("db_result", {})

        response_text = "抱歉,我未能理解您的请求。" # 默认回复

        if final_agent_state.get("intent") == "book_flight":
            if "flight_no" in db_result:
                response_text = (f"好的,为您找到航班 {db_result['flight_no']},"
                                 f"从{final_agent_state.get('departure')}飞往{final_agent_state.get('destination')},"
                                 f"时间:{db_result['time']},票价:{db_result['price']}元。")
            else:
                response_text = "抱歉,未能找到符合条件的航班。"
        elif final_agent_state.get("intent") == "query_weather":
            if "condition" in db_result:
                response_text = (f"{final_agent_state.get('city')}今天{db_result['condition']},"
                                 f"气温{db_result['temperature']}。")
            else:
                response_text = "抱歉,未能查询到该城市的天气信息。"
        elif final_agent_state.get("intent") == "general_chat":
            response_text = final_agent_state.get("text", "好的,我明白了。")

        print(f"    Generated Final Response: {response_text}")
        return {"final_response": response_text}

3.2 构建Agent子图

我们将把意图识别、数据库查询和响应生成等复杂逻辑封装成子图。

子图一:意图识别与槽位填充 (IntentRecognitionSubgraph)

这个子图的职责是接收用户输入,通过LLM判断用户意图并提取关键信息(槽位)。

# 构建 IntentRecognitionSubgraph
intent_prompt_node = PromptNode(name="IntentPrompt", 
                                template="用户输入:{user_query}n请判断用户意图并提取相关实体。例如:{{'intent': 'book_flight', 'departure': '北京', 'destination': '上海'}}")
intent_llm_node = LLMNode(name="IntentLLM")
intent_output_node = OutputNode(name="IntentOutput")

intent_recognition_graph = Graph(name="IntentRecognitionGraph")
intent_recognition_graph.add_node(intent_prompt_node)
intent_recognition_graph.add_node(intent_llm_node)
intent_recognition_graph.add_node(intent_output_node)

intent_recognition_graph.add_edge(Edge(intent_prompt_node.node_id, intent_llm_node.node_id, "prompt", "prompt"))
# LLMNode的llm_output可能是一个字典,我们希望将其全部传递给下一个节点
intent_recognition_graph.add_edge(Edge(intent_llm_node.node_id, intent_output_node.node_id, "llm_output", "value"))

intent_recognition_subgraph_node = SubgraphNode(inner_graph=intent_recognition_graph, name="IntentRecognizer")
# 定义输入映射:外部的'user_query'映射到内部'IntentPrompt'节点的'user_query'输入
intent_recognition_subgraph_node.add_input_mapping("user_query", intent_prompt_node.node_id, "user_query")
# 定义输出映射:内部'IntentOutput'节点的'final_result'输出作为子图的'agent_state'输出
intent_recognition_subgraph_node.add_output_mapping("agent_state", intent_output_node.node_id, "final_result")

子图二:数据库查询 (DatabaseQuerySubgraph)

这个子图的职责是根据意图和槽位信息,构建并执行数据库查询。

# 构建 DatabaseQuerySubgraph
db_query_node = DatabaseQueryNode(name="PerformDBQuery")
db_output_node = OutputNode(name="DBResultOutput")

database_query_graph = Graph(name="DatabaseQueryGraph")
database_query_graph.add_node(db_query_node)
database_query_graph.add_node(db_output_node)

# 假设DBQueryNode可以直接从其输入获取query_type和query_params
# 因此,子图的输入直接映射到DBQueryNode的输入
database_query_graph.add_edge(Edge(db_query_node.node_id, db_output_node.node_id, "db_result", "value"))

database_query_subgraph_node = SubgraphNode(inner_graph=database_query_graph, name="DBQueryExecutor")
# 定义输入映射:外部的'query_type'和'query_params'映射到内部'PerformDBQuery'节点的同名输入
database_query_subgraph_node.add_input_mapping("query_type", db_query_node.node_id, "query_type")
database_query_subgraph_node.add_input_mapping("query_params", db_query_node.node_id, "query_params")
# 定义输出映射:内部'DBResultOutput'节点的'final_result'输出作为子图的'db_result'输出
database_query_subgraph_node.add_output_mapping("db_result", db_output_node.node_id, "final_result")

3.3 组装主Agent图

现在,我们将这些子图和普通节点组合成一个完整的Agent主图。

# --------------------------------------------------------------------------------
# 组装主Agent图
# --------------------------------------------------------------------------------

# 主图中的其他节点
start_node = Node(name="Start Agent") # 只是一个占位符,表示流程开始
response_gen_node = ResponseGenerationNode(name="FinalResponseGenerator")
final_agent_output_node = OutputNode(name="FinalAgentOutput")

# 定义路由函数,根据LLM的意图决定下一个分支
def route_by_intent(context: Dict[str, Any]) -> str:
    agent_state = context.get("agent_state", {})
    intent = agent_state.get("intent", "general_chat")
    print(f"    Routing based on intent: {intent}")
    return intent

intent_router_node = ConditionalNode(name="IntentRouter", condition_func=route_by_intent)

# 主Agent图
main_agent_graph = Graph(name="MainAgentGraph")
main_agent_graph.add_node(start_node)
main_agent_graph.add_node(intent_recognition_subgraph_node) # 加入意图识别子图节点
main_agent_graph.add_node(intent_router_node)
main_agent_graph.add_node(database_query_subgraph_node)   # 加入数据库查询子图节点
main_agent_graph.add_node(response_gen_node)
main_agent_graph.add_node(final_agent_output_node)

# 连接主图中的节点和子图节点
main_agent_graph.add_edge(Edge(start_node.node_id, intent_recognition_subgraph_node.node_id, "user_query", "user_query")) # 初始输入
main_agent_graph.add_edge(Edge(intent_recognition_subgraph_node.node_id, intent_router_node.node_id, "agent_state", "agent_state"))

# 意图路由分支
# 如果是 'book_flight' 意图,则准备参数并进入数据库查询子图
main_agent_graph.add_edge(Edge(intent_router_node.node_id, database_query_subgraph_node.node_id, "route", "query_type")) # 路由结果作为查询类型
main_agent_graph.add_edge(Edge(intent_recognition_subgraph_node.node_id, database_query_subgraph_node.node_id, "agent_state", "query_params")) # 从意图识别子图的输出中提取查询参数

# 将数据库查询的结果和agent_state都传递给回复生成节点
main_agent_graph.add_edge(Edge(database_query_subgraph_node.node_id, response_gen_node.node_id, "db_result", "db_result"))
main_agent_graph.add_edge(Edge(intent_recognition_subgraph_node.node_id, response_gen_node.node_id, "agent_state", "agent_state"))

# 最终回复传递给输出节点
main_agent_graph.add_edge(Edge(response_gen_node.node_id, final_agent_output_node.node_id, "final_response", "value"))

# --------------------------------------------------------------------------------
# 运行Agent
# --------------------------------------------------------------------------------

print("--- Running Agent with a flight booking query ---")
user_input_1 = {"user_query": "我想订一张从北京到上海的机票,明天出发。"}
agent_result_1 = main_agent_graph.execute(user_input_1)
print(f"Agent Final Response (Query 1): {agent_result_1.get('final_result')}n")

print("--- Running Agent with a weather query ---")
user_input_2 = {"user_query": "北京今天天气怎么样?"}
agent_result_2 = main_agent_graph.execute(user_input_2)
print(f"Agent Final Response (Query 2): {agent_result_2.get('final_result')}n")

print("--- Running Agent with a general chat query ---")
user_input_3 = {"user_query": "你好,能帮我做点什么?"}
agent_result_3 = main_agent_graph.execute(user_input_3)
print(f"Agent Final Response (Query 3): {agent_result_3.get('final_result')}n")

通过这个例子,我们可以清晰地看到:

  • 模块化:意图识别、数据库查询等复杂逻辑被封装在独立的子图中,每个子图都可以独立开发、测试和维护。
  • 可读性:主图只关注高层业务流程,其节点可以是抽象的子图,使得整体架构一目了然。
  • 可重用性IntentRecognitionSubgraphDatabaseQuerySubgraph 可以在其他Agent或业务流程中复用,而无需复制代码。
  • 灵活性:如果我们需要更换意图识别模型,只需修改 IntentRecognitionSubgraph 内部的 LLMNode,而不需要触碰主图的其他部分。

第四章:高级主题与考量

4.1 动态图生成与修改

在某些高级Agent场景中,Agent本身可能需要根据运行时的状态或用户指令来动态构建或修改其执行图。例如:

  • RAG (Retrieval Augmented Generation) Agent:根据检索结果动态插入或选择不同的回复生成路径。
  • 规划Agent:LLM生成一个行动计划,这个计划可以被解析成一系列串联或并行的节点和子图。

实现动态图需要图结构支持在运行时添加/删除节点和边,并重新进行拓扑排序。这通常涉及到更复杂的图管理和验证逻辑。

# 动态图的简化示例:根据LLM的意图动态添加DBQueryExecutor或直接Response
class DynamicAgentGraph(Graph):
    def __init__(self, name: str = "DynamicAgent"):
        super().__init__(name)
        self.nodes = {} # 清空父类的节点,我们将动态添加

    def build_dynamic_path(self, user_intent: Dict[str, Any]):
        # 每次构建前清空现有动态部分
        self.nodes = {}
        self.edges = []
        self._adj_list = defaultdict(list)
        self._rev_adj_list = defaultdict(list)

        # 核心节点
        start = Node(name="Dynamic Start")
        intent_recognizer = SubgraphNode(inner_graph=intent_recognition_graph, name="IntentRecognizer") # 复用之前的子图
        response_generator = ResponseGenerationNode(name="Dynamic Response Generator")
        final_output = OutputNode(name="Dynamic Final Output")

        self.add_node(start)
        self.add_node(intent_recognizer)
        self.add_node(response_generator)
        self.add_node(final_output)

        self.add_edge(Edge(start.node_id, intent_recognizer.node_id, "user_query", "user_query"))
        self.add_edge(Edge(intent_recognizer.node_id, response_generator.node_id, "agent_state", "agent_state"))
        self.add_edge(Edge(response_generator.node_id, final_output.node_id, "final_response", "value"))

        # 根据用户意图动态插入DB查询子图
        if user_intent.get("intent") in ["book_flight", "query_weather"]:
            db_query_executor = SubgraphNode(inner_graph=database_query_graph, name="DBQueryExecutor")
            self.add_node(db_query_executor)

            # 重新连接意图识别到DB查询,DB查询再到回复生成
            # 移除旧的连接
            self.edges = [e for e in self.edges if not (e.source_node_id == intent_recognizer.node_id and e.target_node_id == response_generator.node_id)]
            self._adj_list = defaultdict(list) # 重新构建邻接表
            self._rev_adj_list = defaultdict(list)
            for e in self.edges: # 重新填充
                self._adj_list[e.source_node_id].append(e)
                self._rev_adj_list[e.target_node_id].append(e)

            # 添加新的连接
            self.add_edge(Edge(intent_recognizer.node_id, db_query_executor.node_id, "agent_state", "query_params"))
            self.add_edge(Edge(intent_recognizer.node_id, db_query_executor.node_id, "agent_state.intent", "query_type")) # 假设能提取intent作为query_type

            self.add_edge(Edge(db_query_executor.node_id, response_generator.node_id, "db_result", "db_result"))

# --- 运行动态Agent示例 ---
# dynamic_agent = DynamicAgentGraph()
# # 模拟LLM识别出意图
# mock_intent_1 = {"intent": "book_flight", "departure": "北京", "destination": "上海", "date": "明天"}
# dynamic_agent.build_dynamic_path(mock_intent_1)
# print("n--- Running Dynamic Agent with flight booking path ---")
# dynamic_agent.execute({"user_query": "我想订一张从北京到上海的机票,明天出发。"})

# mock_intent_2 = {"intent": "general_chat"}
# dynamic_agent.build_dynamic_path(mock_intent_2)
# print("n--- Running Dynamic Agent with general chat path ---")
# dynamic_agent.execute({"user_query": "你好,天气真好啊。"})

(由于 SubgraphNode 已经将 agent_state 整个传递,"agent_state.intent" 这种键需要更复杂的解析,或者在 SubgraphNodeadd_input_mapping 中支持更灵活的表达式。为了简化,我们暂时保留主图中的 ConditionalNode 来做路由,动态图的例子仅展示了修改图结构的可能性。)

4.2 错误处理与日志

在复杂系统中,错误无处不在。图模型提供了一个结构化的方式来处理错误:

  • 节点级别的错误处理:每个节点 execute 方法内部捕获并处理自己的异常。
  • 边级别的错误处理:可以定义特殊类型的边,例如“失败边”(on-failure edge),当源节点执行失败时,控制流会沿着这条边转移到错误处理节点。
  • 子图级别的错误处理:子图内部的任何错误都可以在子图的 execute 方法中被捕获,并作为子图的特定输出传递给外部,或者直接触发外部的错误处理路径。
  • 全局错误处理:顶层图可以有一个全局的错误处理节点,当任何节点抛出未处理的异常时,流程会跳转到该节点。

完善的日志系统对于理解Agent在图中的执行路径至关重要。每个节点执行前后、数据传递、错误发生时都应记录详细日志。

4.3 状态管理与上下文传递

在Agent的执行过程中,状态(如对话历史、用户偏好、查询结果等)是持续变化的。

  • 局部状态:每个节点可以在其 execute 方法中维护自己的临时状态。
  • 共享上下文Graph.execute 方法中的 context 字典是节点间传递数据的核心机制。它应该包含Agent的全局可访问状态。
  • 记忆模块:对于长期运行的Agent,可能需要专门的记忆模块(如向量数据库、Redis),节点可以访问这些模块来存储和检索信息,而不仅仅依赖于瞬时上下文。

4.4 可视化与调试工具

对于复杂的嵌套图,可视化工具是必不可少的。它可以将图结构渲染出来,帮助开发者理解Agent的整体流程。在运行时,可视化工具还可以高亮当前正在执行的节点、显示数据流、标记错误节点等,极大地提高调试效率。

4.5 持久化与版本控制

Agent的图定义(节点类型、节点实例、边、子图结构)应该能够被序列化(如JSON或YAML格式)存储到文件中,以便:

  • 持久化:Agent可以在重启后恢复其逻辑。
  • 版本控制:图定义可以像代码一样被版本控制,方便回溯和协作。
  • 动态加载:在运行时加载不同的Agent配置。

第五章:优势与挑战

5.1 嵌套图的优势

特性 描述
模块化 将复杂逻辑分解为独立、可管理的子单元,每个子图专注于特定功能。
抽象性 顶层图专注于高层业务逻辑,子图隐藏实现细节,提高可读性和理解性。
可重用性 通用功能子图可以在不同Agent或项目中复用,减少重复开发。
可维护性 局部修改仅影响相关子图,降低整体系统的维护成本和风险。
可测试性 每个子图和节点都可以作为独立单元进行单元测试,提高测试覆盖率和质量。
可视化 图结构天然适合可视化,便于开发者和业务人员理解Agent的行为和数据流。
扩展性 易于添加新功能(新节点、新子图)或修改现有流程,而无需大规模重构。
并行性 理论上,无依赖关系的节点或子图可以在多线程/多进程中并行执行,提高效率(需要额外的调度器支持)。
错误隔离 错误可以被限制在特定的节点或子图内,防止故障扩散,便于定位和恢复。

5.2 挑战与注意事项

挑战 描述 应对策略
过度设计 简单的Agent可能不需要复杂的嵌套图,过度使用会增加不必要的复杂性。 从简单开始,当复杂度上升时再引入子图。遵循“按需抽象”原则。
性能开销 图遍历、上下文传递、节点实例化等操作可能引入运行时开销,尤其是在高频、低延迟场景。 优化图执行引擎,例如缓存拓扑排序结果、减少不必要的数据复制、异步执行。对于性能敏感的节点,可以进行优化或使用更底层的实现。
调试复杂 嵌套层级过深时,追踪数据流和控制流可能变得困难,尤其是在错误发生时。 强大的日志系统(记录节点输入/输出、子图进入/退出)、可视化调试工具、提供断点和单步执行功能。确保错误信息包含足够的上下文。
状态管理 确定哪些数据应该作为全局上下文,哪些作为节点私有状态,以及如何高效、安全地在节点和子图之间传递和更新状态,可能是一个挑战。 明确定义输入/输出端口和数据契约。使用不变性数据结构(Immutability)减少副作用。引入统一的状态管理服务或内存数据库来管理长期状态。
循环依赖 图中出现循环依赖会导致拓扑排序失败,无法执行。 严格的图结构验证。在设计时避免循环。对于需要迭代的场景,应通过特殊的“循环节点”或外部迭代器来管理,而不是直接在图结构中形成循环。
版本控制 图结构和节点代码同时演进,如何有效地版本控制和回溯可能需要专门的工具和流程。 将图定义(如JSON/YAML)与节点实现代码(Python文件)分开管理。使用Git等工具进行版本控制。考虑图迁移工具来处理图结构的变化。
工具集成 将外部工具(如LLM API、数据库、其他微服务)集成到节点中,需要处理API调用、认证、限流、重试等问题。 封装工具调用逻辑到专门的“工具节点”。利用现有的库或框架(如LangChain、LlamaIndex的工具抽象)来简化集成。
抽象泄漏 有时子图的内部实现细节会通过复杂的输入/输出映射泄漏到外部,导致子图的封装性降低。 仔细设计子图的接口,确保其输入/输出是高层次、有意义的。避免直接暴露内部节点的数据结构。

总结与展望

通过本讲座,我们深入探讨了子图和嵌套图在构建大规模AI Agent中的应用。我们从图模型的基础概念出发,逐步构建了一个能够支持复杂Agent逻辑的执行框架,并通过具体的代码示例展示了如何将意图识别、数据库查询等功能模块化为可复用的子图。

子图和嵌套图为我们提供了一种强大的架构模式,用以应对现代AI系统日益增长的复杂性。它提升了代码的模块化、可读性、可维护性和可扩展性,使得构建、理解和调试复杂的Agent逻辑成为可能。虽然它带来了一些新的挑战,但通过合理的设计、严格的工程实践以及适当的工具辅助,这些挑战是完全可以克服的。

未来,随着AI技术的发展,Agent将变得更加智能和自主。嵌套图模型将继续演进,集成更多动态规划、自适应学习和多模态交互的能力。掌握这一范式,将使我们能够更好地驾驭AI的未来,构建出更强大、更灵活、更智能的AI系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注