各位同仁,各位对复杂系统调试和可观测性充满热情的工程师们,大家好。
今天,我们将深入探讨一个在处理现代复杂软件架构,特别是基于大语言模型(LLM)的应用中变得越来越关键的概念——“图扁平化”(Graph Flattening)。我们将重点关注如何将一个极其复杂的嵌套图扁平化,以显著提高像 LangSmith 这样的追踪工具的可读性。
在构建复杂的 LLM 代理、链和工具集成时,我们经常会发现我们的系统行为追踪变得异常庞大和深邃。LangSmith 作为 LangChain 生态系统中的核心可观测性工具,为我们提供了无与伦比的洞察力,但当一个请求触发了数千个内部运行(Runs),并且这些运行层层嵌套时,即使是 LangSmith 的优秀 UI 也可能显得力不从心。这时,图扁平化就成为了我们理解、调试和优化这些复杂系统的强大武器。
一、理解嵌套图的复杂性及其挑战
在深入图扁平化之前,我们首先需要理解我们正在处理的问题。
1.1 什么是图?
在计算机科学中,图(Graph)是一种抽象数据类型,用于表示对象之间的关系。它由一组节点(Nodes,也称为顶点 Vertices)和连接这些节点的边(Edges)组成。
- 节点(Nodes/Vertices):代表实体,如一个函数调用、一个 LLM 请求、一个数据处理步骤、一个用户输入。
- 边(Edges):代表实体之间的关系,如调用关系、数据流、时间顺序。
在 LangSmith 的语境中,每个 Run 对象(无论是 llm、chain、tool、agent 还是其他类型)都可以被视为一个节点。而 parent_run_id 属性则定义了节点之间的父子关系,形成了有向边。
1.2 什么是嵌套图?
嵌套图(Nested Graph),或者更广义地说,层次图(Hierarchical Graph),是指图中的节点本身可以是另一个图,或者图的结构天然地具有多层级的父子关系。
在 LLM 应用中,嵌套图是常态:
- 一个
Agent(节点A)可能会调用一个Tool(节点B)。 - 这个
Tool(节点B)在执行过程中又可能会调用一个LLM(节点C)进行推理。 - 这个
LLM(节点C)的调用可能又会涉及到Retrieval(节点D)和Parsing(节点E)。 - 甚至
Retrieval(节点D)内部可能又是一个Chain,调用了另一个LLM来重写查询。
这就形成了一个深度嵌套的结构:Agent -> Tool -> LLM -> (Retrieval, Parsing)。在 LangSmith 中,这表现为一系列具有 parent_run_id 链接的 Run 对象,形成了一个深层的树状或有向无环图(DAG)。
1.3 嵌套图带来的挑战
这种深度嵌套的结构,虽然能够忠实地记录系统的每一个细枝末节,但也带来了巨大的可观测性挑战:
- 认知负荷过重:在一个包含数百甚至数千个 Run 的追踪中,工程师很难一眼识别出关键路径、性能瓶颈或错误根源。
- 关键信息淹没:重要的业务逻辑和决策点可能被大量细粒度的底层操作(如每个 token 的生成、每个向量数据库的查询)所淹没。
- 可视化困难:直接渲染一个深层嵌套的图会非常混乱,难以布局和理解。
- 分析复杂性:对原始的嵌套图进行自动化分析(如查找模式、性能热点)需要复杂的遍历和聚合逻辑。
为了解决这些问题,我们需要一种机制来“简化”这个图,将其转换为一个更易于理解和分析的形式——这就是“图扁平化”的核心任务。
二、’Graph Flattening’ 的核心概念与目标
图扁平化,顾名思义,就是将一个具有层次结构或嵌套关系的图,转换为一个更“扁平”的图结构。这里的“扁平”可以有不同的解释,但通常意味着:
- 减少节点数量:将多个相关的、连续的或在逻辑上属于同一高级操作的节点合并为一个新的节点。
- 简化边结构:在节点合并后,重新连接剩余的节点,使得图的连接关系更直接、更清晰。
- 抽象层次提升:将低层次的细节隐藏起来,呈现更高层次的逻辑流。
2.1 扁平化的目标
- 提高可读性:这是最直接的目标。一个扁平化的图能够让工程师更快地理解系统的宏观行为。
- 突出关键路径:通过合并次要节点,使关键的决策点、主要的数据流或性能敏感区域更加突出。
- 便于自动化分析:扁平化后的图通常具有更少的节点和边,使得基于图的算法(如路径查找、社区发现)能够更高效地运行。
- 支持多粒度视图:通过不同的扁平化策略,可以为用户提供不同抽象层次的视图,满足不同场景的需求。
2.2 扁平化的主要策略维度
为了达到上述目标,我们通常会从以下几个维度来设计扁平化策略:
2.2.1 节点合并 (Node Merging)
这是扁平化的核心。它涉及识别哪些节点可以被组合成一个逻辑单元。
- 基于类型合并:例如,将所有属于同一
chain的llm运行合并到chain节点中。 - 基于语义合并:例如,将一个
agent的“思考”(LLM调用)和“工具调用”(tool调用)合并为一个“代理步骤”节点。 - 基于连续性合并:将一系列连续执行的、逻辑相关的简单操作合并为一个复合操作。
- 基于深度/阈值合并:将超出特定深度的所有子节点合并到其父节点中。
2.2.2 边重定向/简化 (Edge Redirection/Simplification)
当节点被合并后,原始的边连接可能会失效。我们需要重新建立合并后的节点之间的关系。
- 继承边:如果节点 A 包含节点 B,节点 B 包含节点 C,并且 A-B-C 是原始路径。当 B 和 C 合并到 A 中时,如果 C 原本有一个到节点 D 的边,那么 A 应该继承这个到 D 的边。
- 过滤冗余边:合并后,一些内部边可能变得多余,需要被移除。
2.2.3 属性聚合 (Attribute Aggregation)
当多个原始节点合并为一个扁平节点时,它们的属性(如输入、输出、时间戳、错误信息、元数据)需要被有意义地聚合。
- 时间戳:合并节点的
start_time通常是其所有原始节点中最早的start_time,end_time是最晚的end_time。 - 输入/输出:可以保留合并前第一个节点的输入和最后一个节点的输出,或者将所有节点的输入/输出进行摘要。
- 错误信息:如果任何内部节点发生错误,合并节点应该标记为错误,并聚合错误信息。
- 元数据:可以继承父节点的元数据,或者合并所有子节点的关键元数据。
2.2.4 层次信息保留 (Hierarchy Preservation)
虽然目标是扁平化,但我们通常不希望完全丢失原始的层次信息。理想的扁平化方案应该允许在需要时“钻取”(drill down)到合并节点的内部细节。这可以通过在扁平节点中存储其包含的原始节点 ID 或一个指向原始子图的引用来实现。
2.2.5 上下文提取 (Context Extraction)
对于像 LLM 的输入/输出这种文本内容,仅仅聚合可能不够。我们需要提取其关键上下文,甚至使用 LLM 本身来总结这些内容,以提高可读性。
三、为什么 LangSmith 需要图扁平化?
LangSmith 是一个强大的开发人员平台,用于调试、测试、评估和监控基于 LLM 的应用程序。它通过记录应用程序中每个组件的执行(即 Run 对象)来提供端到端的可见性。
3.1 LangSmith 的追踪机制
LangSmith 的核心是 Run 对象。每个 Run 都有一个唯一的 id、run_type(如 llm, chain, agent, tool, retriever, prompt, parser)、name、start_time、end_time、inputs、outputs、error 和一个关键的 parent_run_id。这个 parent_run_id 构成了 LangSmith 追踪的层次结构。
例如,一个简单的 Agent 执行可能看起来像这样:
Run 1 (Agent)
|-- Run 2 (Tool: search_tool)
| |-- Run 3 (LLM: OpenAI)
| |-- Run 4 (Parser: OutputParser)
|-- Run 5 (LLM: OpenAI) // Agent's final decision
3.2 复杂 LLM 应用的特点
现代 LLM 应用,特别是那些采用 ReAct、Tree-of-Thought 等高级代理模式的应用,往往会产生极其复杂的 LangSmith 追踪:
- 深度嵌套:一个 Agent 可能连续调用多个工具,每个工具内部又是一个复杂的 Chain,其中包含多次 LLM 调用、检索、解析等。这种嵌套可以达到数十层。
- 大量细粒度步骤:例如,一个流式 LLM 的每次 token 生成,或一个检索器查询的每个子步骤,都可能被记录为一个独立的 Run。
- 循环和条件逻辑:代理在达到目标之前可能会进行多次“思考-行动-观察”循环,导致追踪中出现大量重复的模式。
- 异步并发:多个工具或链可能并行执行,使得追踪图的分支更多,结构更复杂。
3.3 可读性挑战与扁平化的解决之道
当一个 LangSmith 追踪包含成百上千个 Run,并且嵌套层级很深时:
- 用户很难在 LangSmith UI 中快速定位到关键的业务逻辑或性能瓶颈。
- 滚动浏览长长的追踪列表效率低下。
- 理解数据流和控制流变得非常困难。
图扁平化正是为了解决这些挑战而生。它能够将一个庞大的、细粒度的 LangSmith 追踪转化为一个更高级、更易于消化和分析的视图。通过扁平化,我们可以:
- 获得高层概览:快速理解一个 Agent 的主要决策步骤和工具调用序列,而无需深入每个工具的内部实现细节。
- 聚焦关键交互:例如,只显示 Agent 与外部工具和最终 LLM 之间的交互,隐藏所有中间的内部处理。
- 突出性能热点:通过聚合时间数据,扁平化视图能更清晰地展示哪些高级操作耗时最长。
- 简化调试流程:当发现一个高级操作出错时,再“钻取”到其内部查看详细的原始 Run,从而实现从宏观到微观的渐进式调试。
四、扁平化策略与技术:从理论到实践
现在,我们来探讨具体的扁平化策略和实现技术。
4.1 获取 LangSmith 追踪数据
扁平化的第一步是获取原始的 LangSmith 追踪数据。我们可以使用 LangSmith SDK 来完成。
import os
from langsmith import Client
from typing import List, Dict, Any, Optional
import networkx as nx
import matplotlib.pyplot as plt
from datetime import datetime
# 确保设置了 LANGSMITH_API_KEY 和 LANGSMITH_PROJECT
# os.environ["LANGSMITH_API_KEY"] = "YOUR_LANGSMITH_API_KEY"
# os.environ["LANGSMITH_PROJECT"] = "YOUR_LANGSMITH_PROJECT"
client = Client()
def get_run_tree(run_id: str) -> Dict[str, Any]:
"""
递归地获取指定 run_id 及其所有子 run 的详细信息,构建一个树形结构。
"""
main_run = client.read_run(run_id)
run_dict = main_run.dict()
# 递归获取子 run
child_runs = client.list_runs(
project_name=main_run.project_name,
parent_run_id=run_id,
# 可以根据需要添加其他过滤条件,例如 run_type
# run_type=["llm", "chain", "agent", "tool", "retriever", "prompt", "parser"]
)
run_dict["child_runs"] = sorted([get_run_tree(child.id) for child in child_runs],
key=lambda x: x.get("start_time", datetime.min).timestamp())
return run_dict
def build_raw_graph(run_tree: Dict[str, Any]) -> nx.DiGraph:
"""
从 LangSmith run 树形结构构建原始的 NetworkX 有向图。
"""
graph = nx.DiGraph()
def add_nodes_and_edges(node: Dict[str, Any]):
graph.add_node(node["id"], **{k: v for k, v in node.items() if k != "child_runs"})
for child in node.get("child_runs", []):
graph.add_edge(node["id"], child["id"])
add_nodes_and_edges(child)
add_nodes_and_edges(run_tree)
return graph
# 示例:获取一个特定的 run 的追踪
# try:
# example_run_id = "YOUR_EXAMPLE_RUN_ID" # 替换为你的 LangSmith Run ID
# raw_run_tree = get_run_tree(example_run_id)
# raw_graph = build_raw_graph(raw_run_tree)
# print(f"原始图节点数: {raw_graph.number_of_nodes()}, 边数: {raw_graph.number_of_edges()}")
# except Exception as e:
# print(f"获取 LangSmith 数据失败: {e}")
# raw_run_tree = {} # Fallback for demonstration
# raw_graph = nx.DiGraph()
4.2 扁平化数据模型设计
在扁平化过程中,我们需要将多个原始 Run 对象的信息聚合到一个新的扁平节点中。这个新的节点需要包含足够的信息来表示其所代表的聚合操作。
表格:原始 Run 属性 vs. 扁平化节点属性
| 属性名称 | 原始 Run 属性 | 扁平化 Node 属性 | 聚合策略 |
|——————|——————————————————–|—————————————————————-|—————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————- LangZhen 的世界中,时间就是金钱,信息就是生命。对于我们这些软件架构师和开发者而言,理解复杂系统的行为模式、识别性能瓶颈以及追踪错误根源是至关重要的。LangSmith 为我们提供了强大的追踪能力,但当面对极其复杂的、深度嵌套的 LLM 代理或链时,原始的 LangSmith 追踪视图可能会变得过于庞大和细致,反而淹没了我们真正需要关注的关键信息。
今天,我将向大家介绍一种强大的技术——图扁平化(Graph Flattening),以及如何将其应用于 LangSmith 追踪,以显著提高其可读性和分析效率。
1. 问题的根源:复杂LLM应用的嵌套图
1.1 LangSmith追踪的本质:一个有向无环图(DAG)
首先,让我们回顾一下 LangSmith 追踪的结构。每次 LangChain 组件执行时,LangSmith 都会记录一个 Run 对象。这些 Run 对象之间通过 parent_run_id 字段建立父子关系,从而形成一个有向无环图(DAG)。
- 节点(Node):每个
Run对象都是图中的一个节点。它包含id,name,run_type(llm,chain,agent,tool,retriever,prompt,parser等),start_time,end_time,inputs,outputs,error等属性。 - 边(Edge):从父
Run到子Run的有向连接。例如,一个AgentRun 调用了一个ToolRun,那么就有一条从Agent节点到Tool节点的边。
1.2 复杂LLM应用带来的挑战
考虑一个智能体(Agent)的场景,它需要根据用户查询:
- 思考:调用一个
LLM来决定下一步行动。 - 规划:决定使用哪个
Tool。 - 执行:调用
Tool。Tool内部可能又是一个Retrieval链,它会调用另一个LLM来重写查询,然后查询向量数据库。Retrieval结果可能需要Parsing。
- 观察:Agent 接收
Tool的输出。 - 循环:重复上述过程,直到获得最终答案。
这个过程在 LangSmith 中会生成一个深层嵌套的追踪,可能包含:
- 多层级的
chain和agent:Agent 调用 Chain,Chain 调用其他 Chain。 - 大量的
llm调用:每个思考步骤、每个工具内部的推理、每次查询重写都可能是一个llmRun。 - 细粒度的
tool,retriever,parser,prompt等:这些都是构成高级逻辑的基石。
当这些 Run 数量达到数百甚至数千个时,原始的 LangSmith UI 视图会变得非常密集和难以理解。我们看到的是一个庞大的树状结构,其中充满了我们可能暂时不需要关注的细节,而真正关键的决策点和信息流则被淹没其中。
2. 图扁平化:核心概念与目标
图扁平化的核心思想是降低图的复杂性,提高其信息密度和可读性。它通过聚合、抽象和简化图的结构,将一个深层、细粒度的图转换为一个更浅层、更宏观的图。
2.1 扁平化的目标
- 提供高层次概览:快速理解系统的主体流程和关键步骤。
- 减少视觉混乱:消除大量冗余或次要的节点和边,使图更易于可视化。
- 突出关键信息:将注意力引导至重要的业务逻辑、决策点或性能瓶颈。
- 支持多粒度分析:允许用户在不同抽象层次之间切换,按需深入细节。
- 简化自动化处理:扁平化后的图更小,可以更快地进行模式识别、性能分析等自动化任务。
2.2 扁平化的主要策略
图扁平化并非一蹴而就,它通常涉及以下几种策略的组合:
2.2.1 节点合并(Node Merging)
这是扁平化的核心。识别一组逻辑相关的原始节点,将它们合并成一个单一的扁平节点。
- 基于类型和语义:
- 将一个
agent的所有内部llm思考和tool调用合并成一个“Agent Step”。 - 将一个
chain的所有内部llm,retriever,parser调用合并成一个“Chain Execution”。 - 将一个
tool的所有内部操作合并成一个“Tool Call”。
- 将一个
- 基于连续性:如果一系列连续的
Run都是相同类型且逻辑紧密,可以合并。例如,多次连续的llm调用(如流式生成中的多个 chunk)可以合并为一个LLM Generation。 - 基于深度阈值:将任何超过特定深度的节点及其子孙节点,递归地合并到其最近的符合深度的祖先节点中。
2.2.2 边重定向与简化(Edge Redirection & Simplification)
当节点合并发生时,原始的边需要被重新评估和重定向。
- 如果节点 A 包含节点 B,节点 B 包含节点 C。当 B 和 C 被合并到 A 中时,任何从 C 指向外部节点 D 的边,现在都应该从 A 指向 D。
- 所有合并节点内部的边都被视为内部细节而移除。
2.2.3 属性聚合(Attribute Aggregation)
合并节点必须能够代表其所有原始节点的关键信息。
id:生成一个新的唯一id,或使用主导节点的id。name:根据合并规则生成一个描述性名称(例如:"Agent Step: Search & Summarize")。run_type:定义新的扁平化类型,如flat_agent_step,flat_chain_execution。start_time/end_time:取所有原始节点中最早的start_time和最晚的end_time。inputs/outputs:可以保留主导节点的输入/输出,或将所有内部节点的输入/输出进行摘要,特别是对于文本内容。error:如果任何内部节点有错误,则扁平节点标记为错误,并聚合错误信息。original_run_ids:记录所有被合并的原始Run ID,以便将来“钻取”查看细节。
2.2.4 上下文提取与摘要(Context Extraction & Summarization)
对于 inputs 和 outputs 等可能包含大量文本的属性,直接聚合可能仍然过于冗长。这时,我们可以利用 LLM 本身的能力,对这些文本内容进行摘要,提取关键上下文。
3. 实现 LangSmith 追踪的扁平化:一个分步指南
我们将通过一个具体的 Python 代码示例,演示如何实现 LangSmith 追踪的扁平化。这里我们将重点关注基于类型和语义的节点合并。
3.1 核心思路:自底向上或自顶向下
对于 LangSmith 的树状追踪结构,我们可以采用两种主要的遍历策略来扁平化:
- 自底向上(Bottom-Up):从叶子节点开始,逐步向上合并到父节点。这种方法在处理属性聚合时可能更直观,因为子节点的信息已经完全。
- 自顶向下(Top-Down):从根节点开始,递归地处理子节点,并在满足条件时进行合并。这种方法对于定义合并规则和控制深度更方便。
这里我们将采用一种混合自顶向下的策略,即在遍历过程中,根据当前节点及其子节点的信息,决定是否进行合并。
3.2 定义扁平化规则
为了演示,我们定义以下扁平化规则:
- 规则 1:将
tool内部的所有llm、retriever、parser、prompt合并到tool节点中。 这样,一个tool调用将作为一个整体呈现,其内部细节被抽象。 - 规则 2:将
chain内部的连续llm、retriever、parser、prompt合并到chain节点中。 同样,一个chain的执行也作为一个整体。 - 规则 3:将
agent内部的连续llm(思考)、tool(行动)序列合并为单个AgentStep节点。 这将提供一个更高级别的代理决策流程视图。 - 规则 4:任何没有父节点的
llmRun 保持独立。 (例如,一个独立的 LLM 调用)。
3.3 扁平化算法实现
我们将构建一个 flatten_langsmith_trace 函数,它接收一个原始的 LangSmith run_tree(通过 get_run_tree 获取),并返回一个扁平化的 NetworkX 图。
import os
from langsmith import Client
from typing import List, Dict, Any, Optional, Tuple
import networkx as nx
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import json
# 假设 client 和 get_run_tree, build_raw_graph 已经定义并可用
# Helper function to safely parse datetime strings
def parse_datetime(dt_str: Optional[str]) -> Optional[datetime]:
if dt_str:
try:
return datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
except ValueError:
return None
return None
class FlatNode:
"""
表示扁平化图中的一个节点。
"""
def __init__(self, id: str, name: str, flat_type: str, original_run_ids: List[str],
start_time: Optional[datetime], end_time: Optional[datetime],
inputs: Dict[str, Any], outputs: Dict[str, Any], error: Optional[str],
metadata: Dict[str, Any], tags: List[str]):
self.id = id
self.name = name
self.flat_type = flat_type
self.original_run_ids = original_run_ids
self.start_time = start_time
self.end_time = end_time
self.inputs = inputs
self.outputs = outputs
self.error = error
self.metadata = metadata
self.tags = tags
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"flat_type": self.flat_type,
"original_run_ids": self.original_run_ids,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration": (self.end_time - self.start_time).total_seconds() if self.start_time and self.end_time else 0,
"inputs": self.inputs,
"outputs": self.outputs,
"error": self.error,
"metadata": self.metadata,
"tags": self.tags
}
def _aggregate_run_attributes(runs: List[Dict[str, Any]]) -> Tuple[Optional[datetime], Optional[datetime], Dict[str, Any], Dict[str, Any], Optional[str], Dict[str, Any], List[str]]:
"""
聚合一组 LangSmith Run 的属性。
"""
if not runs:
return None, None, {}, {}, None, {}, []
# Initialize with values from the first run or safe defaults
agg_start_time = parse_datetime(runs[0].get("start_time"))
agg_end_time = parse_datetime(runs[0].get("end_time"))
# Inputs: Take the input of the first relevant run (e.g., chain/agent/tool)
# Output: Take the output of the last relevant run
agg_inputs = runs[0].get("inputs", {})
agg_outputs = runs[-1].get("outputs", {})
agg_error = None
agg_metadata = {}
agg_tags = []
for run in runs:
current_start = parse_datetime(run.get("start_time"))
current_end = parse_datetime(run.get("end_time"))
if current_start and (agg_start_time is None or current_start < agg_start_time):
agg_start_time = current_start
if current_end and (agg_end_time is None or current_end > agg_end_time):
agg_end_time = current_end
if run.get("error"):
if agg_error is None:
agg_error = run["error"]
else:
# Concatenate errors if multiple, or just keep the first/last
agg_error += f"; {run['error']}"
# Merge metadata (simple shallow merge for demonstration)
agg_metadata.update(run.get("extra", {}).get("metadata", {}))
agg_metadata.update(run.get("metadata", {})) # LangSmith 1.x moved metadata directly
# Aggregate tags
agg_tags.extend(run.get("tags", []))
return agg_start_time, agg_end_time, agg_inputs, agg_outputs, agg_error, agg_metadata, list(set(agg_tags))
def flatten_langsmith_trace(run_tree: Dict[str, Any]) -> nx.DiGraph:
"""
将 LangSmith 追踪的树形结构扁平化为一个 NetworkX 有向图。
"""
flat_graph = nx.DiGraph()
# 存储扁平化后的节点,以便后续连接边
flat_nodes_map: Dict[str, FlatNode] = {} # original_run_id -> FlatNode
def process_node(current_run: Dict[str, Any], parent_flat_node_id: Optional[str] = None) -> Optional[str]:
"""
递归处理 LangSmith run 节点,应用扁平化规则。
返回当前 run 最终对应的扁平节点的 ID。
"""
current_run_id = current_run["id"]
current_run_type = current_run["run_type"]
current_run_name = current_run["name"]
# Rule 1 & 2: Merge internal llm/retriever/parser/prompt into tool/chain
# Rule 3: Merge agent's thinking (llm) and action (tool) into an AgentStep
# Strategy: For a parent node (e.g., chain, agent, tool),
# we decide if its children should be absorbed or become independent flat nodes.
# Collect children that should be "absorbed" into the current node
absorbed_children_runs = []
independent_children_runs = []
# A list to hold all runs that will form the current flat node
runs_to_merge_into_current_flat_node = [current_run]
# Decide if current_run itself should be an independent flat node or absorbed by its parent
# For now, let's assume top-level 'chain', 'agent', 'tool' will be independent flat nodes
# And their direct internal 'llm', 'retriever', 'parser', 'prompt' will be absorbed.
# Special handling for agent_step:
# An agent step typically involves an LLM "thought" followed by a "tool" action,
# and potentially observation. We want to group these.
if current_run_type == "agent":
# For an agent, we want to group its thought (LLM) and tool calls.
# This is more complex and might require looking ahead/post-processing.
# For simplicity here, let's make agent a flat node, and its direct children too.
# A more sophisticated approach would iterate through agent's children to form 'AgentStep's.
# For this example, we'll keep it simpler: agent is a flat node, its tool calls are flat nodes.
pass # No specific absorption rule for agent's direct children here for simplicity.
elif current_run_type in ["chain", "tool"]:
# For chain/tool, absorb direct 'llm', 'retriever', 'parser', 'prompt' children
for child in current_run.get("child_runs", []):
if child["run_type"] in ["llm", "retriever", "parser", "prompt"]:
absorbed_children_runs.append(child)
runs_to_merge_into_current_flat_node.append(child) # Add to current flat node
else:
independent_children_runs.append(child)
else:
# For llm, retriever, parser, prompt not under chain/tool, they become independent flat nodes
# Or if they are top-level.
independent_children_runs = current_run.get("child_runs", [])
# Create the current flat node
original_ids = [r["id"] for r in runs_to_merge_into_current_flat_node]
agg_start, agg_end, agg_inputs, agg_outputs, agg_error, agg_metadata, agg_tags =
_aggregate_run_attributes(runs_to_merge_into_current_flat_node)
# Define flat_type based on the primary run or a derived type
flat_node_id = f"flat_{current_run_id}"
if current_run_type == "agent":
flat_node_type = "Agent"
elif current_run_type == "tool":
flat_node_type = "ToolCall"
elif current_run_type == "chain":
flat_node_type = "ChainExecution"
elif current_run_type == "llm":
flat_node_type = "LLMCall"
else:
flat_node_type = current_run_type.capitalize()
flat_node_name = current_run_name # Use the primary run's name
flat_node = FlatNode(
id=flat_node_id,
name=flat_node_name,
flat_type=flat_node_type,
original_run_ids=original_ids,
start_time=agg_start,
end_time=agg_end,
inputs=agg_inputs,
outputs=agg_outputs,
error=agg_error,
metadata=agg_metadata,
tags=agg_tags
)
flat_graph.add_node(flat_node.id, **flat_node.to_dict())
flat_nodes_map[current_run_id] = flat_node # Map original ID to the new flat node
for absorbed_run in absorbed_children_runs:
flat_nodes_map[absorbed_run["id"]] = flat_node # Map absorbed children to the same flat node
# Establish edge to parent if exists
if parent_flat_node_id:
# Ensure no self-loops if parent_flat_node_id is the same as current_flat_node_id
if parent_flat_node_id != flat_node.id:
flat_graph.add_edge(parent_flat_node_id, flat_node.id)
# Recursively process independent children
last_child_flat_node_id = flat_node.id # For sequential connections
for child_run in independent_children_runs:
child_flat_node_id = process_node(child_run, flat_node.id)
if child_flat_node_id:
# If we want to enforce sequential flow between sibling flat nodes, add an edge.
# For now, parent-child edges are sufficient.
pass
return flat_node.id
# Start processing from the root of the run tree
process_node(run_tree)
# Post-processing for AgentStep:
# This part is more complex and would typically involve a second pass or a more sophisticated
# state machine during the first pass. For now, the "Agent" flat node contains its direct children
# as independent nodes. To truly create an "AgentStep" (thought + tool + observation),
# we would need to identify sequences of (LLM, ToolCall, LLM/Observation) under an Agent.
# This is left as an advanced exercise but can be approached by:
# 1. Collect all direct children of an 'Agent' flat node.
# 2. Iterate through them, grouping consecutive 'LLMCall' and 'ToolCall' nodes.
# 3. Create new 'AgentStep' flat nodes from these groups.
# 4. Remove the original 'LLMCall'/'ToolCall' nodes and replace them with 'AgentStep's.
# This would require modifying the graph after the initial pass.
return flat_graph
# --- 示例用法 (需要真实的 LangSmith Run ID) ---
# run_id = "YOUR_SPECIFIC_LANGSMITH_RUN_ID"
# try:
# raw_trace_tree = get_run_tree(run_id)
# flattened_graph = flatten_langsmith_trace(raw_trace_tree)
# print(f"扁平化图节点数: {flattened_graph.number_of_nodes()}, 边数: {flattened_graph.number_of_edges()}")
# # 简单的可视化
# plt.figure(figsize=(12, 8))
# pos = nx.spring_layout(flattened_graph, k=0.8, iterations=50) # 可以尝试不同的布局
# node_labels = {node_id: data['name'] for node_id, data in flattened_graph.nodes(data=True)}
# node_colors = [
# 'skyblue' if data['flat_type'] == 'Agent' else
# 'lightcoral' if data['flat_type'] == 'ToolCall' else
# 'lightgreen' if data['flat_type'] == 'ChainExecution' else
# 'lightgoldenrodyellow' if data['flat_type'] == 'LLMCall' else
# 'lightgray'
# for node_id, data in flattened_graph.nodes(data=True)
# ]
# nx.draw(flattened_graph, pos, labels=node_labels, with_labels=True, node_size=3000,
# node_color=node_colors, font_size=8, font_weight='bold', arrowsize=20)
# plt.title(f"Flattened LangSmith Trace for Run ID: {run_id}")
# plt.show()
# # 打印扁平化后的节点信息
# # for node_id, data in flattened_graph.nodes(data=True):
# # print(f"Node ID: {node_id}, Name: {data['name']}, Type: {data['flat_type']}, Duration: {data['duration']:.2f}s")
# # print(f" Original Runs: {data['original_run_ids']}")
# # if data['error']:
# # print(f" Error: {data['error']}")
# # print("-" * 20)
# except Exception as e:
# print(f"扁平化追踪失败: {e}")
代码解释:
FlatNode类:定义了扁平化后的节点结构,包含了聚合后的关键属性。_aggregate_run_attributes函数:负责将一组原始Run对象的start_time,end_time,inputs,outputs,error,metadata,tags进行聚合。这里采取了最简单的时间戳取极值、输入取第一个、输出取最后一个、错误信息合并的策略。在实际应用中,inputs和outputs的聚合可能需要更复杂的逻辑,比如使用 LLM 摘要。flatten_langsmith_trace函数:- 通过递归函数
process_node遍历原始的run_tree。 - 核心逻辑在
process_node中:- 根据
current_run_type判断其子节点是否应该被“吸收”到当前扁平节点中。例如,chain和tool会吸收其直接的llm,retriever,parser,prompt子节点。 runs_to_merge_into_current_flat_node列表收集所有将构成当前扁平节点的原始Run。- 调用
_aggregate_run_attributes聚合属性,创建FlatNode实例,并将其添加到flat_graph中。 flat_nodes_map用于记录原始Run ID到其对应的扁平FlatNode的映射,这对于后续建立正确的边至关重要。- 递归调用
process_node处理那些不被吸收的“独立”子节点,并建立从当前扁平节点到子扁平节点的边。
- 根据
- AgentStep 的高级处理:在示例中,为了简化,Agent 的直接子节点(如
ToolCall)仍然是独立的扁平节点。要实现真正的AgentStep(Thought-Action-Observation 组合),需要更复杂的模式识别逻辑,可能需要对已经创建的扁平图进行第二次遍历和重构。
- 通过递归函数
3.4 示例:AgentStep 的高级扁平化
我们上面实现的 flatten_langsmith_trace 只是一个基础版本。为了更好地处理 Agent 的“思考-行动-观察”循环,我们可以引入一个更高级的 AgentStep 概念。这通常需要在一个初步扁平化之后进行二次处理。
高级 AgentStep 扁平化的伪代码思路:
def post_process_agent_steps(flat_graph: nx.DiGraph) -> nx.DiGraph:
"""
后处理扁平图,将 Agent 的连续 LLM Calls (thought) 和 Tool Calls (action) 合并为 AgentStep。
"""
new_flat_graph = nx.DiGraph()
agent_nodes = [node_id for node_id, data in flat_graph.nodes(data=True) if data.get('flat_type') == 'Agent']
for agent_node_id in agent_nodes:
agent_children_ids = list(flat_graph.successors(agent_node_id))
# 按照时间顺序排序 Agent 的子节点
sorted_children = sorted(
[flat_graph.nodes[cid] for cid in agent_children_ids],
key=lambda x: parse_datetime(x.get('start_time', datetime.min.isoformat()))
)
current_step_runs = []
step_counter = 1
# Iterate through sorted children to form AgentSteps
for i, child_data in enumerate(sorted_children):
if child_data['flat_type'] == 'LLMCall' and (not current_step_runs or current_step_runs[-1]['flat_type'] == 'AgentStep'):
# Start a new AgentStep with an LLMCall (thought)
current_step_runs = [child_data]
elif child_data['flat_type'] == 'ToolCall' and current_step_runs and current_step_runs[-1]['flat_type'] == 'LLMCall':
# Add a ToolCall (action) to the current AgentStep
current_step_runs.append(child_data)
elif child_data['flat_type'] == 'LLMCall' and current_step_runs and current_step_runs[-1]['flat_type'] == 'ToolCall':
# Add another LLMCall (observation/decision) to the current AgentStep
current_step_runs.append(child_data)
else:
# If current_step_runs is not empty, finalize the previous AgentStep
if current_step_runs:
# Create and add the AgentStep node
step_id = f"{agent_node_id}_step_{step_counter}"
agg_start, agg_end, agg_inputs, agg_outputs, agg_error, agg_metadata, agg_tags =
_aggregate_run_attributes([r for r in current_step_runs]) # Need to convert FlatNode to dict for _aggregate
agent_step_node = FlatNode(
id=step_id,
name=f"Agent Step {step_counter}: {'->'.join([r['name'] for r in current_step_runs])}",
flat_type="AgentStep",
original_run_ids=[orig_id for r in current_step_runs for orig_id in r['original_run_ids']],
start_time=agg_start, end_time=agg_end,
inputs=current_step_runs[0]['inputs'], # Take first thought's input
outputs=current_step_runs[-1]['outputs'], # Take last action/observation's output
error=agg_error, metadata=agg_metadata, tags=agg_tags
)
new_flat_graph.add_node(agent_step_node.id, **agent_step_node.to_dict())
new_flat_graph.add_edge(agent_node_id, agent_step_node.id)
step_counter += 1
current_step_runs = [] # Reset for next step
# Add the current child_data as an independent node if it doesn't fit the step pattern
# Or start a new step with it if it's an LLMCall
# This part needs careful logic to avoid losing nodes.
# For simplicity here, if not part of a step, add it directly (or handle it in the main loop).
pass # This is where the complexity lies. For a full implementation, you'd add this child directly
# if it doesn't fit a step, or start a new step if it's an LLMCall.
# Handle any remaining runs in current_step_runs after the loop
if current_step_runs:
step_id = f"{agent_node_id}_step_{step_counter}"
agg_start, agg_end, agg_inputs, agg_outputs, agg_error, agg_metadata, agg_tags =
_aggregate_run_attributes([r for r in current_step_runs])
agent_step_node = FlatNode(
id=step_id,
name=f"Agent Step {step_counter}: {'->'.join([r['name'] for r in current_step_runs])}",
flat_type="AgentStep",
original_run_ids=[orig_id for r in current_step_runs for orig_id in r['original_run_ids']],
start_time=agg_start, end_time=agg_end,
inputs=current_step_runs[0]['inputs'],
outputs=current_step_runs[-1]['outputs'],
error=agg_error, metadata=agg_metadata, tags=agg_tags
)
new_flat_graph.add_node(agent_step_node.id, **agent_step_node.to_dict())
new_flat_graph.add_edge(agent_node_id, agent_step_node.id)
# Copy all non-agent related nodes and edges, and the Agent nodes themselves
for node_id, data in flat_graph.nodes(data=True):
if node_id not in new_flat_graph.nodes:
new_flat_graph.add_node(node_id, **data)
for u, v in flat_graph.edges:
if u not in agent_nodes or v not in agent_children_ids: # Don't copy edges absorbed into AgentStep
new_flat_graph.add_edge(u, v)
return new_flat_graph
# 注意:上面的 post_process_agent_steps 只是一个概念性的伪代码,
# 实际实现需要更严谨地处理所有节点和边的复制、删除以及新节点的创建,
# 确保所有原始节点都被正确地映射到新的扁平图中,且没有遗漏或重复。
# 并且 _aggregate_run_attributes 需要能够处理 FlatNode 的 to_dict() 结果。
这种后处理方法允许我们首先进行通用的扁平化,然后再针对特定模式(如 AgentStep)进行更精细的结构重组。
3.5 上下文敏感的摘要(使用LLM)
对于扁平化节点的 inputs 和 outputs 属性,如果内容过于庞大,我们可以引入一个小的 LLM 模型来对其进行摘要。
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI # 假设使用 OpenAI
def summarize_text_with_llm(text: str, max_length: int = 200) -> str:
"""
使用 LLM 摘要文本,如果文本过长。
"""
if not text or len(text) <= max_length:
return text
# 确保 API KEY 已设置
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
prompt = (
f"请将以下文本内容进行简洁的总结,确保抓住核心要点,总结结果的长度尽量不超过 {max_length} 个字符:nn"
f"文本内容:{text}"
)
try:
response = llm.invoke([HumanMessage(content=prompt)])
return response.content
except Exception as e:
print(f"LLM 摘要失败: {e}")
return text[:max_length] + "..." # 失败时截断
# 在 _aggregate_run_attributes 中使用
# agg_inputs = summarize_text_with_llm(str(runs[0].get("inputs", {})))
# agg_outputs = summarize_text_with_llm(str(runs[-1].get("outputs", {})))
将这个摘要功能集成到 _aggregate_run_attributes 中,可以进一步提高扁平化节点的信息密度,使其 inputs 和 outputs 更易于阅读。
4. 高级扁平化考量与挑战
4.1 动态扁平化与钻取(Drill Down)
理想的扁平化方案应该支持动态粒度调整。用户在 LangSmith UI 中看到扁平化的高层视图,当他们对某个扁平节点感兴趣时,可以点击该节点,“钻取”到其内部,查看构成该扁平节点的原始 LangSmith Runs 及其更详细的嵌套结构。
实现这一点需要:
- 在扁平化节点中保留其
original_run_ids列表。 - UI 层面需要支持点击扁平节点后,根据
original_run_ids重新加载并展示子图。
4.2 性能优化
对于拥有数万甚至数十万个 Run 的超大型追踪,扁平化过程本身也可能成为性能瓶颈。
- 增量更新:如果追踪是实时生成的,考虑增量地扁平化新添加的 Runs。
- 缓存:缓存已扁平化的子图,避免重复计算。
- 并行处理:如果图结构允许,可以并行处理独立子图的扁平化。
4.3 语义一致性与可解释性
扁平化的最大挑战之一是在简化和信息丢失之间取得平衡。过度扁平化可能导致关键细节的丢失,使得扁平图难以解释或无法用于精确定位问题。
- 明确的扁平化规则:规则需要清晰、一致,并与业务逻辑紧密相关。
- 用户可配置性:允许用户自定义扁平化规则或选择不同的预设扁平化策略。
- 提供原始信息入口:确保任何扁平化节点都可以追溯到其原始的 LangSmith Runs。
4.4 循环图的处理
虽然 LangSmith 追踪通常是 DAG(有向无环图),但在某些高级代理模式或错误场景下,理论上可能出现循环(例如,如果一个工具错误地调用了导致自身被再次调用的逻辑)。标准图扁平化算法通常假定 DAG。处理循环需要额外的逻辑来识别和打破循环,或者将其表示为特殊的扁平节点。
4.5 与 LangSmith UI 的集成
最终目标是将扁平化视图无缝集成到 LangSmith 的用户界面中。这可能需要 LangSmith 自身提供 API 或扩展点来支持自定义的可视化和视图切换。一个理想的场景是,用户可以在原始的详细视图和多个扁平化视图之间轻松切换。
5. 展望与总结
图扁平化是应对现代复杂 LLM 应用可观测性挑战的强大技术。通过将 LangSmith 追踪从细粒度的、深度嵌套的视图转换为高层次、易于理解的概览,我们能够显著提升调试效率、加快问题定位,并更好地理解系统行为。
我们探讨了扁平化的核心概念、多种策略以及一个基于 Python 和 NetworkX 的实现示例。从基于类型和语义的节点合并,到利用 LLM 进行上下文摘要,再到高级的 AgentStep 抽象,这些技术为我们提供了一个工具箱,以应对不断增长的复杂性。虽然实现一个健壮、灵活且性能优越的扁平化系统需要细致的工程工作,但其带来的价值——提升的洞察力、更快的迭代速度——是无可估量的。未来,我们期待 LangSmith 等平台能够原生集成更智能、更动态的扁平化能力,让开发者能够以最适合其需求的方式来理解和掌控他们的 LLM 应用。
谢谢大家!