探讨 ‘The Philosophy of Control’:在全自动化与完全受控之间,如何通过 LangGraph 寻找中间地带?

驾驭混沌与秩序:LangGraph 在全自动化与人类监督之间寻找控制的哲学

各位同仁,下午好!

今天,我们齐聚一堂,探讨一个在现代技术领域日益凸显的哲学与工程学交叉难题:在追求极致自动化效率的同时,如何保持对复杂系统的有效控制?我们面临的挑战并非简单地选择“完全自动化”或“完全受控”,而是在这两极之间,如何开辟一条既能利用人工智能的强大能力,又能确保人类智能始终处于关键决策与监督地位的中间地带。我们将深入剖析这一“控制的哲学”,并重点探讨 LangGraph 这一强大工具如何为我们构建这种智能、可控的自动化系统提供坚实的基础。

1. 控制的哲学困境:效率与审慎的平衡 (The Philosophical Dilemma of Control: Balancing Efficiency and Prudence)

当我们谈论自动化时,脑海中浮现的往往是机器高速、无疲惫地执行任务的场景。这无疑带来了效率的飞跃、成本的降低和规模的扩展。然而,随着自动化程度的提升,尤其是当人工智能(AI)开始承担决策角色时,我们面临的不仅仅是技术实现的问题,更是深刻的哲学拷问:

  • 完全自动化 (Full Automation):在此模式下,AI系统被赋予了极高的自主权,它能够独立地进行感知、分析、决策并执行操作,人类可能只负责设定高层目标或在系统出现极端异常时介入。
    • 优点:极致的效率、24/7无间断运行、处理复杂数据和模式的能力远超人类。
    • 缺点:缺乏透明度(“黑箱”问题)、难以追溯责任、在未知或异常情境下可能做出非预期或灾难性决策、对人类的失能风险。
  • 完全受控 (Complete Human Control):这代表了传统模式,即所有关键决策和大部分执行操作都由人类完成,AI系统更多地扮演辅助工具的角色,提供信息、建议或执行简单的、预设好的子任务。
    • 优点:高透明度、责任明确、灵活性强、在复杂伦理或非结构化问题上表现优异。
    • 缺点:效率低下、易受人为错误影响、难以扩展、不适用于高并发或大数据场景。

这两个极端都存在明显的局限性。完全自动化可能导致我们失去对系统的洞察和干预能力,一旦出现偏差,后果不堪设想。而完全受控则让我们无法充分利用AI的潜力,在日益复杂和动态的环境中,人类自身的速度和认知带宽已成为瓶颈。

因此,我们的目标是寻找一个“中间地带”,一个能够将AI的计算能力、模式识别能力与人类的常识、伦理判断和高级推理能力相结合的范式。这个中间地带需要我们能够设计出:

  1. 可解释性 (Interpretability):我们能理解AI为什么做出某个决策。
  2. 可审计性 (Auditability):我们能追踪AI的每一步操作和状态变化。
  3. 可干预性 (Intervenability):我们能在必要时暂停、修正或重定向AI的流程。
  4. 适应性 (Adaptability):系统能根据环境变化或人类反馈调整其自动化水平。

而 LangGraph,正是为我们构建这种智能、灵活且可控的系统提供了强大的抽象和工具集。

2. 自动化光谱:从完全受控到完全自主 (The Automation Spectrum: From Fully Controlled to Fully Autonomous)

为了更好地理解中间地带,我们可以将自动化视为一个连续的光谱,而非简单的二元选择。以下是一个简化的自动化级别模型:

自动化级别 人类角色 AI/系统角色 风险暴露 效率增益 典型应用场景
0级:手动 执行所有操作和决策 无或仅提供原始数据 手工数据录入、传统生产线
1级:工具辅助 做出所有决策,执行大部分操作 提供信息、建议或预警 拼写检查、推荐系统、辅助驾驶
2级:功能自动化 监控、审批,在关键点介入 自动执行特定功能或子任务 中低 中高 自动报告生成、数据清洗脚本、部分HR流程
3级:任务自动化 监督、异常处理,设置目标 自动完成一系列相关任务,形成工作流 客户服务聊天机器人、DevOps自动化部署
4级:情境自动化 设置策略、高层目标,处理复杂异常 根据情境动态决策和执行,处理大部分异常 中高 智能工厂调度、金融交易算法、高级网络安全
5级:完全自主 仅设定宏观目标或愿景 自主感知、决策、执行,自我修复 极高 无人驾驶出租车、太空探测器、完全自动驾驶

我们所寻求的“中间地带”,通常落在 2级到4级 之间。在这个区间内,AI系统能够承担相当大的工作量,显著提升效率,但人类仍然是系统的最终仲裁者和监督者,负责处理AI无法解决的复杂问题、伦理困境或系统边界外的异常情况。这种模式,我们称之为“人机协作”或“增强智能”。

3. LangGraph:构建智能代理工作流的基石 (LangGraph: The Foundation for Building Intelligent Agent Workflows)

LangGraph 是 LangChain 生态系统中的一个核心组件,它允许我们通过构建有向图(Directed Graph)来编排复杂的语言模型(LLM)驱动的代理(Agent)工作流。它的设计理念非常适合解决我们前面提到的控制哲学问题。

3.1. 为什么选择 LangGraph 来实现控制?

  • 显式状态管理 (Explicit State Management):LangGraph 的核心是一个共享的、可变的 State 对象。这意味着整个工作流中的所有节点都可以访问和修改这个统一的状态,从而实现信息共享和上下文传递。这对于构建可审计、可回溯的系统至关重要。
  • 节点化执行 (Node-based Execution):每个节点代表工作流中的一个独立步骤或一个代理的决策。这种模块化设计使得我们可以清晰地定义每个步骤的功能、输入和输出,便于理解、测试和维护。
  • 条件边缘 (Conditional Edges):这是 LangGraph 实现动态控制的关键。我们可以根据当前 State 中的条件或某个节点的输出,动态地决定下一个执行的节点。这使得工作流不再是线性的,而是能够根据实际情况进行分支、循环,甚至引入人类决策。
  • 人类在环模式 (Human-in-the-Loop Patterns):LangGraph 可以轻松集成自定义工具,包括那些需要人类输入或批准的工具。结合条件边缘,我们可以设计出在特定条件下暂停自动化流程,等待人类介入的模式。
  • 代理编排能力 (Agentic Capabilities):LangGraph 是为多代理系统而生。我们可以让不同的LLM代理扮演不同的角色,例如一个“规划代理”、一个“执行代理”和一个“审查代理”,并通过图结构协调它们的交互,实现复杂的协作行为。

3.2. LangGraph 的基本概念

让我们快速回顾一下 LangGraph 的几个核心概念:

  • State: 定义工作流的共享状态模式。通常是一个 TypedDictdict,包含所有节点可能需要访问或修改的数据。
  • Node: 图中的一个基本处理单元。它可以是一个 Python 函数,一个 LangChain Runnable,或一个完整的代理。
  • Edge: 连接两个节点的有向边。可以是普通边 (normal edge),表示无条件地从一个节点流向下一个;也可以是条件边 (conditional edge),表示根据某个函数的输出(通常是当前状态的判断)来选择下一个节点。
  • Graph: 整个工作流的蓝图,由节点和边构成。
  • add_node(node_name, node_function): 添加一个命名节点。
  • add_edge(source_node, target_node): 添加一个普通边。
  • add_conditional_edges(source_node, condition_function, branch_map): 添加条件边。condition_function 接收当前状态并返回一个字符串,branch_map 将字符串映射到目标节点。
  • set_entry_point(node_name): 设置图的起始节点。
  • set_finish_point(node_name): 设置图的结束节点。

现在,让我们深入探讨如何利用这些机制来设计我们的“中间地带”。

4. LangGraph 中的控制机制:设计中间地带 (Control Mechanisms in LangGraph: Designing the Middle Ground)

我们将通过一系列代码示例来展示 LangGraph 如何帮助我们实现不同程度的控制。

为了简化演示,我们假设已经配置了 OpenAI 或其他 LLM 客户端。

import os
from typing import TypedDict, Annotated, List
import operator
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END, START
from langgraph.prebuilt import ToolExecutor, ToolNode

# 假设已经配置了环境变量,例如 OPENAI_API_KEY
# from langchain_openai import ChatOpenAI
# llm = ChatOpenAI(model="gpt-4o", temperature=0)

# 实际项目中,LLM的初始化会更复杂,这里仅为示例
class MockLLM:
    def __init__(self, responses=None):
        self.responses = responses if responses is not None else []
        self._call_count = 0

    def invoke(self, messages, **kwargs):
        # 简单模拟LLM的回复
        if self._call_count < len(self.responses):
            response_content = self.responses[self._call_count]
            self._call_count += 1
            return AIMessage(content=response_content)
        return AIMessage(content=f"Default AI response for: {messages[-1].content}")

    def bind_tools(self, tools):
        # 模拟LLM绑定工具,这里不实现实际工具调用逻辑
        return self

# 共享状态定义
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add] # 聊天历史
    proposal: str # 提案内容
    approved: bool # 审批状态
    review_feedback: str # 审查反馈
    status: str # 流程状态
    confidence: float # AI决策的置信度
    error: str # 错误信息
    plan: str # 计划内容
    code: str # 代码内容
    test_results: str # 测试结果
    deployment_status: str # 部署状态

# 模拟一个LLM,实际项目中会用真正的LLM
# llm = ChatOpenAI(model="gpt-4o", temperature=0)
llm = MockLLM()

4.1. 显式决策节点与条件路由 (Explicit Decision Nodes and Conditional Routing)

这是 LangGraph 实现控制的基础。我们可以让 LLM 代理作为决策者,根据其分析结果或生成的内容,通过条件边缘引导工作流走向不同的分支。

示例1:简单提案审批工作流

假设我们有一个自动化系统,需要生成一个提案,并根据提案的内容决定是否需要人工审批。

# 工具定义 (这里只是模拟,实际会调用外部系统)
@tool
def send_proposal_for_approval(proposal: str) -> str:
    """Sends the generated proposal to a human for approval."""
    print(f"n--- 发送提案等待人工审批 ---n提案内容:n{proposal}n")
    # 实际场景会触发邮件、通知或API调用
    return "Proposal sent for human review."

@tool
def finalize_proposal(proposal: str) -> str:
    """Finalizes the proposal and takes further action."""
    print(f"n--- 提案已获批,开始执行后续操作 ---n提案内容:n{proposal}n")
    # 实际场景会执行部署、发布等
    return "Proposal finalized and executed."

# LLM模拟响应,用于示例
llm_for_proposal = MockLLM(
    responses=[
        "基于当前市场数据,我建议将营销预算增加20%,重点投放社交媒体广告。此提案风险较低,预计回报率高。",
        "这是一个高风险的投资提案,建议投入100万美元购买XYZ公司的股票,存在较大波动性。"
    ]
)

# 节点函数
def generate_proposal(state: AgentState) -> AgentState:
    """Agent generates a proposal."""
    # 实际LLM调用会更复杂,这里简化
    messages = [HumanMessage(content="请根据当前情况生成一个业务提案。")]
    response = llm_for_proposal.invoke(messages)
    print(f"AI 生成提案: {response.content}")
    return {"proposal": response.content, "status": "提案生成"}

def decide_next_step_after_proposal(state: AgentState) -> str:
    """Decides whether to seek human approval or proceed based on proposal content."""
    # 这是一个关键的控制点。LLM或预设规则可以判断。
    proposal = state["proposal"]
    # 假设我们根据关键词判断风险,或者LLM可以输出一个风险评估
    if "高风险" in proposal or "100万美元" in proposal:
        print("提案被识别为高风险,需要人工审批。")
        return "needs_approval"
    else:
        print("提案被识别为低风险,可以自动执行。")
        return "auto_proceed"

def request_human_approval_node(state: AgentState) -> AgentState:
    """Requests human approval."""
    # 模拟人工审批过程,实际会通过UI或API交互
    print(send_proposal_for_approval.invoke({"proposal": state["proposal"]}))
    # 假设人工审批结果通过外部系统回写到状态
    return {"status": "等待人工审批", "approved": False} # 初始设置为未审批

def get_human_approval_result(state: AgentState) -> str:
    """Simulates getting human approval result."""
    # 在实际系统中,这会是一个外部触发器,或者是一个查询数据库的工具。
    # 为了演示,我们直接模拟审批结果
    if "高风险" in state["proposal"]:
        human_decision = input(f"提案: '{state['proposal']}' 风险较高,是否批准?(y/n): ").lower()
        if human_decision == 'y':
            print("人工批准通过。")
            return "approved"
        else:
            print("人工拒绝。")
            return "rejected"
    else:
        # 低风险提案通常无需此节点,但为了演示完整性,可以假设快速自动批准
        print("低风险提案自动批准。")
        return "approved"

def execute_approved_proposal(state: AgentState) -> AgentState:
    """Executes the proposal if approved."""
    print(finalize_proposal.invoke({"proposal": state["proposal"]}))
    return {"status": "提案已执行", "approved": True}

def handle_rejected_proposal(state: AgentState) -> AgentState:
    """Handles a rejected proposal."""
    print(f"n--- 提案 '{state['proposal']}' 已被拒绝,需要重新评估或终止流程 ---n")
    return {"status": "提案被拒绝"}

# 构建图
workflow = StateGraph(AgentState)

workflow.add_node("generate_proposal", generate_proposal)
workflow.add_node("request_human_approval", request_human_approval_node)
workflow.add_node("get_human_approval_result", get_human_approval_result)
workflow.add_node("execute_approved_proposal", execute_approved_proposal)
workflow.add_node("handle_rejected_proposal", handle_rejected_proposal)

workflow.set_entry_point("generate_proposal")

# 定义条件路由
workflow.add_conditional_edges(
    "generate_proposal",
    decide_next_step_after_proposal,
    {
        "needs_approval": "request_human_approval",
        "auto_proceed": "execute_approved_proposal",
    },
)

workflow.add_edge("request_human_approval", "get_human_approval_result")

workflow.add_conditional_edges(
    "get_human_approval_result",
    lambda state: "approved" if state["approved"] else "rejected", # 这里简化,实际应根据get_human_approval_result的输出
    {
        "approved": "execute_approved_proposal",
        "rejected": "handle_rejected_proposal",
    },
)

workflow.add_edge("execute_approved_proposal", END)
workflow.add_edge("handle_rejected_proposal", END)

app = workflow.compile()

# 运行示例 1: 低风险提案 (自动执行)
print("n--- 运行示例 1: 低风险提案 (预期自动执行) ---")
result1 = app.invoke({"messages": [], "proposal": "", "approved": False, "status": ""}, config={"configurable": {"thread_id": "thread-low-risk"}})
print(f"最终状态 (低风险): {result1['status']}")
print("n" + "="*80 + "n")

# 运行示例 2: 高风险提案 (预期需要人工审批)
print("n--- 运行示例 2: 高风险提案 (预期需要人工审批) ---")
# 重新初始化LLM,确保它能给出高风险的响应
llm_for_proposal = MockLLM(
    responses=[
        "这是一个高风险的投资提案,建议投入100万美元购买XYZ公司的股票,存在较大波动性。"
    ]
)
result2 = app.invoke({"messages": [], "proposal": "", "approved": False, "status": ""}, config={"configurable": {"thread_id": "thread-high-risk"}})
print(f"最终状态 (高风险): {result2['status']}")
print("n" + "="*80 + "n")

代码解析:

  • generate_proposal 节点负责生成提案。
  • decide_next_step_after_proposal 函数是关键的决策点。它根据 proposal 的内容(这里是简单的关键词匹配,实际可以是更复杂的LLM判断)返回 "needs_approval""auto_proceed"
  • add_conditional_edges 根据 decide_next_step_after_proposal 的返回结果,将流程导向 request_human_approvalexecute_approved_proposal
  • request_human_approval_node 模拟发送审批请求。
  • get_human_approval_result 模拟获取人工审批结果。此节点会根据用户输入来决定 approved 状态,从而通过另一个条件边导向 execute_approved_proposalhandle_rejected_proposal

这个示例清晰地展示了如何通过LLM的决策能力和LangGraph的条件路由,在自动化流程中嵌入动态的人工审批点。

4.2. 人工干预点:工具与暂停 (Human Intervention Points: Tools and Pauses)

LangGraph 不直接提供“暂停”图的功能,但我们可以通过设计来模拟这种行为:

  1. 显式的人工输入工具:创建一个工具,它会等待人类的输入。
  2. 外部状态更新:图中的一个节点可以检查外部系统(如数据库、消息队列)中是否有新的“人工反馈”或“审批结果”,如果没有则流程停止,直到外部更新发生。
  3. 循环等待模式:一个节点可以不断检查某个条件,直到满足条件(例如,一个标记 human_input_received 变为 True),然后才继续。

示例2:动态人类审查与代理修订

一个代理生成一个计划,然后请求人类审查。人类提供反馈后,代理根据反馈修订计划。

# 模拟一个外部系统来存储人类反馈
human_feedback_store = {}

@tool
def get_human_feedback(thread_id: str) -> str:
    """Retrieves human feedback for a given thread."""
    feedback = human_feedback_store.get(thread_id, "No feedback yet.")
    print(f"检索到人工反馈: {feedback}")
    return feedback

@tool
def provide_human_feedback(thread_id: str, feedback: str) -> str:
    """Allows a human to provide feedback for a given thread."""
    human_feedback_store[thread_id] = feedback
    print(f"人类为线程 {thread_id} 提供了反馈: {feedback}")
    return "Feedback recorded."

# LLM 模拟
llm_for_planning = MockLLM(
    responses=[
        "初始计划:我们首先需要收集用户需求,然后设计数据库结构,最后开发前端界面。",
        "根据反馈,修订计划:第一步,与用户召开会议收集详细需求;第二步,基于需求设计可扩展数据库架构并进行原型开发;第三步,迭代开发前端界面并进行用户测试。"
    ]
)

# 节点函数
def generate_initial_plan(state: AgentState) -> AgentState:
    """Agent generates an initial plan."""
    messages = [HumanMessage(content="请为一个新功能开发项目生成一个初步计划。")]
    response = llm_for_planning.invoke(messages)
    print(f"AI 生成初始计划: {response.content}")
    return {"plan": response.content, "status": "初始计划生成"}

def request_human_review(state: AgentState) -> AgentState:
    """Requests human review for the generated plan."""
    print(f"n--- 计划已生成,等待人工审查与反馈 ---n计划内容:n{state['plan']}n")
    # 在这里,我们可以调用一个工具来通知人类,并等待反馈
    # 实际场景会触发一个外部事件或UI界面
    return {"status": "等待人工反馈", "review_feedback": ""} # 清空旧反馈

def check_for_human_feedback(state: AgentState, config: dict) -> str:
    """Checks if human feedback has been provided."""
    thread_id = config["configurable"]["thread_id"]
    feedback = get_human_feedback.invoke({"thread_id": thread_id})
    if feedback != "No feedback yet.":
        print(f"收到人工反馈。")
        return "feedback_received"
    else:
        print(f"暂无人工反馈,继续等待。")
        return "waiting_for_feedback" # 这是一个循环点,实际上需要外部机制触发重试或暂停

def process_review_feedback(state: AgentState) -> AgentState:
    """Agent processes human feedback and revises the plan."""
    feedback = state["review_feedback"] # 假设反馈已通过外部机制更新到state
    if not feedback:
        feedback = get_human_feedback.invoke({"thread_id": "current_thread"}) # 再次尝试获取

    if feedback and feedback != "No feedback yet.":
        print(f"AI 正在处理人工反馈: {feedback}")
        messages = [
            HumanMessage(content=f"原始计划: {state['plan']}"),
            HumanMessage(content=f"人工反馈: {feedback}"),
            HumanMessage(content="请根据反馈修订计划。")
        ]
        response = llm_for_planning.invoke(messages)
        print(f"AI 修订计划: {response.content}")
        return {"plan": response.content, "status": "计划已修订", "review_feedback": ""}
    else:
        print("无有效反馈,跳过修订。")
        return {"status": "计划未修订", "review_feedback": ""}

def finalize_plan(state: AgentState) -> AgentState:
    """Finalizes the plan and proceeds."""
    print(f"n--- 计划已最终确定,准备执行 ---n最终计划:n{state['plan']}n")
    return {"status": "计划已最终确定"}

# 构建图
workflow_review = StateGraph(AgentState)

workflow_review.add_node("generate_initial_plan", generate_initial_plan)
workflow_review.add_node("request_human_review", request_human_review)
# workflow_review.add_node("check_for_human_feedback", check_for_human_feedback) # 这通常是一个外部触发或循环点
workflow_review.add_node("process_review_feedback", process_review_feedback)
workflow_review.add_node("finalize_plan", finalize_plan)

workflow_review.set_entry_point("generate_initial_plan")
workflow_review.add_edge("generate_initial_plan", "request_human_review")

# 这里我们不直接用 LangGraph 循环等待,而是模拟外部更新后再次调用
# 实际场景中,request_human_review 节点会返回一个状态,表示“等待中”,
# 直到外部事件(如用户在Web界面提交反馈)发生,再触发图的下一次调用。

# 假设人类反馈是外部更新后,我们再调用 process_review_feedback
# 简化起见,这里直接连接,但请注意实际流程的差异

workflow_review.add_edge("request_human_review", "process_review_feedback") # 假设反馈会很快到来,或者外部服务会触发下一次invoke
workflow_review.add_edge("process_review_feedback", "finalize_plan")
workflow_review.add_edge("finalize_plan", END)

app_review = workflow_review.compile()

print("n--- 运行示例 3: 动态人类审查 ---")
thread_id_example = "plan-review-123"
initial_state = {"messages": [], "proposal": "", "approved": False, "review_feedback": "", "status": "", "plan": ""}
# 第一次调用,生成计划并请求审查
result3_step1 = app_review.invoke(initial_state, config={"configurable": {"thread_id": thread_id_example}})
print(f"步骤1状态: {result3_step1['status']}")

# 模拟人类提供反馈
provide_human_feedback.invoke({"thread_id": thread_id_example, "feedback": "计划很好,但能否更详细地说明需求收集和数据库设计步骤?"})
result3_step1['review_feedback'] = human_feedback_store.get(thread_id_example) # 更新状态,模拟外部回写

# 第二次调用,处理反馈并修订计划 (这里是连续调用,实际可能隔一段时间)
result3_step2 = app_review.invoke(result3_step1, config={"configurable": {"thread_id": thread_id_example}}) # 重新从当前状态启动
print(f"最终计划: {result3_step2['plan']}")
print(f"最终状态: {result3_step2['status']}")
print("n" + "="*80 + "n")

代码解析:

  • get_human_feedbackprovide_human_feedback 工具模拟了与人类交互的外部系统。
  • request_human_review 节点通知人类需要审查。
  • 关键在于 process_review_feedback 节点。在实际应用中,当 request_human_review 节点执行完毕后,图会暂停。只有当外部系统接收到人类反馈并更新了 LangGraph 的 State(例如,通过一个 Webhook 或 API 调用)之后,我们才会再次调用 app_review.invoke(),从上次暂停的状态继续执行,并进入 process_review_feedback 节点。
  • 这里为了演示方便,我模拟了在一个 invoke 后立即提供反馈,并更新了状态,然后再次 invoke。但核心思想是:LangGraph 允许你设计出在特定节点等待外部输入,然后在收到输入后继续执行的流程。

4.3. 监督与审计:透明度和可解释性 (Supervision and Auditing: Transparency and Explainability)

LangGraph 的状态管理和节点化设计天然支持审计和监督。

  • 状态历史记录:每次 invoke 调用都会返回最终状态,但 LangGraph 内部会保留每次节点执行后的状态快照,这对于调试和审计非常有用。
  • 节点日志:我们可以在每个节点函数中添加日志输出,记录其输入、输出、决策逻辑和状态变化。

示例3:日志记录与决策路径追踪

修改之前的节点,加入详细的日志输出。

# 增强日志输出的节点函数
def generate_proposal_with_log(state: AgentState) -> AgentState:
    """Agent generates a proposal with logging."""
    print(f"[LOG] 进入 generate_proposal_with_log 节点。当前状态: {state['status']}")
    messages = [HumanMessage(content="请生成一个业务提案。")]
    response = llm_for_proposal.invoke(messages) # 假设llm_for_proposal已重置
    print(f"[LOG] AI 生成提案: {response.content}")
    new_state = {"proposal": response.content, "status": "提案生成"}
    print(f"[LOG] generate_proposal_with_log 节点完成。更新状态: {new_state}")
    return new_state

def decide_next_step_after_proposal_with_log(state: AgentState) -> str:
    """Decides whether to seek human approval or proceed based on proposal content with logging."""
    print(f"[LOG] 进入 decide_next_step_after_proposal_with_log 节点。当前提案: {state['proposal']}")
    proposal = state["proposal"]
    if "高风险" in proposal or "100万美元" in proposal:
        decision = "needs_approval"
        print(f"[LOG] 决策: 提案识别为高风险,需要人工审批。")
    else:
        decision = "auto_proceed"
        print(f"[LOG] 决策: 提案识别为低风险,可以自动执行。")
    return decision

# 重建带有日志的图 (只做示意,不重复所有代码)
workflow_audit = StateGraph(AgentState)
workflow_audit.add_node("generate_proposal", generate_proposal_with_log)
workflow_audit.add_node("decide_next_step", decide_next_step_after_proposal_with_log)
# ... 添加其他节点和边
# workflow_audit.set_entry_point("generate_proposal")
# workflow_audit.add_conditional_edges("generate_proposal", decide_next_step_after_proposal_with_log, {"needs_approval": "request_human_approval", "auto_proceed": "execute_approved_proposal"})
# ...

# 运行时的输出将包含详细的 [LOG] 信息,便于追踪。
# 此外,LangGraph 的 `invoke` 方法返回的最终状态中包含了整个流程的 `messages` 历史,
# 可以用来回溯所有 AI 的思考和操作步骤。

通过这种方式,我们可以建立一个完整的审计链,记录AI的每一次决策、每一次状态转换以及人类的每一次干预。这对于遵守法规、进行事后分析和提高系统可信度至关重要。

4.4. 权限与角色:限制代理能力 (Permissions and Roles: Limiting Agent Capabilities)

在多代理系统中,不同的代理通常拥有不同的权限和职责。LangGraph 允许我们通过以下方式实现角色和权限控制:

  • 工具集限制:为不同的代理节点绑定不同的工具集。例如,“代码生成代理”可以访问 write_code 工具,而“代码审查代理”只能访问 review_code 工具。
  • LLM 行为约束:通过为不同角色的代理使用不同的系统提示(system prompt)来约束其行为和思考模式。

示例4:角色分工的代码审查流程

假设我们有一个代码提交流程,由一个“代码作者代理”生成代码,一个“代码审查代理”进行审查。

# 工具定义
@tool
def write_code(file_path: str, code_content: str) -> str:
    """Writes code to a specified file path."""
    print(f"n[Author Agent] 正在写入文件: {file_path}")
    # with open(file_path, "w") as f:
    #     f.write(code_content)
    return f"Code written to {file_path} successfully."

@tool
def review_code(code_content: str) -> str:
    """Reviews the provided code content for quality, bugs, and best practices."""
    print(f"n[Reviewer Agent] 正在审查代码...")
    # 模拟LLM审查
    if "buggy_logic" in code_content:
        return "发现潜在bug:`buggy_logic`。建议修正。"
    elif "insecure_api_key" in code_content:
        return "安全警告:发现硬编码的API密钥。请使用环境变量。"
    else:
        return "代码质量良好,无明显问题。"

# 不同的LLM实例,模拟不同的角色和Prompt
# 实际会用不同的prompt template或LLM chain
llm_author = MockLLM(
    responses=[AIMessage(content="def calculate_sum(a, b):n    return a + b")]
).bind_tools([write_code]) # 只有作者代理能写代码

llm_reviewer = MockLLM(
    responses=[AIMessage(content="Looks good. No issues.")]
).bind_tools([review_code]) # 只有审查代理能审查代码

# 节点函数
def generate_code_node(state: AgentState) -> AgentState:
    """Code author agent generates code."""
    messages = [HumanMessage(content="请生成一个 Python 函数来计算两个数的和。")]
    # 这里LLM会决定是否调用 write_code 工具
    response = llm_author.invoke(messages)
    # 简化:直接假设LLM生成了代码
    code_content = response.content
    print(f"代码作者生成代码:n{code_content}")
    return {"code": code_content, "status": "代码生成"}

def review_code_node(state: AgentState) -> AgentState:
    """Code reviewer agent reviews the generated code."""
    messages = [HumanMessage(content=f"请审查以下代码:n{state['code']}")]
    # 这里LLM会决定是否调用 review_code 工具
    response = llm_reviewer.invoke(messages)
    review_result = response.content # 假设LLM直接输出审查结果
    print(f"代码审查代理审查结果: {review_result}")
    return {"review_feedback": review_result, "status": "代码审查完成"}

def decide_after_review(state: AgentState) -> str:
    """Decides based on review feedback."""
    feedback = state["review_feedback"]
    if "bug" in feedback or "警告" in feedback:
        print("审查发现问题,需要修改。")
        return "needs_revision"
    else:
        print("审查通过,可以部署。")
        return "approved_for_deployment"

def revise_code_node(state: AgentState) -> AgentState:
    """Code author agent revises code based on feedback."""
    print(f"n[Author Agent] 收到反馈,开始修订代码...")
    messages = [
        HumanMessage(content=f"原始代码:n{state['code']}"),
        HumanMessage(content=f"审查反馈: {state['review_feedback']}"),
        HumanMessage(content="请根据反馈修订代码。")
    ]
    response = llm_author.invoke(messages)
    revised_code = response.content
    print(f"代码作者修订代码:n{revised_code}")
    return {"code": revised_code, "status": "代码已修订", "review_feedback": ""} # 清空反馈

def deploy_code_node(state: AgentState) -> AgentState:
    """Deploys the approved code."""
    print(f"n--- 代码已通过审查,正在部署 ---n代码:n{state['code']}n")
    return {"status": "代码已部署"}

# 构建图
workflow_roles = StateGraph(AgentState)

workflow_roles.add_node("generate_code", generate_code_node)
workflow_roles.add_node("review_code", review_code_node)
workflow_roles.add_node("revise_code", revise_code_node)
workflow_roles.add_node("deploy_code", deploy_code_node)

workflow_roles.set_entry_point("generate_code")
workflow_roles.add_edge("generate_code", "review_code")

workflow_roles.add_conditional_edges(
    "review_code",
    decide_after_review,
    {
        "needs_revision": "revise_code",
        "approved_for_deployment": "deploy_code",
    },
)
workflow_roles.add_edge("revise_code", "review_code") # 重新审查修订后的代码
workflow_roles.add_edge("deploy_code", END)

app_roles = workflow_roles.compile()

print("n--- 运行示例 4: 角色分工与权限控制 (模拟发现问题并修订) ---")
# 模拟LLM审查时发现问题
llm_reviewer = MockLLM(
    responses=[
        AIMessage(content="审查发现:函数参数命名不规范,且缺少异常处理。建议修改。"),
        AIMessage(content="修订后的代码看起来更规范了,异常处理也已添加。审查通过。")
    ]
).bind_tools([review_code])

# 模拟LLM作者修订代码
llm_author = MockLLM(
    responses=[
        AIMessage(content="def calculate_sum(num1: int, num2: int) -> int:n    try:n        return num1 + num2n    except TypeError as e:n        raise ValueError(f'Invalid input types: {e}')")
    ]
).bind_tools([write_code])

result4 = app_roles.invoke({"messages": [], "code": "", "review_feedback": "", "status": ""}, config={"configurable": {"thread_id": "code-flow-1"}})
print(f"最终状态: {result4['status']}")
print(f"最终代码:n{result4['code']}")
print("n" + "="*80 + "n")

代码解析:

  • 我们创建了两个不同的 MockLLM 实例,llm_authorllm_reviewer,并为它们绑定了不同的工具。
  • generate_code_node 使用 llm_author,而 review_code_node 使用 llm_reviewer
  • decide_after_review 根据审查结果(假设 review_code 会在 review_feedback 中写入审查结果)决定是进入 revise_code 节点还是 deploy_code 节点。
  • 如果需要修订,revise_code 节点会再次调用 llm_author 来修改代码,然后流程会循环回到 review_code 节点进行重新审查,形成一个迭代的审查-修改循环。

这种模式非常强大,它允许我们构建复杂的代理团队,每个代理都在其权限和专业领域内工作,并通过 LangGraph 进行协调。

5. 高级控制模式与实践 (Advanced Control Patterns and Practices)

5.1. 自适应自动化 (Adaptive Automation)

自适应自动化意味着系统能够根据运行时的条件、置信度或环境变化,动态地调整其自动化级别。例如,当AI对某个决策的置信度较低时,它可以自动将控制权交还给人类。

示例5:置信度驱动的决策升级

假设LLM在做出决策时可以输出一个置信度分数。

# LLM模拟,可以输出置信度
class ConfidentMockLLM:
    def __init__(self, responses_with_confidence=None):
        self.responses_with_confidence = responses_with_confidence if responses_with_confidence is not None else []
        self._call_count = 0

    def invoke(self, messages, **kwargs):
        if self._call_count < len(self.responses_with_confidence):
            content, confidence = self.responses_with_confidence[self._call_count]
            self._call_count += 1
            return AIMessage(content=content, additional_kwargs={"confidence": confidence})
        return AIMessage(content="Default AI response.", additional_kwargs={"confidence": 0.5})

llm_confident = ConfidentMockLLM(
    responses_with_confidence=[
        ("建议A,这是基于大量数据分析的可靠结论。", 0.95),
        ("建议B,但数据存在不确定性,需要进一步验证。", 0.60),
        ("建议C,这是一个高度不确定的领域,我需要更多信息。", 0.30)
    ]
)

def make_decision_with_confidence(state: AgentState) -> AgentState:
    """Agent makes a decision and reports confidence."""
    messages = [HumanMessage(content="请针对当前市场趋势,提出一个投资建议。")]
    response = llm_confident.invoke(messages)
    confidence = response.additional_kwargs.get("confidence", 0.5)
    decision = response.content
    print(f"AI 决策: {decision} (置信度: {confidence:.2f})")
    return {"proposal": decision, "confidence": confidence, "status": "AI决策完成"}

def check_confidence_and_escalate(state: AgentState) -> str:
    """Checks AI's confidence and decides whether to escalate to human."""
    confidence = state["confidence"]
    if confidence < 0.7: # 设定一个阈值
        print(f"AI 置信度 ({confidence:.2f}) 低于阈值 (0.7),升级至人工审查。")
        return "escalate_to_human"
    else:
        print(f"AI 置信度 ({confidence:.2f}) 足够,自动执行。")
        return "auto_execute"

def human_escalation_node(state: AgentState) -> AgentState:
    """Node for human intervention when confidence is low."""
    print(f"n--- AI 置信度低,需要人工干预 ---nAI 建议: {state['proposal']}n置信度: {state['confidence']:.2f}n")
    # 实际会启动一个人工审查流程
    return {"status": "等待人工审查"}

def execute_ai_decision_node(state: AgentState) -> AgentState:
    """Executes the AI's decision autonomously."""
    print(f"n--- AI 决策已自动执行 ---n决策内容: {state['proposal']}n")
    return {"status": "AI决策已执行"}

# 构建图
workflow_adaptive = StateGraph(AgentState)
workflow_adaptive.add_node("make_decision", make_decision_with_confidence)
workflow_adaptive.add_node("human_escalation", human_escalation_node)
workflow_adaptive.add_node("execute_ai_decision", execute_ai_decision_node)

workflow_adaptive.set_entry_point("make_decision")
workflow_adaptive.add_conditional_edges(
    "make_decision",
    check_confidence_and_escalate,
    {
        "escalate_to_human": "human_escalation",
        "auto_execute": "execute_ai_decision",
    },
)
workflow_adaptive.add_edge("human_escalation", END)
workflow_adaptive.add_edge("execute_ai_decision", END)

app_adaptive = workflow_adaptive.compile()

print("n--- 运行示例 5.1: 高置信度 (自动执行) ---")
result5_1 = app_adaptive.invoke({"messages": [], "confidence": 0.0, "status": ""}, config={"configurable": {"thread_id": "adaptive-1"}})
print(f"最终状态: {result5_1['status']}")
print("n" + "="*80 + "n")

print("n--- 运行示例 5.2: 中置信度 (人工升级) ---")
llm_confident = ConfidentMockLLM(
    responses_with_confidence=[
        ("建议B,但数据存在不确定性,需要进一步验证。", 0.60)
    ]
) # 重置LLM响应
result5_2 = app_adaptive.invoke({"messages": [], "confidence": 0.0, "status": ""}, config={"configurable": {"thread_id": "adaptive-2"}})
print(f"最终状态: {result5_2['status']}")
print("n" + "="*80 + "n")

代码解析:

  • ConfidentMockLLM 模拟了一个能够输出置信度分数的LLM。
  • make_decision_with_confidence 节点调用LLM生成决策并获取置信度。
  • check_confidence_and_escalate 函数根据置信度分数决定流程走向:如果低于阈值,则路由到 human_escalation_node;否则,路由到 execute_ai_decision_node

这种模式实现了更智能的自动化,它知道何时该信任自己,何时该寻求人类帮助,从而在效率和风险之间取得平衡。

5.2. 层次化控制 (Hierarchical Control)

对于非常复杂的系统,一个单一的 LangGraph 可能会变得难以管理。我们可以通过将一个 LangGraph 嵌入到另一个 LangGraph 的节点中,实现层次化控制。一个“主图”负责高层任务调度,而“子图”负责处理特定子任务的详细流程。

示例6:简单的嵌套图概念

# 定义子图的状态和节点 (简化)
class SubGraphState(TypedDict):
    task_name: str
    sub_status: str
    result: str

def sub_task_processor(state: SubGraphState) -> SubGraphState:
    """Processes a sub-task."""
    print(f"t[SubGraph] 正在处理子任务: {state['task_name']}")
    processed_result = f"Processed {state['task_name']} successfully."
    return {"sub_status": "完成", "result": processed_result}

# 构建子图
sub_workflow = StateGraph(SubGraphState)
sub_workflow.add_node("process_sub_task", sub_task_processor)
sub_workflow.set_entry_point("process_sub_task")
sub_workflow.add_edge("process_sub_task", END)
sub_app = sub_workflow.compile()

# 主图的节点
def main_task_dispatcher(state: AgentState) -> AgentState:
    """Dispatches a main task, possibly involving sub-graphs."""
    print("[MainGraph] 正在调度主任务: 准备调用子图。")
    # 假设主任务需要处理一个子任务
    sub_task_input = {"task_name": "数据预处理", "sub_status": "", "result": ""}
    sub_result = sub_app.invoke(sub_task_input, config={"configurable": {"thread_id": "sub-thread-1"}})
    print(f"[MainGraph] 子图处理结果: {sub_result['result']}")
    return {"status": f"主任务完成,子任务结果: {sub_result['result']}"}

# 构建主图
main_workflow = StateGraph(AgentState)
main_workflow.add_node("dispatch_main_task", main_task_dispatcher)
main_workflow.set_entry_point("dispatch_main_task")
main_workflow.add_edge("dispatch_main_task", END)
main_app = main_workflow.compile()

print("n--- 运行示例 6: 层次化控制 (主图调用子图) ---")
result6 = main_app.invoke({"messages": [], "status": ""}, config={"configurable": {"thread_id": "main-thread-1"}})
print(f"最终主图状态: {result6['status']}")
print("n" + "="*80 + "n")

代码解析:

  • 我们定义了一个 SubGraphState 和一个简单的 sub_workflow,它包含一个 sub_task_processor 节点。
  • 在主图的 main_task_dispatcher 节点中,我们直接调用了 sub_app.invoke() 来执行子图。
  • 子图的执行结果会被返回并更新到主图的状态中(或作为主图状态的一部分)。

这种方法使得我们可以将复杂问题分解为更小的、可管理的子问题,每个子问题都可以由一个独立的 LangGraph 来处理,从而降低了整体系统的复杂性,并提高了模块化程度。

5.3. 故障处理与回滚 (Error Handling and Rollback)

健壮的自动化系统必须能够优雅地处理错误。LangGraph 可以通过以下方式实现:

  • 错误检测节点:专门的节点来检查前一个节点的输出或状态中是否存在错误。
  • 条件路由到错误处理分支:如果检测到错误,通过条件边缘将流程导向一个错误处理子图或人工干预节点。
  • 状态回滚:虽然 LangGraph 本身不提供自动回滚机制,但我们可以通过在状态中保留“上一个已知良好状态”的快照,并在错误发生时恢复到该状态。

示例7:基本的错误处理

@tool
def risky_operation(input_data: str) -> str:
    """A risky operation that might fail."""
    if "fail" in input_data.lower():
        raise ValueError("Simulated failure during risky operation!")
    return f"Risky operation succeeded with: {input_data}"

def perform_risky_action(state: AgentState) -> AgentState:
    """Attempts a risky action."""
    print("[Action Node] 尝试执行高风险操作...")
    try:
        result = risky_operation.invoke({"input_data": "数据A"})
        return {"status": "高风险操作成功", "error": "", "result": result}
    except Exception as e:
        print(f"[Action Node] 高风险操作失败: {e}")
        return {"status": "高风险操作失败", "error": str(e)}

def check_for_errors(state: AgentState) -> str:
    """Checks if an error occurred in the previous step."""
    if state["error"]:
        print(f"[Error Handler] 检测到错误: {state['error']}")
        return "error_detected"
    else:
        print("[Error Handler] 未检测到错误。")
        return "no_error"

def human_investigation_node(state: AgentState) -> AgentState:
    """Node for human investigation after an error."""
    print(f"n--- 检测到自动化流程错误,需要人工调查 ---n错误信息: {state['error']}n")
    # 实际会触发警报、创建工单等
    return {"status": "等待人工调查"}

def continue_on_success(state: AgentState) -> AgentState:
    """Continues the process after a successful operation."""
    print(f"n--- 操作成功,继续后续流程 ---n结果: {state.get('result')}n")
    return {"status": "流程继续"}

# 构建图
workflow_error = StateGraph(AgentState)
workflow_error.add_node("perform_risky_action", perform_risky_action)
workflow_error.add_node("human_investigation", human_investigation_node)
workflow_error.add_node("continue_on_success", continue_on_success)

workflow_error.set_entry_point("perform_risky_action")
workflow_error.add_conditional_edges(
    "perform_risky_action",
    check_for_errors,
    {
        "error_detected": "human_investigation",
        "no_error": "continue_on_success",
    },
)
workflow_error.add_edge("human_investigation", END)
workflow_error.add_edge("continue_on_success", END)

app_error = workflow_error.compile()

print("n--- 运行示例 7.1: 无错误路径 ---")
result7_1 = app_error.invoke({"messages": [], "error": "", "status": ""}, config={"configurable": {"thread_id": "error-flow-1"}})
print(f"最终状态: {result7_1['status']}")
print("n" + "="*80 + "n")

print("n--- 运行示例 7.2: 错误路径 (触发人工调查) ---")
# 模拟工具失败
@tool
def risky_operation_fail(input_data: str) -> str:
    """A risky operation that will fail."""
    raise ValueError("Simulated failure during risky operation (forced)!")

def perform_risky_action_fail(state: AgentState) -> AgentState:
    """Attempts a risky action that will fail."""
    print("[Action Node] 尝试执行高风险操作 (模拟失败)...")
    try:
        result = risky_operation_fail.invoke({"input_data": "fail_data"})
        return {"status": "高风险操作成功", "error": "", "result": result}
    except Exception as e:
        print(f"[Action Node] 高风险操作失败: {e}")
        return {"status": "高风险操作失败", "error": str(e)}

# 临时替换节点函数
workflow_error_fail = StateGraph(AgentState)
workflow_error_fail.add_node("perform_risky_action", perform_risky_action_fail) # 使用会失败的节点
workflow_error_fail.add_node("human_investigation", human_investigation_node)
workflow_error_fail.add_node("continue_on_success", continue_on_success)

workflow_error_fail.set_entry_point("perform_risky_action")
workflow_error_fail.add_conditional_edges(
    "perform_risky_action",
    check_for_errors,
    {
        "error_detected": "human_investigation",
        "no_error": "continue_on_success",
    },
)
workflow_error_fail.add_edge("human_investigation", END)
workflow_error_fail.add_edge("continue_on_success", END)
app_error_fail = workflow_error_fail.compile()

result7_2 = app_error_fail.invoke({"messages": [], "error": "", "status": ""}, config={"configurable": {"thread_id": "error-flow-2"}})
print(f"最终状态: {result7_2['status']}")
print("n" + "="*80 + "n")

代码解析:

  • risky_operation 工具模拟了一个可能失败的操作。
  • perform_risky_action 节点尝试执行此操作,并捕获任何异常,将错误信息写入 Stateerror 字段。
  • check_for_errors 函数根据 state["error"] 是否为空来判断是否存在错误。
  • 如果检测到错误,流程将通过条件边缘路由到 human_investigation_node,通知人类介入。

这种错误处理机制确保了自动化系统在遇到意料之外的问题时不会简单崩溃,而是能够以可控的方式暂停并寻求人类帮助,从而提高了系统的鲁棒性。

6. 案例分析:一个混合控制的DevOps流程 (Case Study: A Hybrid Control DevOps Process)

现在,让我们把这些概念整合到一个更实际的场景中:一个具有人类监督的自动化代码部署管道。

场景描述:

当开发人员提交代码到版本控制系统后,我们希望一个自动化流程能自动执行大部分步骤,但在关键环节,如代码审查和生产部署前,需要人类的确认。当出现问题时,系统应能自动尝试回滚,并通知人类。

DevOps流程阶段:

  1. 代码变更检测 (Code Change Detection):当新的代码提交到特定分支时触发。
  2. 自动化静态分析与单元测试 (Automated Linting & Unit Testing):AI自动执行代码质量检查和单元测试。
  3. LLM辅助代码审查 (LLM-based Code Review & Suggestion):LLM代理审查代码,提出改进建议。
  4. 人工代码审查与批准 (Human Code Review & Approval):人类开发人员审查LLM的建议和代码本身,并决定是否批准。
  5. 自动化集成测试 (Automated Integration Testing):代码通过审查后,自动进行集成测试。
  6. 人工UAT/最终确认 (Human Manual Sanity Check/UAT):在部署到生产环境前,进行最终的人工确认或用户验收测试。
  7. 自动化部署 (Automated Deployment):如果所有检查和批准通过,自动部署代码。
  8. 自动化监控与回滚 (Automated Monitoring & Rollback):部署后持续监控,如果发现异常,尝试自动回滚并通知人类。

LangGraph 映射与核心逻辑:

LangGraph 节点/功能 对应的 DevOps 阶段 控制级别 核心 LangGraph 机制
detect_code_change 代码变更检测 0级(触发器) 外部触发器,更新 LangGraph 状态
run_static_analysis 自动化静态分析与单元测试 3级(任务自动化) LangChain Runnable 或 ToolExecutor
llm_code_review LLM辅助代码审查 2级(功能自动化) LLM代理,输出审查报告到状态
human_review_approval 人工代码审查与批准 1级(工具辅助/人机协作) HumanInputTool 或外部交互,条件路由
run_integration_tests 自动化集成测试 3级(任务自动化) LangChain Runnable 或 ToolExecutor
human_uat_confirm 人工UAT/最终确认 1级(工具辅助/人机协作) HumanInputTool 或外部交互,条件路由
deploy_to_production 自动化部署 3级(任务自动化) LangChain Runnable 或 ToolExecutor
monitor_post_deployment 自动化监控与回滚(初级) 4级(情境自动化) 异步任务,更新状态,条件路由到 rollback
rollback_deployment 自动化回滚 3级(任务自动化) LangChain Runnable 或 ToolExecutor
条件边缘 基于测试结果、审查结果、批准状态 动态控制 add_conditional_edges
共享状态 追踪代码、测试结果、审查反馈、审批状态、部署状态、错误信息 透明度/审计 AgentState

简化版代码示例:


# 扩展 AgentState 以适应 DevOps 流程
class DevOpsAgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    code_changes: str # 提交的代码内容
    lint_status: str # Linting 结果
    unit_test_status: str # 单元测试结果
    llm_review_report: str # LLM 代码审查报告
    human_review_approved: bool # 人工审查是否批准
    integration_test_status: str # 集成测试结果
    human_uat_approved: bool # 人工UAT是否批准
    deployment_status: str # 部署状态 (pending, deploying, deployed, failed, rolled_back)
    error_message: str # 错误信息
    current_action: str # 当前正在执行的动作

# 模拟工具
@tool
def run_linter(code: str) -> str:
    """Runs a linter on the provided code."""
    print("[Tool] 运行代码质量检查...")
    if "bad_indent" in code:
        return "Linter failed: Bad indentation detected."
    return "Linter passed."

@tool
def run_unit_tests(code: str) -> str:
    """Runs unit tests on the provided code."""
    print("[Tool] 运行单元测试...")
    if "test_failure_case" in code:
        return "Unit tests failed: test_failure_case failed."
    return "Unit tests passed."

@tool
def run_integration_tests_tool(code: str) -> str:
    """Runs integration tests."""
    print("[Tool] 运行集成测试...")
    if "integration_issue" in code:
        return "Integration tests failed: database connection issue."
    return "Integration tests passed."

@tool
def deploy_code_tool(code: str) -> str:
    """Deploys the code to production."""
    print("[Tool] 部署代码到生产环境...")
    if "deployment_error_case" in code:
        raise Exception("Simulated deployment error!")
    return "Code deployed successfully."

@tool
def rollback_deployment_tool() -> str:
    """Rolls back the last deployment."""
    print("[Tool] 执行部署回滚...")
    return "Deployment rolled back successfully."

# LLM 代理 (简化)
llm_devops = MockLLM(
    responses=[
        AIMessage(content="LLM审查报告:代码逻辑清晰,但建议添加更多注释。"),
        AIMessage(content="LLM审查报告:代码存在潜在性能问题,建议优化循环结构。"),
    ]
)

# 节点函数
def detect_code_change(state: DevOpsAgentState) -> DevOpsAgentState:
    """Simulates detecting a code change."""
    print("[Node] 检测到新的代码提交。")
    # 实际会从Git Hook等获取
    return {"code_changes": "def new_feature():n    print('Hello, DevOps!')", "current_action": "代码变更检测", "deployment_status": "pending"}

def run_pre_checks(state: DevOpsAgentState) -> DevOpsAgentState:
    """Runs static analysis and unit tests."""
    print("[Node] 运行预检查 (静态分析和单元测试)...")
    lint_result = run_linter.invoke({"code": state["code_changes"]})
    unit_test_result = run_unit_tests.invoke({"code": state["code_changes"]})
    print(f"Linter: {lint_result}, Unit Tests: {unit_test_result}")
    return {
        "lint_status": lint_result,
        "unit_test_status": unit_test_result,
        "current_action": "预检查完成"
    }

def llm_review_code_node(state: DevOpsAgentState) -> DevOpsAgentState:
    """LLM agent performs code review."""
    print("[Node] LLM代理进行代码审查...")
    messages = [HumanMessage(content=f"请审查以下代码:n{state['code_changes']}n并提供改进建议。")]
    response = llm_devops.invoke(messages)
    print(f"LLM审查报告: {response.content}")
    return {"llm_review_report": response.content, "current_action": "LLM代码审查完成"}

def human_review_approval_node(state: DevOpsAgentState) -> DevOpsAgentState:
    """Requests human approval for code changes and LLM review."""
    print(f"n--- 等待人工代码审查与批准 ---")
    print(f"代码变更:n{state['code_changes']}")
    print(f"LLM审查报告:n{state['llm_review_report']}")
    # 实际会通过UI/API等待人类输入
    user_input = input("请审查代码和LLM报告,是否批准部署?(y/n): ").lower()
    approved = (user_input == 'y')
    print(f"人工审查结果: {'批准' if approved else '拒绝'}")
    return {"human_review_approved": approved, "current_action": "人工代码审查完成"}

def run_integration_tests_node(state: DevOpsAgentState) -> DevOpsAgentState:
    """Runs integration tests."""
    print("[Node] 运行集成测试...")
    test_result = run_integration_tests_tool.invoke({"code": state["code_changes"]})
    print(f"集成测试结果: {test_result}")
    return {"integration_test_status": test_result, "current_action": "集成测试完成"}

def human_uat_confirm_node(state: DevOpsAgentState) -> DevOpsAgentState:
    """Requests human UAT/final confirmation."""
    print(f"n--- 等待人工UAT/最终确认 ---")
    print(f"代码已通过所有自动化测试。准备部署到生产环境。")
    user_input = input("请进行最终人工确认,是否部署?(y/n): ").lower()
    approved = (user_input == 'y')
    print(f"人工UAT结果: {'批准' if approved else '拒绝'}")
    return {"human_uat_approved": approved, "current_action": "人工UAT完成"}

def deploy_to_production_node(state: DevOpsAgentState) -> DevOpsAgentState:
    """Deploys the code to production."""
    print("[Node] 部署代码到生产环境...")
    try:
        deploy_result = deploy_code_tool.invoke({"code": state["code_changes"]})
        print(f"部署结果: {deploy_result}")
        return {"deployment_status": "deployed", "current_action": "部署完成"}
    except Exception as e:
        print(f"部署失败: {e}")
        return {"deployment_status": "failed", "error_message": str(e), "current_action": "部署失败"}

def monitor_post_deployment_node(state: DevOpsAgentState) -> DevOpsAgentState:
    """Simulates post-deployment monitoring."""
    print("[Node] 部署后监控中...")
    # 实际会是一个持续运行的监控系统,这里模拟一个瞬时检查
    if "deployment_error_case" in state["code_changes"]: # 假设这个字符串代表部署后会出问题
        print("监控系统检测到异常!")
        return {"deployment_status": "monitoring_failed", "error_message": "Post-deployment error detected!", "current_action": "监控失败"}
    print("监控正常。")
    return {"deployment_status": "deployed", "current_action": "监控正常"}

def rollback_deployment_node(state: DevOpsAgentState) -> DevOpsAgentState:
    """Rolls back the deployment."""
    print("[Node] 执行回滚操作...")
    rollback_result = rollback_deployment_tool.invoke({})
    print(f"回滚结果: {rollback_result}")
    return {"deployment_status": "rolled_back", "current_action": "回滚完成"}

# 定义条件路由函数
def decide_pre_checks_outcome(state: DevOpsAgentState) -> str:
    if "passed" in state["lint_status"] and "passed" in state["unit_test_status"]:
        return "passed"
    return "failed"

def decide_human_review_outcome(state: DevOpsAgentState) -> str:
    if state["human_review_approved"]:
        return "approved"
    return "rejected"

def decide_integration_tests_outcome(state: DevOpsAgentState) -> str:
    if "passed" in state["integration_test_status"]:
        return "passed"
    return "failed"

def decide_uat_outcome(state: DevOpsAgentState) -> str:
    if state["human_uat_approved"]:
        return "approved"
    return "rejected"

def decide_deployment_outcome(state: DevOpsAgentState) -> str:
    if state["deployment_status"] == "deployed":
        return "success"
    return "failed"

def decide_monitoring_outcome(state: DevOpsAgentState) -> str:
    if state["deployment_status"] == "monitoring_failed":
        return "error_detected"
    return "no_error"

# 构建 DevOps 工作流图
devops_workflow = StateGraph(DevOpsAgentState)

# 添加节点
devops_workflow.add_node("detect_code_change", detect_code_change)
devops_workflow.add_node("run_pre_checks", run_pre_checks)
devops_workflow.add_node("llm_code_review", llm_code_review_node)
devops_workflow.add_node("human_review_approval", human_review_approval_node)
devops_workflow.add_node("run_integration_tests", run_integration_tests_node)
devops_workflow.add_node("human_uat_confirm", human_uat_confirm_node)
devops_workflow.add_node("deploy_to_production", deploy_to_production_node)
devops_workflow.add_node("monitor_post_deployment", monitor_post_deployment_node)
devops_workflow.add_node("rollback_deployment", rollback_deployment_node)

# 设置入口点
devops_workflow.set_entry_point("detect_code_change")

# 定义边
devops_workflow.add_edge("detect_code_change", "run_pre_checks")
devops_workflow.add_conditional_edges(
    "run_pre_checks",
    decide_pre_checks_outcome,
    {
        "passed": "llm_code_review",
        "failed": END, # 预检查失败,流程结束
    },
)
devops_workflow.add_edge("llm_code_review", "human_review_approval")
devops_workflow.add_conditional_edges(
    "human_review_approval",
    decide_human_review_outcome,
    {
        "approved": "run_integration_tests",
        "rejected": END, # 人工审查拒绝,流程结束
    },
)
devops_workflow.add_conditional_edges(
    "run_integration_tests",
    decide_integration_tests_outcome,
    {
        "passed": "human_uat_confirm",
        "failed": END, # 集成测试失败,流程结束
    },
)
devops_workflow.add_conditional_edges(
    "human_uat_confirm",
    decide_uat_outcome,
    {
        "approved": "deploy_to_production",
        "rejected": END, # UAT拒绝,流程结束
    },
)
devops_workflow.add_conditional_edges(
    "deploy_to_production",
    decide_deployment_outcome,
    {
        "success": "monitor_post_deployment",
        "failed": "rollback_deployment", # 部署失败,尝试回滚
    },
)
devops_workflow.add_conditional_edges(
    "monitor_post_deployment",
    decide_monitoring_outcome,
    {
        "error_detected": "rollback_deployment", # 监控发现问题,触发回滚
        "no_error": END, # 监控正常,流程成功结束
    },
)
devops_workflow.add_edge("rollback_deployment", END) # 回滚后流程结束

devops_app = devops_workflow.compile()

# --- 运行 DevOps 流程示例 ---
print("n" + "="*80 + "n")
print("--- 运行 DevOps 流程示例: 成功路径 ---")
# 模拟LLM审查时发现问题
llm_devops = MockLLM(
    responses=[
        AIMessage(content="LLM审查报告:代码逻辑清晰,但建议添加更多注释。"),
    ]
)

# 临时替换 detect_code_change 模拟成功代码
def detect_code_change_success(state: DevOpsAgentState) -> DevOpsAgentState:
    print("[Node] 检测到新的代码提交 (成功代码)。")
    return {"code_changes": "def new_feature():n    return True", "current_action": "代码变更检测", "deployment_status": "pending"}

# 重新编译图以使用新的 detect_code_change_success
devops_workflow_success = StateGraph(DevOpsAgentState)
devops_workflow_success.add_node("detect_code_change", detect_code_change_success)
devops_workflow_success.add_node("run_pre_checks", run_pre_checks)
devops_workflow_success.add_node("llm_code_review", llm_code_review_node)
devops_workflow_success.add_node("human_review_approval", human_review_approval_node)
devops_workflow_success.add_node("run_integration_tests", run_integration_tests_node)
devops_workflow_success.add_node("human_uat_confirm", human_uat_confirm_node)
devops_workflow_success.add_node("deploy_to_production", deploy_to_production_node)
devops_workflow_success.add_node("monitor_post_deployment", monitor_post_deployment_node)
devops_workflow_success.add_node("rollback_deployment", rollback_deployment_node)

devops_workflow_success.set_entry_point("detect_code_change")
devops_workflow_success.add_edge("detect_code_change", "run_pre_checks")
devops_workflow_success.add_conditional_edges("run_pre_checks", decide_pre_checks_outcome, {"passed": "llm_code_review", "failed": END})
devops_workflow_success.add_edge("llm_code_review", "human_review_approval")
devops_workflow_success.add_conditional_edges("human_review_approval", decide_human_review_outcome, {"approved": "run_integration_tests", "rejected": END})
devops_workflow_success.add_conditional_edges("run_integration_tests", decide_integration_tests_outcome, {"passed": "human_uat_confirm", "failed": END})
devops_workflow_success.add_conditional_edges("human_uat_confirm", decide_uat_outcome, {"approved": "deploy_to_production", "rejected": END})
devops_workflow_success.add_conditional_edges("deploy_to_production", decide_deployment_outcome, {"success": "monitor_post_deployment", "failed": "rollback_deployment"})
devops_workflow_success.add_conditional_edges("monitor_post_deployment", decide_monitoring_outcome, {"error_detected": "rollback_deployment", "no_error": END})
devops_workflow_success.add_edge("rollback_deployment", END)
devops_app_success = devops_workflow_success.compile()

initial_devops_state = DevOpsAgentState(
    messages=[], code_changes="", lint_status="", unit_test_status="",
    llm_review_report="", human_review_approved=False, integration_test_status="",
    human_uat_approved=False, deployment_status="", error_message="", current_action=""
)
# 注意:这里需要用户在终端输入 'y' 两次来批准
final_state_

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注