什么是‘全自动财务审计 Agent’:利用 LangGraph 处理数万张发票,并自动对照税务法规发现异常项

各位同仁,各位技术爱好者,大家好!

今天,我们齐聚一堂,共同探讨一个令人兴奋且极具挑战性的前沿课题:如何构建一个‘全自动财务审计 Agent’,它能够利用 LangGraph 这样的强大框架,处理数万张发票,并自动对照复杂的税务法规,精准发现异常项。

在当今瞬息万变的商业环境中,财务审计的效率和准确性直接关系到企业的健康运营和合规性。传统的审计流程往往耗时、耗力,且容易受到人为因素的影响。面对海量的交易数据和日益复杂的法规体系,我们急需一种智能化的解决方案。而今天,我将向大家展示,如何通过结合大型语言模型(LLM)的强大理解能力、LangGraph 的流程编排能力以及一系列工程实践,将这一愿景变为现实。

1. 传统审计之困与智能审计之光

首先,让我们回顾一下传统财务审计面临的挑战:

  1. 数据量爆炸式增长: 随着业务规模的扩大,企业每天产生数万甚至数十万张发票、交易凭证。人工逐一审核,效率低下,且容易遗漏。
  2. 规则复杂且多变: 税务法规、会计准则等不断更新,条款繁多,理解和应用需要专业的知识和经验。
  3. 模式识别困难: 异常交易往往隐藏在海量正常交易中,需要审计人员具备极强的洞察力和经验才能发现。
  4. 重复性高: 大部分基础的数据核对工作具有高度重复性,占用了审计人员大量宝贵时间。
  5. 主观性风险: 人工判断可能带有主观偏见,影响审计结果的客观性。

正是在这样的背景下,智能审计,特别是基于大型语言模型和图式编程的审计Agent,展现出前所未有的潜力。我们的目标是构建一个能够“理解”发票内容,“学习”税务法规,“推理”异常行为的自动化系统。

2. 为什么选择 Agent 范式与 LangGraph?

2.1 Agent 范式:智能化的核心

我们所说的“Agent”,不仅仅是一个简单的脚本或工具,它是一个具备感知、决策、行动和记忆能力的自主实体。

  • 感知 (Perception): 能够从原始数据(如发票图片、PDF)中提取结构化信息。
  • 决策 (Reasoning): 能够根据提取的信息和预设规则(税务法规)进行逻辑判断和推理。
  • 行动 (Action): 能够执行具体操作,如查询数据库、生成报告、标记异常。
  • 记忆 (Memory): 能够存储审计历史、学习经验,并在后续任务中利用。

这种范式使得我们的系统不再是被动执行指令的机器,而是能够主动解决问题的智能体。

2.2 LangGraph:编排 Agent 的利器

LangGraph 是 LangChain 的一个扩展,它将 Agent 的复杂工作流抽象为有向无环图(DAG)或更通用的图结构。为什么 LangGraph 如此适合我们构建全自动财务审计 Agent?

  1. 状态管理: 审计Agent在处理发票时,需要维护当前发票的状态、已提取的数据、已进行的判断等。LangGraph 提供了 State 机制,可以清晰地定义和管理Agent的内部状态,并在节点间传递。
  2. 复杂逻辑编排: 审计流程往往包含多个步骤:数据提取、规则匹配、异常判断、人工复核等。这些步骤之间存在复杂的依赖和条件分支。LangGraph 允许我们以图的形式直观地定义这些流程,包括条件路由(conditional edges)和循环。
  3. 工具集成: Agent 需要与各种外部工具交互,如光学字符识别(OCR)服务、数据库、税务法规知识库、LLM API。LangGraph 能够无缝集成这些工具,并让 Agent 智能地选择和使用它们。
  4. 可观测性和调试: 图结构使得Agent的执行路径一目了然,便于我们理解Agent的决策过程,进行调试和优化。
  5. 鲁棒性: 通过清晰的节点和状态管理,我们可以更容易地处理错误、重试机制和人工干预点。

接下来,我们将深入探讨如何利用 LangGraph 构建这个审计 Agent。

3. Agent 架构设计:模块化与可扩展性

一个高效的审计 Agent 需要一个健壮、模块化的架构。我们可以将其划分为以下核心组件:

组件名称 职责概述 关键技术
发票数据摄取模块 接收原始发票文件(PDF, JPG),执行OCR,提取关键信息。 OCR引擎(PaddleOCR, Google Vision API),PDF解析库
数据标准化与验证模块 清洗、格式化提取的数据,执行初步的结构化校验。 Pydantic, 数据清洗脚本
税务法规知识库 存储结构化和非结构化的税务法规、会计准则和历史审计案例。 Vector Database (Chroma, Weaviate), RDBMS
LLM推理引擎 核心决策单元,利用LLM进行文本理解、规则匹配、异常推理。 OpenAI GPT系列, Anthropic Claude, Llama
工具箱 (Tooling) 提供Agent与外部系统交互的能力,如数据库查询、API调用。 LangChain Tools, 自定义 Python 函数
审计报告与持久化模块 生成审计报告,将审计结果(异常项、复核建议)存入数据库。 RDBMS (PostgreSQL), 报告生成库
人工复核与反馈接口 提供界面供审计师复核Agent标记的异常,并收集反馈。 Web UI (Streamlit, Flask), API

这些模块将在 LangGraph 的图结构中以节点的形式体现。

4. LangGraph 实现:从状态到流程

4.1 定义 Agent 状态 (State)

在 LangGraph 中,State 是 Agent 内存的核心。它是一个 TypedDict 或 Pydantic 模型,用于在图的各个节点之间传递和修改数据。对于我们的审计 Agent,其状态需要包含以下关键信息:

from typing import List, Dict, Union, TypedDict, Optional
from langchain_core.messages import BaseMessage

class InvoiceData(TypedDict):
    invoice_id: str
    supplier_name: str
    purchaser_name: str
    issue_date: str
    total_amount: float
    tax_amount: float
    items: List[Dict[str, Union[str, float]]]
    is_valid_format: bool
    ocr_confidence: float

class TaxRegulation(TypedDict):
    rule_id: str
    category: str
    description: str
    conditions: str # 自然语言描述,或可解析的表达式
    action: str # 比如 "不允许抵扣", "需特殊备案"

class Anomaly(TypedDict):
    anomaly_id: str
    invoice_id: str
    rule_id: str
    description: str
    severity: str # "高", "中", "低"
    suggested_action: str
    status: str # "待复核", "已确认", "已驳回"

class AuditState(TypedDict):
    """
    Represents the state of our financial audit agent.
    """
    # 当前处理的发票文件路径或标识
    current_invoice_path: Optional[str]
    # 从发票中提取的结构化数据
    extracted_invoice_data: Optional[InvoiceData]
    # 相关的税务法规
    relevant_tax_regulations: List[TaxRegulation]
    # 发现的异常项
    anomalies: List[Anomaly]
    # 审计历史消息,用于LLM的上下文
    messages: List[BaseMessage]
    # 流程控制标志
    needs_human_review: bool
    # 批次ID,用于处理数万张发票
    batch_id: Optional[str]
    # 当前处理进度
    progress: Optional[str]

这个 AuditState 将在整个审计流程中传递和更新。

4.2 定义工具 (Tools)

Agent 需要一系列工具来执行具体任务。这些工具可以是与外部服务交互的 API 调用,也可以是内部的 Python 函数。

from langchain_core.tools import tool
import json
import random
import datetime

# 假设我们有一个OCR服务
@tool
def perform_ocr_and_extract(invoice_path: str) -> InvoiceData:
    """
    Performs OCR on an invoice file (PDF/image) and extracts structured data.
    Returns a JSON string of InvoiceData.
    """
    print(f"Executing OCR and extraction for: {invoice_path}")
    # 模拟OCR和数据提取过程
    if "error" in invoice_path:
        raise ValueError("Simulated OCR error for this invoice.")

    # 模拟从发票文件中提取数据
    mock_data = {
        "invoice_id": f"INV-{random.randint(10000, 99999)}",
        "supplier_name": "创新科技股份有限公司",
        "purchaser_name": "智慧财务有限公司",
        "issue_date": datetime.date.today().strftime("%Y-%m-%d"),
        "total_amount": round(random.uniform(1000.0, 50000.0), 2),
        "tax_amount": round(random.uniform(50.0, 5000.0), 2),
        "items": [
            {"description": "软件服务费", "quantity": 1, "unit_price": 10000.0, "total": 10000.0},
            {"description": "技术支持费", "quantity": 1, "unit_price": 2000.0, "total": 2000.0}
        ],
        "is_valid_format": True,
        "ocr_confidence": round(random.uniform(0.8, 0.99), 2)
    }
    return mock_data

# 假设我们有一个税务法规知识库
@tool
def retrieve_tax_regulations(category: str, keywords: List[str]) -> List[TaxRegulation]:
    """
    Retrieves relevant tax regulations from the knowledge base based on category and keywords.
    Returns a list of TaxRegulation objects.
    """
    print(f"Retrieving tax regulations for category: {category}, keywords: {keywords}")
    # 模拟从向量数据库或结构化数据库中检索法规
    mock_regulations = [
        {"rule_id": "TR-001", "category": "增值税", "description": "餐饮娱乐支出不得抵扣增值税进项税额。", "conditions": "支出类别属于餐饮或娱乐", "action": "不允许抵扣"},
        {"rule_id": "TR-002", "category": "企业所得税", "description": "研发费用可享受加计扣除政策。", "conditions": "费用属于研发活动范畴,并符合备案要求", "action": "可加计扣除"},
        {"rule_id": "TR-003", "category": "增值税", "description": "纳税人取得的虚开增值税专用发票,不得抵扣进项税额,并需进行税务处理。", "conditions": "发票被认定为虚开", "action": "不允许抵扣,需税务处理"},
        {"rule_id": "TR-004", "category": "增值税", "description": "购买固定资产用于非应税项目,其进项税额不得抵扣。", "conditions": "固定资产用于非应税项目", "action": "不允许抵扣"}
    ]

    # 简单过滤模拟
    filtered_regs = []
    for reg in mock_regulations:
        if category and reg["category"] != category:
            continue
        if keywords:
            if not any(k.lower() in reg["description"].lower() for k in keywords):
                continue
        filtered_regs.append(reg)
    return filtered_regs

# 假设我们有一个将异常存入数据库的工具
@tool
def save_anomaly_to_db(anomaly: Anomaly) -> str:
    """
    Saves a detected anomaly to the audit database.
    Returns the ID of the saved anomaly.
    """
    print(f"Saving anomaly to DB: {anomaly['description']}")
    # 模拟数据库操作
    anomaly_id = f"ANOMALY-{random.randint(1000, 9999)}"
    # print(f"Anomaly saved with ID: {anomaly_id}")
    return anomaly_id

# 汇总所有工具
tools = [
    perform_ocr_and_extract,
    retrieve_tax_regulations,
    save_anomaly_to_db
]

4.3 定义图的节点 (Nodes)

每个节点都是一个 Python 函数或一个 LangChain Runnable,它接收当前 State 并返回对 State 的更新。

from langchain_core.runnables import RunnableLambda
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field

# 假设LLM模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)

class ExtractedInvoiceSchema(BaseModel):
    invoice_id: str = Field(description="The unique identifier of the invoice.")
    supplier_name: str = Field(description="The name of the supplier.")
    purchaser_name: str = Field(description="The name of the purchaser.")
    issue_date: str = Field(description="The date the invoice was issued in YYYY-MM-DD format.")
    total_amount: float = Field(description="The total amount of the invoice.")
    tax_amount: float = Field(description="The tax amount on the invoice.")
    items: List[Dict[str, Union[str, float]]] = Field(description="A list of items on the invoice, each with description, quantity, unit_price, and total.")

# 使用PydanticOutputParser来确保LLM输出结构化
invoice_parser = PydanticOutputParser(pydantic_object=ExtractedInvoiceSchema)

# --- 节点函数定义 ---

def extract_invoice_data_node(state: AuditState) -> AuditState:
    """
    Node 1: Extracts structured data from the invoice using OCR tool.
    """
    print("n--- Node: extract_invoice_data_node ---")
    current_invoice_path = state["current_invoice_path"]
    if not current_invoice_path:
        raise ValueError("No invoice path provided in state.")

    try:
        extracted_data_raw = perform_ocr_and_extract.invoke({"invoice_path": current_invoice_path})

        # 假设OCR工具直接返回InvoiceData TypedDict,如果返回JSON字符串则需要json.loads
        # extracted_data = json.loads(extracted_data_raw) 
        extracted_data = extracted_data_raw # 模拟工具直接返回TypedDict

        state["extracted_invoice_data"] = extracted_data
        state["messages"].append(AIMessage(f"Successfully extracted data from invoice {extracted_data['invoice_id']}."))
        state["progress"] = "Invoice data extracted."
        return state
    except Exception as e:
        state["messages"].append(AIMessage(f"Error during OCR and extraction: {e}. Marking for human review."))
        state["needs_human_review"] = True
        state["progress"] = "OCR failed, needs human review."
        return state

def normalize_and_validate_node(state: AuditState) -> AuditState:
    """
    Node 2: Normalizes and performs basic validation on extracted invoice data.
    """
    print("n--- Node: normalize_and_validate_node ---")
    invoice_data = state["extracted_invoice_data"]
    if not invoice_data:
        state["messages"].append(AIMessage("No invoice data to normalize/validate."))
        state["needs_human_review"] = True
        state["progress"] = "Validation skipped, no data."
        return state

    # 简单校验逻辑
    is_valid = True
    validation_errors = []

    if invoice_data["total_amount"] <= 0:
        is_valid = False
        validation_errors.append("Total amount is zero or negative.")
    if invoice_data["tax_amount"] < 0:
        is_valid = False
        validation_errors.append("Tax amount is negative.")
    if abs(sum(item["total"] for item in invoice_data["items"]) - (invoice_data["total_amount"] - invoice_data["tax_amount"])) > 0.01:
        is_valid = False
        validation_errors.append("Item totals do not match subtotal.")

    invoice_data["is_valid_format"] = is_valid # 更新状态中的invoice_data

    if not is_valid:
        state["messages"].append(AIMessage(f"Invoice {invoice_data['invoice_id']} failed basic validation: {'; '.join(validation_errors)}. Needs human review."))
        state["needs_human_review"] = True
        state["progress"] = "Validation failed, needs human review."
    else:
        state["messages"].append(AIMessage(f"Invoice {invoice_data['invoice_id']} passed basic validation."))
        state["progress"] = "Invoice data validated."
    return state

def retrieve_relevant_rules_node(state: AuditState) -> AuditState:
    """
    Node 3: Retrieves relevant tax regulations based on invoice content.
    """
    print("n--- Node: retrieve_relevant_rules_node ---")
    invoice_data = state["extracted_invoice_data"]
    if not invoice_data:
        state["messages"].append(AIMessage("No invoice data to retrieve rules for."))
        return state

    # 使用LLM提取关键词或分类,以提高检索准确性
    prompt_template = f"""
    Given the following invoice data, identify the main categories and keywords that would be relevant for tax compliance checks.
    Invoice ID: {invoice_data['invoice_id']}
    Supplier: {invoice_data['supplier_name']}
    Purchaser: {invoice_data['purchaser_name']}
    Issue Date: {invoice_data['issue_date']}
    Total Amount: {invoice_data['total_amount']}
    Tax Amount: {invoice_data['tax_amount']}
    Items: {json.dumps(invoice_data['items'], ensure_ascii=False)}

    Provide a JSON object with 'category' (e.g., "增值税", "企业所得税") and 'keywords' (list of strings, e.g., ["餐饮", "研发", "固定资产"]).
    """

    try:
        llm_response = llm.invoke(prompt_template)
        # 尝试解析LLM的输出
        llm_parsed = json.loads(llm_response.content)
        category = llm_parsed.get("category", "")
        keywords = llm_parsed.get("keywords", [])
    except Exception as e:
        print(f"Warning: LLM failed to extract category/keywords: {e}. Using default.")
        category = ""
        keywords = [item["description"] for item in invoice_data["items"]]

    # 调用工具检索法规
    relevant_rules = retrieve_tax_regulations.invoke({"category": category, "keywords": keywords})
    state["relevant_tax_regulations"] = relevant_rules
    state["messages"].append(AIMessage(f"Retrieved {len(relevant_rules)} relevant tax regulations."))
    state["progress"] = "Relevant rules retrieved."
    return state

def detect_anomaly_node(state: AuditState) -> AuditState:
    """
    Node 4: Uses LLM to compare invoice data against retrieved regulations and detect anomalies.
    """
    print("n--- Node: detect_anomaly_node ---")
    invoice_data = state["extracted_invoice_data"]
    regulations = state["relevant_tax_regulations"]

    if not invoice_data or not regulations:
        state["messages"].append(AIMessage("Skipping anomaly detection: missing invoice data or regulations."))
        return state

    # 构建LLM的Prompt
    prompt = f"""
    You are an expert financial auditor. Your task is to review an invoice and a set of tax regulations,
    then identify any potential anomalies or compliance risks.

    --- Invoice Data ---
    Invoice ID: {invoice_data['invoice_id']}
    Supplier: {invoice_data['supplier_name']}
    Purchaser: {invoice_data['purchaser_name']}
    Issue Date: {invoice_data['issue_date']}
    Total Amount: {invoice_data['total_amount']}
    Tax Amount: {invoice_data['tax_amount']}
    Items: {json.dumps(invoice_data['items'], ensure_ascii=False, indent=2)}

    --- Relevant Tax Regulations ---
    {json.dumps(regulations, ensure_ascii=False, indent=2)}

    --- Instructions ---
    1. Carefully compare the invoice data with each tax regulation.
    2. Identify any specific items or aspects of the invoice that might violate or be inconsistent with the regulations.
    3. For each potential anomaly, provide:
        - `anomaly_id`: A unique identifier for the anomaly.
        - `invoice_id`: The ID of the invoice.
        - `rule_id`: The ID of the regulation that might be violated.
        - `description`: A clear, concise description of the anomaly.
        - `severity`: "高" (High), "中" (Medium), "低" (Low).
        - `suggested_action`: Recommended next steps (e.g., "要求提供证明材料", "驳回抵扣", "人工复核").
    4. If no anomalies are found, return an empty list.
    5. Output the result as a JSON array of Anomaly objects.
    """

    try:
        llm_response = llm.invoke(prompt)
        # print(f"LLM Raw Anomaly Detection Response: {llm_response.content}")
        detected_anomalies_raw = json.loads(llm_response.content)

        # 确保LLM输出的是Anomalies列表的结构
        detected_anomalies = []
        if isinstance(detected_anomalies_raw, list):
            for anomaly_dict in detected_anomalies_raw:
                # 简单的Pydantic验证
                try:
                    anomaly_obj = Anomaly(
                        anomaly_id=anomaly_dict.get('anomaly_id', f"ANOM-{random.randint(10000,99999)}"),
                        invoice_id=anomaly_dict.get('invoice_id', invoice_data['invoice_id']),
                        rule_id=anomaly_dict.get('rule_id', 'N/A'),
                        description=anomaly_dict.get('description', 'No description provided.'),
                        severity=anomaly_dict.get('severity', '中'),
                        suggested_action=anomaly_dict.get('suggested_action', '人工复核'),
                        status='待复核'
                    )
                    detected_anomalies.append(anomaly_obj)
                except Exception as e:
                    print(f"Error parsing single anomaly from LLM: {e}, raw: {anomaly_dict}")
                    state["messages"].append(AIMessage(f"Error parsing some anomalies from LLM. Needs human review."))
                    state["needs_human_review"] = True

        else:
            print("LLM did not return a list of anomalies.")
            if detected_anomalies_raw: # 如果不是列表但有内容,也视为可能异常
                 state["messages"].append(AIMessage(f"LLM returned unexpected anomaly format: {llm_response.content}. Needs human review."))
                 state["needs_human_review"] = True

        state["anomalies"].extend(detected_anomalies) # 累积异常

        if detected_anomalies:
            state["needs_human_review"] = True # 发现异常,需要人工复核
            state["messages"].append(AIMessage(f"Detected {len(detected_anomalies)} potential anomalies. Needs human review."))
            state["progress"] = "Anomalies detected, needs human review."
        else:
            state["messages"].append(AIMessage("No anomalies detected for this invoice."))
            state["progress"] = "No anomalies detected."
        return state
    except json.JSONDecodeError as e:
        state["messages"].append(AIMessage(f"LLM response for anomaly detection was not valid JSON: {e}. Raw: {llm_response.content}. Needs human review."))
        state["needs_human_review"] = True
        state["progress"] = "LLM output error, needs human review."
        return state
    except Exception as e:
        state["messages"].append(AIMessage(f"An unexpected error occurred during anomaly detection: {e}. Needs human review."))
        state["needs_human_review"] = True
        state["progress"] = "Anomaly detection error, needs human review."
        return state

def save_audit_results_node(state: AuditState) -> AuditState:
    """
    Node 5: Saves confirmed anomalies and audit status to the database.
    """
    print("n--- Node: save_audit_results_node ---")
    if state["anomalies"]:
        for anomaly in state["anomalies"]:
            if anomaly["status"] == "待复核" and not state["needs_human_review"]:
                # 如果不需要人工复核,但异常状态是待复核,说明是无异常,或者异常被自动处理了
                # 这里为了简化,我们假设所有发现的异常都需要人工复核
                # 实际场景中,可以根据异常的severity和type来决定是否自动处理或标记为已处理
                anomaly["status"] = "待复核" # 保持待复核状态

            # 只有需要人工复核的异常,或者已经人工复核确认的异常才入库
            # 这里我们简化为只要有异常,都先存入待复核状态
            save_anomaly_to_db.invoke({"anomaly": anomaly})
        state["messages"].append(AIMessage(f"Saved {len(state['anomalies'])} anomalies to database."))
        state["progress"] = "Anomalies saved for review."
    else:
        state["messages"].append(AIMessage("No anomalies to save for this invoice."))
        state["progress"] = "Audit complete, no anomalies."
    return state

def human_review_node(state: AuditState) -> AuditState:
    """
    Node for human review. This node typically pauses the execution or marks for external review.
    For demonstration, we'll just log and potentially clear the 'needs_human_review' flag.
    In a real system, this would trigger an alert/task in a UI.
    """
    print("n--- Node: human_review_node ---")
    if state["needs_human_review"]:
        print(f"!!! HUMAN REVIEW REQUIRED for invoice {state['extracted_invoice_data']['invoice_id'] if state['extracted_invoice_data'] else 'N/A'} !!!")
        print(f"Reason: {state['messages'][-1].content}") # 通常是最后一条消息说明原因
        # 在实际系统中,这里不会直接修改 state['needs_human_review'] = False
        # 而是会等待人工复核系统的回调
        # 为了演示流程能走完,我们暂时模拟人工审核后的结果
        # state["needs_human_review"] = False # 假设人工审核完成并处理了
        # state["progress"] = "Waiting for human review."
    return state

4.4 构建图 (Graph)

现在,我们将这些节点和工具组装成一个 LangGraph StateGraph

from langgraph.graph import StateGraph, END

# 定义图
workflow = StateGraph(AuditState)

# 添加节点
workflow.add_node("extract_data", extract_invoice_data_node)
workflow.add_node("validate_data", normalize_and_validate_node)
workflow.add_node("retrieve_rules", retrieve_relevant_rules_node)
workflow.add_node("detect_anomaly", detect_anomaly_node)
workflow.add_node("save_results", save_audit_results_node)
workflow.add_node("human_review", human_review_node)

# 设置入口点
workflow.set_entry_point("extract_data")

# 添加边 (Edges)
# 1. 提取数据 -> 验证数据
workflow.add_edge("extract_data", "validate_data")

# 2. 验证数据 -> 检索规则 (如果验证通过)
# 验证数据 -> 人工复核 (如果验证失败)
workflow.add_conditional_edges(
    "validate_data",
    lambda state: "human_review" if state["needs_human_review"] else "retrieve_rules",
    {"human_review": "human_review", "retrieve_rules": "retrieve_rules"}
)

# 3. 检索规则 -> 异常检测
workflow.add_edge("retrieve_rules", "detect_anomaly")

# 4. 异常检测 -> 保存结果 (如果未发现异常)
# 异常检测 -> 人工复核 (如果发现异常或LLM出错)
workflow.add_conditional_edges(
    "detect_anomaly",
    lambda state: "human_review" if state["needs_human_review"] else "save_results",
    {"human_review": "human_review", "save_results": "save_results"}
)

# 5. 人工复核 -> 保存结果 (人工复核后,无论结果如何,最终都保存)
# 实际中,人工复核后可能有多种路径:修改数据重新进入流程,确认异常并保存,驳回异常并保存
# 这里为了简化,我们假设人工复核后直接到保存结果。
# 也可以设计成一个循环,人工复核后可以重新触发某些步骤。
workflow.add_edge("human_review", "save_results")

# 6. 保存结果 -> 结束
workflow.add_edge("save_results", END)

# 编译图
app = workflow.compile()

print("LangGraph workflow compiled successfully.")

4.5 运行 Agent

现在我们可以传入初始状态来运行这个 Agent 了。

import os
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY" # 请替换为你的API Key

# 模拟数万张发票的输入
invoice_paths = [
    "invoice_good_001.pdf",
    "invoice_bad_format_002.pdf_error", # 模拟OCR错误
    "invoice_normal_003.pdf",
    "invoice_catering_004.pdf", # 模拟餐饮发票,不可抵扣
    "invoice_normal_005.pdf",
    "invoice_suspicious_006.pdf" # 模拟LLM可能发现的异常
]

print("n--- Starting Audit Agent Processing ---")

processed_count = 0
anomalies_found_total = 0

for i, invoice_path in enumerate(invoice_paths):
    print(f"nProcessing invoice {i+1}/{len(invoice_paths)}: {invoice_path}")
    initial_state = AuditState(
        current_invoice_path=invoice_path,
        extracted_invoice_data=None,
        relevant_tax_regulations=[],
        anomalies=[],
        messages=[HumanMessage(content=f"Starting audit for {invoice_path}")],
        needs_human_review=False,
        batch_id="BATCH-20231027",
        progress="Initialized"
    )

    try:
        # 运行图
        # app.stream() 可以逐节点获取更新,app.invoke() 直接获取最终状态
        final_state = app.invoke(initial_state) 

        print(f"n--- Audit for {invoice_path} Completed ---")
        print(f"Final Progress: {final_state['progress']}")
        if final_state["anomalies"]:
            anomalies_found_total += len(final_state["anomalies"])
            print(f"Detected Anomalies: {json.dumps(final_state['anomalies'], indent=2, ensure_ascii=False)}")
        else:
            print("No anomalies detected.")
        if final_state["needs_human_review"]:
            print("!!! This invoice requires human review !!!")

    except Exception as e:
        print(f"!!! An unhandled error occurred for invoice {invoice_path}: {e} !!!")
        # 可以在这里记录到错误日志,或者将该发票标记为失败,并推入人工队列

    processed_count += 1
    if processed_count % 100 == 0:
        print(f"n--- Processed {processed_count} invoices so far ---")

print(f"n--- All {len(invoice_paths)} invoices processed ---")
print(f"Total anomalies found across all invoices: {anomalies_found_total}")

运行结果示例 (部分输出):

--- Starting Audit Agent Processing ---

Processing invoice 1/6: invoice_good_001.pdf

--- Node: extract_invoice_data_node ---
Executing OCR and extraction for: invoice_good_001.pdf

--- Node: normalize_and_validate_node ---
Invoice INV-76077 passed basic validation.

--- Node: retrieve_relevant_rules_node ---
Retrieving tax regulations for category: 软件服务, keywords: ['软件服务费', '技术支持费']

--- Node: detect_anomaly_node ---
No anomalies detected for this invoice.

--- Node: save_audit_results_node ---
No anomalies to save for this invoice.

--- Audit for invoice_good_001.pdf Completed ---
Final Progress: Audit complete, no anomalies.
No anomalies detected.

Processing invoice 2/6: invoice_bad_format_002.pdf_error

--- Node: extract_invoice_data_node ---
Executing OCR and extraction for: invoice_bad_format_002.pdf_error
Error during OCR and extraction: Simulated OCR error for this invoice.. Marking for human review.

--- Node: normalize_and_validate_node ---

--- Node: human_review_node ---
!!! HUMAN REVIEW REQUIRED for invoice N/A !!!
Reason: Error during OCR and extraction: Simulated OCR error for this invoice.. Marking for human review.

--- Node: save_audit_results_node ---
No anomalies to save for this invoice.

--- Audit for invoice_bad_format_002.pdf_error Completed ---
Final Progress: OCR failed, needs human review.
No anomalies detected.
!!! This invoice requires human review !!!

Processing invoice 4/6: invoice_catering_004.pdf

--- Node: extract_invoice_data_node ---
Executing OCR and extraction for: invoice_catering_004.pdf

--- Node: normalize_and_validate_node ---
Invoice INV-20625 passed basic validation.

--- Node: retrieve_relevant_rules_node ---
Retrieving tax regulations for category: 餐饮, keywords: ['餐饮服务', '招待费']

--- Node: detect_anomaly_node ---
Detected 1 potential anomalies. Needs human review.

--- Node: human_review_node ---
!!! HUMAN REVIEW REQUIRED for invoice INV-20625 !!!
Reason: Detected 1 potential anomalies. Needs human review.

--- Node: save_audit_results_node ---
Saving anomaly to DB: 发票包含餐饮服务项目,根据TR-001规定,餐饮娱乐支出不得抵扣增值税进项税额。
Saved 1 anomalies to database.

--- Audit for invoice_catering_004.pdf Completed ---
Final Progress: Anomalies detected, needs human review.
Detected Anomalies: [
  {
    "anomaly_id": "ANOMALY-1845",
    "invoice_id": "INV-20625",
    "rule_id": "TR-001",
    "description": "发票包含餐饮服务项目,根据TR-001规定,餐饮娱乐支出不得抵扣增值税进项税额。",
    "severity": "高",
    "suggested_action": "驳回抵扣,并通知相关部门。",
    "status": "待复核"
  }
]
!!! This invoice requires human review !!!
...

5. 规模化处理:数万张发票的挑战与应对

处理数万张发票,不仅仅是运行一个Agent这么简单,还需要考虑性能、并发、错误恢复等问题。

5.1 批处理与异步处理

  • 批次管理: 将发票文件按批次组织,每次处理一个批次。在 AuditState 中加入 batch_idprogress 字段,方便跟踪批次状态。
  • 异步执行: LangGraph 本身支持异步运行。在实际部署中,可以使用 asyncioCelery 等任务队列,将每个发票的处理作为一个独立的异步任务提交。

    # 异步运行示例 (伪代码)
    # async def run_audit_for_invoice(invoice_path: str):
    #     initial_state = AuditState(...)
    #     final_state = await app.ainvoke(initial_state)
    #     return final_state
    
    # import asyncio
    # tasks = [run_audit_for_invoice(p) for p in invoice_paths]
    # results = await asyncio.gather(*tasks)

5.2 资源管理

  • LLM API 速率限制: 大规模调用 LLM 会遇到速率限制。需要实现指数退避重试机制或使用并发限制器。
  • OCR 资源: OCR 服务也可能有并发限制或成本考量。
  • 数据库连接池: 确保数据库操作高效,避免连接泄露。

5.3 错误处理与重试

  • 节点级别的错误处理: 在每个节点函数内部,使用 try-except 块捕获预期错误,并更新 AuditState,例如设置 needs_human_review = True
  • 全局错误处理: 对于未捕获的错误,记录日志,并将发票标记为“处理失败”,可以考虑在后续批次中重试。
  • 幂等性: 设计节点操作时考虑幂等性,即多次执行相同操作不会产生不同结果,便于重试。

5.4 知识库的RAG优化

对于税务法规,将其存储在向量数据库中,结合 RAG(Retrieval Augmented Generation)技术,能显著提高 LLM 检索相关法规的准确性。

  1. 数据摄入: 将税务法规文本切分、嵌入(使用 text-embedding-ada-002 或其他嵌入模型),存入 ChromaDB、Weaviate 等向量数据库。
  2. 检索:retrieve_relevant_rules_node 中,不再是简单的关键词匹配,而是将发票内容(或LLM提炼的摘要)转换为向量,在向量数据库中进行相似度搜索,获取最相关的法规片段。
  3. 增强: 将检索到的法规片段作为上下文,连同发票数据一起传递给 LLM,用于异常检测。
# RAG增强的retrieve_relevant_rules_node (概念性代码)
# from langchain_community.vectorstores import Chroma
# from langchain_openai import OpenAIEmbeddings

# embeddings = OpenAIEmbeddings()
# vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)

# def retrieve_relevant_rules_rag_node(state: AuditState) -> AuditState:
#     invoice_data = state["extracted_invoice_data"]
#     query_text = f"请检索与以下发票相关的税务法规:{invoice_data['supplier_name']}向{invoice_data['purchaser_name']}开具的关于{', '.join([item['description'] for item in invoice_data['items']])}的发票,总金额{invoice_data['total_amount']}。"

#     # 检索最相似的法规文档
#     docs = vectorstore.similarity_search(query_text, k=5)
#     
#     relevant_rules = []
#     for doc in docs:
#         # 假设文档的元数据中包含rule_id, category等信息
#         rule_info = doc.metadata 
#         rule_info["description"] = doc.page_content # 文档内容作为描述
#         relevant_rules.append(TaxRegulation(**rule_info))

#     state["relevant_tax_regulations"] = relevant_rules
#     state["messages"].append(AIMessage(f"Retrieved {len(relevant_rules)} relevant tax regulations using RAG."))
#     return state

6. 人机协作与持续优化

全自动审计Agent并非要完全取代人工,而是赋能审计师,让他们专注于更复杂、更具战略性的工作。

6.1 人工复核机制

  • 优先级: 对Agent标记的异常进行优先级排序,高风险、高金额的异常优先人工复核。
  • 反馈闭环: 审计师复核后,将“确认异常”、“驳回异常”、“修改建议”等反馈回传给系统,用于Agent的持续学习和优化。这可以通过微调LLM或更新规则库来实现。

6.2 性能监控与可观测性

  • 日志记录: 详细记录Agent的运行日志,包括每个节点的输入、输出、耗时、LLM调用详情等。
  • 指标监控: 监控关键指标,如发票处理量、异常检出率、误报率、漏报率、LLM成本等。
  • 可视化: 利用仪表盘(如 Grafana)实时展示审计进度和异常概览。

6.3 持续学习与模型迭代

  • 规则库更新: 定期更新税务法规知识库,确保Agent的合规性判断基于最新规则。
  • LLM微调: 利用人工复核的反馈数据,对LLM进行微调,提升其在特定审计场景下的理解和推理能力。例如,针对误报或漏报的案例,构造高质量的问答对进行监督式微调。
  • Agent行为优化: 根据实际运行效果,调整 LangGraph 的节点逻辑、条件分支和工具使用策略。

7. 挑战与展望

尽管全自动财务审计Agent潜力巨大,但我们也必须清醒地认识到其面临的挑战:

  • LLM的“幻觉”问题: LLM可能生成看似合理但实际错误的信息,需要严格的验证机制。
  • 数据隐私与安全: 财务数据高度敏感,Agent的部署和运行必须符合严格的数据安全和隐私保护标准。
  • 解释性: LLM的决策过程往往是“黑箱”,如何提高Agent的决策解释性,使其更易于审计师信任和理解,是一个重要课题。
  • 法规的动态性: 税务法规变化频繁,如何确保知识库的及时更新和Agent的快速适应。

展望未来,我们的全自动财务审计Agent将不仅仅局限于事后审计,而是可以扩展到:

  • 事前预警: 在交易发生前就预测潜在的合规风险。
  • 实时监控: 持续监控企业财务数据流,实时发现异常。
  • 智能报告: 自动生成定制化的审计报告和合规建议。
  • 多Agent协作: 构建多Agent系统,每个Agent专注于审计的不同方面,协同工作,提高整体效率和深度。

结语

全自动财务审计 Agent,是人工智能在财务领域深度应用的一次探索。LangGraph 为我们提供了一个强大的框架,来编排复杂的 Agent 工作流,结合 LLM 的智能,我们正逐步构建一个能够高效、准确、规模化处理审计任务的未来系统。这不仅将极大地提升审计效率,更能为企业带来更高的合规性和更健康的财务管理。这是一个充满挑战但前景广阔的方向,期待与各位同仁共同推动这一领域的进步。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注