package com.mes.core; import com.mes.step.IWorkflowStep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.*; /** * 流程引擎 - 管理工作流步骤的执行 * 支持配置化流程组装、异步步骤处理、超时控制 */ public class WorkflowEngine { private static final Logger log = LoggerFactory.getLogger(WorkflowEngine.class); // 工位上下文 private final StationContext context; // 步骤列表(按执行顺序) private final List steps; // 步骤快速查找表 (stepId -> step) private final Map stepMap; // 消息类型到步骤的映射 (messageType -> step) private final Map messageStepMap; // 流程状态 private boolean running; private boolean paused; // 超时控制 private ScheduledExecutorService timeoutExecutor; private ScheduledFuture currentTimeoutTask; // 回调接口 private WorkflowListener listener; /** * 流程事件监听器 */ public interface WorkflowListener { void onStepStarted(WorkflowEngine engine, IWorkflowStep step); void onStepCompleted(WorkflowEngine engine, IWorkflowStep step, boolean success); void onWorkflowCompleted(WorkflowEngine engine); void onWorkflowError(WorkflowEngine engine, String error); } public WorkflowEngine(StationContext context) { this.context = context; this.steps = new ArrayList<>(); this.stepMap = new LinkedHashMap<>(); this.messageStepMap = new HashMap<>(); this.running = false; this.paused = false; this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "WorkflowTimeout-" + context.getStationCode()); t.setDaemon(true); return t; }); } /** * 从配置构建流程 */ public void buildFromConfig(List stepConfigs, StepFactory stepFactory) { steps.clear(); stepMap.clear(); messageStepMap.clear(); for (StationConfig.StepConfig config : stepConfigs) { IWorkflowStep step = stepFactory.createStep(config.getId()); if (step != null) { // 配置步骤 step.configure(config.getConfig()); if (config.getCraft() != null) { step.setCraft(config.getCraft()); } // 添加到列表 steps.add(step); stepMap.put(step.getStepId(), step); // 建立消息类型映射 String msgType = step.getMessageType(); if (msgType != null && !msgType.isEmpty()) { messageStepMap.put(msgType, step); } log.debug("[{}] 加载步骤: {} ({})", context.getStationCode(), step.getStepName(), step.getStepId()); } else { log.warn("[{}] 未知步骤类型: {}", context.getStationCode(), config.getId()); } } log.info("[{}] 流程构建完成, 共 {} 个步骤", context.getStationCode(), steps.size()); } /** * 添加步骤 */ public void addStep(IWorkflowStep step) { steps.add(step); stepMap.put(step.getStepId(), step); String msgType = step.getMessageType(); if (msgType != null && !msgType.isEmpty()) { messageStepMap.put(msgType, step); } } /** * 启动流程 */ public void start() { if (running) { log.warn("[{}] 流程已在运行中", context.getStationCode()); return; } log.info("[{}] 启动流程", context.getStationCode()); running = true; paused = false; context.setCurrentStepIndex(0); context.setState(StationContext.WorkflowState.IDLE); // 执行第一步(如果不需要用户交互) executeCurrentStepIfReady(); } /** * 停止流程 */ public void stop() { log.info("[{}] 停止流程", context.getStationCode()); running = false; paused = false; cancelTimeout(); } /** * 暂停流程 */ public void pause() { log.info("[{}] 暂停流程", context.getStationCode()); paused = true; } /** * 恢复流程 */ public void resume() { log.info("[{}] 恢复流程", context.getStationCode()); paused = false; executeCurrentStepIfReady(); } /** * 重置流程 */ public void reset() { log.info("[{}] 重置流程", context.getStationCode()); cancelTimeout(); context.reset(); context.setCurrentStepIndex(0); // 重置步骤索引到第一步 running = true; paused = false; // 重置所有步骤状态并重建消息映射(防止多阶段步骤的messageType残留) messageStepMap.clear(); for (IWorkflowStep step : steps) { step.reset(); String msgType = step.getMessageType(); if (msgType != null && !msgType.isEmpty()) { messageStepMap.put(msgType, step); } } } /** * 获取当前步骤 */ public IWorkflowStep getCurrentStep() { int index = context.getCurrentStepIndex(); if (index >= 0 && index < steps.size()) { return steps.get(index); } return null; } /** * 根据ID获取步骤 */ public IWorkflowStep getStep(String stepId) { return stepMap.get(stepId); } /** * 触发当前步骤执行(用户交互触发) * 例如:用户扫码后调用此方法 */ public void triggerCurrentStep() { if (!running || paused) { log.warn("[{}] 流程未运行或已暂停", context.getStationCode()); return; } IWorkflowStep currentStep = getCurrentStep(); if (currentStep == null) { log.warn("[{}] 当前没有待执行的步骤", context.getStationCode()); return; } executeStep(currentStep); } /** * 执行指定步骤 */ private void executeStep(IWorkflowStep step) { log.info("[{}] 执行步骤: {} ({})", context.getStationCode(), step.getStepName(), step.getStepId()); // 检查前置条件 if (!step.canExecute(context)) { log.warn("[{}] 步骤 {} 前置条件不满足", context.getStationCode(), step.getStepId()); // 如果已经有错误消息,不要用"前置条件不满足"覆盖它 if (context.getStatusLevel() != -1) { step.onFailure(context, "前置条件不满足"); } else { // 已有错误消息,只记录日志不覆盖状态消息 log.error("[{}] 步骤 {} 执行失败: 前置条件不满足 (保留原错误消息: {})", context.getStationCode(), step.getStepId(), context.getStatusMessage()); } return; } // 通知监听器 if (listener != null) { listener.onStepStarted(this, step); } try { // 记录执行前的messageType String msgTypeBefore = step.getMessageType(); // 执行步骤 boolean result = step.execute(context); if (result) { // 异步步骤:设置超时,等待服务端响应或轮询完成 if (step.isAsync()) { // 检查execute中是否切换了messageType(如MQDW->MBDW) String msgTypeAfter = step.getMessageType(); if (msgTypeAfter != null && !msgTypeAfter.equals(msgTypeBefore)) { log.info("[{}] 步骤 {} execute后切换消息类型: {} -> {}", context.getStationCode(), step.getStepId(), msgTypeBefore, msgTypeAfter); if (msgTypeBefore != null) { messageStepMap.remove(msgTypeBefore); } messageStepMap.put(msgTypeAfter, step); } // 注册异步完成回调(用于轮询类步骤主动通知完成) step.setAsyncCompleteCallback(success -> { cancelTimeout(); onStepComplete(step, success); }); startTimeout(step); } else { // 同步步骤:直接完成,进入下一步 onStepComplete(step, true); } } else { // 执行失败 onStepComplete(step, false); } } catch (Exception e) { log.error("[{}] 步骤 {} 执行异常: {}", context.getStationCode(), step.getStepId(), e.getMessage(), e); step.onFailure(context, "执行异常: " + e.getMessage()); onStepComplete(step, false); } } /** * 处理服务端消息 * @param messageType 消息类型(AQDW, MKSW等) * @param result 结果码 * @param message 完整消息 */ public void onServerMessage(String messageType, String result, String message) { if (!running) { log.warn("[{}] 流程未运行,忽略消息: {}", context.getStationCode(), messageType); return; } // 取消超时 cancelTimeout(); // 查找对应的步骤 IWorkflowStep step = messageStepMap.get(messageType); if (step == null) { log.warn("[{}] 未找到消息类型 {} 对应的步骤", context.getStationCode(), messageType); return; } // 检查是否是当前步骤 IWorkflowStep currentStep = getCurrentStep(); if (currentStep == null || !currentStep.getStepId().equals(step.getStepId())) { log.warn("[{}] 收到非当前步骤的响应: 期望={}, 实际={}", context.getStationCode(), currentStep != null ? currentStep.getStepId() : "null", step.getStepId()); // 仍然处理响应,但不自动进入下一步 step.onServerResponse(context, result, message); return; } // 记录响应前的messageType String oldMsgType = step.getMessageType(); // 处理响应 step.onServerResponse(context, result, message); // 检查步骤是否切换了messageType(多阶段步骤,如先MBDW再MQDW) String newMsgType = step.getMessageType(); if (newMsgType != null && !newMsgType.equals(oldMsgType)) { log.info("[{}] 步骤 {} 切换消息类型: {} -> {}", context.getStationCode(), step.getStepId(), oldMsgType, newMsgType); // 更新messageStepMap映射 messageStepMap.remove(oldMsgType); messageStepMap.put(newMsgType, step); // 重新启动超时,继续等待下一阶段的响应 startTimeout(step); return; } // 判断是否成功 boolean success = "OK".equalsIgnoreCase(result) || "UD".equalsIgnoreCase(result); onStepComplete(step, success); } /** * 步骤完成处理 */ private void onStepComplete(IWorkflowStep step, boolean success) { log.info("[{}] 步骤 {} 完成, 成功={}", context.getStationCode(), step.getStepId(), success); // 通知监听器 if (listener != null) { listener.onStepCompleted(this, step, success); } if (success) { // 进入下一步 moveToNextStep(); } else { // 失败处理:根据步骤类型决定是否停止流程 // 默认行为:停留在当前步骤,等待重试 context.setState(StationContext.WorkflowState.ERROR); } } /** * 进入下一步 */ private void moveToNextStep() { context.nextStep(); int nextIndex = context.getCurrentStepIndex(); if (nextIndex >= steps.size()) { // 流程完成,重置到初始状态准备下一轮 onWorkflowComplete(); // 重置步骤索引到0,等待用户扫码开始新的流程 context.setCurrentStepIndex(0); log.info("[{}] 流程重置,等待下一件工件", context.getStationCode()); } else { // 执行下一步(如果不需要用户交互) executeCurrentStepIfReady(); } } /** * 如果当前步骤不需要用户交互,则自动执行 */ private void executeCurrentStepIfReady() { IWorkflowStep currentStep = getCurrentStep(); if (currentStep == null) { return; } // 检查是否可以跳过 if (currentStep.canSkip(context)) { log.info("[{}] 跳过步骤: {}", context.getStationCode(), currentStep.getStepId()); currentStep.onSkip(context); moveToNextStep(); return; } // 如果不需要用户交互,自动执行 if (!currentStep.requiresUserInteraction()) { executeStep(currentStep); } else { // 需要用户交互的步骤:检查是否已经满足条件可以直接执行 // 例如:scan_product 步骤需要工件码,但用户可能已经扫过码了 if (currentStep.canExecute(context)) { log.debug("[{}] 步骤 {} 需要用户交互,但条件已满足,尝试执行", context.getStationCode(), currentStep.getStepName()); executeStep(currentStep); } else { // 等待用户操作 log.debug("[{}] 等待用户操作: {}", context.getStationCode(), currentStep.getStepName()); } } } /** * 流程完成 */ private void onWorkflowComplete() { log.info("[{}] 流程完成", context.getStationCode()); context.setState(StationContext.WorkflowState.COMPLETED); if (listener != null) { listener.onWorkflowCompleted(this); } } /** * 启动超时计时器 */ private void startTimeout(IWorkflowStep step) { int timeout = step.getTimeoutMs(); if (timeout <= 0) { return; } cancelTimeout(); currentTimeoutTask = timeoutExecutor.schedule(() -> { log.warn("[{}] 步骤 {} 超时", context.getStationCode(), step.getStepId()); step.onTimeout(context); onStepComplete(step, false); }, timeout, TimeUnit.MILLISECONDS); } /** * 取消超时计时器 */ private void cancelTimeout() { if (currentTimeoutTask != null && !currentTimeoutTask.isDone()) { currentTimeoutTask.cancel(false); currentTimeoutTask = null; } } /** * 跳过当前步骤(通过模拟执行成功) * 用于流程监控面板的"跳过"功能 * @return true=跳过成功,false=无法跳过 */ public boolean skipCurrentStep() { if (!running) { log.warn("[{}] 流程未运行,无法跳过步骤", context.getStationCode()); return false; } IWorkflowStep currentStep = getCurrentStep(); if (currentStep == null) { log.warn("[{}] 当前没有待执行的步骤", context.getStationCode()); return false; } log.info("[{}] 跳过步骤: {} ({})", context.getStationCode(), currentStep.getStepName(), currentStep.getStepId()); // 取消超时 cancelTimeout(); // 调用 simulateSuccess 模拟执行成功 currentStep.simulateSuccess(context); // 通知监听器 if (listener != null) { listener.onStepCompleted(this, currentStep, true); } // 进入下一步 moveToNextStep(); return true; } /** * 获取当前步骤索引 */ public int getCurrentStepIndex() { return context.getCurrentStepIndex(); } /** * 销毁引擎 */ public void destroy() { stop(); if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) { timeoutExecutor.shutdownNow(); } } // ========== Getters & Setters ========== public StationContext getContext() { return context; } public List getSteps() { return Collections.unmodifiableList(steps); } public boolean isRunning() { return running; } public boolean isPaused() { return paused; } public void setListener(WorkflowListener listener) { this.listener = listener; } /** * 步骤工厂接口 */ public interface StepFactory { IWorkflowStep createStep(String stepId); } }