WorkflowEngine.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. package com.mes.core;
  2. import com.mes.step.IWorkflowStep;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import java.util.*;
  6. import java.util.concurrent.*;
  7. /**
  8. * 流程引擎 - 管理工作流步骤的执行
  9. * 支持配置化流程组装、异步步骤处理、超时控制
  10. */
  11. public class WorkflowEngine {
  12. private static final Logger log = LoggerFactory.getLogger(WorkflowEngine.class);
  13. // 工位上下文
  14. private final StationContext context;
  15. // 步骤列表(按执行顺序)
  16. private final List<IWorkflowStep> steps;
  17. // 步骤快速查找表 (stepId -> step)
  18. private final Map<String, IWorkflowStep> stepMap;
  19. // 消息类型到步骤的映射 (messageType -> step)
  20. private final Map<String, IWorkflowStep> messageStepMap;
  21. // 流程状态
  22. private boolean running;
  23. private boolean paused;
  24. // 超时控制
  25. private ScheduledExecutorService timeoutExecutor;
  26. private ScheduledFuture<?> currentTimeoutTask;
  27. // 回调接口
  28. private WorkflowListener listener;
  29. /**
  30. * 流程事件监听器
  31. */
  32. public interface WorkflowListener {
  33. void onStepStarted(WorkflowEngine engine, IWorkflowStep step);
  34. void onStepCompleted(WorkflowEngine engine, IWorkflowStep step, boolean success);
  35. void onWorkflowCompleted(WorkflowEngine engine);
  36. void onWorkflowError(WorkflowEngine engine, String error);
  37. }
  38. public WorkflowEngine(StationContext context) {
  39. this.context = context;
  40. this.steps = new ArrayList<>();
  41. this.stepMap = new LinkedHashMap<>();
  42. this.messageStepMap = new HashMap<>();
  43. this.running = false;
  44. this.paused = false;
  45. this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
  46. Thread t = new Thread(r, "WorkflowTimeout-" + context.getStationCode());
  47. t.setDaemon(true);
  48. return t;
  49. });
  50. }
  51. /**
  52. * 从配置构建流程
  53. */
  54. public void buildFromConfig(List<StationConfig.StepConfig> stepConfigs, StepFactory stepFactory) {
  55. steps.clear();
  56. stepMap.clear();
  57. messageStepMap.clear();
  58. for (StationConfig.StepConfig config : stepConfigs) {
  59. IWorkflowStep step = stepFactory.createStep(config.getId());
  60. if (step != null) {
  61. // 配置步骤
  62. step.configure(config.getConfig());
  63. if (config.getCraft() != null) {
  64. step.setCraft(config.getCraft());
  65. }
  66. // 添加到列表
  67. steps.add(step);
  68. stepMap.put(step.getStepId(), step);
  69. // 建立消息类型映射
  70. String msgType = step.getMessageType();
  71. if (msgType != null && !msgType.isEmpty()) {
  72. messageStepMap.put(msgType, step);
  73. }
  74. log.debug("[{}] 加载步骤: {} ({})", context.getStationCode(), step.getStepName(), step.getStepId());
  75. } else {
  76. log.warn("[{}] 未知步骤类型: {}", context.getStationCode(), config.getId());
  77. }
  78. }
  79. log.info("[{}] 流程构建完成, 共 {} 个步骤", context.getStationCode(), steps.size());
  80. }
  81. /**
  82. * 添加步骤
  83. */
  84. public void addStep(IWorkflowStep step) {
  85. steps.add(step);
  86. stepMap.put(step.getStepId(), step);
  87. String msgType = step.getMessageType();
  88. if (msgType != null && !msgType.isEmpty()) {
  89. messageStepMap.put(msgType, step);
  90. }
  91. }
  92. /**
  93. * 启动流程
  94. */
  95. public void start() {
  96. if (running) {
  97. log.warn("[{}] 流程已在运行中", context.getStationCode());
  98. return;
  99. }
  100. log.info("[{}] 启动流程", context.getStationCode());
  101. running = true;
  102. paused = false;
  103. context.setCurrentStepIndex(0);
  104. context.setState(StationContext.WorkflowState.IDLE);
  105. // 执行第一步(如果不需要用户交互)
  106. executeCurrentStepIfReady();
  107. }
  108. /**
  109. * 停止流程
  110. */
  111. public void stop() {
  112. log.info("[{}] 停止流程", context.getStationCode());
  113. running = false;
  114. paused = false;
  115. cancelTimeout();
  116. }
  117. /**
  118. * 暂停流程
  119. */
  120. public void pause() {
  121. log.info("[{}] 暂停流程", context.getStationCode());
  122. paused = true;
  123. }
  124. /**
  125. * 恢复流程
  126. */
  127. public void resume() {
  128. log.info("[{}] 恢复流程", context.getStationCode());
  129. paused = false;
  130. executeCurrentStepIfReady();
  131. }
  132. /**
  133. * 重置流程
  134. */
  135. public void reset() {
  136. log.info("[{}] 重置流程", context.getStationCode());
  137. cancelTimeout();
  138. context.reset();
  139. context.setCurrentStepIndex(0); // 重置步骤索引到第一步
  140. running = true;
  141. paused = false;
  142. // 重置所有步骤状态并重建消息映射(防止多阶段步骤的messageType残留)
  143. messageStepMap.clear();
  144. for (IWorkflowStep step : steps) {
  145. step.reset();
  146. String msgType = step.getMessageType();
  147. if (msgType != null && !msgType.isEmpty()) {
  148. messageStepMap.put(msgType, step);
  149. }
  150. }
  151. }
  152. /**
  153. * 获取当前步骤
  154. */
  155. public IWorkflowStep getCurrentStep() {
  156. int index = context.getCurrentStepIndex();
  157. if (index >= 0 && index < steps.size()) {
  158. return steps.get(index);
  159. }
  160. return null;
  161. }
  162. /**
  163. * 根据ID获取步骤
  164. */
  165. public IWorkflowStep getStep(String stepId) {
  166. return stepMap.get(stepId);
  167. }
  168. /**
  169. * 触发当前步骤执行(用户交互触发)
  170. * 例如:用户扫码后调用此方法
  171. */
  172. public void triggerCurrentStep() {
  173. if (!running || paused) {
  174. log.warn("[{}] 流程未运行或已暂停", context.getStationCode());
  175. return;
  176. }
  177. IWorkflowStep currentStep = getCurrentStep();
  178. if (currentStep == null) {
  179. log.warn("[{}] 当前没有待执行的步骤", context.getStationCode());
  180. return;
  181. }
  182. executeStep(currentStep);
  183. }
  184. /**
  185. * 执行指定步骤
  186. */
  187. private void executeStep(IWorkflowStep step) {
  188. log.info("[{}] 执行步骤: {} ({})", context.getStationCode(), step.getStepName(), step.getStepId());
  189. // 检查前置条件
  190. if (!step.canExecute(context)) {
  191. log.warn("[{}] 步骤 {} 前置条件不满足", context.getStationCode(), step.getStepId());
  192. // 如果已经有错误消息,不要用"前置条件不满足"覆盖它
  193. if (context.getStatusLevel() != -1) {
  194. step.onFailure(context, "前置条件不满足");
  195. } else {
  196. // 已有错误消息,只记录日志不覆盖状态消息
  197. log.error("[{}] 步骤 {} 执行失败: 前置条件不满足 (保留原错误消息: {})",
  198. context.getStationCode(), step.getStepId(), context.getStatusMessage());
  199. }
  200. return;
  201. }
  202. // 通知监听器
  203. if (listener != null) {
  204. listener.onStepStarted(this, step);
  205. }
  206. try {
  207. // 记录执行前的messageType
  208. String msgTypeBefore = step.getMessageType();
  209. // 执行步骤
  210. boolean result = step.execute(context);
  211. if (result) {
  212. // 异步步骤:设置超时,等待服务端响应或轮询完成
  213. if (step.isAsync()) {
  214. // 检查execute中是否切换了messageType(如MQDW->MBDW)
  215. String msgTypeAfter = step.getMessageType();
  216. if (msgTypeAfter != null && !msgTypeAfter.equals(msgTypeBefore)) {
  217. log.info("[{}] 步骤 {} execute后切换消息类型: {} -> {}",
  218. context.getStationCode(), step.getStepId(), msgTypeBefore, msgTypeAfter);
  219. if (msgTypeBefore != null) {
  220. messageStepMap.remove(msgTypeBefore);
  221. }
  222. messageStepMap.put(msgTypeAfter, step);
  223. }
  224. // 注册异步完成回调(用于轮询类步骤主动通知完成)
  225. step.setAsyncCompleteCallback(success -> {
  226. cancelTimeout();
  227. onStepComplete(step, success);
  228. });
  229. startTimeout(step);
  230. } else {
  231. // 同步步骤:直接完成,进入下一步
  232. onStepComplete(step, true);
  233. }
  234. } else {
  235. // 执行失败
  236. onStepComplete(step, false);
  237. }
  238. } catch (Exception e) {
  239. log.error("[{}] 步骤 {} 执行异常: {}", context.getStationCode(), step.getStepId(), e.getMessage(), e);
  240. step.onFailure(context, "执行异常: " + e.getMessage());
  241. onStepComplete(step, false);
  242. }
  243. }
  244. /**
  245. * 处理服务端消息
  246. * @param messageType 消息类型(AQDW, MKSW等)
  247. * @param result 结果码
  248. * @param message 完整消息
  249. */
  250. public void onServerMessage(String messageType, String result, String message) {
  251. if (!running) {
  252. log.warn("[{}] 流程未运行,忽略消息: {}", context.getStationCode(), messageType);
  253. return;
  254. }
  255. // 取消超时
  256. cancelTimeout();
  257. // 查找对应的步骤
  258. IWorkflowStep step = messageStepMap.get(messageType);
  259. if (step == null) {
  260. log.warn("[{}] 未找到消息类型 {} 对应的步骤", context.getStationCode(), messageType);
  261. return;
  262. }
  263. // 检查是否是当前步骤
  264. IWorkflowStep currentStep = getCurrentStep();
  265. if (currentStep == null || !currentStep.getStepId().equals(step.getStepId())) {
  266. log.warn("[{}] 收到非当前步骤的响应: 期望={}, 实际={}",
  267. context.getStationCode(),
  268. currentStep != null ? currentStep.getStepId() : "null",
  269. step.getStepId());
  270. // 仍然处理响应,但不自动进入下一步
  271. step.onServerResponse(context, result, message);
  272. return;
  273. }
  274. // 记录响应前的messageType
  275. String oldMsgType = step.getMessageType();
  276. // 处理响应
  277. step.onServerResponse(context, result, message);
  278. // 检查步骤是否切换了messageType(多阶段步骤,如先MBDW再MQDW)
  279. String newMsgType = step.getMessageType();
  280. if (newMsgType != null && !newMsgType.equals(oldMsgType)) {
  281. log.info("[{}] 步骤 {} 切换消息类型: {} -> {}", context.getStationCode(), step.getStepId(), oldMsgType, newMsgType);
  282. // 更新messageStepMap映射
  283. messageStepMap.remove(oldMsgType);
  284. messageStepMap.put(newMsgType, step);
  285. // 重新启动超时,继续等待下一阶段的响应
  286. startTimeout(step);
  287. return;
  288. }
  289. // 判断是否成功
  290. boolean success = "OK".equalsIgnoreCase(result) || "UD".equalsIgnoreCase(result);
  291. onStepComplete(step, success);
  292. }
  293. /**
  294. * 步骤完成处理
  295. */
  296. private void onStepComplete(IWorkflowStep step, boolean success) {
  297. log.info("[{}] 步骤 {} 完成, 成功={}", context.getStationCode(), step.getStepId(), success);
  298. // 通知监听器
  299. if (listener != null) {
  300. listener.onStepCompleted(this, step, success);
  301. }
  302. if (success) {
  303. // 进入下一步
  304. moveToNextStep();
  305. } else {
  306. // 失败处理:根据步骤类型决定是否停止流程
  307. // 默认行为:停留在当前步骤,等待重试
  308. context.setState(StationContext.WorkflowState.ERROR);
  309. }
  310. }
  311. /**
  312. * 进入下一步
  313. */
  314. private void moveToNextStep() {
  315. context.nextStep();
  316. int nextIndex = context.getCurrentStepIndex();
  317. if (nextIndex >= steps.size()) {
  318. // 流程完成,重置到初始状态准备下一轮
  319. onWorkflowComplete();
  320. // 重置步骤索引到0,等待用户扫码开始新的流程
  321. context.setCurrentStepIndex(0);
  322. log.info("[{}] 流程重置,等待下一件工件", context.getStationCode());
  323. } else {
  324. // 执行下一步(如果不需要用户交互)
  325. executeCurrentStepIfReady();
  326. }
  327. }
  328. /**
  329. * 如果当前步骤不需要用户交互,则自动执行
  330. */
  331. private void executeCurrentStepIfReady() {
  332. IWorkflowStep currentStep = getCurrentStep();
  333. if (currentStep == null) {
  334. return;
  335. }
  336. // 检查是否可以跳过
  337. if (currentStep.canSkip(context)) {
  338. log.info("[{}] 跳过步骤: {}", context.getStationCode(), currentStep.getStepId());
  339. currentStep.onSkip(context);
  340. moveToNextStep();
  341. return;
  342. }
  343. // 如果不需要用户交互,自动执行
  344. if (!currentStep.requiresUserInteraction()) {
  345. executeStep(currentStep);
  346. } else {
  347. // 需要用户交互的步骤:检查是否已经满足条件可以直接执行
  348. // 例如:scan_product 步骤需要工件码,但用户可能已经扫过码了
  349. if (currentStep.canExecute(context)) {
  350. log.debug("[{}] 步骤 {} 需要用户交互,但条件已满足,尝试执行",
  351. context.getStationCode(), currentStep.getStepName());
  352. executeStep(currentStep);
  353. } else {
  354. // 等待用户操作
  355. log.debug("[{}] 等待用户操作: {}", context.getStationCode(), currentStep.getStepName());
  356. }
  357. }
  358. }
  359. /**
  360. * 流程完成
  361. */
  362. private void onWorkflowComplete() {
  363. log.info("[{}] 流程完成", context.getStationCode());
  364. context.setState(StationContext.WorkflowState.COMPLETED);
  365. if (listener != null) {
  366. listener.onWorkflowCompleted(this);
  367. }
  368. }
  369. /**
  370. * 启动超时计时器
  371. */
  372. private void startTimeout(IWorkflowStep step) {
  373. int timeout = step.getTimeoutMs();
  374. if (timeout <= 0) {
  375. return;
  376. }
  377. cancelTimeout();
  378. currentTimeoutTask = timeoutExecutor.schedule(() -> {
  379. log.warn("[{}] 步骤 {} 超时", context.getStationCode(), step.getStepId());
  380. step.onTimeout(context);
  381. onStepComplete(step, false);
  382. }, timeout, TimeUnit.MILLISECONDS);
  383. }
  384. /**
  385. * 取消超时计时器
  386. */
  387. private void cancelTimeout() {
  388. if (currentTimeoutTask != null && !currentTimeoutTask.isDone()) {
  389. currentTimeoutTask.cancel(false);
  390. currentTimeoutTask = null;
  391. }
  392. }
  393. /**
  394. * 跳过当前步骤(通过模拟执行成功)
  395. * 用于流程监控面板的"跳过"功能
  396. * @return true=跳过成功,false=无法跳过
  397. */
  398. public boolean skipCurrentStep() {
  399. if (!running) {
  400. log.warn("[{}] 流程未运行,无法跳过步骤", context.getStationCode());
  401. return false;
  402. }
  403. IWorkflowStep currentStep = getCurrentStep();
  404. if (currentStep == null) {
  405. log.warn("[{}] 当前没有待执行的步骤", context.getStationCode());
  406. return false;
  407. }
  408. log.info("[{}] 跳过步骤: {} ({})", context.getStationCode(), currentStep.getStepName(), currentStep.getStepId());
  409. // 取消超时
  410. cancelTimeout();
  411. // 调用 simulateSuccess 模拟执行成功
  412. currentStep.simulateSuccess(context);
  413. // 通知监听器
  414. if (listener != null) {
  415. listener.onStepCompleted(this, currentStep, true);
  416. }
  417. // 进入下一步
  418. moveToNextStep();
  419. return true;
  420. }
  421. /**
  422. * 获取当前步骤索引
  423. */
  424. public int getCurrentStepIndex() {
  425. return context.getCurrentStepIndex();
  426. }
  427. /**
  428. * 销毁引擎
  429. */
  430. public void destroy() {
  431. stop();
  432. if (timeoutExecutor != null && !timeoutExecutor.isShutdown()) {
  433. timeoutExecutor.shutdownNow();
  434. }
  435. }
  436. // ========== Getters & Setters ==========
  437. public StationContext getContext() {
  438. return context;
  439. }
  440. public List<IWorkflowStep> getSteps() {
  441. return Collections.unmodifiableList(steps);
  442. }
  443. public boolean isRunning() {
  444. return running;
  445. }
  446. public boolean isPaused() {
  447. return paused;
  448. }
  449. public void setListener(WorkflowListener listener) {
  450. this.listener = listener;
  451. }
  452. /**
  453. * 步骤工厂接口
  454. */
  455. public interface StepFactory {
  456. IWorkflowStep createStep(String stepId);
  457. }
  458. }