各位同仁,各位技术爱好者,
今天,我们将深入探讨一个在复杂系统设计中至关重要的模式:如何构建一个能够根据用户意图自动选择后续处理分支的动态路由链。我们将围绕一个核心概念——RouterRunnable——来展开我们的讨论。请注意,RouterRunnable并非一个特定框架或库的名称,而是我为了本次讲座所抽象出的一个通用模式,它代表了一个具备路由能力的、可执行的工作单元。
在当今瞬息万变的软件环境中,系统需要具备前所未有的灵活性和适应性。传统的静态、硬编码的业务流程已经难以满足快速迭代和个性化服务的需求。想象一下一个智能客服系统,它需要根据用户输入的自然语言,识别出用户的真实意图,然后动态地选择一系列服务(例如,查询订单、修改地址、报告问题)来响应。这不仅仅是简单的条件判断,而是一条动态生成、自我调整的复杂处理链。
这就是动态路由链的用武之地。它将复杂的业务流程分解为一系列可独立执行、可插拔的步骤,并通过一个智能的“路由器”机制,在运行时根据上下文和用户意图来决定下一步走向。这不仅提高了系统的可维护性和可扩展性,更重要的是,它使得系统能够像一个真正理解用户意图的智能体一样运作。
一、动态路由链:核心理念与设计哲学
1.1 静态路由的局限性
在深入动态路由之前,我们先回顾一下静态路由的不足。在典型的MVC或RESTful架构中,路由通常是URL到控制器动作的映射,或者在内部流程中表现为硬编码的if-else if-else结构或switch语句。
// 静态路由的典型示例
public void processRequest(Request request) {
if ("create_order".equals(request.getIntent())) {
createOrderService.execute(request);
} else if ("check_status".equals(request.getIntent())) {
checkStatusService.execute(request);
} else if ("cancel_order".equals(request.getIntent())) {
cancelOrderService.execute(request);
} else {
fallbackService.execute(request);
}
}
这种模式在业务逻辑简单时尚可接受,但随着系统复杂度的增加,它会迅速暴露出问题:
- 僵硬性: 难以在运行时改变流程,每次业务规则调整都需要修改代码并重新部署。
- 可维护性差: 大量的条件判断导致代码臃肿、难以阅读和理解。
- 扩展性差: 添加新的业务流程需要修改现有代码,违反开闭原则。
- 重用性低: 业务逻辑与流程控制紧密耦合,难以将单个处理单元复用到其他流程中。
1.2 动态路由链的愿景
动态路由链旨在克服上述局限,其核心愿景是:
- 意图驱动: 流程的每一步都能够根据用户(或系统)的当前意图和上下文数据做出智能决策。
- 模块化: 将复杂流程拆解为独立的、可重用的、原子化的处理步骤。
- 可配置性: 路由逻辑可以外部化配置,甚至在运行时动态加载和修改。
- 可观测性: 能够清晰地追踪每个请求在流程中的进展、遇到的决策点和执行路径。
我们将构建一个模型,其中每个“可执行单元”都可以是一个纯粹的处理器(Processor),负责执行具体的业务逻辑;也可以是一个路由器(Router),负责根据上下文数据和预设规则决定下一步的走向。而RouterRunnable正是这个路由器角色的抽象,它是一个能够执行并同时具备路由能力的单元。
二、构建基石:核心组件设计
要实现动态路由链,我们需要定义几个关键的抽象和组件。
2.1 处理上下文(ProcessingContext)
ProcessingContext是整个流程中数据和状态的载体。它承载了用户意图、当前处理状态、业务数据、错误信息以及任何需要在步骤之间传递的信息。它的设计应尽量通用和可扩展。
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID; // For session ID generation
/**
* 封装了整个处理流程中的所有上下文信息,包括用户意图、会话数据、处理状态、步骤输出等。
*/
public class ProcessingContext {
private final String sessionId; // 标识一次完整的会话或请求
private UserIntent userIntent; // 用户或系统初始化的意图
private Map<String, Object> data = new HashMap<>(); // 存储业务数据,可在步骤间传递
private ProcessingState state; // 当前处理链的整体状态
private String currentStepId; // 当前正在执行或刚刚执行完的步骤ID
private String nextStepId; // 路由器决定的下一个步骤ID
private Object stepOutput; // 当前步骤的输出结果
private String errorMessage; // 错误信息,如果处理失败
private Map<String, Object> metadata = new HashMap<>(); // 其他元数据,如请求时间、来源等
public ProcessingContext(UserIntent userIntent) {
this.sessionId = UUID.randomUUID().toString(); // 自动生成会话ID
this.userIntent = userIntent;
this.state = ProcessingState.INITIAL;
// 初始意图也可以存储在data中,但作为核心驱动力,独立字段更清晰
this.data.put("initialUserIntent", userIntent);
}
public ProcessingContext(String sessionId, UserIntent userIntent) {
this.sessionId = sessionId;
this.userIntent = userIntent;
this.state = ProcessingState.INITIAL;
this.data.put("initialUserIntent", userIntent);
}
// --- 数据存取方法 ---
public void putData(String key, Object value) {
if (key == null || value == null) {
System.err.println("Warning: Attempted to put null key or value into context data.");
return;
}
this.data.put(key, value);
}
public <T> Optional<T> getData(String key, Class<T> type) {
Object value = data.get(key);
if (value != null && type.isInstance(value)) {
return Optional.of(type.cast(value));
}
return Optional.empty();
}
public <T> T getRequiredData(String key, Class<T> type) {
return getData(key, type)
.orElseThrow(() -> new IllegalArgumentException("Context data for key '" + key + "' not found or wrong type."));
}
// --- 元数据存取方法 ---
public void putMetadata(String key, Object value) {
if (key == null || value == null) {
System.err.println("Warning: Attempted to put null key or value into context metadata.");
return;
}
this.metadata.put(key, value);
}
public <T> Optional<T> getMetadata(String key, Class<T> type) {
Object value = metadata.get(key);
if (value != null && type.isInstance(value)) {
return Optional.of(type.cast(value));
}
return Optional.empty();
}
// --- 状态和控制流相关方法 ---
public String getSessionId() { return sessionId; }
public UserIntent getUserIntent() { return userIntent; }
public ProcessingState getState() { return state; }
public void setState(ProcessingState state) { this.state = state; }
public String getCurrentStepId() { return currentStepId; }
public void setCurrentStepId(String currentStepId) { this.currentStepId = currentStepId; }
public String getNextStepId() { return nextStepId; }
public void setNextStepId(String nextStepId) { this.nextStepId = nextStepId; }
public Object getStepOutput() { return stepOutput; }
public void setStepOutput(Object stepOutput) { this.stepOutput = stepOutput; }
public String getErrorMessage() { return errorMessage; }
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
this.state = ProcessingState.FAILED; // 设定错误信息通常意味着失败
}
public boolean hasError() { return errorMessage != null || state == ProcessingState.FAILED; }
@Override
public String toString() {
return "ProcessingContext{" +
"sessionId='" + sessionId + ''' +
", userIntent=" + userIntent.getIntentType() +
", state=" + state +
", currentStepId='" + currentStepId + ''' +
", nextStepId='" + nextStepId + ''' +
", hasError=" + hasError() +
'}';
}
}
2.1.1 用户意图(UserIntent)
UserIntent是ProcessingContext的一个核心组成部分,它代表了用户请求的语义化表达。在实际应用中,这可能来自NLP模型的输出,或者结构化的API请求。
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
/**
* 封装了用户(或系统)的核心意图及其相关参数。
*/
public class UserIntent {
private final String intentType; // 意图类型,例如 "ORDER_PRODUCT", "CHECK_STATUS", "CANCEL_ORDER"
private final Map<String, String> parameters; // 意图相关的参数,例如 product_id, order_id
public UserIntent(String intentType, Map<String, String> parameters) {
if (intentType == null || intentType.trim().isEmpty()) {
throw new IllegalArgumentException("Intent type cannot be null or empty.");
}
this.intentType = intentType;
this.parameters = parameters != null ? Collections.unmodifiableMap(new HashMap<>(parameters)) : Collections.emptyMap();
}
public String getIntentType() { return intentType; }
public Optional<String> getParameter(String key) {
return Optional.ofNullable(parameters.get(key));
}
public String getRequiredParameter(String key) {
return getParameter(key)
.orElseThrow(() -> new IllegalArgumentException("Missing required parameter: " + key + " for intent " + intentType));
}
public Map<String, String> getParameters() {
return parameters; // 返回不可修改的Map
}
@Override
public String toString() {
return "UserIntent{" +
"intentType='" + intentType + ''' +
", parameters=" + parameters +
'}';
}
}
2.1.2 处理状态(ProcessingState)
定义一系列枚举来表示ProcessingContext在整个链条中的状态,有助于追踪和控制流程。
/**
* 定义处理链中可能的整体状态。
*/
public enum ProcessingState {
INITIAL, // 初始状态,刚创建上下文
PENDING, // 等待执行
EXECUTING, // 正在执行某个步骤
ROUTING, // 正在进行路由决策
COMPLETED, // 流程成功完成
FAILED, // 流程执行失败
WAITING_FOR_USER_INPUT, // 流程暂停,等待用户提供更多信息
ABORTED // 流程被主动终止
}
2.2 可路由步骤(RoutableStep)接口
这是所有处理单元的抽象。无论是执行业务逻辑的处理器,还是负责决策路由的路由器,都将实现这个接口。
/**
* 定义了路由链中所有可执行步骤的通用接口。
* 每个步骤都拥有一个唯一的ID,并能够根据给定的上下文执行其逻辑。
*/
public interface RoutableStep {
/**
* 获取步骤的唯一标识符。
* @return 步骤ID
*/
String getId();
/**
* 执行当前步骤的逻辑。
* 步骤执行后,可能会修改ProcessingContext中的数据、状态,
* 特别是Router类型的步骤会设置context.setNextStepId()。
* @param context 当前的处理上下文
* @return 更新后的处理上下文
*/
ProcessingContext execute(ProcessingContext context);
/**
* 判断当前步骤是否应该被执行。
* 允许在执行前进行预检查,例如权限、前置条件等。
* @param context 当前的处理上下文
* @return 如果应该执行返回true,否则返回false。
*/
boolean shouldExecute(ProcessingContext context);
}
2.3 处理器步骤(AbstractProcessorStep)
处理器步骤是执行具体业务逻辑的工作单元。它们不直接负责路由,但可以修改ProcessingContext,从而影响后续的路由决策。
import java.util.function.Function;
/**
* 抽象的处理器步骤基类。
* 处理器步骤主要负责执行具体的业务逻辑,并更新ProcessingContext。
* 它通常不会主动决定下一个步骤,除非是链中最后一个步骤,或者有特定的后续处理逻辑。
*/
public abstract class AbstractProcessorStep implements RoutableStep {
protected final String id;
protected String defaultNextStepId; // 处理器完成后可以指定一个默认的下一个步骤
public AbstractProcessorStep(String id) {
if (id == null || id.trim().isEmpty()) {
throw new IllegalArgumentException("Step ID cannot be null or empty.");
}
this.id = id;
}
public AbstractProcessorStep(String id, String defaultNextStepId) {
this(id);
this.defaultNextStepId = defaultNextStepId;
}
@Override
public String getId() {
return id;
}
/**
* 默认情况下,处理器步骤总是执行。子类可以根据需要重写此方法。
* @param context 当前的处理上下文
* @return 总是返回true
*/
@Override
public boolean shouldExecute(ProcessingContext context) {
// 例如,可以在这里检查context是否有错误状态,如果有则不执行
if (context.hasError()) {
System.out.println(String.format("Skip processor step [%s] due to existing error: %s", id, context.getErrorMessage()));
return false;
}
return true;
}
/**
* 模板方法,封装了执行的通用逻辑。
* 子类需要实现 doExecute 方法来完成具体的业务逻辑。
* @param context 当前的处理上下文
* @return 更新后的处理上下文
*/
@Override
public final ProcessingContext execute(ProcessingContext context) {
context.setCurrentStepId(getId());
context.setState(ProcessingState.EXECUTING);
System.out.println(String.format("[%s] Executing processor step...", getId()));
try {
ProcessingContext resultContext = doExecute(context);
if (!resultContext.hasError() && resultContext.getState() != ProcessingState.WAITING_FOR_USER_INPUT) {
// 如果处理器没有明确设置下一个步骤,并且也没有错误或等待输入,则使用默认的下一个步骤
if (resultContext.getNextStepId() == null && defaultNextStepId != null) {
resultContext.setNextStepId(defaultNextStepId);
}
// 如果没有错误,且不是等待输入,且没有设置下一个步骤,则假定此分支结束
if (resultContext.getNextStepId() == null && resultContext.getState() != ProcessingState.COMPLETED) {
resultContext.setState(ProcessingState.COMPLETED);
}
}
return resultContext;
} catch (Exception e) {
context.setErrorMessage("Error in step " + getId() + ": " + e.getMessage());
System.err.println(String.format("Error executing step [%s]: %s", getId(), e.getMessage()));
e.printStackTrace();
return context;
}
}
/**
* 子类必须实现此方法来包含具体的业务逻辑。
* @param context 当前的处理上下文
* @return 更新后的处理上下文
* @throws Exception 业务逻辑执行中可能抛出的异常
*/
protected abstract ProcessingContext doExecute(ProcessingContext context) throws Exception;
}
2.4 路由器步骤(AbstractRouterStep)—— RouterRunnable的具现
路由器步骤是动态路由链的核心。它根据ProcessingContext中的信息,使用一组预定义的条件和对应的目标步骤ID,来决定下一步的执行路径。这正是我们所指的RouterRunnable模式的体现。
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Predicate;
/**
* 抽象的路由器步骤基类。
* 路由器步骤主要负责根据ProcessingContext中的信息,通过一系列条件来决定下一个要执行的步骤ID。
* 它实现了RouterRunnable的核心逻辑。
*/
public abstract class AbstractRouterStep implements RoutableStep {
protected final String id;
// 使用LinkedHashMap保证路由规则的顺序,因为规则可能有优先级
protected final Map<Predicate<ProcessingContext>, String> routes = new LinkedHashMap<>();
protected String defaultNextStepId; // 如果没有条件匹配,则转向的默认步骤
public AbstractRouterStep(String id) {
if (id == null || id.trim().isEmpty()) {
throw new IllegalArgumentException("Router ID cannot be null or empty.");
}
this.id = id;
}
public AbstractRouterStep(String id, String defaultNextStepId) {
this(id);
this.defaultNextStepId = defaultNextStepId;
}
@Override
public String getId() {
return id;
}
/**
* 添加一个路由规则。
* @param condition 判断是否满足路由条件的谓词
* @param nextStepId 如果条件满足,将要跳转到的下一个步骤ID
*/
public void addRoute(Predicate<ProcessingContext> condition, String nextStepId) {
if (condition == null || nextStepId == null || nextStepId.trim().isEmpty()) {
throw new IllegalArgumentException("Condition and nextStepId cannot be null or empty.");
}
this.routes.put(condition, nextStepId);
}
/**
* 默认情况下,路由器步骤总是执行。子类可以根据需要重写此方法。
* @param context 当前的处理上下文
* @return 总是返回true
*/
@Override
public boolean shouldExecute(ProcessingContext context) {
if (context.hasError()) {
System.out.println(String.format("Skip router step [%s] due to existing error: %s", id, context.getErrorMessage()));
return false;
}
return true;
}
/**
* 执行路由逻辑。遍历所有已注册的路由规则,找到第一个满足条件的规则,并设置下一个步骤ID。
* @param context 当前的处理上下文
* @return 更新后的处理上下文
*/
@Override
public final ProcessingContext execute(ProcessingContext context) {
context.setCurrentStepId(getId());
context.setState(ProcessingState.ROUTING);
System.out.println(String.format("[%s] Executing router step, evaluating routes...", getId()));
// 尝试匹配路由规则
for (Map.Entry<Predicate<ProcessingContext>, String> entry : routes.entrySet()) {
if (entry.getKey().test(context)) {
context.setNextStepId(entry.getValue());
System.out.println(String.format("[%s] Route matched! Condition: %s, Next Step: %s",
getId(), entry.getKey().toString(), entry.getValue()));
return context;
}
}
// 如果没有匹配到任何规则,则使用默认的下一个步骤
if (defaultNextStepId != null) {
context.setNextStepId(defaultNextStepId);
System.out.println(String.format("[%s] No specific route matched. Falling back to default next step: %s",
getId(), defaultNextStepId));
} else {
// 如果连默认的下一个步骤都没有,则认为路由失败
context.setErrorMessage("Router " + getId() + " failed to determine a next step (no match and no default).");
System.err.println(String.format("Error: Router [%s] failed to determine a next step.", getId()));
}
return context;
}
}
2.5 动态路由链执行器(DynamicRouterChainExecutor)
DynamicRouterChainExecutor是整个动态路由链的编排者。它负责接收初始请求,根据路由器的决策驱动流程的执行,直到流程完成、失败或暂停。
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 动态路由链的执行器,负责编排和驱动整个处理流程。
* 它根据当前步骤的类型(处理器或路由器)以及路由器决定的下一个步骤ID来推进流程。
*/
public class DynamicRouterChainExecutor {
private final Map<String, RoutableStep> registeredSteps = new ConcurrentHashMap<>();
private final String initialStepId; // 链的起始步骤ID
public DynamicRouterChainExecutor(String initialStepId) {
if (initialStepId == null || initialStepId.trim().isEmpty()) {
throw new IllegalArgumentException("Initial step ID cannot be null or empty.");
}
this.initialStepId = initialStepId;
}
/**
* 注册一个可路由步骤到执行器中。
* @param step 要注册的步骤
*/
public void registerStep(RoutableStep step) {
if (step == null) {
throw new IllegalArgumentException("Cannot register a null step.");
}
if (registeredSteps.containsKey(step.getId())) {
System.err.println("Warning: Overwriting existing step with ID: " + step.getId());
}
registeredSteps.put(step.getId(), step);
System.out.println("Registered step: " + step.getId());
}
/**
* 执行整个动态路由链。
* @param context 初始的处理上下文
* @return 最终的处理上下文,包含执行结果和状态
*/
public ProcessingContext executeChain(ProcessingContext context) {
String currentStepId = initialStepId;
// 如果初始上下文已经有错误,直接返回
if (context.hasError()) {
System.err.println("Chain aborted: Initial context has an error: " + context.getErrorMessage());
return context;
}
while (currentStepId != null && !context.hasError() && context.getState() != ProcessingState.WAITING_FOR_USER_INPUT) {
RoutableStep currentStep = registeredSteps.get(currentStepId);
if (currentStep == null) {
context.setErrorMessage("Unknown step ID encountered: " + currentStepId);
System.err.println("Error: Unknown step ID: " + currentStepId);
context.setState(ProcessingState.FAILED);
break;
}
System.out.println(String.format("n--- Processing Step: %s (Intent: %s, State: %s) ---",
currentStepId, context.getUserIntent().getIntentType(), context.getState()));
if (!currentStep.shouldExecute(context)) {
context.setErrorMessage("Step " + currentStepId + " was skipped due to pre-conditions.");
context.setState(ProcessingState.FAILED); // 如果一个步骤不应该执行,通常意味着流程无法按预期进行
System.err.println(String.format("Chain failed: Step [%s] skipped. Reason: %s", currentStepId, context.getErrorMessage()));
break;
}
// 执行当前步骤
context = currentStep.execute(context);
// 检查执行后的状态
if (context.hasError()) {
System.err.println(String.format("Chain terminated due to error at step %s: %s", currentStepId, context.getErrorMessage()));
break;
}
if (context.getState() == ProcessingState.WAITING_FOR_USER_INPUT) {
System.out.println(String.format("Chain paused at step %s, waiting for user input. Context: %s", currentStepId, context));
break; // 暂停链条,等待外部输入
}
// 获取下一个步骤ID
String nextStepId = context.getNextStepId();
if (nextStepId == null) {
// 如果当前步骤是ProcessorStep,且没有明确指定nextStepId,则视为流程完成
if (currentStep instanceof AbstractProcessorStep) {
context.setState(ProcessingState.COMPLETED);
System.out.println(String.format("Processor step [%s] completed without setting a next step. Chain ends.", currentStepId));
} else {
// 如果是RouterStep,但没有找到下一个步骤ID,则视为路由失败
context.setErrorMessage("Router step " + currentStepId + " failed to determine a next step.");
context.setState(ProcessingState.FAILED);
System.err.println(String.format("Error: Router step [%s] failed to determine a next step.", currentStepId));
}
break; // 结束循环
}
currentStepId = nextStepId;
context.setNextStepId(null); // 清除nextStepId,为下一个路由器做准备
}
System.out.println("n--- Chain execution finished. Final Context State: " + context.getState() + " ---");
return context;
}
}
三、实战演练:一个智能客服场景
让我们通过一个具体的智能客服场景来演示如何使用上述组件构建动态路由链。
场景描述: 一个电商客服系统,需要处理用户以下几种意图:
- 查询商品详情: 用户提供商品ID,系统返回商品信息。
- 下单: 用户提供商品ID和数量,系统进行下单操作。
- 查询订单状态: 用户提供订单ID,系统返回订单状态。
- 取消订单: 用户提供订单ID,系统取消订单。
- 寻求帮助/未知意图: 提供通用帮助或转接人工客服。
3.1 定义具体的处理器步骤
// 1. 查询商品详情处理器
class FetchProductDetailsStep extends AbstractProcessorStep {
public FetchProductDetailsStep() { super("FetchProductDetails"); }
@Override
protected ProcessingContext doExecute(ProcessingContext context) {
String productId = context.getUserIntent().getParameter("product_id")
.orElseThrow(() -> new IllegalArgumentException("Product ID is required for fetching details."));
System.out.println(String.format(" -> Simulating fetching details for product: %s", productId));
// 模拟数据库查询
String productInfo = "Product " + productId + " - Price: $19.99, Stock: 100";
context.putData("productDetails", productInfo);
context.setStepOutput("Successfully fetched product details: " + productInfo);
context.setState(ProcessingState.COMPLETED);
return context;
}
}
// 2. 验证用户登录状态处理器 (模拟)
class VerifyUserLoginStep extends AbstractProcessorStep {
public VerifyUserLoginStep() { super("VerifyUserLogin"); }
@Override
protected ProcessingContext doExecute(ProcessingContext context) {
// 模拟用户已登录
boolean isLoggedIn = context.getData("isLoggedIn", Boolean.class).orElse(false);
if (!isLoggedIn) {
System.out.println(" -> User not logged in. Requesting login.");
context.setStepOutput("Please log in to proceed.");
context.setState(ProcessingState.WAITING_FOR_USER_INPUT); // 暂停,等待用户登录
context.putData("requiredAction", "login"); // 提示前端需要登录
// 这里通常不会直接失败,而是等待用户输入,然后重新进入流程
} else {
System.out.println(" -> User is already logged in. Proceeding.");
context.setStepOutput("User login verified.");
context.setState(ProcessingState.COMPLETED);
}
return context;
}
}
// 3. 创建订单处理器
class CreateOrderStep extends AbstractProcessorStep {
public CreateOrderStep() { super("CreateOrder"); }
@Override
protected ProcessingContext doExecute(ProcessingContext context) {
String productId = context.getUserIntent().getRequiredParameter("product_id");
int quantity = context.getUserIntent().getParameter("quantity")
.map(Integer::parseInt)
.orElse(1); // 默认数量1
System.out.println(String.format(" -> Simulating creating order for product %s, quantity %d", productId, quantity));
// 模拟生成订单ID
String orderId = UUID.randomUUID().toString().substring(0, 8);
context.putData("orderId", orderId);
context.setStepOutput("Order " + orderId + " created successfully for product " + productId + ".");
context.setState(ProcessingState.COMPLETED);
return context;
}
}
// 4. 查询订单状态处理器
class FetchOrderStatusStep extends AbstractProcessorStep {
public FetchOrderStatusStep() { super("FetchOrderStatus"); }
@Override
protected ProcessingContext doExecute(ProcessingContext context) {
String orderId = context.getUserIntent().getParameter("order_id")
.orElse(context.getData("orderId", String.class).orElse(null)); // 尝试从上下文获取订单ID
if (orderId == null) {
context.setErrorMessage("Order ID is required to check status.");
context.setStepOutput("Please provide an order ID to check its status.");
context.setState(ProcessingState.WAITING_FOR_USER_INPUT); // 需要用户提供订单ID
context.putData("requiredInput", "order_id");
return context;
}
System.out.println(String.format(" -> Simulating fetching status for order: %s", orderId));
// 模拟订单状态查询
String status = "DELIVERED"; // 假设已交付
if (orderId.startsWith("PENDING")) { // 模拟一些特殊情况
status = "PENDING";
}
context.putData("orderStatus", status);
context.setStepOutput("Order " + orderId + " status: " + status);
context.setState(ProcessingState.COMPLETED);
return context;
}
}
// 5. 取消订单处理器
class CancelOrderStep extends AbstractProcessorStep {
public CancelOrderStep() { super("CancelOrder"); }
@Override
protected ProcessingContext doExecute(ProcessingContext context) {
String orderId = context.getUserIntent().getRequiredParameter("order_id");
System.out.println(String.format(" -> Simulating canceling order: %s", orderId));
// 模拟取消逻辑,可能会有失败情况
if (orderId.equals("DELIVERED_ORDER_ID")) {
context.setErrorMessage("Order " + orderId + " cannot be cancelled as it's already delivered.");
context.setState(ProcessingState.FAILED);
context.setStepOutput("Cancellation failed: Order already delivered.");
} else {
context.putData("cancellationStatus", "SUCCESS");
context.setStepOutput("Order " + orderId + " cancelled successfully.");
context.setState(ProcessingState.COMPLETED);
}
return context;
}
}
// 6. 最终响应生成器
class ResponseGeneratorStep extends AbstractProcessorStep {
public ResponseGeneratorStep() { super("ResponseGenerator"); }
@Override
protected ProcessingContext doExecute(ProcessingContext context) {
System.out.println(" -> Generating final response for user.");
String finalMessage = context.getStepOutput() != null ? context.getStepOutput().toString() : "Operation completed.";
if (context.hasError()) {
finalMessage = "Operation failed: " + context.getErrorMessage();
} else if (context.getState() == ProcessingState.WAITING_FOR_USER_INPUT) {
finalMessage = "Action required: " + context.getStepOutput();
}
// 也可以根据context中的其他数据构建更复杂的响应
context.putData("finalUserResponse", finalMessage);
System.out.println(String.format(" -> Final user response: '%s'", finalMessage));
context.setState(ProcessingState.COMPLETED); // 确保最终状态为完成
return context;
}
}
// 7. 兜底帮助处理器
class FallbackHelpStep extends AbstractProcessorStep {
public FallbackHelpStep() { super("FallbackHelp"); }
@Override
protected ProcessingContext doExecute(ProcessingContext context) {
System.out.println(" -> User intent not recognized or requires help. Providing general assistance.");
context.setStepOutput("I'm sorry, I couldn't understand your request. Please tell me more or type 'help'.");
context.setState(ProcessingState.COMPLETED);
return context;
}
}
3.2 定义具体的路由器步骤
// 1. 初始意图路由器
class InitialIntentRouter extends AbstractRouterStep {
public InitialIntentRouter() {
super("InitialIntentRouter", "FallbackHelp"); // 默认路由到帮助
// 根据UserIntent的intentType进行路由
addRoute(ctx -> "FETCH_PRODUCT_DETAILS".equals(ctx.getUserIntent().getIntentType()), "FetchProductDetails");
addRoute(ctx -> "CREATE_ORDER".equals(ctx.getUserIntent().getIntentType()), "VerifyUserLogin"); // 下单前先验证登录
addRoute(ctx -> "CHECK_ORDER_STATUS".equals(ctx.getUserIntent().getIntentType()), "FetchOrderStatus");
addRoute(ctx -> "CANCEL_ORDER".equals(ctx.getUserIntent().getIntentType()), "CancelOrder");
// 还可以添加更多复杂的条件,例如:
// addRoute(ctx -> ctx.getUserIntent().getIntentType().startsWith("ORDER") && ctx.getData("userRole", String.class).orElse("").equals("ADMIN"), "AdminOrderProcessing");
}
}
// 2. 订单创建后路由器 (例如,订单创建成功后,是跳转到查询订单状态,还是直接返回)
class OrderCreationPostRouter extends AbstractRouterStep {
public OrderCreationPostRouter() {
super("OrderCreationPostRouter", "ResponseGenerator"); // 默认路由到生成响应
// 如果用户明确要求查看订单状态,则跳转
addRoute(ctx -> ctx.getData("shouldCheckStatusAfterOrder", Boolean.class).orElse(false), "FetchOrderStatus");
}
}
// 3. 登录验证后路由器
class PostLoginRouter extends AbstractRouterStep {
public PostLoginRouter() {
super("PostLoginRouter", "FallbackHelp"); // 默认路由到帮助或错误处理
addRoute(ctx -> ctx.getData("isLoggedIn", Boolean.class).orElse(false), "CreateOrder"); // 登录成功,继续创建订单
addRoute(ctx -> !ctx.getData("isLoggedIn", Boolean.class).orElse(false), "VerifyUserLogin"); // 登录失败,或者等待用户输入,可能需要重新尝试登录
}
}
3.3 组装和执行路由链
现在,我们将所有组件组装起来,并演示如何执行。
import java.util.HashMap;
import java.util.Map;
import java.util.UUID; // For session ID generation
public class DynamicRouterChainDemo {
public static void main(String[] args) {
// 1. 注册所有步骤
DynamicRouterChainExecutor executor = new DynamicRouterChainExecutor("InitialIntentRouter");
// 注册处理器
executor.registerStep(new FetchProductDetailsStep());
executor.registerStep(new VerifyUserLoginStep());
executor.registerStep(new CreateOrderStep());
executor.registerStep(new FetchOrderStatusStep());
executor.registerStep(new CancelOrderStep());
executor.registerStep(new ResponseGeneratorStep());
executor.registerStep(new FallbackHelpStep());
// 注册路由器
executor.registerStep(new InitialIntentRouter());
executor.registerStep(new OrderCreationPostRouter());
executor.registerStep(new PostLoginRouter()); // 假设PostLoginRouter在VerifyUserLogin之后被调用
System.out.println("----------------------------------------");
System.out.println("Starting Dynamic Router Chain Demonstrations");
System.out.println("----------------------------------------");
// --- Demo 1: 查询商品详情 ---
System.out.println("n--- DEMO 1: Fetch Product Details ---");
Map<String, String> params1 = new HashMap<>();
params1.put("product_id", "PROD123");
UserIntent intent1 = new UserIntent("FETCH_PRODUCT_DETAILS", params1);
ProcessingContext context1 = new ProcessingContext(intent1);
ProcessingContext result1 = executor.executeChain(context1);
System.out.println("DEMO 1 Result: " + result1);
System.out.println("Final Response: " + result1.getData("finalUserResponse", String.class).orElse("N/A"));
// --- Demo 2: 创建订单 (需要登录) ---
System.out.println("n--- DEMO 2: Create Order (Requires Login) ---");
Map<String, String> params2 = new HashMap<>();
params2.put("product_id", "PROD456");
params2.put("quantity", "2");
UserIntent intent2 = new UserIntent("CREATE_ORDER", params2);
ProcessingContext context2 = new ProcessingContext(intent2);
context2.putData("isLoggedIn", true); // 模拟用户已登录
// context2.putData("shouldCheckStatusAfterOrder", true); // 模拟下单后想查看状态
ProcessingContext result2 = executor.executeChain(context2);
System.out.println("DEMO 2 Result: " + result2);
System.out.println("Order ID: " + result2.getData("orderId", String.class).orElse("N/A"));
System.out.println("Final Response: " + result2.getData("finalUserResponse", String.class).orElse("N/A"));
// --- Demo 3: 创建订单 (未登录,需要等待用户输入) ---
System.out.println("n--- DEMO 3: Create Order (Not Logged In) ---");
Map<String, String> params3 = new HashMap<>();
params3.put("product_id", "PROD789");
params3.put("quantity", "1");
UserIntent intent3 = new UserIntent("CREATE_ORDER", params3);
ProcessingContext context3 = new ProcessingContext(intent3);
context3.putData("isLoggedIn", false); // 模拟用户未登录
ProcessingContext result3 = executor.executeChain(context3);
System.out.println("DEMO 3 Result: " + result3);
System.out.println("Final Response: " + result3.getData("finalUserResponse", String.class).orElse("N/A"));
if (result3.getState() == ProcessingState.WAITING_FOR_USER_INPUT) {
System.out.println("System is waiting for user to " + result3.getData("requiredAction", String.class).orElse("complete an action") + ".");
// 模拟用户登录后重新提交请求,需要一个机制来恢复context,这里简单创建一个新的
System.out.println("n--- DEMO 3.1: User logs in and resubmits ---");
context3.putData("isLoggedIn", true); // 用户登录了
// 重新设置初始步骤,或者根据会话状态从暂停点继续
// 在实际应用中,这里需要更复杂的恢复逻辑,例如从数据库加载context并从currentStepId继续
// 这里为了演示,我们假设是重新开始,但context的数据是保留的
context3.setNextStepId("CreateOrder"); // 直接跳转到创建订单步骤,跳过登录验证 (因为登录已完成)
context3.setState(ProcessingState.INITIAL); // 重置状态以便Executor可以运行
// 假设我们有一个机制,让RouterRunnable的execute方法能够识别并处理WAITTING_FOR_USER_INPUT状态后的恢复
// 在我们的Executor中,如果context.nextStepId有值,会优先使用它
ProcessingContext result3_1 = executor.executeChain(context3);
System.out.println("DEMO 3.1 Result: " + result3_1);
System.out.println("Order ID: " + result3_1.getData("orderId", String.class).orElse("N/A"));
System.out.println("Final Response: " + result3_1.getData("finalUserResponse", String.class).orElse("N/A"));
}
// --- Demo 4: 查询订单状态 (需要订单ID) ---
System.out.println("n--- DEMO 4: Check Order Status (Missing Order ID) ---");
UserIntent intent4 = new UserIntent("CHECK_ORDER_STATUS", new HashMap<>());
ProcessingContext context4 = new ProcessingContext(intent4);
ProcessingContext result4 = executor.executeChain(context4);
System.out.println("DEMO 4 Result: " + result4);
System.out.println("Final Response: " + result4.getData("finalUserResponse", String.class).orElse("N/A"));
if (result4.getState() == ProcessingState.WAITING_FOR_USER_INPUT) {
System.out.println("System is waiting for user to provide " + result4.getData("requiredInput", String.class).orElse("some input") + ".");
// 模拟用户提供订单ID后重新提交
System.out.println("n--- DEMO 4.1: User provides Order ID and resubmits ---");
Map<String, String> newParams = new HashMap<>(intent4.getParameters());
newParams.put("order_id", "ORDER_ABC_123");
context4.userIntent = new UserIntent(intent4.getIntentType(), newParams); // 更新意图,注意这里直接修改了,实际应该通过工厂方法或更安全的机制
context4.setNextStepId("FetchOrderStatus"); // 直接跳回查询订单状态
context4.setState(ProcessingState.INITIAL); // 重置状态
ProcessingContext result4_1 = executor.executeChain(context4);
System.out.println("DEMO 4.1 Result: " + result4_1);
System.out.println("Order Status: " + result4_1.getData("orderStatus", String.class).orElse("N/A"));
System.out.println("Final Response: " + result4_1.getData("finalUserResponse", String.class).orElse("N/A"));
}
// --- Demo 5: 取消订单 (成功) ---
System.out.println("n--- DEMO 5: Cancel Order (Success) ---");
Map<String, String> params5 = new HashMap<>();
params5.put("order_id", "PENDING_ORDER_ID");
UserIntent intent5 = new UserIntent("CANCEL_ORDER", params5);
ProcessingContext context5 = new ProcessingContext(intent5);
ProcessingContext result5 = executor.executeChain(context5);
System.out.println("DEMO 5 Result: " + result5);
System.out.println("Cancellation Status: " + result5.getData("cancellationStatus", String.class).orElse("N/A"));
System.out.println("Final Response: " + result5.getData("finalUserResponse", String.class).orElse("N/A"));
// --- Demo 6: 取消订单 (失败,已发货) ---
System.out.println("n--- DEMO 6: Cancel Order (Failed - Delivered) ---");
Map<String, String> params6 = new HashMap<>();
params6.put("order_id", "DELIVERED_ORDER_ID");
UserIntent intent6 = new UserIntent("CANCEL_ORDER", params6);
ProcessingContext context6 = new ProcessingContext(intent6);
ProcessingContext result6 = executor.executeChain(context6);
System.out.println("DEMO 6 Result: " + result6);
System.out.println("Final Response: " + result6.getData("finalUserResponse", String.class).orElse("N/A"));
// --- Demo 7: 未知意图 ---
System.out.println("n--- DEMO 7: Unknown Intent ---");
UserIntent intent7 = new UserIntent("UNKNOWN_REQUEST", new HashMap<>());
ProcessingContext context7 = new ProcessingContext(intent7);
ProcessingContext result7 = executor.executeChain(context7);
System.out.println("DEMO 7 Result: " + result7);
System.out.println("Final Response: " + result7.getData("finalUserResponse", String.class).orElse("N/A"));
}
}
3.4 演示运行结果示例 (部分)
--- Processing Step: InitialIntentRouter (Intent: FETCH_PRODUCT_DETAILS, State: INITIAL) ---
[InitialIntentRouter] Executing router step, evaluating routes...
[InitialIntentRouter] Route matched! Condition: ...predicate..., Next Step: FetchProductDetails
--- Processing Step: FetchProductDetails (Intent: FETCH_PRODUCT_DETAILS, State: ROUTING) ---
[FetchProductDetails] Executing processor step...
-> Simulating fetching details for product: PROD123
-> Final user response: 'Successfully fetched product details: Product PROD123 - Price: $19.99, Stock: 100'
--- Chain execution finished. Final Context State: COMPLETED ---
DEMO 1 Result: ProcessingContext{sessionId='...', userIntent=FETCH_PRODUCT_DETAILS, state=COMPLETED, currentStepId='ResponseGenerator', nextStepId='null', hasError=false}
Final Response: Successfully fetched product details: Product PROD123 - Price: $19.99, Stock: 100
--- DEMO 2: Create Order (Requires Login) ---
--- Processing Step: InitialIntentRouter (Intent: CREATE_ORDER, State: INITIAL) ---
[InitialIntentRouter] Executing router step, evaluating routes...
[InitialIntentRouter] Route matched! Condition: ...predicate..., Next Step: VerifyUserLogin
--- Processing Step: VerifyUserLogin (Intent: CREATE_ORDER, State: ROUTING) ---
[VerifyUserLogin] Executing processor step...
-> User is already logged in. Proceeding.
-> Final user response: 'User login verified.'
--- Processing Step: CreateOrder (Intent: CREATE_ORDER, State: COMPLETED) ---
[CreateOrder] Executing processor step...
-> Simulating creating order for product PROD456, quantity 2
-> Final user response: 'Order ... created successfully for product PROD456.'
--- Processing Step: ResponseGenerator (Intent: CREATE_ORDER, State: COMPLETED) ---
[ResponseGenerator] Executing processor step...
-> Generating final response for user.
-> Final user response: 'Order ... created successfully for product PROD456.'
--- Chain execution finished. Final Context State: COMPLETED ---
DEMO 2 Result: ProcessingContext{sessionId='...', userIntent=CREATE_ORDER, state=COMPLETED, currentStepId='ResponseGenerator', nextStepId='null', hasError=false}
Order ID: ...
Final Response: Order ... created successfully for product PROD456.
--- DEMO 3: Create Order (Not Logged In) ---
--- Processing Step: InitialIntentRouter (Intent: CREATE_ORDER, State: INITIAL) ---
[InitialIntentRouter] Executing router step, evaluating routes...
[InitialIntentRouter] Route matched! Condition: ...predicate..., Next Step: VerifyUserLogin
--- Processing Step: VerifyUserLogin (Intent: CREATE_ORDER, State: ROUTING) ---
[VerifyUserLogin] Executing processor step...
-> User not logged in. Requesting login.
-> Final user response: 'Please log in to proceed.'
--- Chain execution finished. Final Context State: WAITING_FOR_USER_INPUT ---
DEMO 3 Result: ProcessingContext{sessionId='...', userIntent=CREATE_ORDER, state=WAITING_FOR_USER_INPUT, currentStepId='VerifyUserLogin', nextStepId='null', hasError=false}
Final Response: Action required: Please log in to proceed.
System is waiting for user to login.
--- DEMO 3.1: User logs in and resubmits ---
--- Processing Step: CreateOrder (Intent: CREATE_ORDER, State: INITIAL) ---
[CreateOrder] Executing processor step...
-> Simulating creating order for product PROD789, quantity 1
-> Final user response: 'Order ... created successfully for product PROD789.'
--- Processing Step: ResponseGenerator (Intent: CREATE_ORDER, State: COMPLETED) ---
[ResponseGenerator] Executing processor step...
-> Generating final response for user.
-> Final user response: 'Order ... created successfully for product PROD789.'
--- Chain execution finished. Final Context State: COMPLETED ---
DEMO 3.1 Result: ProcessingContext{sessionId='...', userIntent=CREATE_ORDER, state=COMPLETED, currentStepId='ResponseGenerator', nextStepId='null', hasError=false}
Order ID: ...
Final Response: Order ... created successfully for product PROD789.
四、高级考量与扩展
4.1 异步处理与长流程
在实际系统中,有些步骤可能耗时较长,或者需要等待外部事件(如人工审核、第三方回调)。
- 非阻塞执行:
execute方法可以返回CompletableFuture<ProcessingContext>,执行器需要适配异步结果。 - 状态持久化: 当遇到
WAITING_FOR_USER_INPUT这类状态时,ProcessingContext需要被持久化到数据库或缓存中,以便在收到用户输入或外部回调后恢复执行。每个sessionId对应一个持久化的上下文。 - 消息队列集成: 长流程步骤可以将任务投递到消息队列,并返回一个
PENDING状态,由另一个服务消费消息并继续处理,完成后通过回调更新ProcessingContext并触发链的恢复。
4.2 错误处理与重试机制
- 统一错误管理:
ProcessingContext中的errorMessage和state=FAILED提供了基本的错误传播机制。 - 错误处理步骤: 可以设计专门的
ErrorHandlingRouter或ErrorProcessorStep,当context.hasError()为真时,将流程路由到特定的错误处理分支,例如记录日志、通知管理员、向用户发送错误提示或触发重试。 - 重试策略: 在
AbstractProcessorStep中可以集成重试逻辑,例如使用Guava-Retry或Spring Retry。路由器也可以根据错误类型,将流程导向一个重试步骤,并记录重试次数。
4.3 动态配置与外部化
- 路由规则外部化:
AbstractRouterStep中的routes可以从外部配置文件(如YAML、JSON)或配置中心(如Apollo、Nacos)加载。这允许在不修改代码的情况下调整业务流程。 - 步骤注册:
DynamicRouterChainExecutor可以支持通过反射、Spring IoC容器或服务发现机制,动态注册RoutableStep实例。 - DSL(领域特定语言): 针对复杂的路由规则,可以设计一套DSL,让业务人员或配置人员能够更直观地定义流程。
4.4 可观测性与审计
- 日志: 在每个
RoutableStep的execute方法中,详细记录进入和离开时的ProcessingContext状态、决策结果、执行耗时等。 - 指标: 使用Micrometer或Prometheus收集每个步骤的执行次数、成功率、失败率、平均耗时等指标。
- 链路追踪: 集成OpenTracing或OpenTelemetry,为每个请求生成唯一的Trace ID,并贯穿所有步骤,便于追踪请求的完整路径和性能瓶颈。
- 审计日志: 记录关键业务操作和决策,例如哪个路由器做了什么决策,哪个处理器修改了什么数据。
4.5 并行处理
某些流程分支可以并行执行,例如同时查询多个外部服务。
ParallelRouterStep: 设计一个特殊的路由器,它能够识别并启动多个并行子链,并在所有子链完成后将结果合并。这需要更复杂的ProcessingContext管理和线程池调度。
4.6 版本控制与回滚
- 对于重要的业务流程,路由链的配置和步骤实现可能需要版本控制,以支持灰度发布和快速回滚。
五、优势与挑战
5.1 动态路由链的优势
- 极强的灵活性和适应性: 能够根据运行时上下文和用户意图动态调整流程,适应多变的需求。
- 高可维护性: 业务逻辑被封装在独立的步骤中,路由逻辑集中管理,便于理解和修改。
- 良好的扩展性: 添加新的业务功能或调整流程,只需实现新的
RoutableStep并更新路由配置,符合开闭原则。 - 提高代码复用率: 独立的处理器步骤可以在不同的路由链中被复用。
- 增强用户体验: 能够提供更智能、更个性化的交互流程,例如多轮对话、主动引导用户。
- 便于测试: 每个
RoutableStep都可以独立进行单元测试,整个链条可以通过模拟ProcessingContext进行集成测试。
5.2 面临的挑战
- 初期设计复杂度: 引入了更多的抽象和组件,初期设计和实现比传统
if-else结构更复杂。 - “意大利面条路由器”风险: 如果路由规则定义不当、过于随意,或者路由器之间耦合过紧,可能导致流程难以追踪和理解,形成“意大利面条式”的路由。
- 调试难度: 动态生成的流程路径可能使得调试变得复杂,需要强大的日志和追踪系统支持。
- 性能考量: 每次路由决策和上下文传递可能带来微小的性能开销,但在大多数业务场景中,其带来的灵活性收益远超这点开销。对于极致性能要求的场景,可能需要精细优化。
- 状态管理: 特别是在异步和分布式环境中,
ProcessingContext的持久化、恢复和并发访问需要精心设计。
六、结语
动态路由链模式,特别是围绕RouterRunnable概念构建的体系,为处理复杂、多变且意图驱动的业务流程提供了一个强大而优雅的解决方案。它将流程的控制权从硬编码的逻辑中解放出来,赋予系统在运行时“思考”和“决策”的能力。通过清晰的模块化、可配置的路由规则以及对上下文的智能利用,我们能够构建出更具韧性、更易于演进的软件系统。
在实际应用中,关键在于平衡灵活性与可管理性,确保路由规则的清晰性、可测试性,并投入足够精力在可观测性上。随着AI和自动化程度的不断提高,这种能够根据意图自适应的动态路由模式,将成为构建下一代智能应用不可或缺的基石。