fix:流程执行完成,更新算列状态
This commit is contained in:
@@ -0,0 +1,48 @@
|
||||
package com.sdm.flowable.config;
|
||||
|
||||
import com.sdm.flowable.listener.GlobalStatusEventListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
|
||||
import org.flowable.spring.SpringProcessEngineConfiguration;
|
||||
import org.flowable.spring.boot.EngineConfigurationConfigurer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Flowable流程引擎配置类
|
||||
* 用于注册全局事件监听器等配置
|
||||
*
|
||||
* @author SDM
|
||||
* @date 2026-01-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Configuration
|
||||
public class FlowableEngineConfig {
|
||||
|
||||
/**
|
||||
* 注册全局事件监听器
|
||||
* 通过EngineConfigurationConfigurer可以在流程引擎初始化时添加监听器
|
||||
*/
|
||||
@Bean
|
||||
public EngineConfigurationConfigurer<SpringProcessEngineConfiguration> globalEventListenerConfigurer(
|
||||
GlobalStatusEventListener globalStatusEventListener) {
|
||||
return engineConfiguration -> {
|
||||
// 获取现有的事件监听器列表
|
||||
List<FlowableEventListener> eventListeners = engineConfiguration.getEventListeners();
|
||||
if (eventListeners == null) {
|
||||
eventListeners = new ArrayList<>();
|
||||
}
|
||||
|
||||
// 添加全局状态同步监听器
|
||||
eventListeners.add(globalStatusEventListener);
|
||||
engineConfiguration.setEventListeners(eventListeners);
|
||||
|
||||
log.info("✅ 已注册Flowable全局事件监听器: GlobalStatusEventListener");
|
||||
log.info(" 监听事件类型: PROCESS_COMPLETED, PROCESS_CANCELLED, ENTITY_SUSPENDED, ENTITY_ACTIVATED, JOB_MOVED_TO_DEADLETTER");
|
||||
log.info(" 状态映射: COMPLETED, CANCELLED, SUSPENDED, RUNNING, ERROR");
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -7,35 +7,43 @@ public enum ProcessInstanceStateEnum {
|
||||
/**
|
||||
* 运行中
|
||||
*/
|
||||
RUNNING("running"),
|
||||
RUNNING(1,"running"),
|
||||
|
||||
/**
|
||||
* 已完成
|
||||
*/
|
||||
COMPLETED(2,"completed"),
|
||||
|
||||
/**
|
||||
* 错误
|
||||
*/
|
||||
ERROR(3,"error"),
|
||||
|
||||
/**
|
||||
* 挂起
|
||||
*/
|
||||
SUSPENDED("suspended"),
|
||||
|
||||
/**
|
||||
* 错误
|
||||
*/
|
||||
ERROR("error"),
|
||||
|
||||
/**
|
||||
* 已完成
|
||||
*/
|
||||
COMPLETED("completed"),
|
||||
SUSPENDED(4,"suspended"),
|
||||
|
||||
|
||||
/**
|
||||
* 已取消
|
||||
*/
|
||||
CANCELLED("cancelled");
|
||||
CANCELLED(5,"cancelled");
|
||||
|
||||
private final String code;
|
||||
private final Integer code;
|
||||
|
||||
ProcessInstanceStateEnum(String code) {
|
||||
private final String value;
|
||||
|
||||
ProcessInstanceStateEnum(Integer code, String value) {
|
||||
this.code = code;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String getCode() {
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public Integer getCode() {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,187 @@
|
||||
package com.sdm.flowable.listener;
|
||||
|
||||
import com.sdm.common.feign.inter.project.ISimulationRunFeignClient;
|
||||
import com.sdm.flowable.enums.ProcessInstanceStateEnum;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
|
||||
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
|
||||
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
|
||||
import org.flowable.job.api.Job;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 全局算例状态同步监听器
|
||||
* 作用:替代定时任务,实时推送状态
|
||||
* <p>
|
||||
* 监听Flowable流程引擎的全生命周期事件,实现算例状态的实时同步:
|
||||
* - PROCESS_COMPLETED: 流程正常完成
|
||||
* - PROCESS_CANCELLED: 流程被取消/终止
|
||||
* - ENTITY_SUSPENDED: 实体挂起(需过滤流程实例)
|
||||
* - ENTITY_ACTIVATED: 实体激活(需过滤流程实例)
|
||||
* - JOB_MOVED_TO_DEADLETTER: 作业移入死信队列(ERROR状态)
|
||||
* </p>
|
||||
*
|
||||
* @author SDM
|
||||
* @date 2026-01-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class GlobalStatusEventListener implements FlowableEventListener {
|
||||
|
||||
@Autowired
|
||||
private ISimulationRunFeignClient simulationRunFeignClient;
|
||||
|
||||
@Override
|
||||
public Set<FlowableEngineEventType> getTypes() {
|
||||
return new HashSet<>(Arrays.asList(
|
||||
FlowableEngineEventType.PROCESS_COMPLETED, // 流程完成
|
||||
FlowableEngineEventType.PROCESS_CANCELLED, // 流程取消(精确匹配,不需要判断deleteReason)
|
||||
FlowableEngineEventType.ENTITY_SUSPENDED, // 实体挂起(需过滤流程实例)
|
||||
FlowableEngineEventType.ENTITY_ACTIVATED, // 实体激活(需过滤流程实例)
|
||||
FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER // 作业进入死信(ERROR状态)
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(FlowableEvent event) {
|
||||
try {
|
||||
FlowableEngineEventType eventType = (FlowableEngineEventType) event.getType();
|
||||
|
||||
// 1. 流程正常完成
|
||||
if (eventType == FlowableEngineEventType.PROCESS_COMPLETED) {
|
||||
handleProcessCompleted((FlowableEngineEntityEvent) event);
|
||||
}
|
||||
// 2. 流程被取消(Flowable 7.x有独立的CANCELLED事件)
|
||||
else if (eventType == FlowableEngineEventType.PROCESS_CANCELLED) {
|
||||
handleProcessCancelled((FlowableEngineEntityEvent) event);
|
||||
}
|
||||
// 3. 实体挂起/激活(需要过滤,只处理流程实例级别)
|
||||
else if (eventType == FlowableEngineEventType.ENTITY_SUSPENDED ||
|
||||
eventType == FlowableEngineEventType.ENTITY_ACTIVATED) {
|
||||
handleSuspendOrActivate((FlowableEngineEntityEvent) event, eventType);
|
||||
}
|
||||
// 4. 作业进入死信队列(ERROR状态的金标准)
|
||||
else if (eventType == FlowableEngineEventType.JOB_MOVED_TO_DEADLETTER) {
|
||||
handleDeadLetter((FlowableEngineEntityEvent) event);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("处理Flowable事件异常: eventType={}", event.getType(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// --- 内部逻辑方法 ---
|
||||
|
||||
/**
|
||||
* 处理流程正常完成事件
|
||||
* PROCESS_COMPLETED只表示流程走到EndEvent,不包括取消场景
|
||||
*/
|
||||
private void handleProcessCompleted(FlowableEngineEntityEvent event) {
|
||||
String processInstanceId = event.getProcessInstanceId();
|
||||
log.info("流程正常完成: processInstanceId={}", processInstanceId);
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.COMPLETED.getCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理流程取消事件
|
||||
* Flowable 7.x提供了独立的PROCESS_CANCELLED事件,更精确
|
||||
*/
|
||||
private void handleProcessCancelled(FlowableEngineEntityEvent event) {
|
||||
String processInstanceId = event.getProcessInstanceId();
|
||||
ExecutionEntity execution = (ExecutionEntity) event.getEntity();
|
||||
log.info("流程被取消: processInstanceId={}, deleteReason={}",
|
||||
processInstanceId, execution.getDeleteReason());
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.CANCELLED.getCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理实体挂起/激活事件
|
||||
* 关键:必须过滤,只处理流程实例级别的挂起/激活,忽略子流程或其他实体
|
||||
*/
|
||||
private void handleSuspendOrActivate(FlowableEngineEntityEvent event, FlowableEngineEventType eventType) {
|
||||
Object entity = event.getEntity();
|
||||
|
||||
// 只处理ExecutionEntity(流程执行实体)
|
||||
if (entity instanceof ExecutionEntity) {
|
||||
ExecutionEntity execution = (ExecutionEntity) entity;
|
||||
|
||||
// 关键判断:isProcessInstanceType()确保是流程实例本身,而非子分支
|
||||
if (execution.isProcessInstanceType()) {
|
||||
String processInstanceId = execution.getProcessInstanceId();
|
||||
Integer status = (eventType == FlowableEngineEventType.ENTITY_SUSPENDED) ? ProcessInstanceStateEnum.SUSPENDED.getCode() : ProcessInstanceStateEnum.RUNNING.getCode();
|
||||
|
||||
log.info("流程实例{}状态变更: processInstanceId={}",
|
||||
status.equals("SUSPENDED") ? "挂起" : "激活", processInstanceId);
|
||||
doUpdate(processInstanceId, status);
|
||||
} else {
|
||||
log.debug("忽略非流程实例级别的挂起/激活事件: executionId={}", execution.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理死信事件(ERROR状态的标准方式)
|
||||
* JOB_MOVED_TO_DEADLETTER是最精确的ERROR信号:
|
||||
* - 比JOB_EXECUTION_FAILURE准确(FAILURE只是重试中的失败)
|
||||
* - 表示引擎已放弃重试,必须人工干预
|
||||
* <p>
|
||||
* 前提条件:ServiceTask必须配置async="true"和R0/PT0S重试策略
|
||||
*/
|
||||
private void handleDeadLetter(FlowableEngineEntityEvent event) {
|
||||
Object entity = event.getEntity();
|
||||
|
||||
if (entity instanceof Job) {
|
||||
Job job = (Job) entity;
|
||||
String processInstanceId = job.getProcessInstanceId();
|
||||
String exceptionMessage = job.getExceptionMessage();
|
||||
|
||||
log.error("❌ 作业进入死信队列,流程ERROR: processInstanceId={}, jobId={}, exception={}",
|
||||
processInstanceId, job.getId(), exceptionMessage);
|
||||
|
||||
doUpdate(processInstanceId, ProcessInstanceStateEnum.ERROR.getCode());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 真正的更新数据库逻辑
|
||||
* 通过Feign调用project服务更新算例状态
|
||||
*/
|
||||
private void doUpdate(String processInstanceId, Integer statusCode) {
|
||||
if (processInstanceId == null) {
|
||||
log.warn("流程实例ID为空,跳过状态更新");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(">>> 更新算例状态 [{}] -> {}", processInstanceId, statusCode);
|
||||
try {
|
||||
simulationRunFeignClient.updateStatusByProcessInstanceId(processInstanceId, statusCode);
|
||||
} catch (Exception e) {
|
||||
log.error("更新算例状态失败: processInstanceId={}, status={}", processInstanceId, statusCode, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFailOnException() {
|
||||
// 返回false:即使监听器抛异常,也不影响流程继续执行
|
||||
// 这是容错设计,保证业务流程的稳定性
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFireOnTransactionLifecycleEvent() {
|
||||
// 返回false:在事务提交前触发,确保状态及时更新
|
||||
// 如果返回true,监听器会在事务提交后触发,可能导致状态更新滞后
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOnTransaction() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -472,11 +472,11 @@ public class ProcessService implements Iprocess{
|
||||
if (isRunning) {
|
||||
// --- 运行中 ---
|
||||
if (hasError) {
|
||||
status = ProcessInstanceStateEnum.ERROR.getCode(); // 有死信作业,视为异常
|
||||
status = ProcessInstanceStateEnum.ERROR.getValue(); // 有死信作业,视为异常
|
||||
} else if (isSuspended) {
|
||||
status = ProcessInstanceStateEnum.SUSPENDED.getCode(); // 被挂起
|
||||
status = ProcessInstanceStateEnum.SUSPENDED.getValue(); // 被挂起
|
||||
} else {
|
||||
status = ProcessInstanceStateEnum.RUNNING.getCode(); // 正常运行
|
||||
status = ProcessInstanceStateEnum.RUNNING.getValue(); // 正常运行
|
||||
}
|
||||
} else {
|
||||
// --- 已结束 (运行时查不到,历史表里有) ---
|
||||
@@ -484,11 +484,11 @@ public class ProcessService implements Iprocess{
|
||||
|
||||
if (deleteReason == null) {
|
||||
// 1. 正常走完结束节点,deleteReason 为空
|
||||
status = ProcessInstanceStateEnum.COMPLETED.getCode();
|
||||
status = ProcessInstanceStateEnum.COMPLETED.getValue();
|
||||
} else {
|
||||
// 2. 有删除原因,说明是被取消或强制终止的
|
||||
// 你可以根据 reason 的内容做更细的区分,或者统称为 cancelled
|
||||
status = ProcessInstanceStateEnum.CANCELLED.getCode();
|
||||
status = ProcessInstanceStateEnum.CANCELLED.getValue();
|
||||
}
|
||||
}
|
||||
info.setStatus(status);
|
||||
|
||||
Reference in New Issue
Block a user