各位编程专家,下午好!
今天,我们将深入探讨一个在大型语言模型(LLM)驱动的复杂系统中至关重要且极具挑战性的主题:如何物理实现“条件边缘”(Conditional Edges),并根据LLM的输出概率动态计算下一跳路径。 这不仅仅是一个理论概念,更是构建智能、自适应、少人工干预的AI系统所必须掌握的核心工程实践。
在传统的软件开发中,我们的程序流程通常是预先定义好的,通过if/else、switch/case或状态机来明确控制。然而,当我们将LLM引入工作流时,其强大的自然语言理解和生成能力,使得我们能够基于非结构化或半结构化的输入,做出更加灵活和智能的决策。这就引出了一个关键问题:我们如何将LLM的“模糊”判断(即概率性输出)转化为“清晰”的流程控制指令,从而动态地选择工作流中的下一条路径?这就是“条件边缘”的物理实现所要解决的核心问题。
我们将从LLM输出概率的本质入手,逐步构建起一个能够理解、解析并利用这些概率来驱动复杂工作流的系统。这趟旅程将涵盖多种策略、详细的代码示例以及关键的架构考量。
一、 LLM与动态路径的必要性
大型语言模型(LLM)已经彻底改变了我们与计算机交互的方式。它们能够理解、生成、总结和转换自然语言,从而为构建更智能、更人性化的应用打开了大门。然而,LLM本身并非一个工作流引擎。它们擅长的是文本处理,而非流程编排。
考虑一个复杂的业务场景:
- 智能客服:用户提出问题,LLM首先理解意图,然后根据意图将请求路由到技术支持、账单查询、产品咨询或售后服务等不同部门。
- 内容生成与审核:LLM生成文章草稿,然后根据草稿的质量、是否包含敏感信息、是否符合品牌调性等,决定是直接发布、发送给人工编辑、还是打回重写。
- 自动化数据处理:LLM从非结构化文档中提取信息,然后根据提取信息的类型和置信度,决定是将数据存入数据库、触发API调用、还是标记为需要人工复核。
在这些场景中,单一的LLM调用不足以完成整个任务。我们需要一个由多个LLM调用、API调用、数据库操作和人工干预组成的复杂序列。更重要的是,这个序列不应该是固定的,而应该根据LLM的实时输出动态调整。这就是“条件边缘”发挥作用的地方。它允许我们的工作流根据LLM的判断结果,在不同的路径之间进行选择,从而实现真正的自适应和智能。
二、 条件边缘的本质与传统实现
在深入LLM相关的实现之前,我们先回顾一下“条件边缘”在传统计算图或工作流系统中的定义。
条件边缘(Conditional Edge)是指在有向图(Directed Graph)中,连接两个节点(或任务)的边,其激活(即允许流程沿着这条边前进)取决于某个预设条件的评估结果。
传统实现方式:
- 布尔表达式:最常见的方式。例如,
if (variable > 10),如果条件为真,则激活相应的边缘。 - 状态机:在有限状态机(FSM)中,从一个状态到另一个状态的转换(边缘)由特定的事件或条件触发。
- 规则引擎:更复杂的系统使用规则引擎,定义一组“当…则…”的规则,根据输入数据评估这些规则来决定下一步动作。
这些传统方法的核心是确定性。条件通常是明确的、可量化的,评估结果是真或假。然而,当我们将LLM引入决策循环时,这种确定性变得模糊。LLM的输出,尤其是当它被要求做出分类或选择时,往往伴随着概率。我们不能简单地说“LLM说A就是A”,而更应该问“LLM有多大把握说A?”。这个“把握”就是我们进行动态路径计算的基础。
三、 LLM输出概率的深度解析
要利用LLM的输出概率,我们首先需要理解这些概率是如何产生的,以及如何有效地获取和解释它们。
3.1 LLM生成概率的机制
当LLM生成文本时,它实际上是在为词汇表中的每一个可能的下一个词(或子词,即token)计算一个概率分布。这个过程通常涉及以下步骤:
- 输入编码:用户的输入文本首先被分词器(Tokenizer)转换为一系列的token ID。
- 嵌入层:这些token ID被映射到高维向量空间中的嵌入(Embeddings)。
- Transformer模型:嵌入向量通过Transformer模型的多个层,进行复杂的注意力计算和前馈网络处理,以理解上下文并预测下一个token。
- Logits输出:Transformer的最后一层通常会输出一个与词汇表大小相等的向量,这个向量的每个元素代表了对应token的“对数几率”(Logits)。Logits是未经归一化的分数,可以直接反映模型对每个token的偏好。
-
Softmax层:为了将Logits转换为可解释的概率分布,通常会应用Softmax函数。Softmax函数将任意实数向量压缩成一个和为1的正数向量,每个元素都在0到1之间,可以被解释为概率。
$$ P(token_i | context) = frac{e^{logiti}}{sum{j=1}^{V} e^{logit_j}} $$
其中 $V$ 是词汇表的大小,$logit_i$ 是token $i$ 的对数几率。
3.2 获取LLM输出概率的方法
不同的LLM服务和库提供了不同的方式来访问这些概率。
a) Hugging Face transformers库(本地模型)
对于本地运行的Hugging Face模型,我们可以直接访问模型的输出Logits。
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
# 假设我们使用一个小型模型作为示例
model_name = "gpt2" # 或者 "bert-base-uncased" for masked language modeling
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.eval() # 设置为评估模式
def get_token_probabilities(text: str, target_tokens: list[str]):
"""
获取给定文本后,特定目标token的生成概率。
注意:这通常是针对下一个token的概率。
对于多个token的序列,需要迭代计算或使用更复杂的策略。
"""
inputs = tokenizer(text, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
# Logits for the last token in the input sequence
# Shape: [batch_size, sequence_length, vocab_size]
last_token_logits = outputs.logits[0, -1, :]
# Apply softmax to get probabilities
probabilities = torch.softmax(last_token_logits, dim=-1)
result = {}
for target_token_text in target_tokens:
# Tokenize the target word/phrase.
# For single words, this is usually straightforward.
# For multi-word phrases, we need to consider how the model tokenizes them.
# Here, we assume target_token_text is a single token or the first token of a desired sequence.
# Ensure the token is in the vocabulary
token_ids = tokenizer.encode(target_token_text, add_special_tokens=False)
if not token_ids:
print(f"Warning: '{target_token_text}' could not be tokenized.")
result[target_token_text] = 0.0
continue
# If a phrase tokenizes to multiple IDs, we might take the first one's probability,
# or calculate a joint probability (more complex).
target_token_id = token_ids[0]
if target_token_id in tokenizer.vocab.values(): # Check if ID is valid
prob = probabilities[target_token_id].item()
result[target_token_text] = prob
else:
print(f"Warning: Token ID {target_token_id} for '{target_token_text}' not found in vocabulary.")
result[target_token_text] = 0.0
return result
# 示例使用
context = "The customer is asking about their"
keywords = ["billing", "technical", "shipping"]
probs = get_token_probabilities(context, keywords)
print(f"Probabilities for keywords after '{context}': {probs}")
# 示例输出可能为: {'billing': 0.0012, 'technical': 0.0008, 'shipping': 0.0005}
b) OpenAI API(或其他商业API)
OpenAI的Completion API(对于旧模型)和Chat Completion API(对于新模型如GPT-3.5/4)都支持获取logprobs。Logprobs是概率的自然对数,它们在数值上比原始概率更稳定,并且可以直接相加来计算序列的联合对数概率。
import openai
import os
# openai.api_key = os.getenv("OPENAI_API_KEY") # 确保设置了API Key
def get_openai_logprobs(prompt: str, choices: list[str], model: str = "gpt-3.5-turbo-instruct"):
"""
使用OpenAI Completion API获取特定token的logprobs。
注意:Chat Completion API的logprobs支持更为复杂,这里以Completion API为例。
对于Chat Completion API,需要关注 'logprobs' 参数和 'top_logprobs' 字段。
"""
try:
response = openai.Completion.create(
model=model,
prompt=prompt,
max_tokens=1, # 只生成一个token
logprobs=5, # 返回top 5个token的logprobs
echo=False # 不回显prompt文本
)
# 提取第一个生成的token的logprobs
if response.choices and response.choices[0].logprobs:
top_logprobs = response.choices[0].logprobs.top_logprobs[0]
result = {}
for choice_text in choices:
# OpenAI返回的token可能包含前导空格,需要处理
# 例如," billing" vs "billing"
normalized_choice = choice_text
# 查找匹配的token及其logprob
# 遍历top_logprobs字典,寻找与我们的choice匹配的key
found = False
for token_str, logprob_val in top_logprobs.items():
# 考虑到分词器可能添加前导空格,尝试多种匹配方式
if token_str.strip().lower() == normalized_choice.strip().lower():
result[choice_text] = torch.exp(torch.tensor(logprob_val)).item()
found = True
break
# 有时候,模型会预测带空格的token,例如 " billing"
if token_str.startswith(' ') and token_str[1:].strip().lower() == normalized_choice.strip().lower():
result[choice_text] = torch.exp(torch.tensor(logprob_val)).item()
found = True
break
if not found:
result[choice_text] = 0.0 # 如果不在top_logprobs中,则概率视为0
return result
except openai.error.OpenAIError as e:
print(f"OpenAI API error: {e}")
return {}
except Exception as e:
print(f"An unexpected error occurred: {e}")
return {}
# 示例使用 (需要有效的OpenAI API Key)
# prompt_text = "The customer is asking about their"
# keywords_openai = ["billing", "technical", "shipping"]
# probs_openai = get_openai_logprobs(prompt_text, keywords_openai)
# print(f"OpenAI Probabilities for keywords after '{prompt_text}': {probs_openai}")
3.3 概率的挑战与解释
- Token-level vs. Concept-level:LLM提供的概率是针对单个token的。一个概念(如“技术支持”)可能由多个token组成(例如“technical”、“ support”)。要获取概念的概率,需要更复杂的逻辑,如计算构成概念的所有token的联合概率,或将LLM作为一个分类器使用。
- 概率校准:LLM的原始Softmax概率不总是完美校准的。这意味着一个模型报告的80%概率,可能在实际中并不总是代表80%的置信度。在关键应用中,可能需要对概率进行校准(如使用Platt scaling)。
- 阈值设定:如何设定一个合理的概率阈值来触发条件边缘?这通常需要经验、实验和对业务风险的理解。
- 长序列概率:计算整个句子或短语的精确联合概率非常复杂,且通常不是必需的。对于条件边缘,我们通常更关注LLM对某个关键决策词或短语的直接概率。
四、 设计’条件边缘’机制:策略与实现
有了对LLM概率的理解,现在我们可以着手设计如何利用这些概率来驱动条件边缘。我们将探讨几种核心策略。
4.1 策略1:关键词/短语概率匹配
思想:引导LLM在输出中包含特定的关键词或短语,这些词语与不同的路径直接关联。然后,我们提取这些关键词的概率,并根据预设的阈值选择路径。
优点:
- 直观易懂,实现相对简单。
- 适用于LLM被明确指示输出特定词语的场景。
缺点:
- 依赖LLM准确生成关键词。
- 如果关键词本身是多义的,可能导致混淆。
- 可能需要处理LLM输出中关键词的变体(如单数/复数,同义词)。
实现步骤:
- Prompt Engineering:设计Prompt,明确要求LLM在输出中包含或暗示特定的关键词。例如:“根据上述用户查询,请判断其意图是’技术支持’、’账单查询’还是’通用问题’。请直接输出最可能的意图关键词。”
- 概率提取:使用前述方法(如Hugging Face
get_token_probabilities或 OpenAIlogprobs)提取目标关键词的概率。 - 阈值判断:比较提取的概率与预设阈值,选择概率最高的且超过阈值的路径。
代码示例:关键词概率匹配的决策引擎
import torch
import math
class KeywordProbabilityDecisionEngine:
def __init__(self, llm_client, threshold: float = 0.5):
self.llm_client = llm_client # 传入一个LLM客户端实例
self.threshold = threshold
def make_decision(self, context: str, possible_paths: dict[str, list[str]]) -> tuple[str | None, dict[str, float]]:
"""
根据LLM对关键词的概率,选择下一跳路径。
Args:
context (str): LLM做决策的上下文文本。
possible_paths (dict): 字典,键是路径名称(例如"technical_support"),
值是与该路径关联的关键词列表(例如["technical", "support"])。
LLM可能会输出这些词来表示意图。
Returns:
tuple[str | None, dict]: 选定的路径名称(如果没有路径超过阈值则为None)
以及所有路径关键词的最高概率。
"""
all_keywords = []
path_keyword_map = {}
for path_name, keywords in possible_paths.items():
all_keywords.extend(keywords)
for keyword in keywords:
path_keyword_map[keyword] = path_name
# 获取所有关键词在LLM下一个token中的概率
# 注意:这里假设llm_client.get_token_probabilities 能够处理我们传入的all_keywords
# 并且返回的是LLM对这些关键词作为下一个token的概率。
# 实际实现中,这可能是对一个短句进行预测,然后提取其中关键词的概率。
# 简单起见,我们假设LLM直接预测一个关键词
prompt_for_keywords = f"{context}nBased on the above, the most likely intent is:"
# 模拟LLM客户端获取下一个token的概率
# 实际这里会调用LLM API或本地模型
keyword_probs = self.llm_client.get_token_probabilities(prompt_for_keywords, all_keywords)
print(f"Keyword probabilities: {keyword_probs}")
path_scores = {path_name: 0.0 for path_name in possible_paths.keys()}
path_details_probs = {}
for path_name, keywords in possible_paths.items():
max_keyword_prob_for_path = 0.0
for keyword in keywords:
prob = keyword_probs.get(keyword, 0.0)
if prob > max_keyword_prob_for_path:
max_keyword_prob_for_path = prob
path_scores[path_name] = max_keyword_prob_for_path
path_details_probs[path_name] = max_keyword_prob_for_path # 记录每个路径的最高关键词概率
selected_path = None
max_score = 0.0
for path_name, score in path_scores.items():
if score > max_score and score >= self.threshold:
max_score = score
selected_path = path_name
return selected_path, path_details_probs
# 模拟LLM客户端
class MockLLMClient:
def get_token_probabilities(self, prompt: str, target_tokens: list[str]) -> dict[str, float]:
# 这是一个模拟,实际会调用真实的LLM
# 为了演示,我们根据prompt内容返回一些假数据
if "technical" in prompt:
return {"technical": 0.85, "billing": 0.1, "general": 0.05, "support": 0.7}
elif "invoice" in prompt:
return {"technical": 0.1, "billing": 0.8, "general": 0.1, "invoice": 0.75}
else:
return {"technical": 0.2, "billing": 0.2, "general": 0.6}
# 示例使用
mock_llm = MockLLMClient()
decision_engine = KeywordProbabilityDecisionEngine(mock_llm, threshold=0.6)
customer_query_1 = "My internet is not working, I need help with connectivity."
paths_1 = {
"technical_support": ["technical", "support", "connectivity", "internet"],
"billing_inquiry": ["billing", "invoice", "payment"],
"general_question": ["general", "question", "info"]
}
selected_path_1, probs_1 = decision_engine.make_decision(customer_query_1, paths_1)
print(f"Query 1: '{customer_query_1}' -> Selected Path: {selected_path_1}, Probabilities: {probs_1}")
# 预期输出: Selected Path: technical_support
customer_query_2 = "I have a question about my last invoice."
paths_2 = {
"technical_support": ["technical", "support"],
"billing_inquiry": ["billing", "invoice", "payment"],
"general_question": ["general", "question"]
}
selected_path_2, probs_2 = decision_engine.make_decision(customer_query_2, paths_2)
print(f"Query 2: '{customer_query_2}' -> Selected Path: {selected_path_2}, Probabilities: {probs_2}")
# 预期输出: Selected Path: billing_inquiry
customer_query_3 = "What are your business hours?"
paths_3 = {
"technical_support": ["technical", "support"],
"billing_inquiry": ["billing", "invoice"],
"general_question": ["general", "question", "hours"]
}
selected_path_3, probs_3 = decision_engine.make_decision(customer_query_3, paths_3)
print(f"Query 3: '{customer_query_3}' -> Selected Path: {selected_path_3}, Probabilities: {probs_3}")
# 预期输出: Selected Path: general_question
# 如果没有达到阈值
customer_query_4 = "I'm not sure what I need."
selected_path_4, probs_4 = decision_engine.make_decision(customer_query_4, paths_3)
print(f"Query 4: '{customer_query_4}' -> Selected Path: {selected_path_4}, Probabilities: {probs_4}")
# 预期输出: Selected Path: None (因为模拟的概率都不高)
4.2 策略2:结构化输出解析(JSON/YAML)
思想:要求LLM输出结构化的数据(如JSON),其中包含明确的决策字段(如decision_path)和可选的置信度分数(confidence)。我们解析这个结构化输出,并根据decision_path和confidence来选择路径。
优点:
- 决策信息明确,易于编程解析。
- 可以获取LLM的显式置信度,而不仅仅是单个token的概率。
- 便于验证和错误处理。
缺点:
- 对Prompt Engineering要求更高,需要确保LLM能够持续生成有效的结构化输出。
- LLM偶尔会生成格式错误或不完整的JSON。
实现步骤:
- Prompt Engineering:明确指示LLM以JSON格式输出,并定义好JSON的结构。
{ "decision_path": "technical_support", "confidence": 0.92, "reason": "User mentioned 'internet not working' and 'connectivity'." } - LLM调用:进行LLM调用以获取结构化输出。
- JSON解析与验证:使用Python的
json库解析输出,并使用Pydantic等库进行数据模型验证。 - 决策逻辑:根据
decision_path和confidence字段进行路径选择和阈值判断。
代码示例:结构化输出解析的决策引擎
import json
from pydantic import BaseModel, Field, ValidationError
from typing import Optional
class LLMDecisionOutput(BaseModel):
decision_path: str = Field(..., description="The name of the next path to take.")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score for the decision.")
reason: Optional[str] = Field(None, description="Explanation for the decision.")
class StructuredOutputDecisionEngine:
def __init__(self, llm_client, threshold: float = 0.7):
self.llm_client = llm_client
self.threshold = threshold
def make_decision(self, context: str, possible_path_names: list[str]) -> tuple[str | None, float]:
"""
根据LLM的结构化JSON输出选择下一跳路径。
Args:
context (str): LLM做决策的上下文文本。
possible_path_names (list): 所有可能的路径名称列表。
Returns:
tuple[str | None, float]: 选定的路径名称(如果没有路径超过阈值则为None)
以及LLM报告的置信度。
"""
prompt = (
f"Given the following user query:n'{context}'nn"
f"Please analyze the intent and provide your decision in JSON format. "
f"The 'decision_path' field must be one of {possible_path_names}. "
f"Include a 'confidence' score (0.0 to 1.0) and a 'reason'.n"
f"JSON Output:"
)
# 模拟LLM客户端生成JSON文本
# 实际这里会调用LLM API,例如 OpenAI Chat Completion API
# prompt_response = self.llm_client.generate_text(prompt)
# 模拟LLM返回的JSON字符串
if "internet" in context.lower() or "connectivity" in context.lower():
mock_json_output = '{"decision_path": "technical_support", "confidence": 0.95, "reason": "User reported internet issues."}'
elif "invoice" in context.lower() or "bill" in context.lower():
mock_json_output = '{"decision_path": "billing_inquiry", "confidence": 0.88, "reason": "User mentioned invoice."}'
elif "hours" in context.lower() or "open" in context.lower():
mock_json_output = '{"decision_path": "general_question", "confidence": 0.75, "reason": "User asked about business hours."}'
else:
mock_json_output = '{"decision_path": "unclear_intent", "confidence": 0.45, "reason": "Intent is ambiguous."}'
print(f"LLM Raw JSON Output: {mock_json_output}")
try:
parsed_data = json.loads(mock_json_output)
decision = LLMDecisionOutput.parse_obj(parsed_data)
if decision.decision_path in possible_path_names and decision.confidence >= self.threshold:
return decision.decision_path, decision.confidence
else:
print(f"Decision '{decision.decision_path}' (confidence: {decision.confidence}) not valid or below threshold.")
return None, decision.confidence
except json.JSONDecodeError as e:
print(f"Failed to decode JSON from LLM: {e}")
return None, 0.0
except ValidationError as e:
print(f"LLM output validation error: {e}")
return None, 0.0
except Exception as e:
print(f"An unexpected error occurred during decision parsing: {e}")
return None, 0.0
# 模拟LLM客户端(此处只模拟generate_text方法)
class MockLLMClientForJSON:
def generate_text(self, prompt: str) -> str:
# 实际会调用LLM API
return "..." # 真实LLM会返回JSON字符串
mock_llm_json = MockLLMClientForJSON()
decision_engine_json = StructuredOutputDecisionEngine(mock_llm_json, threshold=0.7)
possible_paths = ["technical_support", "billing_inquiry", "general_question", "unclear_intent"]
query_1 = "My WiFi is down, can you help?"
selected_path_1, conf_1 = decision_engine_json.make_decision(query_1, possible_paths)
print(f"Query 1: '{query_1}' -> Selected Path: {selected_path_1}, Confidence: {conf_1}")
query_2 = "Where can I find my latest bill?"
selected_path_2, conf_2 = decision_engine_json.make_decision(query_2, possible_paths)
print(f"Query 2: '{query_2}' -> Selected Path: {selected_path_2}, Confidence: {conf_2}")
query_3 = "What's the weather like tomorrow?" # 模拟LLM返回低置信度或未知路径
selected_path_3, conf_3 = decision_engine_json.make_decision(query_3, possible_paths)
print(f"Query 3: '{query_3}' -> Selected Path: {selected_path_3}, Confidence: {conf_3}")
4.3 策略3:语义相似度/嵌入比较
思想:LLM输出一段自然语言描述,我们将其转换为向量嵌入(Embeddings)。然后,我们将这个输出嵌入与预定义的“意图”或“路径”的嵌入进行比较(例如使用余弦相似度),选择最相似的路径。
优点:
- 不要求LLM严格输出特定关键词或格式,更自然。
- 对LLM输出的细微变化具有鲁棒性。
- 可以处理更复杂的意图匹配。
缺点:
- 需要额外的嵌入模型。
- 相似度阈值的设定同样具有挑战性。
- 计算成本可能更高,尤其是在大量意图需要比较时。
实现步骤:
- 预定义意图嵌入:为每个可能的路径创建代表性的文本描述,并使用嵌入模型(如Sentence Transformers)将其转换为嵌入向量。
- LLM输出嵌入:让LLM生成一个简短的意图总结或决策描述,并将其转换为嵌入向量。
- 相似度计算:计算LLM输出嵌入与所有预定义意图嵌入之间的余弦相似度。
- 决策:选择相似度最高且超过阈值的路径。
代码示例:语义相似度决策引擎
from sentence_transformers import SentenceTransformer, util
import numpy as np
class SemanticSimilarityDecisionEngine:
def __init__(self, llm_client, embedding_model_name: str = 'all-MiniLM-L6-v2', threshold: float = 0.7):
self.llm_client = llm_client
self.embedding_model = SentenceTransformer(embedding_model_name)
self.threshold = threshold
self.path_embeddings = {} # 存储预定义的路径意图嵌入
def preprocess_paths(self, path_intents: dict[str, str]):
"""
为每个路径意图生成嵌入向量。
Args:
path_intents (dict): 键是路径名称,值是代表该路径意图的描述文本。
"""
for path_name, intent_description in path_intents.items():
self.path_embeddings[path_name] = self.embedding_model.encode(intent_description, convert_to_tensor=True)
print(f"Preprocessed {len(self.path_embeddings)} path embeddings.")
def make_decision(self, context: str) -> tuple[str | None, float]:
"""
根据LLM的语义输出与预定义意图的相似度选择下一跳路径。
Args:
context (str): LLM做决策的上下文文本。
Returns:
tuple[str | None, float]: 选定的路径名称(如果没有路径超过阈值则为None)
以及最高相似度分数。
"""
if not self.path_embeddings:
raise ValueError("Path intents must be preprocessed first using preprocess_paths().")
# 模拟LLM客户端生成一个意图总结
# 实际这里会调用LLM API,比如让LLM总结用户意图
# prompt_for_summary = f"Summarize the user's intent from this query: '{context}'"
# llm_summary = self.llm_client.generate_text(prompt_for_summary)
if "internet" in context.lower() or "connectivity" in context.lower():
llm_summary = "The user needs assistance with a technical problem related to their internet connection."
elif "invoice" in context.lower() or "bill" in context.lower():
llm_summary = "The user has a question regarding their billing statement or a specific invoice."
elif "hours" in context.lower() or "open" in context.lower():
llm_summary = "The user is asking about the company's operating hours."
else:
llm_summary = "The user's intent is unclear or general."
print(f"LLM Summary of intent: '{llm_summary}'")
# 将LLM的总结转换为嵌入向量
llm_summary_embedding = self.embedding_model.encode(llm_summary, convert_to_tensor=True)
max_similarity = -1.0
selected_path = None
all_similarities = {}
for path_name, path_embed in self.path_embeddings.items():
similarity = util.cos_sim(llm_summary_embedding, path_embed).item()
all_similarities[path_name] = similarity
if similarity > max_similarity:
max_similarity = similarity
selected_path = path_name
print(f"All similarities: {all_similarities}")
if max_similarity >= self.threshold:
return selected_path, max_similarity
else:
print(f"Highest similarity ({max_similarity}) for path '{selected_path}' is below threshold ({self.threshold}).")
return None, max_similarity
# 模拟LLM客户端(此处只模拟generate_text方法)
class MockLLMClientForSemantic:
def generate_text(self, prompt: str) -> str:
return "..." # 真实LLM会返回文本
mock_llm_semantic = MockLLMClientForSemantic()
decision_engine_semantic = SemanticSimilarityDecisionEngine(mock_llm_semantic, threshold=0.6) # 阈值可以根据模型和数据调整
# 定义路径意图
path_intents_map = {
"technical_support": "User is experiencing a technical issue, bug, or needs help with product functionality.",
"billing_inquiry": "User has questions about their bill, invoice, payment, or subscription.",
"general_question": "User is asking a general question, looking for information, or company details.",
"sales_inquiry": "User is interested in purchasing a product or service, or wants to know more about offerings."
}
decision_engine_semantic.preprocess_paths(path_intents_map)
query_1 = "My software crashed, I need urgent help debugging."
selected_path_1, sim_1 = decision_engine_semantic.make_decision(query_1)
print(f"Query 1: '{query_1}' -> Selected Path: {selected_path_1}, Similarity: {sim_1}")
query_2 = "I want to know how much your premium plan costs."
selected_path_2, sim_2 = decision_engine_semantic.make_decision(query_2)
print(f"Query 2: '{query_2}' -> Selected Path: {selected_path_2}, Similarity: {sim_2}")
query_3 = "Tell me about your company's history."
selected_path_3, sim_3 = decision_engine_semantic.make_decision(query_3)
print(f"Query 3: '{query_3}' -> Selected Path: {selected_path_3}, Similarity: {sim_3}")
query_4 = "I'm just browsing." # 模拟低相似度
selected_path_4, sim_4 = decision_engine_semantic.make_decision(query_4)
print(f"Query 4: '{query_4}' -> Selected Path: {selected_path_4}, Similarity: {sim_4}")
4.4 阈值与决策逻辑
无论采用哪种策略,阈值设定都是关键。
- 静态阈值:最简单,但可能不够灵活。
- 动态阈值:根据上下文、用户重要性、业务风险等因素动态调整。例如,对于涉及金钱交易的路径,阈值可以设得更高。
- 回退路径:当没有任何路径达到阈值时,需要有一个明确的回退策略,例如:
- 发送给人工审核。
- 选择一个默认的“通用”路径。
- 请求LLM澄清或重新生成。
- 多路径情景:如果多个路径都超过了阈值,如何选择?
- 选择概率/相似度最高的。
- 根据预设优先级。
- 触发一个“多意图”流程,并行处理或请求用户进一步澄清。
五、 架构组件与工作流编排
为了将上述决策机制集成到一个健壮的系统中,我们需要一个整体的架构。
5.1 核心组件
一个支持LLM驱动条件边缘的工作流系统通常包含以下核心组件:
- LLM服务接口(LLM Service Interface):封装对底层LLM(无论是本地模型还是云API)的调用,提供统一的接口来获取文本生成、Logprobs、嵌入等功能。
- Prompt工程模块(Prompt Engineering Module):管理和生成LLM的Prompt。根据当前工作流节点和所需决策类型,动态构建最优Prompt。
- 输出解析器(Output Parser):负责解析LLM的原始输出。这可能包括JSON解析、关键词提取、语义摘要等。
- 决策引擎(Decision Engine):这是我们前面讨论的核心部分,负责根据解析后的LLM输出(特别是概率或置信度)和预设逻辑,评估条件边缘并选择下一跳路径。
- 工作流编排器(Workflow Orchestrator):核心的流程控制组件。它维护工作流的图结构(节点和边缘),驱动工作流从一个节点到下一个节点的执行。
- 状态存储(State Store):持久化工作流的当前状态和上下文数据,确保工作流在不同节点之间可以传递信息,并在中断后恢复。
- 任务执行器(Task Executor):负责执行工作流中的非LLM任务,如数据库操作、API调用、发送通知等。
5.2 工作流图的数据结构
工作流编排器需要一种方式来表示工作流的结构,通常是基于有向无环图(DAG)的概念。
节点(Node):工作流中的一个步骤或任务。
id: 唯一标识符。name: 节点名称。type: 节点类型(例如 "llm_decision", "api_call", "human_review", "start", "end")。config: 节点特定的配置,例如:- LLM决策节点:
model_name,prompt_template_id,decision_strategy(keyword, json, semantic),threshold。 - API调用节点:
api_endpoint,request_payload_template。
- LLM决策节点:
边缘(Edge):连接两个节点的有向连接。
source_node_id: 起始节点的ID。target_node_id: 目标节点的ID。condition: 激活此边缘的条件。这正是“条件边缘”的核心。type: 条件类型(例如 "probability_threshold", "json_field_match", "semantic_similarity")。params: 条件参数,例如:probability_threshold:{ "keyword": "technical_support", "min_prob": 0.7 }json_field_match:{ "field_path": "$.decision_path", "expected_value": "billing_inquiry", "min_confidence": 0.8 }semantic_similarity:{ "intent_name": "technical_support", "min_similarity": 0.6 }
is_default: (可选) 如果其他条件边缘未满足,则激活此边缘。
表格:工作流组件与数据结构概览
| 组件名称 | 职责 | 关键数据结构/配置 |
|---|---|---|
| LLM服务接口 | 统一LLM调用,处理API通信/本地模型加载 | LLM模型配置,API Key/Endpoint |
| Prompt工程模块 | 根据上下文和任务动态生成Prompt | Prompt模板,变量映射 |
| 输出解析器 | 解析LLM输出,提取结构化信息或关键数据 | JSON Schema,Regex模式,关键词列表 |
| 决策引擎 | 评估LLM输出概率/置信度,选择下一跳路径 | 决策策略(关键词、JSON、语义),阈值,回退逻辑 |
| 工作流编排器 | 管理工作流图,驱动节点执行,传递状态 | 节点(Node),边缘(Edge),图(Graph) |
| 状态存储 | 持久化工作流上下文和中间结果 | 键值存储,数据库,内存缓存 |
| 任务执行器 | 执行非LLM任务(API调用、DB操作) | 任务函数,API客户端 |
5.3 概念性工作流执行流程
- 启动工作流:接收初始输入,从
Start节点开始。 - 执行当前节点:
- 如果当前是
LLM Decision Node:- Prompt工程模块生成Prompt。
- LLM服务接口调用LLM。
- 输出解析器解析LLM输出。
- 决策引擎评估LLM输出(如概率、置信度),并根据连接到当前节点的所有条件边缘进行判断。
- 如果当前是
API Call Node:- 任务执行器调用外部API。
- 如果当前是
Human Review Node:- 将任务发送给人工,等待反馈。
- 如果当前是
- 选择下一跳边缘:
- 决策引擎根据评估结果,找到第一个满足条件的边缘。
- 如果没有条件边缘满足,则选择标记为
is_default的边缘,或触发错误/回退。
- 更新工作流状态:将当前节点输出和选择的路径记录到状态存储中。
- 切换到下一节点:流程沿着选定的边缘,移动到目标节点。
- 重复2-5步,直到到达
End节点或发生错误/回退。
六、 实践:一个完整的Python工作流示例
我们来构建一个简化的、但功能完整的Python工作流编排器,它能利用LLM的结构化输出和置信度来动态路由客户查询。
场景:客户支持系统,根据用户查询将请求路由到:
technical_supportbilling_inquirygeneral_questionescalate_to_human(回退路径)
import json
import logging
from typing import Dict, Any, Optional, Callable, List
from pydantic import BaseModel, Field, ValidationError
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- 1. LLM服务接口 (Mock for demonstration) ---
class MockLLMClient:
"""
模拟LLM客户端,实际会集成OpenAI, Hugging Face等。
"""
def generate_json_response(self, prompt: str, possible_paths: List[str]) -> str:
logging.info(f"MockLLMClient: Generating JSON for prompt: {prompt[:100]}...")
# 模拟LLM根据Prompt内容生成JSON
if "internet" in prompt.lower() or "connectivity" in prompt.lower() or "technical" in prompt.lower():
return '{"decision_path": "technical_support", "confidence": 0.92, "reason": "User reported a technical issue."}'
elif "invoice" in prompt.lower() or "bill" in prompt.lower() or "payment" in prompt.lower():
return '{"decision_path": "billing_inquiry", "confidence": 0.85, "reason": "User inquired about billing."}'
elif "hours" in prompt.lower() or "contact" in prompt.lower():
return '{"decision_path": "general_question", "confidence": 0.78, "reason": "User asked a general question."}'
else:
return '{"decision_path": "unclear_intent", "confidence": 0.40, "reason": "Intent is ambiguous."}'
# --- 2. Prompt工程模块 ---
class PromptGenerator:
"""
根据节点类型和上下文生成LLM Prompt。
"""
def generate_decision_prompt(self, query: str, possible_paths: List[str]) -> str:
return (
f"Given the customer query: '{query}'nn"
f"Please identify the most appropriate department or action. "
f"Your options are: {', '.join(possible_paths)}.n"
f"Respond in JSON format with 'decision_path' (one of the options), "
f"'confidence' (0.0-1.0), and 'reason'.n"
f"JSON Output:"
)
# --- 3. 输出解析器 & Pydantic模型 ---
class LLMDecisionOutput(BaseModel):
decision_path: str = Field(..., description="The name of the next path to take.")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score for the decision.")
reason: Optional[str] = Field(None, description="Explanation for the decision.")
# --- 4. 决策引擎 ---
class DecisionEngine:
"""
评估LLM的结构化输出,并根据置信度选择路径。
"""
def __init__(self, llm_client: MockLLMClient, prompt_generator: PromptGenerator, default_threshold: float = 0.7):
self.llm_client = llm_client
self.prompt_generator = prompt_generator
self.default_threshold = default_threshold
def evaluate_decision(self, query: str, possible_paths: List[str], threshold: float = None) -> tuple[str | None, float, str]:
"""
基于LLM的结构化输出进行决策。
返回 (selected_path, confidence, reason)
"""
actual_threshold = threshold if threshold is not None else self.default_threshold
prompt = self.prompt_generator.generate_decision_prompt(query, possible_paths)
raw_llm_output = self.llm_client.generate_json_response(prompt, possible_paths)
try:
parsed_data = json.loads(raw_llm_output)
decision = LLMDecisionOutput.parse_obj(parsed_data)
logging.info(f"LLM Decision: Path='{decision.decision_path}', Confidence={decision.confidence}, Reason='{decision.reason}'")
if decision.decision_path in possible_paths and decision.confidence >= actual_threshold:
return decision.decision_path, decision.confidence, decision.reason
else:
logging.warning(f"LLM decision '{decision.decision_path}' (confidence: {decision.confidence}) not valid or below threshold {actual_threshold}.")
# 如果LLM选择了不在possible_paths里的路径,或者置信度不够,则返回None
return None, decision.confidence, decision.reason
except (json.JSONDecodeError, ValidationError) as e:
logging.error(f"Failed to parse or validate LLM JSON output: {e} | Raw output: {raw_llm_output}")
return None, 0.0, f"Parsing/Validation Error: {e}"
except Exception as e:
logging.error(f"Unexpected error during decision evaluation: {e}")
return None, 0.0, f"Unexpected Error: {e}"
# --- 5. 工作流编排器 (Workflow Orchestrator) ---
# 定义节点和边缘的数据结构
class WorkflowNode(BaseModel):
id: str
name: str
type: str # e.g., "start", "llm_decision", "task", "end", "human_review"
config: Dict[str, Any] = Field(default_factory=dict) # Node-specific configuration
class WorkflowEdge(BaseModel):
source_node_id: str
target_node_id: str
condition_type: str # e.g., "llm_decision_path", "default"
condition_params: Dict[str, Any] = Field(default_factory=dict) # Parameters for the condition
class Workflow:
def __init__(self, nodes: List[WorkflowNode], edges: List[WorkflowEdge]):
self.nodes = {node.id: node for node in nodes}
self.edges = edges
self.adj_list: Dict[str, List[WorkflowEdge]] = self._build_adjacency_list()
def _build_adjacency_list(self) -> Dict[str, List[WorkflowEdge]]:
adj = {node_id: [] for node_id in self.nodes}
for edge in self.edges:
if edge.source_node_id in adj:
adj[edge.source_node_id].append(edge)
else:
logging.warning(f"Edge source node {edge.source_node_id} not found in nodes.")
return adj
def get_node(self, node_id: str) -> Optional[WorkflowNode]:
return self.nodes.get(node_id)
def get_outgoing_edges(self, node_id: str) -> List[WorkflowEdge]:
return self.adj_list.get(node_id, [])
class WorkflowOrchestrator:
def __init__(self, workflow: Workflow, decision_engine: DecisionEngine):
self.workflow = workflow
self.decision_engine = decision_engine
self.current_context: Dict[str, Any] = {} # Stores workflow state
def _execute_llm_decision_node(self, node: WorkflowNode) -> str | None:
"""
执行LLM决策节点。
"""
customer_query = self.current_context.get("initial_query")
if not customer_query:
logging.error("No 'initial_query' found in context for LLM decision.")
return None
possible_paths = node.config.get("possible_paths", [])
threshold = node.config.get("threshold")
selected_path, confidence, reason = self.decision_engine.evaluate_decision(
query=customer_query,
possible_paths=possible_paths,
threshold=threshold
)
# 将LLM的决策结果保存到上下文中
self.current_context["llm_decision"] = {
"path": selected_path,
"confidence": confidence,
"reason": reason
}
return selected_path # 返回LLM选择的路径名称
def _execute_task_node(self, node: WorkflowNode):
"""
模拟执行一个通用任务节点。
"""
task_name = node.config.get("task_name", "Unknown Task")
logging.info(f"Executing task: {task_name} at node {node.id}. Context: {self.current_context}")
# 实际这里会调用外部服务,执行业务逻辑
self.current_context[f"task_result_{node.id}"] = f"Successfully completed {task_name}"
def _execute_human_review_node(self, node: WorkflowNode):
"""
模拟人工审核节点。
"""
review_reason = self.current_context.get("llm_decision", {}).get("reason", "No specific reason provided.")
logging.warning(f"Human review required at node {node.id}. Reason: {review_reason}. Initial query: {self.current_context.get('initial_query')}")
self.current_context[f"review_status_{node.id}"] = "Pending Human Review"
# 在实际系统中,这里会触发一个通知或创建一个人机交互界面
def run_workflow(self, initial_query: str, start_node_id: str = "start"):
"""
运行工作流。
"""
self.current_context = {"initial_query": initial_query}
current_node_id = start_node_id
max_iterations = 20 # 防止无限循环
iteration_count = 0
while current_node_id:
iteration_count += 1
if iteration_count > max_iterations:
logging.error("Workflow exceeded max iterations, possibly infinite loop detected.")
break
current_node = self.workflow.get_node(current_node_id)
if not current_node:
logging.error(f"Node '{current_node_id}' not found in workflow.")
break
logging.info(f"Currently at node: {current_node.name} (ID: {current_node.id}, Type: {current_node.type})")
# 执行当前节点逻辑
if current_node.type == "start":
# Start node simply moves to the next
pass
elif current_node.type == "llm_decision":
llm_selected_path = self._execute_llm_decision_node(current_node)
# LLM决策节点会根据其结果来选择后续边缘
# 我们需要将LLM的实际选择与边缘条件进行匹配
elif current_node.type == "task":
self._execute_task_node(current_node)
elif current_node.type == "human_review":
self._execute_human_review_node(current_node)
elif current_node.type == "end":
logging.info(f"Workflow finished at node: {current_node.name}. Final context: {self.current_context}")
break
else:
logging.warning(f"Unknown node type: {current_node.type}. Skipping execution.")
# 评估出站边缘,选择下一跳
outgoing_edges = self.workflow.get_outgoing_edges(current_node_id)
next_edge: Optional[WorkflowEdge] = None
# 优先匹配具体条件边缘
for edge in outgoing_edges:
if edge.condition_type == "llm_decision_path":
# 对于LLM决策节点,我们检查LLM的输出是否匹配边缘期望的路径
if current_node.type == "llm_decision" and self.current_context.get("llm_decision", {}).get("path") == edge.condition_params.get("expected_path"):
next_edge = edge
break
# 可以添加其他条件类型,例如基于任务结果的条件
# elif edge.condition_type == "task_result_check":
# if self.current_context.get(edge.condition_params.get("result_key")) == edge.condition_params.get("expected_value"):
# next_edge = edge
# break
# 如果没有匹配到具体条件边缘,尝试寻找默认边缘
if not next_edge:
for edge in outgoing_edges:
if edge.condition_type == "default":
next_edge = edge
break
if next_edge:
current_node_id = next_edge.target_node_id
else:
logging.error(f"No valid outgoing edge found from node '{current_node_id}'. Workflow halted.")
break
return self.current_context
# --- 6. 定义工作流图 ---
nodes = [
WorkflowNode(id="start", name="Start", type="start"),
WorkflowNode(id="llm_route_query", name="LLM Query Router", type="llm_decision",
config={"possible_paths": ["technical_support", "billing_inquiry", "general_question", "unclear_intent"], "threshold": 0.7}),
WorkflowNode(id="handle_tech_support", name="Handle Technical Support", type="task", config={"task_name": "Open Tech Support Ticket"}),
WorkflowNode(id="handle_billing", name="Handle Billing Inquiry", type="task", config={"task_name": "Lookup Billing Info"}),
WorkflowNode(id="handle_general_q", name="Handle General Question", type="task", config={"task_name": "Provide FAQ Answer"}),
WorkflowNode(id="escalate_human", name="Escalate to Human", type="human_review"),
WorkflowNode(id="end_workflow", name="End", type="end")
]
edges = [
WorkflowEdge(source_node_id="start", target_node_id="llm_route_query", condition_type="default"),
# LLM决策后的条件边缘
WorkflowEdge(source_node_id="llm_route_query", target_node_id="handle_tech_support",
condition_type="llm_decision_path", condition_params={"expected_path": "technical_support"}),
WorkflowEdge(source_node_id="llm_route_query", target_node_id="handle_billing",
condition_type="llm_decision_path", condition_params={"expected_path": "billing_inquiry"}),
WorkflowEdge(source_node_id="llm_route_query", target_node_id="handle_general_q",
condition_type="llm_decision_path", condition_params={"expected_path": "general_question"}),
# 默认或低置信度回退
WorkflowEdge(source_node_id="llm_route_query", target_node_id="escalate_human",
condition_type="default"), # 如果LLM没有明确选择路径,或选择"unclear_intent",则走此默认路径
# 任务完成后的默认边缘
WorkflowEdge(source_node_id="handle_tech_support", target_node_id="end_workflow", condition_type="default"),
WorkflowEdge(source_node_id="handle_billing", target_node_id="end_workflow", condition_type="default"),
WorkflowEdge(source_node_id="handle_general_q", target_node_id="end_workflow", condition_type="default"),
WorkflowEdge(source_node_id="escalate_human", target_node_id="end_workflow", condition_type="default"),
]
# --- 7. 运行工作流 ---
llm_client = MockLLMClient()
prompt_generator = PromptGenerator()
decision_engine = DecisionEngine(llm_client, prompt_generator)
workflow_instance = Workflow(nodes, edges)
orchestrator = WorkflowOrchestrator(workflow_instance, decision_engine)
print("n--- Running Workflow for 'My internet is not working' ---")
final_context_1 = orchestrator.run_workflow("My internet is not working, I need help with connectivity.")
print(f"Final Context 1: {json.dumps(final_context_1, indent=2)}")
print("n--- Running Workflow for 'I have a question about my latest invoice' ---")
final_context_2 = orchestrator.run_workflow("I have a question about my latest invoice.")
print(f"Final Context 2: {json.dumps(final_context_2, indent=2)}")
print("n--- Running Workflow for 'What are your business hours?' ---")
final_context_3 = orchestrator.run_workflow("What are your business hours?")
print(f"Final Context 3: {json.dumps(final_context_3, indent=2)}")
print("n--- Running Workflow for 'I am completely lost and don't know what to do.' (Low Confidence/Unclear) ---")
final_context_4 = orchestrator.run_workflow("I am completely lost and don't know what to do.")
print(f"Final Context 4: {json.dumps(final_context_4, indent=2)}")
print("n--- Running Workflow for 'Can you tell me a joke?' (Unrecognized Intent/Low Confidence) ---")
final_context_5 = orchestrator.run_workflow("Can you tell me a joke?")
print(f"Final Context 5: {json.dumps(final_context_5, indent=2)}")
这个示例展示了如何将LLM的概率性(在此通过置信度体现)输出与结构化的工作流相结合。WorkflowOrchestrator根据LLMDecisionOutput中的decision_path和confidence来选择下一跳。当LLM的置信度低于预设阈值或LLM选择了一个未被明确定义的路径时,工作流会自动回退到escalate_to_human节点,确保了系统的鲁棒性。
七、 进阶考量
构建一个生产级的LLM驱动的动态工作流系统,还需要考虑以下进阶问题:
- 概率校准与不确定性量化:LLM报告的置信度可能不总是与真实世界事件的概率完美匹配。使用校准技术(如 Platt Scaling、isotonic regression)可以改善模型的概率预测。此外,探索模型的不确定性量化(Uncertainty Quantification, UQ)方法,可以更全面地理解模型何时“不知道”。
- 多模态决策:结合多种决策策略(关键词、JSON、语义相似度)形成一个决策委员会,可以提高决策的准确性和鲁棒性。例如,如果JSON输出的置信度很高,但关键词概率很低,系统可以标记为需要额外审查。
- 人机协作(Human-in-the-Loop, HITL):设计明确的人工干预点。当LLM的决策置信度低于某个关键阈值,或遇到无法处理的异常情况时,应自动将任务路由给人类专家进行审查和处理。这不仅提高了系统的可靠性,也为模型提供了宝贵的反馈数据。
- 性能与成本:频繁调用大型LLM会带来显著的延迟和成本。
- 缓存机制:对重复的LLM调用结果进行缓存。
- 异步调用:使用异步IO处理LLM调用,避免阻塞。
- 模型选择:根据任务复杂度和重要性,选择大小合适的模型。对于简单的分类任务,小型、快速的模型可能就足够了。
- 批处理(Batching):在可能的情况下,将多个LLM请求批处理发送。
- 安全性与合规性:
- 输入输出过滤:对LLM的输入和输出进行严格的过滤和验证,防止注入攻击(Prompt Injection)和不当内容的生成。
- 隐私保护:确保敏感数据在LLM处理过程中得到妥善保护,遵循数据隐私法规。
- 可解释性:尽可能让LLM提供决策理由(如JSON输出中的
reason字段),这有助于审计和调试。
- 可观测性(Observability):
- 日志记录:详细记录工作流的执行路径、LLM的输入输出、决策结果和置信度,以及任何错误。
- 监控:实时监控工作流的性能、LLM的API调用成功率、决策引擎的准确性。
- 追踪:实现端到端的工作流追踪,方便问题诊断和性能优化。
- 迭代与优化:LLM和工作流是不断演进的。通过收集真实世界的数据,持续优化Prompt,微调LLM,并调整决策阈值和策略,以提高系统的整体表现。
尾声
通过对LLM输出概率的深入理解和灵活运用,我们能够为工作流系统构建出强大的“条件边缘”机制。这使得我们的应用程序不再是僵硬的线性流程,而是能够根据LLM的智能判断,实时调整其执行路径,实现前所未有的自适应能力。从简单的关键词匹配到复杂的语义理解,再到健壮的结构化输出解析,每一种策略都为我们提供了将概率性洞察转化为确定性行动的工具。通过精心设计的架构和工程实践,我们能够构建出高效、可靠且高度智能的下一代AI应用。