| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- package com.mes.core;
- import com.mes.step.IWorkflowStep;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.*;
- import java.util.concurrent.*;
- /**
- * 流程引擎 - 管理工作流步骤的执行
- * 支持配置化流程组装、异步步骤处理、超时控制
- */
- public class WorkflowEngine {
- private static final Logger log = LoggerFactory.getLogger(WorkflowEngine.class);
- // 工位上下文
- private final StationContext context;
- // 步骤列表(按执行顺序)
- private final List<IWorkflowStep> steps;
- // 步骤快速查找表 (stepId -> step)
- private final Map<String, IWorkflowStep> stepMap;
- // 消息类型到步骤的映射 (messageType -> step)
- private final Map<String, IWorkflowStep> messageStepMap;
- // 流程状态
- private boolean running;
- private boolean paused;
- // 超时控制
- private ScheduledExecutorService timeoutExecutor;
- private ScheduledFuture<?> currentTimeoutTask;
- // 回调接口
- private WorkflowListener listener;
- /**
- * 流程事件监听器
- */
- public interface WorkflowListener {
- void onStepStarted(WorkflowEngine engine, IWorkflowStep step);
- void onStepCompleted(WorkflowEngine engine, IWorkflowStep step, boolean success);
- void onWorkflowCompleted(WorkflowEngine engine);
- void onWorkflowError(WorkflowEngine engine, String error);
- }
- public WorkflowEngine(StationContext context) {
- this.context = context;
- this.steps = new ArrayList<>();
- this.stepMap = new LinkedHashMap<>();
- this.messageStepMap = new HashMap<>();
- this.running = false;
- this.paused = false;
- this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
- Thread t = new Thread(r, "WorkflowTimeout-" + context.getStationCode());
- t.setDaemon(true);
- return t;
- });
- }
- /**
- * 从配置构建流程
- */
- public void buildFromConfig(List<StationConfig.StepConfig> stepConfigs, StepFactory stepFactory) {
- steps.clear();
- stepMap.clear();
- messageStepMap.clear();
- for (StationConfig.StepConfig config : stepConfigs) {
- IWorkflowStep step = stepFactory.createStep(config.getId());
- if (step != null) {
- // 配置步骤
- step.configure(config.getConfig());
- if (config.getCraft() != null) {
- step.setCraft(config.getCraft());
- }
- // 添加到列表
- steps.add(step);
- stepMap.put(step.getStepId(), step);
- // 建立消息类型映射
- String msgType = step.getMessageType();
- if (msgType != null && !msgType.isEmpty()) {
- messageStepMap.put(msgType, step);
- }
- log.debug("[{}] 加载步骤: {} ({})", context.getStationCode(), step.getStepName(), step.getStepId());
- } else {
- log.warn("[{}] 未知步骤类型: {}", context.getStationCode(), config.getId());
- }
- }
- log.info("[{}] 流程构建完成, 共 {} 个步骤", context.getStationCode(), steps.size());
- }
- /**
- * 添加步骤
- */
- public void addStep(IWorkflowStep step) {
- steps.add(step);
- stepMap.put(step.getStepId(), step);
- String msgType = step.getMessageType();
- if (msgType != null && !msgType.isEmpty()) {
- messageStepMap.put(msgType, step);
- }
- }
- /**
- * 启动流程
- */
- public void start() {
- if (running) {
- log.warn("[{}] 流程已在运行中", context.getStationCode());
- return;
- }
- log.info("[{}] 启动流程", context.getStationCode());
- running = true;
- paused = false;
- context.setCurrentStepIndex(0);
- context.setState(StationContext.WorkflowState.IDLE);
- // 执行第一步(如果不需要用户交互)
- executeCurrentStepIfReady();
- }
- /**
- * 停止流程
- */
- public void stop() {
- log.info("[{}] 停止流程", context.getStationCode());
- running = false;
- paused = false;
- cancelTimeout();
- }
- /**
- * 暂停流程
- */
- public void pause() {
- log.info("[{}] 暂停流程", context.getStationCode());
- paused = true;
- }
- /**
- * 恢复流程
- */
- public void resume() {
- log.info("[{}] 恢复流程", context.getStationCode());
- paused = false;
- executeCurrentStepIfReady();
- }
- /**
- * 重置流程
- */
- public void reset() {
- log.info("[{}] 重置流程", context.getStationCode());
- cancelTimeout();
- context.reset();
- context.setCurrentStepIndex(0); // 重置步骤索引到第一步
- running = true;
- paused = false;
- // 重置所有步骤状态并重建消息映射(防止多阶段步骤的messageType残留)
- messageStepMap.clear();
- for (IWorkflowStep step : steps) {
- step.reset();
- String msgType = step.getMessageType();
- if (msgType != null && !msgType.isEmpty()) {
- messageStepMap.put(msgType, step);
- }
- }
- }
- /**
- * 获取当前步骤
- */
- public IWorkflowStep getCurrentStep() {
- int index = context.getCurrentStepIndex();
- if (index >= 0 && index < steps.size()) {
- return steps.get(index);
- }
- return null;
- }
- /**
- * 根据ID获取步骤
- */
- public IWorkflowStep getStep(String stepId) {
- return stepMap.get(stepId);
- }
- /**
- * 触发当前步骤执行(用户交互触发)
- * 例如:用户扫码后调用此方法
- */
- public void triggerCurrentStep() {
- if (!running || paused) {
- log.warn("[{}] 流程未运行或已暂停", context.getStationCode());
- return;
- }
- IWorkflowStep currentStep = getCurrentStep();
- if (currentStep == null) {
- log.warn("[{}] 当前没有待执行的步骤", context.getStationCode());
- return;
- }
- executeStep(currentStep);
- }
- /**
- * 执行指定步骤
- */
- private void executeStep(IWorkflowStep step) {
- log.info("[{}] 执行步骤: {} ({})", context.getStationCode(), step.getStepName(), step.getStepId());
- // 检查前置条件
- if (!step.canExecute(context)) {
- log.warn("[{}] 步骤 {} 前置条件不满足", context.getStationCode(), step.getStepId());
- // 如果已经有错误消息,不要用"前置条件不满足"覆盖它
- if (context.getStatusLevel() != -1) {
- step.onFailure(context, "前置条件不满足");
- } else {
- // 已有错误消息,只记录日志不覆盖状态消息
- log.error("[{}] 步骤 {} 执行失败: 前置条件不满足 (保留原错误消息: {})",
- context.getStationCode(), step.getStepId(), context.getStatusMessage());
- }
- return;
- }
- // 通知监听器
- if (listener != null) {
- listener.onStepStarted(this, step);
- }
- try {
- // 记录执行前的messageType
- String msgTypeBefore = step.getMessageType();
- // 执行步骤
- boolean result = step.execute(context);
- if (result) {
- // 异步步骤:设置超时,等待服务端响应或轮询完成
- if (step.isAsync()) {
- // 检查execute中是否切换了messageType(如MQDW->MBDW)
- String msgTypeAfter = step.getMessageType();
- if (msgTypeAfter != null && !msgTypeAfter.equals(msgTypeBefore)) {
- log.info("[{}] 步骤 {} execute后切换消息类型: {} -> {}",
- context.getStationCode(), step.getStepId(), msgTypeBefore, msgTypeAfter);
- if (msgTypeBefore != null) {
- messageStepMap.remove(msgTypeBefore);
- }
- messageStepMap.put(msgTypeAfter, step);
- }
- // 注册异步完成回调(用于轮询类步骤主动通知完成)
- step.setAsyncCompleteCallback(success -> {
- cancelTimeout();
- onStepComplete(step, success);
- });
- startTimeout(step);
- } else {
- // 同步步骤:直接完成,进入下一步
- onStepComplete(step, true);
- }
- } else {
- // 执行失败
- onStepComplete(step, false);
- }
- } catch (Exception e) {
- log.error("[{}] 步骤 {} 执行异常: {}", context.getStationCode(), step.getStepId(), e.getMessage(), e);
- step.onFailure(context, "执行异常: " + e.getMessage());
- onStepComplete(step, false);
- }
- }
- /**
- * 处理服务端消息
- * @param messageType 消息类型(AQDW, MKSW等)
- * @param result 结果码
- * @param message 完整消息
- */
- public void onServerMessage(String messageType, String result, String message) {
- if (!running) {
- log.warn("[{}] 流程未运行,忽略消息: {}", context.getStationCode(), messageType);
- return;
- }
- // 取消超时
- cancelTimeout();
- // 查找对应的步骤
- IWorkflowStep step = messageStepMap.get(messageType);
- if (step == null) {
- log.warn("[{}] 未找到消息类型 {} 对应的步骤", context.getStationCode(), messageType);
- return;
- }
- // 检查是否是当前步骤
- IWorkflowStep currentStep = getCurrentStep();
- if (currentStep == null || !currentStep.getStepId().equals(step.getStepId())) {
- log.warn("[{}] 收到非当前步骤的响应: 期望={}, 实际={}",
- context.getStationCode(),
- currentStep != null ? currentStep.getStepId() : "null",
- step.getStepId());
- // 仍然处理响应,但不自动进入下一步
- step.onServerResponse(context, result, message);
- return;
- }
- // 记录响应前的messageType
- String oldMsgType = step.getMessageType();
- // 处理响应
- step.onServerResponse(context, result, message);
- // 检查步骤是否切换了messageType(多阶段步骤,如先MBDW再MQDW)
- String newMsgType = step.getMessageType();
- if (newMsgType != null && !newMsgType.equals(oldMsgType)) {
- log.info("[{}] 步骤 {} 切换消息类型: {} -> {}", context.getStationCode(), step.getStepId(), oldMsgType, newMsgType);
- // 更新messageStepMap映射
- messageStepMap.remove(oldMsgType);
- messageStepMap.put(newMsgType, step);
- // 重新启动超时,继续等待下一阶段的响应
- startTimeout(step);
- return;
- }
- // 判断是否成功
- boolean success = "OK".equalsIgnoreCase(result) || "UD".equalsIgnoreCase(result);
- onStepComplete(step, success);
- }
- /**
- * 步骤完成处理
- */
- private void onStepComplete(IWorkflowStep step, boolean success) {
- log.info("[{}] 步骤 {} 完成, 成功={}", context.getStationCode(), step.getStepId(), success);
- // 通知监听器
- if (listener != null) {
- listener.onStepCompleted(this, step, success);
- }
- if (success) {
- // 进入下一步
- moveToNextStep();
- } else {
- // 失败处理:根据步骤类型决定是否停止流程
- // 默认行为:停留在当前步骤,等待重试
- context.setState(StationContext.WorkflowState.ERROR);
- }
- }
- /**
- * 进入下一步
- */
- private void moveToNextStep() {
- context.nextStep();
- int nextIndex = context.getCurrentStepIndex();
- if (nextIndex >= steps.size()) {
- // 流程完成,重置到初始状态准备下一轮
- onWorkflowComplete();
- // 重置步骤索引到0,等待用户扫码开始新的流程
- context.setCurrentStepIndex(0);
- log.info("[{}] 流程重置,等待下一件工件", context.getStationCode());
- } else {
- // 执行下一步(如果不需要用户交互)
- executeCurrentStepIfReady();
- }
- }
- /**
- * 如果当前步骤不需要用户交互,则自动执行
- */
- private void executeCurrentStepIfReady() {
- IWorkflowStep currentStep = getCurrentStep();
- if (currentStep == null) {
- return;
- }
- // 检查是否可以跳过
- if (currentStep.canSkip(context)) {
- log.info("[{}] 跳过步骤: {}", context.getStationCode(), currentStep.getStepId());
- currentStep.onSkip(context);
- moveToNextStep();
- return;
- }
- // 如果不需要用户交互,自动执行
- if (!currentStep.requiresUserInteraction()) {
- executeStep(currentStep);
- } else {
- // 需要用户交互的步骤:检查是否已经满足条件可以直接执行
- // 例如:scan_product 步骤需要工件码,但用户可能已经扫过码了
- if (currentStep.canExecute(context)) {
- log.debug("[{}] 步骤 {} 需要用户交互,但条件已满足,尝试执行",
- context.getStationCode(), currentStep.getStepName());
- executeStep(currentStep);
- } else {
- // 等待用户操作
- log.debug("[{}] 等待用户操作: {}", context.getStationCode(), currentStep.getStepName());
- }
- }
- }
- /**
- * 流程完成
- */
- private void onWorkflowComplete() {
- log.info("[{}] 流程完成", context.getStationCode());
- context.setState(StationContext.WorkflowState.COMPLETED);
- if (listener != null) {
- listener.onWorkflowCompleted(this);
- }
- }
- /**
- * 启动超时计时器
- */
- private void startTimeout(IWorkflowStep step) {
- int timeout = step.getTimeoutMs();
- if (timeout <= 0) {
- return;
- }
- cancelTimeout();
- currentTimeoutTask = timeoutExecutor.schedule(() -> {
- log.warn("[{}] 步骤 {} 超时", context.getStationCode(), step.getStepId());
- step.onTimeout(context);
- onStepComplete(step, false);
- }, timeout, TimeUnit.MILLISECONDS);
- }
- /**
- * 取消超时计时器
- */
- private void cancelTimeout() {
- if (currentTimeoutTask != null && !currentTimeoutTask.isDone()) {
- currentTimeoutTask.cancel(false);
- currentTimeoutTask = null;
- }
- }
- /**
- * 跳过当前步骤(通过模拟执行成功)
- * 用于流程监控面板的"跳过"功能
- * @return true=跳过成功,false=无法跳过
- */
- public boolean skipCurrentStep() {
- if (!running) {
- log.warn("[{}] 流程未运行,无法跳过步骤", context.getStationCode());
- return false;
- }
- IWorkflowStep currentStep = getCurrentStep();
- if (currentStep == null) {
- log.warn("[{}] 当前没有待执行的步骤", context.getStationCode());
- return false;
- }
- log.info("[{}] 跳过步骤: {} ({})", context.getStationCode(), currentStep.getStepName(), currentStep.getStepId());
- // 取消超时
- cancelTimeout();
- // 调用 simulateSuccess 模拟执行成功
- currentStep.simulateSuccess(context);
- // 通知监听器
- if (listener != null) {
- listener.onStepCompleted(this, currentStep, true);
- }
- // 进入下一步
- moveToNextStep();
- return true;
- }
- /**
- * 获取当前步骤索引
- */
- public int getCurrentStepIndex() {
- return context.getCurrentStepIndex();
- }
- /**
- * 销毁引擎
- */
- public void destroy() {
- stop();
- if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) {
- timeoutExecutor.shutdownNow();
- }
- }
- // ========== Getters & Setters ==========
- public StationContext getContext() {
- return context;
- }
- public List<IWorkflowStep> getSteps() {
- return Collections.unmodifiableList(steps);
- }
- public boolean isRunning() {
- return running;
- }
- public boolean isPaused() {
- return paused;
- }
- public void setListener(WorkflowListener listener) {
- this.listener = listener;
- }
- /**
- * 步骤工厂接口
- */
- public interface StepFactory {
- IWorkflowStep createStep(String stepId);
- }
- }
|