扩展自动化执行的节点为既可以自动,也可以受用户输入控制

This commit is contained in:
2025-11-28 11:46:18 +08:00
parent bf5625e8be
commit b84eaeb583
8 changed files with 196 additions and 41 deletions

View File

@@ -33,4 +33,9 @@ public abstract class BaseExecuteConfig {
private boolean asyncCallback = false;
// 用于标记回调节点ID当asyncCallback为true时表示当前执行节点是异步执行执行完成后需要回调的节点ID,一般就是receiveTaskId
private String callbackNodeId;
// 是否需要等待用户手动提交执行,
// 默认是false,表示不需要等待用户手动提交执行
// true: 流程到此停止,创建人工输入任务,等待用户点击执行
private boolean waitUser = false;
}

View File

@@ -2,7 +2,6 @@ package com.sdm.flowable.controller;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sdm.common.common.SdmResponse;
import com.sdm.flowable.delegate.UniversalDelegate;
import com.sdm.flowable.delegate.handler.HpcHandler;
import com.sdm.flowable.dto.ProcessDefinitionDTO;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
@@ -30,9 +29,6 @@ public class ProcessController {
@Autowired
private IProcessNodeParamService processNodeParamService;
@Autowired
private UniversalDelegate universalDelegate;
@Autowired
private HpcHandler hpcHandler;
@@ -137,8 +133,8 @@ public class ProcessController {
* 根据流程 key 和指定版本获取版本流程定义的节点信息
*/
@GetMapping("/listNodesByProcessDefinitionKey")
public List<Map<String, Object>> listNodesByProcessDefinitionKey(@RequestParam String processDefinitionKey,@RequestParam(required = false)Integer processDefinitionVersion) {
return processService.listNodesByProcessDefinitionKey(processDefinitionKey,processDefinitionVersion);
public List<Map<String, Object>> listNodesByProcessDefinitionKey(@RequestParam String processDefinitionKey, @RequestParam(required = false) Integer processDefinitionVersion) {
return processService.listNodesByProcessDefinitionKey(processDefinitionKey, processDefinitionVersion);
}
/**
@@ -152,8 +148,6 @@ public class ProcessController {
}
/**
* 删除所有流程部署
*/
@@ -239,14 +233,14 @@ public class ProcessController {
}
/**
* 完成人工节点任务
* 流程节点继续执行(完成人工节点/或者等待用户输入后继续手动执行的节点)
*
* @param req
* @return
*/
@PostMapping("/completeManualTasks")
public void completeManualTasks(@RequestBody CompleteTaskReq req) {
processService.completeManualTasks(req);
@PostMapping("/continueServiceTask")
public void continueServiceTask(@RequestBody CompleteTaskReq req) {
processService.continueServiceTask(req);
}
/**
@@ -257,6 +251,6 @@ public class ProcessController {
@PostMapping("/asyncCallback")
public void asyncCallback(@RequestBody AsyncCallbackRequest request) {
// 发送信号唤醒流程实例中等待的节点
universalDelegate.signalByTaskId(request);
processService.asyncCallback(request);
}
}

View File

@@ -11,7 +11,7 @@ import com.sdm.flowable.service.IProcessNodeParamService;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.runtime.Execution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -27,7 +27,7 @@ public class UniversalDelegate implements JavaDelegate {
private ObjectMapper objectMapper;
@Autowired
private IProcessNodeParamService paramService;
private IProcessNodeParamService processNodeParamService;
@Autowired
private IAsyncTaskRecordService asyncTaskRecordService;
@@ -44,7 +44,7 @@ public class UniversalDelegate implements JavaDelegate {
String nodeName = execution.getCurrentFlowElement().getName();
// 2. 读取输入参数
Map<String, Object> params = paramService.getParam(procInstId, nodeId);
Map<String, Object> params = processNodeParamService.getParam(procInstId, nodeId);
log.info("==== 节点执行日志 ====\n流程实例ID{}\n节点ID{}\n节点名称{}\n输入参数{}\n====================",
procInstId, nodeId, nodeName, params);

View File

@@ -10,4 +10,12 @@ public class CompleteTaskReq {
private String processInstanceId;
private String taskDefinitionKey;
private Map<String, Object> variables = new HashMap<>();
/**
* 任务类型FlowElementTypeEnums
* userTask - 普通用户任务
* serviceTask - ServiceTask前置隐藏等待任务
*/
private String flowelementType;
}

View File

@@ -1,10 +1,15 @@
package com.sdm.flowable.process;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.sdm.flowable.delegate.UniversalDelegate;
import com.sdm.flowable.dto.ProcessDefinitionDTO;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.enums.FlowElementTypeEnums;
import com.sdm.flowable.util.Dto2BpmnConverter;
import com.sdm.flowable.constants.FlowableConfig;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.util.FlowNodeIdUtils;
import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.*;
import org.flowable.bpmn.model.Process;
import org.flowable.engine.HistoryService;
@@ -24,12 +29,12 @@ import org.flowable.validation.ValidationError;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
public class ProcessService {
@Autowired
@@ -47,6 +52,9 @@ public class ProcessService {
@Autowired
private Dto2BpmnConverter dto2BpmnConverter;
@Autowired
private UniversalDelegate universalDelegate;
// 部署流程前端传入Flowable标准JSON
public Deployment deploy(ProcessDefinitionDTO processDTO) throws Exception {
BpmnModel bpmnModel = dto2BpmnConverter.convert(processDTO);
@@ -493,17 +501,36 @@ public class ProcessService {
return result;
}
public void completeManualTasks(@RequestBody CompleteTaskReq req) {
Task task = taskService.createTaskQuery()
.processInstanceId(req.getProcessInstanceId())
.taskDefinitionKey(req.getTaskDefinitionKey())
.singleResult();
public void continueServiceTask(@RequestBody CompleteTaskReq req) {
String taskDefKey;
if (task != null) {
taskService.complete(task.getId(), req.getVariables());
// 根据类型确定真正的 taskDefinitionKey
if (FlowElementTypeEnums.fromString(req.getFlowelementType()).equals(FlowElementTypeEnums.SERVICETASK)) {
// 如果是 ServiceTask 前置等待节点
taskDefKey = FlowNodeIdUtils.generateWaitUserTaskId(req.getTaskDefinitionKey());
} else {
throw new RuntimeException("找不到任务!");
// 普通 UserTask
taskDefKey = req.getTaskDefinitionKey();
}
Task task = taskService.createTaskQuery()
.processInstanceId(req.getProcessInstanceId())
.taskDefinitionKey(taskDefKey)
.singleResult();
if (task == null) {
throw new RuntimeException("找不到任务! taskDefinitionKey=" + taskDefKey);
}
// 完成任务
if (req.getVariables() != null) {
taskService.complete(task.getId(), req.getVariables());
} else {
taskService.complete(task.getId());
}
}
public void asyncCallback(AsyncCallbackRequest request) {
// 发送信号唤醒流程实例中等待的节点
universalDelegate.signalByTaskId(request);
}
}

View File

@@ -46,9 +46,12 @@ public class Dto2BpmnConverter {
.filter(e -> FlowElementTypeEnums.SEQUENCEFLOW.getType().equals(e.getType()))
.collect(Collectors.toList());
// 3. 存储异步任务映射关系原节点ID → wait节点ID
Map<String, String> asyncTaskMap = new HashMap<>(); // 异步任务映射(原节点→wait节点)
// 3. 存储异步任务ReceiveTask 映射关系原节点ID → ReceiveTask节点ID
Map<String, String> asyncTaskMap = new HashMap<>(); // 异步任务映射(原节点→ReceiveTask节点)
// 3.1、存储等待用户输入任务映射关系原节点ID → waitUserTask节点ID
Map<String, String> waitUserTaskMap = new HashMap<>(); // 原节点ID → waitUserTask节点ID
// 4. 存储并行网关映射关系原节点ID → 网关ID
Map<String, String> splitGatewayMap = new HashMap<>(); // 拆分网关(原拆分节点→拆分网关)
Map<String, String> joinGatewayMap = new HashMap<>(); // 汇总网关(原汇总节点→汇总网关)
@@ -57,14 +60,16 @@ public class Dto2BpmnConverter {
for (FlowElementDTO nodeDto : nodeDtos) {
// 处理异步任务,创建等待节点,放在穿件实际节点之前是为了构造asyncTaskMap后面createActualNode的时候才能设置回调等待节点
handleAsyncTasks(process, nodeDto, asyncTaskMap);
// 处理等待用户提交任务
handleWaitUserTask(process, nodeDto, waitUserTaskMap);
// 创建实际节点
createActualNode(process, nodeDto,asyncTaskMap);
createActualNode(process, nodeDto, asyncTaskMap);
// 处理并行网关,创建拆分和汇聚节点
addRequiredGateways(process, nodeDto, flowDtos, joinGatewayMap, splitGatewayMap);
}
// 6. 创建连线
createConnections(process, flowDtos, asyncTaskMap, joinGatewayMap, splitGatewayMap);
createConnections(process, flowDtos, asyncTaskMap,waitUserTaskMap, joinGatewayMap, splitGatewayMap);
return bpmnModel;
}
@@ -75,24 +80,44 @@ public class Dto2BpmnConverter {
private void handleAsyncTasks(Process process, FlowElementDTO nodeDto, Map<String, String> asyncTaskMap) {
// 检查节点是否为服务任务或用户任务且标记为异步回调
if ((FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) ||
FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) &&
nodeDto.getExtensionElements() != null &&
nodeDto.getExtensionElements().getExecuteConfig() != null &&
nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback()) {
FlowElementTypeEnums.USERTASK.getType().equals(nodeDto.getType())) &&
nodeDto.getExtensionElements() != null &&
nodeDto.getExtensionElements().getExecuteConfig() != null &&
nodeDto.getExtensionElements().getExecuteConfig().isAsyncCallback()) {
// 创建接收任务节点
String originalNodeId = nodeDto.getId();
String waitNodeId = originalNodeId + "_wait";
String waitNodeId = FlowNodeIdUtils.generateAsyncTaskId(originalNodeId);
ReceiveTask receiveTask = new ReceiveTask();
receiveTask.setId(waitNodeId);
receiveTask.setName(nodeDto.getName() + "等待结果");
process.addFlowElement(receiveTask);
// 记录映射关系
asyncTaskMap.put(originalNodeId, waitNodeId);
}
}
private void handleWaitUserTask(Process process, FlowElementDTO nodeDto, Map<String, String> waitUserTaskMap) {
// 只有当前节点是ServiceTask才需要判断是否等待用户输入需要才创建前置UserTask
if (FlowElementTypeEnums.SERVICETASK.getType().equals(nodeDto.getType()) &&
nodeDto.getExtensionElements() != null &&
nodeDto.getExtensionElements().getExecuteConfig() != null &&
nodeDto.getExtensionElements().getExecuteConfig().isWaitUser()) {
String originalNodeId = nodeDto.getId();
String waitUserId = FlowNodeIdUtils.generateWaitUserTaskId(originalNodeId);
UserTask waitUserTask = new UserTask();
waitUserTask.setId(waitUserId);
waitUserTask.setName(nodeDto.getName() + "等待用户提交");
// 不设置assignee让任何人可以处理
process.addFlowElement(waitUserTask);
// 记录映射
waitUserTaskMap.put(originalNodeId, waitUserId);
}
}
/**
* 添加必要的网关(并行拆分网关和并行汇聚网关)
*/
@@ -107,7 +132,7 @@ public class Dto2BpmnConverter {
// 检查是否需要添加汇聚网关(入度>1
if (incomingCount > 1) {
// 如果入度>1则在节点前插入汇聚网关
String joinGatewayId = "join_gw_" + nodeId;
String joinGatewayId = FlowNodeIdUtils.generateJoinGatewayId(nodeId);
ParallelGateway joinGateway = new ParallelGateway();
joinGateway.setId(joinGatewayId);
joinGateway.setName("并行汇聚-" + nodeDto.getName());
@@ -118,7 +143,7 @@ public class Dto2BpmnConverter {
// 检查是否需要添加拆分网关(出度>1
if (outgoingCount > 1) {
// 如果出度>1则在节点后插入拆分网关
String splitGatewayId = "split_gw_" + nodeId;
String splitGatewayId = FlowNodeIdUtils.generateSplitGatewayId(nodeId);
ParallelGateway splitGateway = new ParallelGateway();
splitGateway.setId(splitGatewayId);
splitGateway.setName("并行拆分-" + nodeDto.getName());
@@ -133,6 +158,7 @@ public class Dto2BpmnConverter {
private void createConnections(Process process,
List<FlowElementDTO> flowDtos,
Map<String, String> asyncTaskMap,
Map<String, String> waitUserTaskMap,
Map<String, String> joinGatewayMap,
Map<String, String> splitGatewayMap) {
@@ -148,9 +174,15 @@ public class Dto2BpmnConverter {
// ====================================================================================
// ③ 第三阶段:处理异步任务(等待节点)
// 原逻辑:原节点 → wait → 原本下游
// 原节点 → wait → 原本下游
// ====================================================================================
handleAsyncTaskConnections(process, asyncTaskMap);
// ====================================================================================
// ④ 第三阶段:处理等待用户提交任务
// 原节点 → waitUserTask → 原节点
// ====================================================================================
handleWaitUserTaskConnections(process, waitUserTaskMap);
}
/**
@@ -273,6 +305,35 @@ public class Dto2BpmnConverter {
}
}
private void handleWaitUserTaskConnections(Process process, Map<String, String> waitUserTaskMap) {
for (String originalNodeId : waitUserTaskMap.keySet()) {
String waitUserId = waitUserTaskMap.get(originalNodeId);
// Step 1: 找出原节点的所有入线,改为指向 waitUserTask
List<SequenceFlow> removeLines = new ArrayList<>();
List<String> originalSources = new ArrayList<>();
for (FlowElement ele : process.getFlowElements()) {
if (ele instanceof SequenceFlow sf) {
if (sf.getTargetRef().equals(originalNodeId)) {
originalSources.add(sf.getSourceRef());
removeLines.add(sf);
}
}
}
removeLines.forEach(f -> process.removeFlowElement(f.getId()));
// Step 2: 添加原来的入线 → waitUserTask
for (String src : originalSources) {
process.addFlowElement(createSequenceFlow(src, waitUserId, null));
}
// Step 3: waitUserTask → 原节点
process.addFlowElement(createSequenceFlow(waitUserId, originalNodeId, null));
}
}
/**
* 创建实际的流程节点
*/

View File

@@ -0,0 +1,60 @@
package com.sdm.flowable.util;
public class FlowNodeIdUtils {
private static final String JOIN_GATEWAY_PREFIX = "join_gw_";
private static final String SPLIT_GATEWAY_PREFIX = "split_gw_";
private static final String ASYNC_TASK_SUFFIX = "_wait";
private static final String WAIT_USER_SUFFIX = "_waitUser";
// ==================== 网关 ====================
public static String generateJoinGatewayId(String nodeId) {
return JOIN_GATEWAY_PREFIX + nodeId;
}
public static String generateSplitGatewayId(String nodeId) {
return SPLIT_GATEWAY_PREFIX + nodeId;
}
public static boolean isJoinGateway(String id) {
return id != null && id.startsWith(JOIN_GATEWAY_PREFIX);
}
public static boolean isSplitGateway(String id) {
return id != null && id.startsWith(SPLIT_GATEWAY_PREFIX);
}
// ==================== 异步接收任务 ====================
public static String generateAsyncTaskId(String nodeId) {
return nodeId + ASYNC_TASK_SUFFIX;
}
public static boolean isAsyncTask(String id) {
return id != null && id.endsWith(ASYNC_TASK_SUFFIX);
}
public static String getOriginalNodeIdFromAsyncTask(String asyncTaskId) {
if (!isAsyncTask(asyncTaskId)) {
throw new IllegalArgumentException("不是异步等待节点: " + asyncTaskId);
}
return asyncTaskId.substring(0, asyncTaskId.length() - ASYNC_TASK_SUFFIX.length());
}
// ==================== 用户等待任务 ====================
public static String generateWaitUserTaskId(String nodeId) {
return nodeId + WAIT_USER_SUFFIX;
}
public static boolean isWaitUserTask(String id) {
return id != null && id.endsWith(WAIT_USER_SUFFIX);
}
public static String getOriginalNodeIdFromWaitUserTask(String waitUserTaskId) {
if (!isWaitUserTask(waitUserTaskId)) {
throw new IllegalArgumentException("不是隐藏等待节点: " + waitUserTaskId);
}
return waitUserTaskId.substring(0, waitUserTaskId.length() - WAIT_USER_SUFFIX.length());
}
}