各位同仁,各位技术爱好者:
今天,我们聚焦一个前沿且极具实践价值的主题:“深入 ‘Knowledge Graph-as-a-State’:将企业内部的关系图谱直接映射为 LangGraph 的动态全局状态”。这是一个将企业核心知识资产——关系图谱——与最新AI编排框架LangGraph深度融合的范式。它超越了传统“图谱检索即上下文”的模式,将图谱本身变为AI代理可感知、可操作、可演进的动态状态,从而赋能更智能、更具洞察力的企业级AI应用。
一、企业知识管理的挑战与AI的期望
在当今复杂多变的企业环境中,知识是核心资产。然而,这些知识往往以碎片化的形式存在于各种系统、文档和人际关系中。我们面临的挑战包括:
- 知识孤岛: 信息分散在CRM、ERP、项目管理、HR系统、代码库等,难以整合。
- 隐性知识: 许多关键信息存在于员工的经验和非正式沟通中,难以显性化。
- 动态变化: 企业内部的实体(员工、项目、产品)和它们之间的关系(从属、合作、依赖)在不断演变。
- AI应用的需求: 现有的LLM虽然强大,但缺乏结构化、实时更新的企业级知识作为其推理的基础,往往需要通过复杂的RAG(Retrieval Augmented Generation)或工具调用来弥补。
我们期望AI能够不仅仅是简单地回答问题,而是能够:
- 理解复杂关系: 识别“谁负责哪个项目?”、“这个项目的关键依赖是什么?”、“如果这个人离职,对哪些项目有影响?”
- 进行多步推理: 从多个事实和关系中推导出新结论。
- 执行动态操作: 根据推理结果,建议或执行对知识图谱的更新(如分配新任务、更新项目状态)。
- 保持状态和上下文: 在长时间的交互中,记住之前的信息和操作,并根据知识图谱的当前状态进行调整。
这正是“Knowledge Graph-as-a-State”范式所要解决的核心问题。
二、核心组件解析:企业关系图谱与LangGraph
为了深入理解这个范式,我们首先需要对两个关键组件有清晰的认识。
2.1 企业内部关系图谱 (Enterprise Internal Relationship Graph, EIRG)
EIRG是企业内部特定领域知识的结构化表示,它以图的形式存储数据,由节点(Entities)和边(Relationships)构成,通常还伴随属性(Properties)。
2.1.1 EIRG的构成要素:
- 节点 (Nodes): 代表企业中的实体,如:
- 人员: 员工、经理、部门负责人、客户。
- 项目: 研发项目、市场活动、内部计划。
- 部门/团队: 研发部、市场部、财务部、产品团队。
- 资源: 服务器、数据库、软件模块、文件、会议室。
- 业务流程: 审批流程、采购流程。
- 政策/规范: 安全策略、合规要求。
- 边 (Edges): 代表节点之间的关系,如:
- 组织关系:
WORKS_FOR(员工->部门),MANAGES(经理->部门/员工),REPORTS_TO(员工->经理)。 - 项目关系:
WORKS_ON(员工->项目),DEPENDS_ON(项目->项目, 任务->任务),OWNS(团队->项目)。 - 资源关系:
USES(项目->资源, 模块->模块),IS_PART_OF(任务->项目, 模块->系统)。 - 业务关系:
FOLLOWS(流程->政策),RELATED_TO(文档->项目)。
- 组织关系:
- 属性 (Properties): 附着在节点或边上的键值对,提供更多细节信息,如:
- 节点属性:
employee_id,project_status,department_head,resource_location,policy_version。 - 边属性:
start_date,end_date,role(在WORKS_ON边上表示员工在项目中的角色),criticality(在DEPENDS_ON边上表示依赖的紧急程度)。
- 节点属性:
2.1.2 EIRG的存储与管理:
通常,EIRG会存储在专门的图数据库(Graph Database)中,例如:
- Neo4j: 业界最流行的原生图数据库,Cypher查询语言强大。
- ArangoDB: 多模型数据库,支持图、文档、键值存储。
- Amazon Neptune: AWS提供的托管图数据库服务。
- Dgraph: 分布式图数据库,使用GraphQL。
在某些情况下,也可以通过关系型数据库(如PostgreSQL)模拟图结构,但其查询效率和表达能力通常不及原生图数据库。
2.1.3 EIRG的价值:
- 统一视图: 将分散的知识汇聚成一个连贯的整体。
- 关系发现: 轻松发现实体间的直接和间接联系。
- 复杂查询: 支持多跳(multi-hop)查询,揭示深层模式。
- 语义丰富: 结合了本体论(Ontology)和语义网技术,使得机器能够更好地理解知识的含义。
2.2 LangGraph 核心概念
LangGraph是LangChain家族的一个重要成员,旨在帮助开发者构建健壮、有状态、多角色的LLM应用程序。它的核心思想是将AI应用设计为一个有向图,其中每个节点代表一个操作(可以是LLM调用、工具使用、自定义函数),边定义了这些操作之间的转换逻辑。
2.2.1 LangGraph的关键要素:
- 图 (Graph): LangGraph的核心。它是一个由节点和边组成的网络,描述了AI代理的决策流和执行路径。
- 状态 (State): 这是LangGraph最强大的特性之一。它是一个在整个图谱执行过程中共享和修改的全局对象。所有节点都通过读取和写入这个状态来进行通信和协作。状态通常是一个
TypedDict,确保类型安全和结构清晰。 - 节点 (Nodes): 图中的基本处理单元。每个节点都是一个Python函数或可调用对象,它接收当前状态作为输入,执行特定逻辑(如调用LLM、执行工具、进行计算),然后返回一个更新状态的字典。
- 边 (Edges): 定义了节点之间的转换。
- 普通边: 从一个节点无条件地转换到另一个节点。
- 条件边 (Conditional Edges): 根据前一个节点的输出或当前状态的某个属性,动态决定下一个要执行的节点。这通过一个“路由器”函数实现,该函数接收前一个节点的输出并返回下一个节点的名字。
- AgentExecutor: LangGraph的运行时引擎,负责初始化状态、按照图的定义执行节点、处理边转换,并管理状态的持久化(通过Checkpoints)。
- Checkpoints: 允许LangGraph保存和恢复执行状态,实现长时间运行的会话或从中断处恢复。
2.2.3 LangGraph的工作流:
- 定义状态: 使用
TypedDict定义共享的全局状态。 - 定义节点: 为每个操作编写一个Python函数,接收状态并返回状态更新。
- 构建图: 使用
StateGraphAPI添加节点和边。 - 编译图: 使用
compile()方法将图转换为可执行的AgentExecutor。 - 运行执行器: 调用
executor.invoke()方法,传入初始状态或用户输入。
LangGraph的这种状态机架构,天然适合于将复杂、动态的企业关系图谱映射为其核心状态,从而实现高度智能化的AI代理。
三、"Knowledge Graph-as-a-State" 范式:核心思想与优势
现在,我们来深入探讨“Knowledge Graph-as-a-State”的核心理念。
3.1 核心思想:图谱作为可操作的动态上下文
传统上,当AI系统需要企业知识图谱时,通常是以下模式:
- 用户查询 -> LLM理解意图 -> 工具/RAG调用 -> 查询知识图谱 -> 获取结果 -> LLM生成答案。
- 在这个过程中,知识图谱扮演的角色更像是一个静态的、只读的外部数据库。AI代理每次需要知识时,都通过查询接口获取一次快照,然后基于这个快照进行推理。
“Knowledge Graph-as-a-State”则打破了这种模式。它的核心思想是:
- 将知识图谱(或其相关的子图)直接嵌入到LangGraph的全局状态中。
- AI代理的各个节点可以直接访问、操作和修改这个图谱状态。
- 图谱不再是外部的、被动的知识源,而是AI代理内在的、可演进的思维模型。
这就像是给AI代理一个动态的、可编辑的“白板”,上面绘制着企业内部的各种实体和关系。AI代理可以在这个白板上进行推理、添加新信息、修改现有关系,并将这些修改作为其后续推理的基础。
3.2 为什么这种范式如此强大?
-
动态上下文与推理:
- 上下文的深度融合: 图谱不再仅仅提供事实片段,而是提供了一个丰富的、相互关联的语义网络作为上下文。
- 多跳推理的内化: LLM可以与图谱操作工具(如NetworkX函数)协作,在图谱状态上进行复杂的路径查找、连通性分析等,而非仅仅依赖LLM自身的幻觉或有限的上下文窗口。
- 状态演进驱动推理: 随着用户交互或代理执行的推进,图谱状态会不断更新。AI代理的后续推理将基于这个最新的图谱状态,实现真正的状态感知。
-
可操作性与行动:
- 图谱的生命周期管理: 代理不再仅限于查询,它可以建议或执行对图谱状态的修改(如“添加新员工”、“更新项目状态”、“建立新的依赖关系”)。
- 行动的直接反馈: 代理执行的动作(如更新任务分配)可以直接反映在图谱状态上,并立即影响后续的推理和决策。
- 代理的“记忆”与“学习”: 通过修改图谱状态,代理实际上在“记住”和“学习”新的企业事实和关系,提升其长期交互的智能性。
-
透明度与可解释性:
- 决策路径可视化: LangGraph的执行路径结合图谱状态的变化,可以清晰地展示AI代理是如何一步步理解问题、提取信息、进行推理并得出结论的。
- 知识来源可追溯: 任何从图谱状态中提取或添加到图谱状态的信息,其来源和上下文都可以在图谱中追溯。
-
与LangGraph的完美契合:
- LangGraph的
State机制天然支持复杂数据结构的传递和修改。 - 节点可以封装图谱操作逻辑(查询、遍历、修改),而条件边可以根据图谱状态的特点(如“是否找到所需实体”、“是否存在冲突关系”)来路由执行流。
- Checkpoints机制可以持久化图谱的动态状态,使得会话可以在不同时间点恢复。
- LangGraph的
3.3 挑战与考量:
当然,这种强大范式也伴随着一些挑战:
- 图谱表示: 如何在Python对象中高效、准确地表示图谱(如使用NetworkX)。
- 子图管理: 针对大型EIRG,不可能将整个图谱加载到内存。需要智能地提取与当前任务相关的“子图”到LangGraph状态中。
- 同步与持久化: LangGraph状态中的图谱修改,如何有效地同步回底层的持久化图数据库。
- 性能开销: 频繁的图谱操作(尤其是涉及LLM与图谱工具的协作)可能带来性能开销。
- LLM的图谱理解能力: LLM在生成图谱查询或解释图谱结构时,其准确性和鲁棒性仍需考量。
克服这些挑战是实现“Knowledge Graph-as-a-State”成功的关键。
四、架构设计与实现细节
现在,我们将深入到具体的架构设计和实现细节,展示如何将EIRG映射到LangGraph的动态全局状态。
4.1 EIRG在LangGraph状态中的表示
我们选择使用Python的networkx库来在LangGraph状态中表示和操作子图。networkx是一个强大的图论库,提供了丰富的图算法和方便的API。对于大型的、持久化的EIRG,LangGraph状态中会包含一个从主图数据库中提取出的相关子图。
首先,定义LangGraph的全局状态GraphState:
from typing import TypedDict, List, Dict, Any, Optional
import networkx as nx
from langchain_core.messages import BaseMessage
# 定义LangGraph的全局状态
class GraphState(TypedDict):
"""
Represents the state of our graph.
Attributes:
query: The user's initial query or current task.
chat_history: A list of messages making up the conversation history.
subgraph: The dynamic NetworkX graph representing the relevant portion of the EIRG.
identified_entities: A list of entities extracted from the query.
identified_relationships: A list of relationships extracted from the query.
graph_query_cql: Generated Cypher query (or similar for other DBs) for the main KG.
graph_query_result: The raw result from the main KG query.
analysis_report: Detailed analysis generated by the agent based on the subgraph.
pending_actions: A list of suggested actions to modify the graph.
final_answer: The final response to the user.
error_message: Any error encountered during processing.
status: Current status of the agent's operation.
"""
query: str
chat_history: List[BaseMessage]
subgraph: Optional[nx.Graph] # The core: NetworkX graph as part of the state
identified_entities: List[str]
identified_relationships: List[str]
graph_query_cql: Optional[str]
graph_query_result: Optional[List[Dict[str, Any]]]
analysis_report: Optional[str]
pending_actions: List[Dict[str, Any]]
final_answer: Optional[str]
error_message: Optional[str]
status: str
# 辅助函数:将Neo4j结果转换为NetworkX图
def neo4j_to_networkx(records: List[Dict[str, Any]]) -> nx.Graph:
"""
Converts Neo4j query results (nodes and relationships) into a NetworkX graph.
Assumes records contain 'node_id', 'node_labels', 'node_properties',
'rel_id', 'rel_type', 'rel_properties', 'start_node_id', 'end_node_id'.
"""
G = nx.MultiDiGraph() # Use MultiDiGraph to allow multiple edges between same nodes
# First pass: add all unique nodes
node_map = {} # Map Neo4j internal ID to NetworkX node
for record in records:
if 'node_id' in record:
node_id = record['node_id']
if node_id not in node_map:
node_map[node_id] = node_id # Simple mapping for now
G.add_node(node_id, **record.get('node_properties', {}), labels=record.get('node_labels', []))
# Handle nodes from relationships
if 'start_node_id' in record and record['start_node_id'] not in node_map:
start_node_id = record['start_node_id']
node_map[start_node_id] = start_node_id
G.add_node(start_node_id, labels=[record.get('start_node_label', 'UNKNOWN')]) # Placeholder label
if 'end_node_id' in record and record['end_node_id'] not in node_map:
end_node_id = record['end_node_id']
node_map[end_node_id] = end_node_id
G.add_node(end_node_id, labels=[record.get('end_node_label', 'UNKNOWN')]) # Placeholder label
# Second pass: add relationships
for record in records:
if 'rel_id' in record:
start_node = record['start_node_id']
end_node = record['end_node_id']
rel_type = record['rel_type']
rel_properties = record.get('rel_properties', {})
G.add_edge(start_node, end_node, key=record['rel_id'], type=rel_type, **rel_properties)
return G
说明:
subgraph: Optional[nx.Graph]是核心,它直接将networkx.Graph对象作为状态的一部分。identified_entities和identified_relationships辅助LLM理解用户意图并构建图查询。graph_query_cql和graph_query_result用于与外部持久化EIRG进行交互。pending_actions是AI代理建议对图谱进行修改的指令列表。neo4j_to_networkx是一个关键的转换函数,它将从Neo4j(或其他图数据库)查询到的结果转换为networkx对象,以便在LangGraph状态中进行操作。实际生产中,这个函数会更复杂,需要处理各种节点和关系的属性映射。
4.2 定义核心节点及其功能
我们将设计一系列节点,每个节点负责LangGraph工作流中的一个特定任务。
为了保持简洁,我们假设已经配置好一个LLM实例,例如ChatOpenAI。
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableLambda
from neo4j import GraphDatabase # Assuming Neo4j for external KG
# Ensure your OpenAI API key is set in environment variables
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# --- 1. Entity and Relationship Extraction Node ---
def extract_info(state: GraphState) -> Dict[str, Any]:
"""
Extracts entities and relationships from the user query using LLM.
"""
print("---EXTRACTING ENTITIES AND RELATIONSHIPS---")
query = state["query"]
prompt = ChatPromptTemplate.from_messages([
("system", """
You are an expert at extracting key entities (people, projects, departments, tasks)
and potential relationships from natural language queries about enterprise knowledge graphs.
Identify all relevant entities and relationships mentioned.
Output a JSON object with 'entities' (list of strings) and 'relationships' (list of strings).
Example: {"entities": ["John Doe", "Aurora Project"], "relationships": ["works on", "manages"]}
"""),
("human", "{query}")
])
extraction_chain = prompt | llm | JsonOutputParser()
extracted_data = extraction_chain.invoke({"query": query})
return {
"identified_entities": extracted_data.get("entities", []),
"identified_relationships": extracted_data.get("relationships", []),
"status": "Entities extracted"
}
# --- 2. Subgraph Retrieval Node (from external EIRG) ---
# This node simulates interaction with a Neo4j database.
# In a real scenario, you'd connect to your Neo4j instance.
# Dummy Neo4j driver for demonstration
class DummyNeo4jDriver:
def __init__(self):
print("Simulating connection to Neo4j...")
# A very simplified, hardcoded graph for demonstration
self.mock_graph_data = [
{'node_id': 'employee_john', 'node_labels': ['Employee'], 'node_properties': {'name': 'John Doe', 'email': '[email protected]'}},
{'node_id': 'employee_jane', 'node_labels': ['Employee'], 'node_properties': {'name': 'Jane Smith', 'email': '[email protected]'}},
{'node_id': 'project_aurora', 'node_labels': ['Project'], 'node_properties': {'name': 'Aurora Project', 'status': 'Active', 'budget': 100000}},
{'node_id': 'project_phoenix', 'node_labels': ['Project'], 'node_properties': {'name': 'Phoenix Initiative', 'status': 'Pending', 'priority': 'High'}},
{'node_id': 'department_rd', 'node_labels': ['Department'], 'node_properties': {'name': 'R&D'}},
{'rel_id': 'rel_1', 'rel_type': 'WORKS_ON', 'rel_properties': {'role': 'Developer', 'start_date': '2023-01-15'},
'start_node_id': 'employee_john', 'end_node_id': 'project_aurora', 'start_node_label': 'Employee', 'end_node_label': 'Project'},
{'rel_id': 'rel_2', 'rel_type': 'MANAGES', 'rel_properties': {},
'start_node_id': 'employee_jane', 'end_node_id': 'department_rd', 'start_node_label': 'Employee', 'end_node_label': 'Department'},
{'rel_id': 'rel_3', 'rel_type': 'WORKS_ON', 'rel_properties': {'role': 'Lead', 'start_date': '2023-03-01'},
'start_node_id': 'employee_jane', 'end_node_id': 'project_aurora', 'start_node_label': 'Employee', 'end_node_label': 'Project'},
{'rel_id': 'rel_4', 'rel_type': 'DEPENDS_ON', 'rel_properties': {'criticality': 'High'},
'start_node_id': 'project_aurora', 'end_node_id': 'project_phoenix', 'start_node_label': 'Project', 'end_node_label': 'Project'},
]
def query_db(self, cql_query: str) -> List[Dict[str, Any]]:
print(f"Executing simulated CQL: {cql_query}")
# In a real scenario, parse CQL and return actual results.
# For simplicity, we just return a relevant subset of mock_graph_data based on keywords.
results = []
# Simple keyword matching for demo purposes
query_lower = cql_query.lower()
if "john doe" in query_lower or "employee_john" in query_lower:
results.extend([d for d in self.mock_graph_data if 'john' in str(d).lower()])
if "aurora project" in query_lower or "project_aurora" in query_lower:
results.extend([d for d in self.mock_graph_data if 'aurora' in str(d).lower()])
if "jane smith" in query_lower or "employee_jane" in query_lower:
results.extend([d for d in self.mock_graph_data if 'jane' in str(d).lower()])
if "phoenix initiative" in query_lower or "project_phoenix" in query_lower:
results.extend([d for d in self.mock_graph_data if 'phoenix' in str(d).lower()])
# Remove duplicates
unique_results = []
seen_ids = set()
for res in results:
item_id = res.get('node_id') or res.get('rel_id')
if item_id and item_id not in seen_ids:
unique_results.append(res)
seen_ids.add(item_id)
# Ensure all nodes mentioned in relationships are also added if not already
all_nodes_in_rels = set()
for res in unique_results:
if 'start_node_id' in res: all_nodes_in_rels.add(res['start_node_id'])
if 'end_node_id' in res: all_nodes_in_rels.add(res['end_node_id'])
for node_id in all_nodes_in_rels:
if node_id not in seen_ids:
node_data = next((d for d in self.mock_graph_data if d.get('node_id') == node_id), None)
if node_data:
unique_results.append(node_data)
seen_ids.add(node_id)
return unique_results
# Initialize dummy driver
dummy_neo4j_driver = DummyNeo4jDriver()
def retrieve_subgraph(state: GraphState) -> Dict[str, Any]:
"""
Generates a Cypher query based on identified entities/relationships
and retrieves a subgraph from the external Neo4j database.
"""
print("---RETRIEVING SUBGRAPH---")
identified_entities = state["identified_entities"]
identified_relationships = state["identified_relationships"]
# LLM helps construct a robust Cypher query
prompt_template = ChatPromptTemplate.from_messages([
("system", """
You are an expert in generating precise Cypher queries for Neo4j.
Based on the identified entities and relationships, generate a Cypher query
to retrieve relevant nodes and their immediate relationships (k-hop=1 or 2)
from the graph. Focus on entities and relationships that might be directly linked.
Return only the Cypher query string, without any additional text or formatting.
Example: MATCH (n)-[r]-(m) WHERE n.name = 'John Doe' OR m.name = 'Aurora Project' RETURN n, r, m
"""),
("human", "Entities: {entities}nRelationships: {relationships}")
])
cql_generation_chain = prompt_template | llm | RunnableLambda(lambda x: x.strip())
# Fallback if no specific entities/relationships are found
if not identified_entities and not identified_relationships:
cql_query = "MATCH (n)-[r]-(m) RETURN n, r, m LIMIT 10" # Default small query
else:
entity_match_clauses = [f"n.name = '{e}'" for e in identified_entities]
relationship_match_clauses = [f"type(r) = '{r_type.upper().replace(' ', '_')}'" for r_type in identified_relationships]
where_clause_parts = []
if entity_match_clauses:
where_clause_parts.append(f"({' OR '.join(entity_match_clauses)})")
if relationship_match_clauses:
where_clause_parts.append(f"({' OR '.join(relationship_match_clauses)})")
where_clause = ""
if where_clause_parts:
where_clause = f" WHERE {' AND '.join(where_clause_parts)}"
cql_query = f"MATCH (n)-[r*0..2]-(m){where_clause} RETURN DISTINCT n AS node_id, labels(n) AS node_labels, properties(n) AS node_properties, id(n) as internal_node_id, id(r) as rel_id, type(r) AS rel_type, properties(r) AS rel_properties, id(startNode(r)) AS start_node_id, id(endNode(r)) AS end_node_id, labels(startNode(r)) AS start_node_label, labels(endNode(r)) AS end_node_label LIMIT 50"
# Or let LLM generate it (more robust but slower)
# cql_query = cql_generation_chain.invoke({
# "entities": ", ".join(identified_entities),
# "relationships": ", ".join(identified_relationships)
# })
# Execute query against Neo4j (simulated)
try:
query_results = dummy_neo4j_driver.query_db(cql_query)
subgraph_nx = neo4j_to_networkx(query_results)
return {
"subgraph": subgraph_nx,
"graph_query_cql": cql_query,
"graph_query_result": query_results,
"status": "Subgraph retrieved"
}
except Exception as e:
print(f"Error querying Neo4j: {e}")
return {"error_message": f"Failed to retrieve subgraph: {e}", "status": "Error"}
# --- 3. Graph Reasoning/Analysis Node ---
def analyze_graph(state: GraphState) -> Dict[str, Any]:
"""
Performs analysis on the current subgraph using LLM and NetworkX algorithms.
"""
print("---ANALYZING SUBGRAPH---")
subgraph = state["subgraph"]
query = state["query"]
if not subgraph or subgraph.number_of_nodes() == 0:
return {"analysis_report": "No relevant subgraph found for analysis.", "status": "Analysis skipped"}
# Convert NetworkX graph to a more LLM-friendly string representation
graph_summary = "Nodes:n"
for node_id, data in subgraph.nodes(data=True):
graph_summary += f"- {data.get('name', node_id)} (Labels: {data.get('labels', ['Unknown'])}, Properties: {data})n"
graph_summary += "nRelationships:n"
for u, v, key, data in subgraph.edges(keys=True, data=True):
u_name = subgraph.nodes[u].get('name', u)
v_name = subgraph.nodes[v].get('name', v)
graph_summary += f"- ({u_name}) -[:{data.get('type', 'UNKNOWN')}]-> ({v_name}) (Properties: {data})n"
prompt = ChatPromptTemplate.from_messages([
("system", """
You are a graph analysis expert. Based on the provided graph summary and the user's query,
perform an in-depth analysis. Identify key entities, relationships, dependencies,
and potential insights. If the query implies a need for action, suggest potential actions
in a JSON list format. Otherwise, provide a comprehensive report.
Graph Summary:
{graph_summary}
User Query:
{query}
Output Format:
{{
"report": "Your detailed analysis report.",
"suggested_actions": [
{{
"action_type": "add_node",
"node_id": "new_node_id",
"labels": ["Task"],
"properties": {{"name": "Risk Assessment", "status": "Pending"}}
}},
{{
"action_type": "add_edge",
"start_node_id": "project_aurora",
"end_node_id": "new_node_id",
"rel_type": "HAS_TASK",
"properties": {{"priority": "High"}}
}},
{{
"action_type": "update_node",
"node_id": "project_aurora",
"properties": {{"status": "Completed"}}
}}
]
}}
If no actions are suggested, the "suggested_actions" list should be empty.
"""),
("human", "Analyze the graph based on the query.")
])
analysis_chain = prompt | llm | JsonOutputParser()
try:
analysis_output = analysis_chain.invoke({"graph_summary": graph_summary, "query": query})
return {
"analysis_report": analysis_output.get("report", "No specific report generated."),
"pending_actions": analysis_output.get("suggested_actions", []),
"status": "Graph analyzed"
}
except Exception as e:
print(f"Error during graph analysis: {e}")
return {"error_message": f"Failed to analyze graph: {e}", "status": "Error"}
# --- 4. Graph Modification/Update Node ---
def modify_graph(state: GraphState) -> Dict[str, Any]:
"""
Applies pending actions to the subgraph in the state and potentially to the external KG.
"""
print("---MODIFYING SUBGRAPH---")
subgraph = state["subgraph"]
pending_actions = state["pending_actions"]
if not subgraph:
return {"error_message": "Cannot modify graph: subgraph is empty.", "status": "Error"}
modified_count = 0
for action in pending_actions:
action_type = action.get("action_type")
if action_type == "add_node":
node_id = action.get("node_id")
labels = action.get("labels", [])
properties = action.get("properties", {})
if node_id and not subgraph.has_node(node_id):
subgraph.add_node(node_id, labels=labels, **properties)
print(f"Added node: {node_id} with labels {labels} and properties {properties}")
modified_count += 1
# In a real system, you'd also send an update to the external Neo4j DB here.
# E.g., driver.query_db(f"CREATE (:{':'.join(labels)} {{name: '{properties.get('name', node_id)}', ...}})")
elif action_type == "add_edge":
start_node_id = action.get("start_node_id")
end_node_id = action.get("end_node_id")
rel_type = action.get("rel_type")
properties = action.get("properties", {})
if start_node_id in subgraph and end_node_id in subgraph and rel_type:
# Add edge, allow multiple edges with different keys
subgraph.add_edge(start_node_id, end_node_id, type=rel_type, **properties)
print(f"Added edge: ({start_node_id}) -[:{rel_type}]-> ({end_node_id}) with properties {properties}")
modified_count += 1
# E.g., driver.query_db(f"MATCH (a), (b) WHERE id(a) = '{start_node_id}' AND id(b) = '{end_node_id}' CREATE (a)-[:{rel_type} {{...}}]->(b)")
elif action_type == "update_node":
node_id = action.get("node_id")
properties_to_update = action.get("properties", {})
if node_id in subgraph:
for prop, value in properties_to_update.items():
subgraph.nodes[node_id][prop] = value
print(f"Updated node: {node_id} with properties {properties_to_update}")
modified_count += 1
# E.g., driver.query_db(f"MATCH (n) WHERE id(n) = '{node_id}' SET n += {{...}}")
# Add logic for delete_node, delete_edge if needed
if modified_count > 0:
return {"subgraph": subgraph, "pending_actions": [], "status": "Graph modified", "analysis_report": f"Graph modified with {modified_count} actions."}
else:
return {"pending_actions": [], "status": "No graph modifications performed."}
# --- 5. Answer Generation Node ---
def generate_answer(state: GraphState) -> Dict[str, Any]:
"""
Generates the final answer to the user based on the analysis report and subgraph.
"""
print("---GENERATING FINAL ANSWER---")
query = state["query"]
analysis_report = state["analysis_report"]
subgraph = state["subgraph"]
# Provide a textual summary of the subgraph for LLM to synthesize
graph_summary = ""
if subgraph and subgraph.number_of_nodes() > 0:
graph_summary = "Current relevant graph structure:n"
for node_id, data in subgraph.nodes(data=True):
graph_summary += f"- {data.get('name', node_id)} (Labels: {data.get('labels', ['Unknown'])})n"
for u, v, data in subgraph.edges(data=True):
u_name = subgraph.nodes[u].get('name', u)
v_name = subgraph.nodes[v].get('name', v)
graph_summary += f"- ({u_name}) -[:{data.get('type', 'UNKNOWN')}]-> ({v_name})n"
prompt = ChatPromptTemplate.from_messages([
("system", """
You are a helpful enterprise assistant. Based on the user's original query,
the detailed analysis report, and the current state of the relevant knowledge graph,
formulate a concise and informative answer.
Original Query: {query}
Analysis Report: {analysis_report}
Current Graph State Summary:
{graph_summary}
If the analysis report suggests actions were taken or are pending, mention them.
"""),
("human", "Please provide the final answer.")
])
answer_chain = prompt | llm
final_answer = answer_chain.invoke({"query": query, "analysis_report": analysis_report, "graph_summary": graph_summary})
return {"final_answer": final_answer.content, "status": "Answer generated"}
# --- 6. Decision Node (Router) ---
def decide_next_step(state: GraphState) -> str:
"""
Decides the next step in the graph based on the current state.
"""
print("---DECIDING NEXT STEP---")
if state["error_message"]:
return "end_error"
if state["final_answer"]:
return "end_success"
if state["pending_actions"]:
return "modify_graph"
if state["analysis_report"] is None:
return "analyze_graph"
if state["subgraph"] is None or state["subgraph"].number_of_nodes() == 0:
return "retrieve_subgraph" # Try to retrieve if subgraph is empty
# Default to analysis if we have a subgraph and haven't analyzed yet
return "analyze_graph"
节点功能总结:
| 节点名称 | 功能概述
"""
You are a specialized agent designed to interact with an Enterprise Internal Relationship Graph (EIRG).
Your primary goal is to answer user queries by:
- Extracting key entities and relationships from the query.
- Translating these into appropriate graph database queries (e.g., Cypher for Neo4j) to fetch a relevant subgraph.
- Performing analysis on this subgraph using graph algorithms (via NetworkX) and LLM reasoning.
- Identifying potential actions (add/update/delete nodes/edges) that might be suggested by the query or analysis.
- Applying these actions to the in-memory subgraph, and potentially queuing them for persistence.
-
Formulating a concise and accurate answer based on the current state of the subgraph and analysis.
The state object `GraphState` contains: - `query`: The user's current request. - `chat_history`: The conversation history. - `subgraph`: The current NetworkX graph relevant to the task. This is what you operate on. - `identified_entities`: Entities extracted from the query. - `identified_relationships`: Relationships extracted from the query. - `graph_query_cql`: The Cypher query used to retrieve the `subgraph`. - `graph_query_result`: Raw results from the graph database. - `analysis_report`: Your detailed analysis of the subgraph. - `pending_actions`: List of actions you've identified to modify the graph. - `final_answer`: The final response to the user. - `error_message`: Any error encountered. - `status`: Current processing status. You will work in a loop, updating the `GraphState` at each step. Be precise in your entity and relationship extraction. When analyzing, consider the implications of the relationships and properties in the subgraph. When suggesting actions, use the specified JSON format for `pending_actions`. ```json [ {{ "action_type": "add_node", "node_id": "unique_id_for_new_node", "labels": ["NodeType"], "properties": {{"name": "Node Name", "attribute": "value"}} }}, {{ "action_type": "add_edge", "start_node_id": "existing_node_id_1", "end_node_id": "existing_node_id_2", "rel_type": "RELATIONSHIP_TYPE", "properties": {{"attribute": "value"}} }}, {{ "action_type": "update_node", "node_id": "existing_node_id", "properties": {{"attribute_to_update": "new_value"}} }} ] ``` Your goal is to be a knowledgeable and active participant in managing enterprise knowledge. """)])
### 4.3 构建LangGraph图
现在,我们将这些节点和决策逻辑组合成一个完整的LangGraph图。
```python
from langgraph.graph import StateGraph, END
# Build the LangGraph
workflow = StateGraph(GraphState)
# Add nodes
workflow.add_node("extract_info", extract_info)
workflow.add_node("retrieve_subgraph", retrieve_subgraph)
workflow.add_node("analyze_graph", analyze_graph)
workflow.add_node("modify_graph", modify_graph)
workflow.add_node("generate_answer", generate_answer)
# Set entry point
workflow.set_entry_point("extract_info")
# Define edges
# After extracting info, always try to retrieve subgraph
workflow.add_edge("extract_info", "retrieve_subgraph")
# After retrieving subgraph, decide next step (analyze, modify, or answer)
workflow.add_conditional_edges(
"retrieve_subgraph",
decide_next_step,
{
"analyze_graph": "analyze_graph",
"modify_graph": "modify_graph", # Potentially bypass analysis if subgraph suggests immediate action
"end_success": "generate_answer", # If subgraph retrieval directly gives enough for answer
"end_error": END # If there was an error
}
)
# After analyzing graph, decide next step (modify or answer)
workflow.add_conditional_edges(
"analyze_graph",
decide_next_step,
{
"modify_graph": "modify_graph",
"end_success": "generate_answer",
"end_error": END
}
)
# After modifying graph, re-analyze (to confirm changes or further insights) or generate answer
workflow.add_conditional_edges(
"modify_graph",
decide_next_step,
{
"analyze_graph": "analyze_graph", # Re-analyze after modification
"end_success": "generate_answer",
"end_error": END
}
)
# After generating answer, end
workflow.add_edge("generate_answer", END)
# Compile the workflow
app = workflow.compile()
# Optional: Visualize the graph (requires graphviz installed)
# from IPython.display import Image, display
# display(Image(app.get_graph().draw_png()))
4.4 流程图概述
这是一个简化的流程图,展示了LangGraph的执行路径。
[Start]
|
v
(extract_info) -- (Identified entities/relationships) -->
|
v
(retrieve_subgraph) -- (Subgraph retrieved) --> (decide_next_step) --
| |
v v
(analyze_graph) -------------------------------------------------> (modify_graph) --
| | |
v v v
(decide_next_step) ------------------------------------------------------------------ (decide_next_step) --
| |
v v
(generate_answer) -------------------------------------------------------------------------------------------> [END]
|
v
[Error END]
关键路由逻辑:
- 从
extract_info到retrieve_subgraph是无条件的。 retrieve_subgraph之后,decide_next_step函数会检查state:- 如果
error_message存在,则直接结束(end_error)。 - 如果
pending_actions存在,说明LLM在分析时建议了修改,跳转到modify_graph。 - 否则,跳转到
analyze_graph进行分析。
- 如果
analyze_graph之后,decide_next_step会检查:- 如果
pending_actions存在,跳转到modify_graph。 - 否则,认为分析完成,跳转到
generate_answer。
- 如果
modify_graph之后,decide_next_step会检查:- 通常会再次回到
analyze_graph,以确认修改后的图谱状态是否产生了新的洞察,或者是否需要进一步操作。这是一个重要的反馈循环。 - 如果修改后认为可以直接回答,也可以直接到
generate_answer。
- 通常会再次回到
generate_answer之后,流程结束。
这种动态路由机制,结合了LLM的推理和图谱的结构化信息,使得AI代理能够根据当前情况灵活地调整其工作流程。
五、示例场景:企业项目管理助手
让我们通过一个具体的企业项目管理场景来演示这个“Knowledge Graph-as-a-State”范式的运作。
场景描述:
假设你是一名项目经理,需要了解一个名为“Aurora Project”的项目进展,并想知道谁是核心参与者,以及该项目是否有关键的依赖。同时,你可能还想为此项目添加一个新的任务。
用户查询:
"Who is working on the ‘Aurora Project’, what are its critical dependencies? Can we add a new task ‘Risk Assessment’ to it, assigned to John Doe?"
预期流程:
- 用户输入: LangGraph接收用户查询。
- 实体提取: LLM识别“Aurora Project”、“John Doe”、“Risk Assessment”为实体,“working on”、“dependencies”、“assigned to”为关系。
- 子图检索: 根据这些实体和关系,LLM生成Cypher查询,从模拟的Neo4j数据库中检索与“Aurora Project”和“John Doe”相关的节点和关系。这个子图会被加载到
subgraph状态。 - 图谱分析: LLM结合NetworkX对
subgraph进行分析:- 识别
WORKS_ON关系,找出参与“Aurora Project”的员工。 - 识别
DEPENDS_ON关系,找出“Aurora Project”的依赖项目。 - 识别用户意图中“添加新任务”的需求。
- 识别
- 图谱修改:
- LLM生成
pending_actions,包含add_node(新任务“Risk Assessment”)、add_edge(“Aurora Project”HAS_TASK “Risk Assessment”)、add_edge(“Risk Assessment”ASSIGNED_TO “John Doe”)。 modify_graph节点执行这些操作,更新subgraph状态。
- LLM生成
- 再次分析/确认: 流程可能再次回到
analyze_graph,确认修改后的图谱状态。 - 答案生成: LLM根据最新的
subgraph状态和分析报告,生成最终答案,包括参与者、依赖,并确认新任务已添加。
六、逐步代码演示
现在,我们运行上述构建好的LangGraph,并观察其状态变化。
# Function to print state changes for better observability
def print_state_change(state: GraphState, node_name: str):
print(f"n--- STATE AFTER NODE: {node_name} ---")
print(f"Query: {state.get('query')}")
print(f"Status: {state.get('status')}")
print(f"Identified Entities: {state.get('identified_entities')}")
print(f"Identified Relationships: {state.get('identified_relationships')}")
if state.get('subgraph'):
print(f"Subgraph Nodes: {state['subgraph'].nodes(data=True)}")
print(f"Subgraph Edges: {state['subgraph'].edges(data=True)}")
print(f"Graph Query CQL: {state.get('graph_query_cql')}")
print(f"Analysis Report: {state.get('analysis_report')}")
print(f"Pending Actions: {state.get('pending_actions')}")
print(f"Final Answer: {state.get('final_answer')}")
print(f"Error Message: {state.get('error_message')}")
print("---------------------------------------")
# Initial state for a new conversation
initial_state = GraphState(
query="",
chat_history=[],
subgraph=None,
identified_entities=[],
identified_relationships=[],
graph_query_cql=None,
graph_query_result=None,
analysis_report=None,
pending_actions=[],
final_answer=None,
error_message=None,
status="Initialized"
)
# User query for the scenario
user_query = "Who is working on the 'Aurora Project', and what are its critical dependencies? Can we add a new task 'Risk Assessment' to it, assigned to John Doe?"
# Invoke the LangGraph app with the initial state and user query
print(f"--- STARTING AGENT WITH QUERY: {user_query} ---")
final_state = initial_state.copy()
final_state["query"] = user_query
final_state["chat_history"].append(BaseMessage(content=user_query, type="human"))
# The LangGraph will automatically run through the nodes
# We'll manually print the state after each step for clarity in this demo
# In a real LangGraph run, you'd get the final state directly
# To observe intermediate states, you can use `stream` or custom callbacks
# Simulating the run to show state changes
current_state = initial_state.copy()
current_state["query"] = user_query
current_state["chat_history"].append(BaseMessage(content=user_query, type="human"))
# Step 1: Extract Info
extracted_info_state = extract_info(current_state)
current_state.update(extracted_info_state)
print_state_change(current_state, "extract_info")
# Step 2: Retrieve Subgraph
retrieved_subgraph_state = retrieve_subgraph(current_state)
current_state.update(retrieved_subgraph_state)
print_state_change(current_state, "retrieve_subgraph")
# Step 3: Decide next step (should be analyze_graph)
next_step = decide_next_step(current_state)
print(f"--- DECISION: {next_step} ---")
# Step 4: Analyze Graph
if next_step == "analyze_graph":
analyzed_graph_state = analyze_graph(current_state)
current_state.update(analyzed_graph_state)
print_state_change(current_state, "analyze_graph")
else:
print("Skipped analyze_graph.")
# Step 5: Decide next step (should be modify_graph due to pending_actions)
next_step = decide_next_step(current_state)
print(f"--- DECISION: {next_step} ---")
# Step 6: Modify Graph
if next_step == "modify_graph":
modified_graph_state = modify_graph(current_state)
current_state.update(modified_graph_state)
print_state_change(current_state, "modify_graph")
else:
print("Skipped modify_graph.")
# Step 7: Decide next step (should be analyze_graph again or generate_answer)
next_step = decide_next_step(current_state)
print(f"--- DECISION: {next_step} ---")
# If it goes back to analyze_graph after modification (feedback loop)
if next_step == "analyze_graph":
re_analyzed_graph_state = analyze_graph(current_state)
current_state.update(re_analyzed_graph_state)
print_state_change(current_state, "re-analyze_graph (after modify)")
next_step = decide_next_step(current_state) # Re-decide after re-analysis
print(f"--- DECISION AFTER RE-ANALYSIS: {next_step} ---")
# Step 8: Generate Answer
if next_step == "generate_answer":
generated_answer_state = generate_answer(current_state)
current_state.update(generated_answer_state)
print_state_change(current_state, "generate_answer")
else:
print("Skipped generate_answer.")
print(f"n--- FINAL AGENT ANSWER ---")
print(current_state.get("final_answer"))
print("--------------------------")
输出分析(简要):
您会观察到如下关键状态变化:
extract_info之后:identified_entities会包含["Aurora Project", "John Doe", "Risk Assessment"],identified_relationships包含["working on", "dependencies", "add new task", "assigned to"]。retrieve_subgraph之后:subgraph不再是None,而是一个networkx.Graph对象,其中包含了"employee_john", "project_aurora", "project_phoenix"等节点及其WORKS_ON,DEPENDS_ON等关系。graph_query_cql会显示LLM或预定义逻辑生成的Cypher查询。analyze_graph之后:analysis_report会包含对子图的洞察(谁在项目上工作,依赖是什么),并且pending_actions可能会填充,例如:[ { "action_type": "add_node", "node_id": "task_risk_assessment", "labels": ["Task"], "properties": {"name": "Risk Assessment", "status": "Pending"} }, { "action_type": "add_edge", "start_node_id": "project_aurora", "end_node_id": "task_risk_assessment", "rel_type": "HAS_TASK", "properties": {"priority": "High"} }, { "action_type": "add_edge", "start_node_id": "task_risk_assessment", "end_node_id": "employee_john", "rel_type": "ASSIGNED_TO", "properties": {"deadline": "2024-07-31"} } ]modify_graph之后:subgraph会更新,新增了"task_risk_assessment"节点,以及"project_aurora"到"task_risk_assessment"的HAS_TASK边,和"task_risk_assessment"到"employee_john"的ASSIGNED_TO边。pending_actions会被清空。generate_answer之后:final_answer会综合所有信息,给出项目参与者、依赖,并确认新任务已添加到“Aurora Project”并分配给John Doe。
这个过程清晰地展示了知识图谱如何从外部检索,进入LangGraph状态,被AI代理动态分析、修改,并最终用于生成智能响应。
七、高级考量与最佳实践
将企业内部关系图谱作为LangGraph状态,虽然功能强大,但在实际部署中需要考虑更多高级因素。
7.1 图谱的Schema管理与验证
- Schema定义: 为EIRG定义严格的节点标签、关系类型和属性Schema。这有助于LLM更准确地理解图谱结构,并生成正确的查询和修改指令。可以使用本体论(Ontology)工具或图数据库自身的Schema管理功能。
- Pydantic模型: 在Python代码中,可以使用Pydantic模型来表示节点和边的结构,确保数据在进入
networkx.Graph之前是验证过的。 - LLM的Schema感知: 在给LLM的Prompt中明确提供图谱的Schema定义,帮助其生成符合Schema的JSON输出或Cypher查询。
7.2 可伸缩性与性能优化
- 智能子图提取: 对于非常大的EIRG,将整个图加载到LangGraph状态是不现实的。需要更智能的子图提取策略:
- K-hop扩展: 从种子实体出发,只提取K跳范围内的邻居。
- 路径查找: 仅提取连接两个或多个关键实体之间的最短路径。
- 语义相关性: 结合节点/边属性的文本嵌入和语义搜索,提取与查询语义最相关的子图。
- 时序/权重过滤: 基于时间戳或重要性权重过滤子图。
- 图数据库的优化: 确保底层的图数据库(如Neo4j)查询高效,建立适当的索引。
- LLM调用优化: 缓存LLM的常见输出,使用更小的模型进行简单的实体提取,仅在复杂推理时使用大型模型。
- 异步处理: LangGraph支持异步节点,可以利用
asyncio并行执行一些图谱操作或LLM调用。
7.3 持久化与状态同步
- LangGraph Checkpoints: LangGraph自带的Checkpointing机制可以保存整个
GraphState,包括networkx.Graph对象。这对于恢复会话非常有用。 - 外部图数据库同步:
modify_graph节点是关键。当LangGraph状态中的subgraph被修改时,这些修改必须通过事务性操作同步回底层的持久化图数据库。这通常涉及:- 生成相应的Cypher(或GQL)
CREATE/MERGE/SET/DELETE语句。 - 通过图数据库驱动执行这些语句。
- 考虑并发控制和冲突解决策略。
- 生成相应的Cypher(或GQL)
- 数据一致性: 确保LangGraph的内存状态与持久化数据库之间的最终一致性。