各位编程专家、架构师和AI爱好者们,大家好!
今天,我们将深入探讨一个在构建复杂AI应用中至关重要的话题:如何在LangGraph框架中,巧妙地平衡并利用“确定性算法”、“概率性模型预测”以及“人类不确定性输入”这三股力量。这不仅仅是技术整合的问题,更是一种构建智能系统哲学的体现。我们将把LangGraph视为一个舞台,这三方玩家在其上进行一场精妙的博弈,最终达到一个动态的平衡点,共同驱动应用的智能。
LangGraph:三方博弈的舞台
在深入探讨三方博弈之前,我们首先需要理解LangGraph为何能成为这场博弈的理想舞台。LangGraph是LangChain生态系统中的一个强大扩展,它允许我们通过图形化的方式来定义和执行复杂的、有状态的LLM(大型语言模型)应用。它的核心优势在于:
- 状态管理 (State Management):LangGraph通过定义一个共享的
State对象来维护整个应用会话的上下文,这使得在不同节点之间传递信息、跟踪决策和用户意图变得异常简单。 - 节点与边 (Nodes and Edges):我们将不同的操作(如调用LLM、执行工具、处理数据或请求用户输入)封装成节点,并通过有向边连接它们,形成一个执行流。
- 条件路由 (Conditional Routing):这是LangGraph最强大的特性之一,允许我们根据当前状态或前一个节点的输出来动态地决定下一个要执行的节点。这为实现复杂的决策逻辑提供了基础。
- 循环与人机交互 (Loops and Human-in-the-Loop):LangGraph能够轻松构建循环,这对于需要迭代、修正或寻求用户反馈的场景至关重要。
正是这些特性,使得LangGraph成为整合确定性、概率性和人类不确定性输入的理想架构。
让我们从一个最简单的LangGraph结构开始,作为我们后续复杂案例的基础:
from typing import TypedDict, Annotated, List, Union
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
# 1. 定义应用的状态
# 这是一个字典,存储了会话中的所有关键信息
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], lambda x, y: x + y] # 消息历史
# 可以在这里添加更多状态变量,比如 order_id, confirmation_required, etc.
# 2. 初始化图
workflow = StateGraph(AgentState)
# 3. 定义一些空的节点(稍后填充逻辑)
def node_a(state: AgentState):
print("Executing Node A")
return {"messages": [HumanMessage(content="Hello from Node A")]}
def node_b(state: AgentState):
print("Executing Node B")
return {"messages": [HumanMessage(content="Hello from Node B")]}
# 4. 添加节点
workflow.add_node("node_a", node_a)
workflow.add_node("node_b", node_b)
# 5. 定义边(简单的顺序执行)
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END) # 结束图的执行
# 6. 设置入口点
workflow.set_entry_point("node_a")
# 7. 编译图
app = workflow.compile()
# 8. 运行图
# for s in app.stream({"messages": [HumanMessage(content="Start")]}):
# print(s)
这个基础结构将作为我们后续讨论的骨架,现在让我们逐一深入了解这三方玩家。
玩家一:确定性算法(Deterministic Algorithms)
确定性算法是AI应用中最坚实、最可预测的部分。它们是基于明确规则、逻辑和预定义步骤的代码块。给定相同的输入,它们总是产生相同的输出。
在AI应用中的角色:
- 数据验证与清洗:检查输入数据的格式、范围和有效性。
- 业务规则执行:根据既定政策进行决策,例如订单状态检查、权限验证。
- 外部API调用:与数据库、CRM系统或第三方服务进行交互。
- 安全与合规性检查:过滤敏感信息,确保输出符合规范。
- 格式转换与解析:将数据从一种格式转换为另一种,或从非结构化文本中提取结构化信息(如果规则明确)。
LangGraph中的实现方式:
确定性算法通常以普通的Python函数形式存在,作为LangGraph的节点或路由函数。它们直接操作和更新AgentState。
代码示例:数据验证与API模拟
假设我们的应用需要处理用户订单。在进行任何复杂处理之前,我们必须验证用户提供的订单ID是否有效。这是一个典型的确定性任务。
import re
# 1. 更新 AgentState,增加 order_id 和 order_details 字段
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], lambda x, y: x + y]
order_id: Union[str, None]
order_details: Union[dict, None]
validation_status: Union[str, None]
# 2. 定义确定性节点:订单ID验证
def validate_order_id(state: AgentState):
print("---Executing: validate_order_id---")
last_message = state["messages"][-1].content
# 尝试从消息中提取订单ID
match = re.search(r'order id is (d+)', last_message, re.IGNORECASE)
order_id = match.group(1) if match else None
if order_id and len(order_id) == 6 and order_id.isdigit():
print(f"Order ID '{order_id}' found and looks valid.")
return {"order_id": order_id, "validation_status": "valid"}
else:
print(f"Order ID '{order_id if order_id else 'None'}' is invalid or not found.")
return {"order_id": order_id, "validation_status": "invalid"}
# 3. 定义确定性节点:模拟外部API调用获取订单详情
def fetch_order_details(state: AgentState):
print("---Executing: fetch_order_details---")
order_id = state.get("order_id")
if not order_id or state.get("validation_status") != "valid":
print("Cannot fetch details: Order ID is invalid or missing.")
return {"order_details": None}
# 模拟一个外部API调用
mock_db = {
"123456": {"item": "Laptop", "status": "Shipped", "address": "123 Main St"},
"789012": {"item": "Mouse", "status": "Processing", "address": "456 Oak Ave"},
}
details = mock_db.get(order_id)
if details:
print(f"Fetched details for order {order_id}: {details}")
return {"order_details": details}
else:
print(f"No details found for order {order_id}.")
return {"order_details": {"item": "N/A", "status": "Not Found"}}
# 4. 构建LangGraph
workflow_deterministic = StateGraph(AgentState)
workflow_deterministic.add_node("validate_id", validate_order_id)
workflow_deterministic.add_node("fetch_details", fetch_order_details)
# 5. 定义确定性路由函数
def route_after_validation(state: AgentState):
if state.get("validation_status") == "valid":
return "fetch_details"
else:
# 如果ID无效,我们可能需要提示用户重新输入,或者直接结束
# 这里我们先假设直接结束,后续会引入人类输入
print("Routing: Order ID invalid, ending for now.")
return END
workflow_deterministic.add_edge("validate_id", "fetch_details")
workflow_deterministic.add_conditional_edges(
"validate_id",
route_after_validation # 使用路由函数决定下一个节点
)
workflow_deterministic.add_edge("fetch_details", END) # 假设获取完详情就结束
workflow_deterministic.set_entry_point("validate_id")
app_deterministic = workflow_deterministic.compile()
# # 运行测试
# print("n--- Test Case 1: Valid Order ID ---")
# for s in app_deterministic.stream({"messages": [HumanMessage(content="My order id is 123456.")]}):
# print(s)
# print("n--- Test Case 2: Invalid Order ID ---")
# for s in app_deterministic.stream({"messages": [HumanMessage(content="My order id is abcde.")]}):
# print(s)
# print("n--- Test Case 3: Order ID not found in DB ---")
# for s in app_deterministic.stream({"messages": [HumanMessage(content="My order id is 999999.")]}):
# print(s)
确定性算法的特点:
| 优点 | 缺点 |
|---|---|
| 可预测性高:输出稳定可靠。 | 缺乏灵活性:难以适应未预见的输入或场景。 |
| 效率高:执行速度快。 | 难以处理模糊性:无法理解自然语言的细微差别。 |
| 易于调试和测试:逻辑清晰,边界明确。 | 维护成本高:规则变更需修改代码。 |
| 安全性强:适合处理敏感操作。 |
确定性算法是系统的骨架,提供了稳定性和可靠性。然而,它们在处理复杂、非结构化或模糊信息时显得力不从心,这正是概率性模型发挥作用的地方。
玩家二:概率性模型预测(Probabilistic Model Predictions)
概率性模型,尤其是大型语言模型(LLMs),是AI应用中智能的核心。它们通过学习海量数据,能够理解、生成和推理人类语言,但其输出通常带有不确定性,表现为“最有可能的”预测,而不是绝对的真理。
在AI应用中的角色:
- 自然语言理解 (NLU):解析用户意图、实体提取、情感分析。
- 自然语言生成 (NLG):生成回复、摘要、代码、创意文本。
- 复杂推理:根据上下文进行逻辑判断、问题解决。
- 模糊信息处理:处理非结构化、语法错误或不完整的输入。
- 工具选择与使用 (Tool Calling):根据用户请求智能地选择并调用外部工具。
LangGraph中的实现方式:
LLMs通常作为独立的节点集成到LangGraph中,接收状态中的消息历史作为输入,并生成新的消息或结构化输出,更新回状态。LangGraph的Tool Calling功能与LLM结合得天衣无缝。
代码示例:意图识别与工具调用
我们将引入一个LLM来理解用户的复杂请求,识别其意图,并决定是否需要调用某个工具(比如我们之前定义的 fetch_order_details)。
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
import os
# 确保设置了 OPENAI_API_KEY 环境变量
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 定义一个我们希望LLM能够调用的工具
@tool
def get_order_details_tool(order_id: str) -> dict:
"""
Retrieves detailed information for a given order ID from a mock database.
Returns a dictionary with item, status, and address if found, otherwise indicates not found.
"""
print(f"---Tool Called: get_order_details_tool for ID {order_id}---")
mock_db = {
"123456": {"item": "Laptop", "status": "Shipped", "address": "123 Main St"},
"789012": {"item": "Mouse", "status": "Processing", "address": "456 Oak Ave"},
}
details = mock_db.get(order_id)
if details:
return {"status": "success", "data": details}
else:
return {"status": "error", "message": "Order ID not found."}
# 将工具列表化
tools = [get_order_details_tool]
# 初始化LLM,并绑定工具
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
# 1. 扩展 AgentState 以处理工具调用和结果
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], lambda x, y: x + y]
order_id: Union[str, None]
order_details: Union[dict, None]
validation_status: Union[str, None]
tool_calls: Union[list, None] # LLM生成的工具调用
tool_output: Union[dict, None] # 工具执行结果
# 2. 定义概率性节点:调用LLM进行意图识别和工具选择
def call_llm(state: AgentState):
print("---Executing: call_llm (for intent and tool selection)---")
messages = state["messages"]
# 调用LLM
response = llm.invoke(messages)
# 打印LLM的原始响应以便观察
print(f"LLM Raw Response: {response}")
# 如果LLM决定调用工具
if response.tool_calls:
print(f"LLM decided to call tool(s): {response.tool_calls}")
return {"messages": [response], "tool_calls": response.tool_calls}
else:
print(f"LLM response (no tool call): {response.content}")
return {"messages": [response]}
# 3. 定义确定性节点:执行工具
# 注意:虽然工具的*选择*是概率性的,但*执行*工具本身是确定性的。
# LangGraph提供了 `tool_executor` 方便地执行LLM选择的工具。
from langchain_core.runnables import RunnableConfig
from langchain_core.runnables.base import RunnableSequence
from langchain_core.tools import tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.agents.output_parsers.tools import ToolCallingAgentOutputParser
from langchain_core.runnables import RunnablePassthrough
from langgraph.prebuilt import ToolExecutor
tool_executor = ToolExecutor(tools)
def execute_tools(state: AgentState):
print("---Executing: execute_tools---")
tool_calls = state["tool_calls"]
if not tool_calls:
print("No tool calls to execute.")
return state # 没有工具调用,直接返回当前状态
results = []
for tool_call in tool_calls:
print(f"Executing tool: {tool_call.name} with args {tool_call.args}")
try:
result = tool_executor.invoke(tool_call)
results.append(result)
print(f"Tool execution result: {result}")
except Exception as e:
print(f"Error executing tool {tool_call.name}: {e}")
results.append({"error": str(e)})
# 将工具结果添加回消息历史,以便LLM可以看到
tool_messages = [BaseMessage(content=str(r), name=tc.name, tool_call_id=tc.id)
for r, tc in zip(results, tool_calls)]
# 将结果作为 tool_output 存储在状态中,方便后续节点访问
return {"messages": tool_messages, "tool_output": results}
# 4. 构建LangGraph
workflow_probabilistic = StateGraph(AgentState)
workflow_probabilistic.add_node("call_llm", call_llm)
workflow_probabilistic.add_node("execute_tools", execute_tools)
# 5. 定义概率性路由函数:根据LLM是否选择工具来路由
def route_llm_output(state: AgentState):
if state.get("tool_calls"):
return "execute_tools"
else:
# 如果LLM没有选择工具,它可能已经直接给出了答案,或者需要进一步的LLM思考
# 这里我们假设它直接给出了答案,然后结束
return END
workflow_probabilistic.set_entry_point("call_llm")
workflow_probabilistic.add_conditional_edges(
"call_llm",
route_llm_output
)
workflow_probabilistic.add_edge("execute_tools", "call_llm") # 工具执行后,再次调用LLM进行总结或下一步
app_probabilistic = workflow_probabilistic.compile()
# # 运行测试
# print("n--- Test Case 1: LLM calls tool ---")
# for s in app_probabilistic.stream({"messages": [HumanMessage(content="What is the status of order 123456?")]}):
# print(s)
# print("-" * 20)
# print("n--- Test Case 2: LLM answers directly ---")
# for s in app_probabilistic.stream({"messages": [HumanMessage(content="Hello, how are you?")]}):
# print(s)
# print("-" * 20)
概率性模型预测的特点:
| 优点 | 缺点 |
|---|---|
| 高度灵活:能处理复杂、模糊、非结构化输入。 | 不确定性:输出可能不准确、有偏差(幻觉)。 |
| 泛化能力强:能处理未见过的场景和语言表达。 | 成本较高:LLM API调用通常按量计费。 |
| 自然语言交互:提供流畅、智能的对话体验。 | 速度相对较慢:LLM推理通常有延迟。 |
| 涌现能力:展现出复杂的推理和创造性。 | 黑箱特性:决策过程难以完全解释。 |
概率性模型赋予了应用理解和生成智能的能力,但它们的不确定性是其固有的挑战。我们需要一种机制来管理这种不确定性,并提供纠正和指导,这就是人类输入的作用。
玩家三:人类不确定性输入(Human Uncertainty Input)
人类输入是整个系统中最不可预测的变量,但同时也是最宝贵的。人类可以提供上下文、澄清模糊、纠正错误、表达偏好,甚至直接覆盖系统的决策。
在AI应用中的角色:
- 初始指令与查询:启动整个工作流。
- 澄清与消歧:当系统不确定时,请求用户提供更多信息。
- 纠正错误:修正LLM的幻觉或确定性规则的误判。
- 表达偏好与约束:指导系统做出符合用户需求的选择。
- 确认与批准:在执行关键操作前,寻求用户的最终确认。
- 反馈与学习:帮助系统改进其模型和规则。
- 专家干预/人工审核:在复杂或高风险场景下进行人工介入。
LangGraph中的实现方式:
人类输入通过app.stream()或app.invoke()的初始messages参数进入系统。更重要的是,LangGraph可以通过暂停执行、生成特定消息来提示用户,并在接收到用户的新输入后继续执行,从而实现人机交互循环。
代码示例:用户确认与澄清
我们将结合前两个部分,构建一个更完整的场景:用户查询订单状态,如果订单ID无效,系统会请求用户澄清;如果系统认为需要执行某个高风险操作(例如取消订单),会请求用户确认。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 1. 再次扩展 AgentState
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], lambda x, y: x + y]
order_id: Union[str, None]
order_details: Union[dict, None]
validation_status: Union[str, None]
tool_calls: Union[list, None]
tool_output: Union[dict, None]
user_clarification_needed: bool # 是否需要用户澄清
user_confirmation_needed: bool # 是否需要用户确认
action_to_confirm: Union[str, None] # 待确认的动作
# 2. 定义确定性节点:请求用户澄清
def request_clarification(state: AgentState):
print("---Executing: request_clarification---")
response_message = HumanMessage(content="I couldn't find a valid order ID. Could you please provide the 6-digit order ID again?")
return {"messages": [response_message], "user_clarification_needed": True}
# 3. 定义确定性节点:请求用户确认
def request_confirmation(state: AgentState):
print("---Executing: request_confirmation---")
action = state.get("action_to_confirm", "an important action")
response_message = HumanMessage(content=f"I'm about to perform '{action}'. Do you confirm this action? Please reply 'yes' or 'no'.")
return {"messages": [response_message], "user_confirmation_needed": True}
# 4. 定义处理用户后续输入的节点 (确定性)
def process_user_input(state: AgentState):
print("---Executing: process_user_input---")
last_message = state["messages"][-1].content.lower()
if state.get("user_clarification_needed"):
# 尝试从用户的新输入中提取订单ID
match = re.search(r'b(d{6})b', last_message)
new_order_id = match.group(1) if match else None
if new_order_id:
print(f"User provided new order ID: {new_order_id}")
return {"order_id": new_order_id, "validation_status": "pending", "user_clarification_needed": False}
else:
print("User did not provide a valid order ID in clarification.")
return {"messages": [HumanMessage(content="I still can't understand the order ID. Please try again or type 'cancel'.")],
"user_clarification_needed": True} # 继续请求澄清
if state.get("user_confirmation_needed"):
if "yes" in last_message:
print("User confirmed the action.")
return {"user_confirmation_needed": False, "confirmed_action": True}
elif "no" in last_message:
print("User denied the action.")
return {"user_confirmation_needed": False, "confirmed_action": False}
else:
print("User did not provide a clear 'yes' or 'no'.")
return {"messages": [HumanMessage(content="Please reply 'yes' or 'no' to confirm or deny the action.")],
"user_confirmation_needed": True} # 继续请求确认
# 如果不是澄清或确认,则只更新消息历史
return {"messages": [HumanMessage(content=last_message)]}
# 5. 定义一个模拟的高风险操作节点 (确定性)
def perform_risky_action(state: AgentState):
print("---Executing: perform_risky_action (e.g., cancelling order)---")
order_id = state.get("order_id")
if order_id and state.get("confirmed_action"):
print(f"Order {order_id} has been cancelled (simulated).")
return {"messages": [HumanMessage(content=f"Order {order_id} has been successfully cancelled.")],
"action_completed": True}
elif not state.get("confirmed_action"):
print("Action denied by user.")
return {"messages": [HumanMessage(content="Action cancelled by user.")]}
else:
print("Cannot perform risky action without valid order ID.")
return {"messages": [HumanMessage(content="Cannot perform action: missing order ID or confirmation.")]}
# 6. 构建LangGraph
workflow_human = StateGraph(AgentState)
workflow_human.add_node("call_llm", call_llm) # 复用之前的LLM节点
workflow_human.add_node("execute_tools", execute_tools) # 复用之前的工具执行节点
workflow_human.add_node("validate_id", validate_order_id) # 复用之前的ID验证节点
workflow_human.add_node("request_clarification", request_clarification)
workflow_human.add_node("process_user_input", process_user_input)
workflow_human.add_node("request_confirmation", request_confirmation)
workflow_human.add_node("perform_risky_action", perform_risky_action)
# 定义路由函数
# 路由:LLM输出后
def route_after_llm_output(state: AgentState):
if state.get("tool_calls"):
return "execute_tools"
# 如果LLM判断意图是“取消订单”,并且没有明确的订单ID,我们可能需要确认或澄清
# 这里简化为如果LLM直接回复取消相关的,且没有工具调用,就请求确认
llm_response_content = state["messages"][-1].content
if "cancel order" in llm_response_content.lower() and not state.get("order_id"):
return "request_confirmation" # 假设LLM直接回复了取消意图
return END # LLM直接给出回复或未识别意图,结束
# 路由:工具执行后,再次调用LLM进行总结或下一步
workflow_human.add_edge("execute_tools", "call_llm")
# 路由:订单ID验证后
def route_after_id_validation(state: AgentState):
if state.get("validation_status") == "valid":
# 如果ID有效,且有待确认的动作 (例如取消订单),则请求确认
if state.get("action_to_confirm"):
return "request_confirmation"
return "fetch_details" # 否则获取订单详情
else:
return "request_clarification" # ID无效,请求澄清
# 路由:处理用户输入后
def route_after_user_input(state: AgentState):
if state.get("user_clarification_needed"):
return "request_clarification" # 用户输入后仍需澄清,回到澄清节点
if state.get("user_confirmation_needed"):
return "request_confirmation" # 用户输入后仍需确认,回到确认节点
# 如果澄清或确认流程完成,且有新提供的order_id,则重新验证
if state.get("order_id") and state.get("validation_status") == "pending":
return "validate_id"
# 如果是确认流程完成,且用户确认了操作,则执行高风险操作
if state.get("confirmed_action"):
return "perform_risky_action"
# 如果用户否定了操作
if state.get("confirmed_action") is False:
return END # 结束对话
return "call_llm" # 其他情况,回到LLM让它继续处理
# 添加边和条件路由
workflow_human.set_entry_point("call_llm") # 总是从LLM开始处理用户输入
workflow_human.add_conditional_edges("call_llm", route_after_llm_output) # LLM输出后路由
workflow_human.add_edge("execute_tools", "call_llm") # 工具执行后回到LLM
workflow_human.add_conditional_edges("validate_id", route_after_id_validation) # ID验证后路由
workflow_human.add_conditional_edges("request_clarification", route_after_user_input) # 澄清后路由
workflow_human.add_conditional_edges("request_confirmation", route_after_user_input) # 确认后路由
workflow_human.add_edge("process_user_input", "route_after_user_input") # 处理用户输入后路由
workflow_human.add_edge("perform_risky_action", END)
# 编译图
app_human = workflow_human.compile()
# # 运行测试 - 需要交互式环境
# print("n--- Test Case 1: Invalid Order ID, then clarification ---")
# inputs = {"messages": [HumanMessage(content="What's my order status for 123?")]}
# for s in app_human.stream(inputs):
# print(s)
# if "user_clarification_needed" in s.get("request_clarification", {}):
# # 模拟用户输入
# inputs["messages"].append(HumanMessage(content="Oh, sorry, it's 123456."))
# inputs["user_clarification_needed"] = False # 重置状态,避免循环
# print("n--- Test Case 2: Request confirmation for risky action ---")
# inputs = {"messages": [HumanMessage(content="I want to cancel order 789012.")]}
# for s in app_human.stream(inputs):
# print(s)
# if "user_confirmation_needed" in s.get("request_confirmation", {}):
# # 模拟用户输入
# inputs["messages"].append(HumanMessage(content="Yes, please cancel it."))
# inputs["user_confirmation_needed"] = False
人类不确定性输入的特点:
| 优点 | 缺点 |
|---|---|
| 提供最终权威和纠正能力:避免系统犯错。 | 引入延迟:需要等待用户响应。 |
| 处理极端复杂和模糊的场景:AI无法解决时。 | 不一致性:不同用户输入可能差异巨大。 |
| 提供个性化体验和偏好:定制化服务。 | 沟通成本:用户可能不理解系统提示或懒于回复。 |
| 促成学习和系统改进:通过反馈迭代优化。 |
人类输入是系统的“安全阀”和“学习引擎”,它弥补了确定性算法的僵硬和概率性模型的不可靠性。
三方博弈的平衡点:LangGraph中的动态协调
现在,我们已经分别介绍了这三方玩家。真正的艺术在于如何在LangGraph中找到它们的平衡点,让它们协同工作,而不是相互冲突。LangGraph的状态管理和条件路由机制是实现这一平衡的关键。
核心思想:优先级与责任划分
-
确定性算法作为守门员和执行者:
- 高优先级检查:安全、合规、数据完整性等确定性检查应尽可能早地执行。例如,在LLM处理敏感信息前,先用确定性规则进行过滤。
- 可靠的执行:当LLM决定执行某个具体动作(如调用API)时,实际的执行逻辑应由确定性算法负责,确保其正确性和可预测性。
- 状态更新:确定性算法负责清晰、准确地更新共享状态。
-
概率性模型作为理解者和决策者:
- 处理模糊性:LLM负责理解复杂的自然语言,提取意图和实体,弥补确定性规则无法处理的语言多样性。
- 智能路由:根据用户意图和当前状态,LLM可以建议下一个执行路径(通过Tool Calling或直接生成决策)。
- 总结与生成:在确定性操作完成后,LLM可以对结果进行自然语言总结,提升用户体验。
-
人类输入作为仲裁者和指导者:
- 最终决策权:在关键或高风险操作(如金融交易、数据删除)前,必须征求用户确认。
- 错误纠正与澄清:当系统(无论是确定性还是概率性部分)遇到不确定性或错误时,应主动向用户寻求帮助。
- 学习信号:用户的反馈可以被收集起来,用于改进确定性规则或微调LLM。
LangGraph如何协调这一切?
- 共享状态 (AgentState):所有玩家通过修改和读取同一个
AgentState字典进行通信。这是信息流动的核心。 - 条件路由 (Conditional Edges):这是实现动态平衡的“指挥棒”。
- 确定性优先:
if state.get("order_id_valid"): return "fetch_details" - LLM决策:
if llm_response.tool_calls: return "execute_tool" - 人机交互:
if state.get("user_clarification_needed"): return "request_clarification"
- 确定性优先:
- 循环 (Loops):当需要用户澄清或确认时,LangGraph可以形成一个循环,等待用户输入,然后根据新输入重新评估路径。
- 错误处理 (Error Handling):在节点中捕获异常,并在状态中记录错误信息,然后通过路由将控制权转交给LLM进行错误解释,或转交给用户寻求帮助。
综合案例:智能订单助手
让我们构建一个更完整的LangGraph应用,它结合了我们之前讨论的所有元素,演示这三方如何协同工作以处理用户查询。
场景描述:
一个智能订单助手,能够:
- 理解用户关于订单状态、修改或取消的意图(概率性)。
- 验证订单ID的格式和是否存在(确定性)。
- 查询订单详情(确定性,通过工具)。
- 提供订单摘要或建议(概率性)。
- 在订单ID无效时请求用户澄清(人类输入)。
- 在执行高风险操作(如取消订单)前请求用户确认(人类输入)。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Union
import re
import os
# 确保设置了 OPENAI_API_KEY 环境变量
# os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 1. 定义扩展后的 AgentState
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], lambda x, y: x + y]
order_id: Union[str, None]
order_details: Union[dict, None]
validation_status: Union[str, None] # "valid", "invalid", "pending"
tool_calls: Union[list, None]
tool_output: Union[dict, None]
requires_clarification: bool # 是否需要用户澄清订单ID
requires_confirmation: bool # 是否需要用户确认高风险操作
action_type: Union[str, None] # LLM识别出的动作类型,例如 "cancel_order", "check_status"
user_confirmed: Union[bool, None] # 用户是否已确认
# 2. 定义工具
@tool
def get_order_details_tool(order_id: str) -> dict:
"""
Retrieves detailed information for a given order ID from a mock database.
Returns a dictionary with item, status, and address if found, otherwise indicates not found.
"""
print(f"n--- Tool Called: get_order_details_tool for ID {order_id} ---")
mock_db = {
"123456": {"item": "Laptop", "status": "Shipped", "address": "123 Main St", "user_id": "user123"},
"789012": {"item": "Mouse", "status": "Processing", "address": "456 Oak Ave", "user_id": "user123"},
"333444": {"item": "Keyboard", "status": "Delivered", "address": "789 Pine Ln", "user_id": "user456"},
}
details = mock_db.get(order_id)
if details:
return {"status": "success", "data": details}
else:
return {"status": "error", "message": "Order ID not found."}
@tool
def cancel_order_tool(order_id: str) -> dict:
"""
Simulates canceling an order. This is a high-risk operation.
"""
print(f"n--- Tool Called: cancel_order_tool for ID {order_id} ---")
# In a real system, this would interact with an actual order management system
# For this simulation, we just mark it as cancelled.
if order_id in ["123456", "789012"]: # Only cancellable if exists
return {"status": "success", "message": f"Order {order_id} has been successfully cancelled (simulated)."}
else:
return {"status": "error", "message": f"Could not cancel order {order_id}: not found or not cancellable."}
tools = [get_order_details_tool, cancel_order_tool]
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
tool_executor = ToolExecutor(tools)
# 3. 定义节点函数
# 概率性节点:LLM代理,理解意图并决定工具调用
def call_llm_agent(state: AgentState):
print("n--- NODE: call_llm_agent ---")
messages = state["messages"]
response = llm.invoke(messages)
# 尝试提取 order_id,即使LLM没有直接调用工具
order_id_match = re.search(r'b(d{6})b', messages[-1].content)
extracted_order_id = order_id_match.group(1) if order_id_match else None
# 简单意图识别 (概率性)
action_type = None
if "cancel" in messages[-1].content.lower():
action_type = "cancel_order"
elif "status" in messages[-1].content.lower() or "details" in messages[-1].content.lower():
action_type = "check_status"
print(f"LLM Raw Response: {response}")
print(f"Extracted Order ID by regex: {extracted_order_id}")
print(f"Identified Action Type: {action_type}")
return {
"messages": [response],
"tool_calls": response.tool_calls,
"order_id": extracted_order_id if extracted_order_id else state.get("order_id"), # 优先使用正则提取的
"action_type": action_type
}
# 确定性节点:执行LLM选择的工具
def execute_llm_tools(state: AgentState):
print("n--- NODE: execute_llm_tools ---")
tool_calls = state["tool_calls"]
if not tool_calls:
print("No tool calls to execute.")
return state
results = []
for tool_call in tool_calls:
print(f"Executing tool: {tool_call.name} with args {tool_call.args}")
try:
result = tool_executor.invoke(tool_call)
results.append(result)
print(f"Tool execution result: {result}")
except Exception as e:
print(f"Error executing tool {tool_call.name}: {e}")
results.append({"error": str(e)})
tool_messages = [ToolMessage(content=str(r), name=tc.name, tool_call_id=tc.id)
for r, tc in zip(results, tool_calls)]
return {"messages": tool_messages, "tool_output": results}
# 确定性节点:订单ID验证
def validate_order_id_node(state: AgentState):
print("n--- NODE: validate_order_id_node ---")
order_id = state.get("order_id")
if order_id and len(order_id) == 6 and order_id.isdigit():
print(f"Order ID '{order_id}' is valid format.")
return {"validation_status": "valid"}
else:
print(f"Order ID '{order_id if order_id else 'None'}' is invalid format.")
return {"validation_status": "invalid", "requires_clarification": True} # 需要澄清
# 确定性节点:请求用户澄清
def request_user_clarification(state: AgentState):
print("n--- NODE: request_user_clarification ---")
response_message = AIMessage(content="I couldn't find a valid 6-digit order ID. Could you please provide it again?")
return {"messages": [response_message], "requires_clarification": True}
# 确定性节点:请求用户确认(高风险操作)
def request_user_confirmation(state: AgentState):
print("n--- NODE: request_user_confirmation ---")
action = state.get("action_type", "an important action")
order_id = state.get("order_id", "the specified order")
response_message = AIMessage(content=f"You've requested to {action} for order {order_id}. This is a high-risk operation. Do you confirm? Please reply 'yes' or 'no'.")
return {"messages": [response_message], "requires_confirmation": True}
# 确定性节点:处理用户后续输入
def process_human_input(state: AgentState):
print("n--- NODE: process_human_input ---")
last_message_content = state["messages"][-1].content.lower()
# 清除之前的请求状态
current_state = {"requires_clarification": False, "requires_confirmation": False, "user_confirmed": None}
if state.get("requires_clarification"):
match = re.search(r'b(d{6})b', last_message_content)
new_order_id = match.group(1) if match else None
if new_order_id:
print(f"User provided new order ID: {new_order_id}")
current_state["order_id"] = new_order_id
current_state["validation_status"] = "pending" # 重新验证
current_state["messages"] = [AIMessage(content=f"Got it, checking order ID {new_order_id}.")]
else:
print("User did not provide a valid order ID in clarification.")
current_state["messages"] = [AIMessage(content="I still can't find a 6-digit order ID. Can you try again?")]
current_state["requires_clarification"] = True # 保持澄清状态
return current_state
if state.get("requires_confirmation"):
if "yes" in last_message_content:
print("User confirmed the action.")
current_state["user_confirmed"] = True
current_state["messages"] = [AIMessage(content="Confirmation received. Proceeding with the action.")]
elif "no" in last_message_content:
print("User denied the action.")
current_state["user_confirmed"] = False
current_state["messages"] = [AIMessage(content="Action denied. What else can I help you with?")]
else:
print("User did not provide a clear 'yes' or 'no'.")
current_state["messages"] = [AIMessage(content="Please reply 'yes' or 'no' to confirm or deny.")]
current_state["requires_confirmation"] = True # 保持确认状态
return current_state
# 如果不是澄清或确认,则只更新消息历史(通常不会走到这里,因为路由会处理)
return state # 理论上这里不会被触发
# 确定性节点:执行确认后的高风险操作
def execute_confirmed_action(state: AgentState):
print("n--- NODE: execute_confirmed_action ---")
if state.get("user_confirmed") and state.get("action_type") == "cancel_order":
order_id = state.get("order_id")
# 实际调用 cancel_order_tool
result = cancel_order_tool.invoke({"order_id": order_id})
if result.get("status") == "success":
return {"messages": [AIMessage(content=result["message"])]}
else:
return {"messages": [AIMessage(content=f"Failed to cancel order {order_id}: {result['message']}")]}
else:
return {"messages": [AIMessage(content="Action not confirmed or not a recognized high-risk action.")]}
# 确定性节点:生成最终回复
def generate_final_response(state: AgentState):
print("n--- NODE: generate_final_response ---")
# 如果LLM已经直接回复了,或者工具执行后LLM已经总结了,则直接返回
if isinstance(state["messages"][-1], AIMessage) and not state.get("tool_calls"):
return state
# 否则,让LLM总结工具结果或提供通用回复
summary_llm = ChatOpenAI(model="gpt-4o", temperature=0)
prompt_template = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Summarize the conversation and any tool outputs to provide a concise and polite final response to the user."),
("human", "{messages}")
])
chain = prompt_template | summary_llm | StrOutputParser()
summary = chain.invoke({"messages": state["messages"]})
return {"messages": [AIMessage(content=summary)]}
# 4. 构建LangGraph图
workflow = StateGraph(AgentState)
workflow.add_node("call_llm_agent", call_llm_agent)
workflow.add_node("execute_llm_tools", execute_llm_tools)
workflow.add_node("validate_order_id_node", validate_order_id_node)
workflow.add_node("request_user_clarification", request_user_clarification)
workflow.add_node("request_user_confirmation", request_user_confirmation)
workflow.add_node("process_human_input", process_human_input)
workflow.add_node("execute_confirmed_action", execute_confirmed_action)
workflow.add_node("generate_final_response", generate_final_response)
# 5. 定义路由函数
def route_agent_output(state: AgentState):
# LLM决定调用工具
if state.get("tool_calls"):
return "execute_llm_tools"
# LLM没有工具调用,但我们可能需要验证订单ID
if state.get("order_id") and state.get("validation_status") != "valid":
return "validate_order_id_node"
# LLM识别出高风险动作,需要确认
if state.get("action_type") == "cancel_order" and not state.get("user_confirmed"):
return "request_user_confirmation"
# 其他情况,LLM直接回复,或等待后续处理
return "generate_final_response"
def route_after_tool_execution(state: AgentState):
# 工具执行后,如果LLM有新的工具调用,回到LLM让它继续处理
if state.get("tool_calls"): # LLM可能会根据工具结果进行二次工具调用
return "call_llm_agent"
# 如果工具执行后,LLM已经生成了最终回复,或者不需要进一步的工具调用
# 检查是否需要确认或澄清
if state.get("requires_confirmation"):
return "request_user_confirmation"
if state.get("requires_clarification"):
return "request_user_clarification"
# 如果有订单详情,让LLM总结
if state.get("order_details"):
return "generate_final_response"
return "generate_final_response" # 否则直接生成最终回复
def route_after_validation(state: AgentState):
if state.get("validation_status") == "valid":
# 如果订单ID有效,且需要执行取消操作,则请求确认
if state.get("action_type") == "cancel_order" and not state.get("user_confirmed"):
return "request_user_confirmation"
# 否则,回到LLM让它根据有效ID继续处理 (可能会调用 get_order_details_tool)
return "call_llm_agent"
else: # ID无效
return "request_user_clarification"
def route_after_human_input(state: AgentState):
if state.get("requires_clarification"):
return "request_user_clarification" # 用户输入后仍需澄清,回到澄清节点
if state.get("requires_confirmation"):
return "request_user_confirmation" # 用户输入后仍需确认,回到确认节点
# 如果澄清流程完成,且有新提供的order_id,则重新验证
if state.get("order_id") and state.get("validation_status") == "pending":
return "validate_order_id_node"
# 如果确认流程完成,且用户确认了操作
if state.get("user_confirmed") is True:
return "execute_confirmed_action"
# 如果用户否定了操作
if state.get("user_confirmed") is False:
return "generate_final_response" # 结束对话
return "call_llm_agent" # 其他情况,回到LLM让它继续处理
# 添加边和条件路由
workflow.set_entry_point("call_llm_agent") # 总是从LLM开始处理用户输入
workflow.add_conditional_edges("call_llm_agent", route_agent_output)
workflow.add_conditional_edges("execute_llm_tools", route_after_tool_execution)
workflow.add_conditional_edges("validate_order_id_node", route_after_validation)
workflow.add_conditional_edges("request_user_clarification", route_after_human_input)
workflow.add_conditional_edges("request_user_confirmation", route_after_human_input)
workflow.add_conditional_edges("process_human_input", route_after_human_input) # 处理用户输入后的通用路由
workflow.add_edge("execute_confirmed_action", "generate_final_response")
workflow.add_edge("generate_final_response", END) # 最终回复后结束
# 编译图
app_full = workflow.compile()
# 模拟交互式运行
def run_interactive_assistant(app_instance):
print("Welcome to the Interactive Order Assistant! Type 'exit' to quit.")
inputs = {"messages": []}
while True:
user_input = input("nYou: ")
if user_input.lower() == 'exit':
break
inputs["messages"].append(HumanMessage(content=user_input))
# 清除上次的确认/澄清状态,因为用户输入代表新的一轮
inputs["requires_clarification"] = False
inputs["requires_confirmation"] = False
inputs["user_confirmed"] = None
inputs["validation_status"] = None # 重新验证
for s in app_instance.stream(inputs):
# 打印当前节点的状态变化
# print(s)
# 找到AI的最新回复
if "__end__" in s:
final_state = s["__end__"]
last_ai_message = None
for msg in reversed(final_state["messages"]):
if isinstance(msg, AIMessage):
last_ai_message = msg.content
break
if last_ai_message:
print(f"Assistant: {last_ai_message}")
inputs["messages"] = final_state["messages"] # 更新历史
# 检查是否需要用户进一步输入
if final_state.get("requires_clarification") or final_state.get("requires_confirmation"):
break # 等待用户新输入
elif final_state.get("user_confirmed") is False:
# 用户拒绝了高风险操作,结束当前流程
inputs["messages"] = [] # 清空消息历史,开始新对话
break
# 如果流程结束,但没有特殊指示,也清空历史
if "__end__" in s and not final_state.get("requires_clarification") and not final_state.get("requires_confirmation"):
inputs["messages"] = []
break
elif "process_human_input" in s:
# 如果进入了process_human_input节点,说明之前有请求,等待用户新输入
# 这里我们假设process_human_input会直接更新状态并路由
pass
# 如果是请求澄清或确认的节点,直接从该节点获取消息
if "request_user_clarification" in s and s["request_user_clarification"].get("requires_clarification"):
inputs["messages"] = s["request_user_clarification"]["messages"]
break
if "request_user_confirmation" in s and s["request_user_confirmation"].get("requires_confirmation"):
inputs["messages"] = s["request_user_confirmation"]["messages"]
break
# 运行完整的交互式助手
# run_interactive_assistant(app_full)
运行测试用例(需要实际运行并输入):
- 用户:
What is the status of my order 123?- 预期: LLM尝试提取ID,ID验证失败,
request_user_clarification节点被触发。 - 助手:
I couldn't find a valid 6-digit order ID. Could you please provide it again?
- 预期: LLM尝试提取ID,ID验证失败,
- 用户:
Oh, sorry, it's 123456.- 预期:
process_human_input节点处理新ID,validate_order_id_node再次验证,然后call_llm_agent再次调用LLM(这次可能会调用get_order_details_tool)。 - 助手:
Confirmation received. Proceeding with the action. Order 123456 details: Item: Laptop, Status: Shipped, Address: 123 Main St.
- 预期:
- 用户:
I want to cancel order 789012.- 预期: LLM识别出“cancel_order”意图和订单ID,由于是高风险操作,
request_user_confirmation节点被触发。 - 助手:
You've requested to cancel_order for order 789012. This is a high-risk operation. Do you confirm? Please reply 'yes' or 'no'.
- 预期: LLM识别出“cancel_order”意图和订单ID,由于是高风险操作,
- 用户:
No.- 预期:
process_human_input节点处理否定,流程结束。 - 助手:
Action denied. What else can I help you with?
- 预期:
- 用户:
Yes, please cancel it.- 预期:
process_human_input节点处理肯定,execute_confirmed_action节点调用cancel_order_tool。 - 助手:
Order 789012 has been successfully cancelled (simulated).
- 预期:
设计原则与实践考量
在构建这种三方博弈系统时,有一些关键的设计原则和实践考量:
- 明确的责任边界:确定性算法负责什么,概率性模型负责什么,人类在哪里介入。避免职责重叠导致混乱。
- 状态的原子性与一致性:确保每个节点对状态的修改是原子性的,并且状态在不同节点间保持一致,防止竞态条件或数据不一致。
- 优雅的错误处理:预见到所有可能出错的地方(LLM幻觉、API失败、用户输入无效),并设计相应的回退机制。通常,人类是最终的错误恢复机制。
- 用户体验优先:在需要用户输入时,提供清晰、简洁、无歧义的提示。让用户知道系统正在做什么,以及为什么需要他们的输入。
- 性能与成本优化:LLM调用是昂贵的且有延迟。尽可能在LLM之前使用确定性规则进行过滤和预处理。只在真正需要理解复杂语义或生成创意内容时才调用LLM。
- 可观测性:记录每个节点的输入、输出和决策过程,以便调试、监控和改进系统。LangGraph的流式输出非常有助于此。
- 持续迭代:这类系统是动态的,需要根据用户反馈和实际运行数据不断调整确定性规则、LLM提示词,甚至图的结构。
结语
在LangGraph的舞台上,确定性算法提供了稳定性与可靠性,概率性模型赋予了系统理解与生成智能的能力,而人类不确定性输入则扮演了引导、纠正和最终裁决的关键角色。通过精心设计状态、节点和条件路由,我们能够实现这三方力量的动态平衡,构建出既强大又灵活、既智能又可靠的下一代AI应用。这是一个持续进化的过程,要求我们不断学习、迭代,并始终将用户置于设计的核心。