新增:pbs任务提交接入工作流引擎实现

This commit is contained in:
yangyang01000846
2025-11-27 15:36:42 +08:00
parent 9b51f1c79f
commit b75cfaa338
13 changed files with 153 additions and 24 deletions

View File

@@ -0,0 +1,15 @@
package com.sdm.common.config;
import feign.Request;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LongTimeRespFeignConfig {
@Bean
public Request.Options pbsFeignOptions() {
// 根据你的HPC任务实际需要调整超时时间
// 5秒连接5分钟读取
return new Request.Options(5000, 300000);
}
}

View File

@@ -1,4 +1,4 @@
package com.sdm.pbs.model.req;
package com.sdm.common.entity.req.pbs;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

View File

@@ -0,0 +1,31 @@
package com.sdm.common.feign.impl.pbs;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TaskClientFeignClientImpl implements ITaskFeignClient {
@Autowired
private ITaskFeignClient taskFeignClient;
@Override
public SdmResponse<String> submitHpcJob(SubmitHpcTaskReq req) {
SdmResponse<String> response;
try {
response = taskFeignClient.submitHpcJob(req);
return response;
} catch (Exception e) {
CoreLogger.error("SubmitHpcJob Exception:{}", e.getMessage());
return SdmResponse.failed("Hpc任务提交失败");
}
}
}

View File

@@ -0,0 +1,21 @@
package com.sdm.common.feign.inter.pbs;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.config.LongTimeRespFeignConfig;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(
name = "pbs",
configuration = LongTimeRespFeignConfig.class
)
public interface ITaskFeignClient {
// "作业提交"
@PostMapping("/pbs/submitHpcJob")
SdmResponse<String> submitHpcJob(@RequestBody SubmitHpcTaskReq req);
}

View File

@@ -1,15 +1,15 @@
package com.sdm.flowable.controller;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sdm.common.common.SdmResponse;
import com.sdm.flowable.delegate.UniversalDelegate;
import com.sdm.flowable.delegate.handler.HpcHandler;
import com.sdm.flowable.dto.ProcessDefinitionDTO;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.resp.ProcessInstanceResp;
import com.sdm.flowable.process.ProcessService;
import com.sdm.flowable.service.IProcessNodeParamService;
import com.sdm.flowable.dto.req.CompleteTaskReq;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.delegate.UniversalDelegate;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.repository.Deployment;
import org.flowable.engine.runtime.ProcessInstance;
@@ -33,8 +33,18 @@ public class ProcessController {
@Autowired
private UniversalDelegate universalDelegate;
@Autowired
private HpcHandler hpcHandler;
private final ObjectMapper objectMapper = new ObjectMapper();
// 验证流程模型
@GetMapping("/testHpc")
public String testHpc() {
hpcHandler.mockinit();
return "ok" ;
}
// 验证流程模型
@PostMapping("/validate")
public Map<String, Object> validate(@RequestBody ProcessDefinitionDTO processDTO) {

View File

@@ -1,12 +1,19 @@
package com.sdm.flowable.delegate.handler;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.flowable.config.executeConfig.BaseExecuteConfig;
import com.sdm.flowable.service.IAsyncTaskRecordService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -14,23 +21,70 @@ import java.util.Map;
@Slf4j
@Component("HPC")
public class HpcHandler implements ExecutionHandler {
@Autowired
private IAsyncTaskRecordService asyncTaskRecordService;
@Autowired
private ITaskFeignClient taskFeignClient;
@Override
public void execute(DelegateExecution execution, Map<String, Object> params, BaseExecuteConfig config) {
// 实现HPC处理逻辑...
// INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败)
String status = "INIT";
// 1. 调用 HPC 平台提交任务
// String hpcTaskId = submitHpcTask(params);
String hpcTaskId = "";
SubmitHpcTaskReq mockReq = mockSubmitHpcTaskReq();
SdmResponse<String> submitResp = taskFeignClient.submitHpcJob(mockReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName());
status = "FAIL";
}
String hpcTaskId = submitResp.getData();
// 2. 存数据库(提交状态 + 外部任务ID
asyncTaskRecordService.registerAsyncTask(
execution,
config.getCallbackNodeId(), // ReceiveTask ID
"HPC", // handlerType
new HashMap<>()
new HashMap<>(),
status,
hpcTaskId
);
log.info("HPC 任务 {} 已提交", hpcTaskId);
}
public void mockinit(){
SubmitHpcTaskReq mockReq = mockSubmitHpcTaskReq();
SdmResponse<String> submitResp = taskFeignClient.submitHpcJob(mockReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName());
System.out.println("失败");
}
String hpcTaskId = submitResp.getData();
System.out.println("成功:"+hpcTaskId);
}
private SubmitHpcTaskReq mockSubmitHpcTaskReq() {
SubmitHpcTaskReq req = new SubmitHpcTaskReq();
// 生成任务名称:年月日-时分秒,如 20251127-145120
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd-HHmmss");
String timestamp = sdf.format(new Date());
req.jobName = "电池_"+timestamp;
req.coreNum = 8; // 默认8核
req.software = "reta.exe";
req.jobType = "仿真计算";
req.independence = 1; // 独立任务
req.inputFiles = Arrays.asList("input1.dat", "input2.dat", "input3.dat");
req.masterFile = "master.dat";
req.taskId = "TASKID_" + timestamp;
req.taskName = "测试任务_" + timestamp;
req.runId = "RUNID_" + timestamp;
req.runName = "测试算力_" + timestamp;
req.command = "\\\\CARSAFE\\share\\solver\\RLithium\\reta.exe -i \\\\CARSAFE\\share\\testproject\\testjob\\testtask\\model\\aa.xml";
req.projectname = "新能源汽车锂电池安全性能优化";
return req;
}
}

View File

@@ -1,8 +1,8 @@
package com.sdm.flowable.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.entity.AsyncTaskRecord;
import com.baomidou.mybatisplus.extension.service.IService;
import org.flowable.engine.delegate.DelegateExecution;
import java.util.Map;
@@ -19,7 +19,7 @@ public interface IAsyncTaskRecordService extends IService<AsyncTaskRecord> {
/**
* 注册异步任务
*/
String registerAsyncTask(DelegateExecution execution, String receiveTaskId, String handlerType, Map<String, Object> bizParams);
String registerAsyncTask(DelegateExecution execution, String receiveTaskId, String handlerType, Map<String, Object> bizParams,String status,String asyncTaskId);
/**
* 异步回调恢复流程

View File

@@ -1,11 +1,11 @@
package com.sdm.flowable.service.impl;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.sdm.flowable.dao.AsyncTaskRecordMapper;
import com.sdm.flowable.dto.req.AsyncCallbackRequest;
import com.sdm.flowable.entity.AsyncTaskRecord;
import com.sdm.flowable.dao.AsyncTaskRecordMapper;
import com.sdm.flowable.service.IAsyncTaskRecordService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.flowable.engine.RuntimeService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.runtime.Execution;
@@ -13,7 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.UUID;
/**
* <p>
@@ -31,9 +30,7 @@ public class AsyncTaskRecordServiceImpl extends ServiceImpl<AsyncTaskRecordMappe
/**
* 注册异步任务
*/
public String registerAsyncTask(DelegateExecution execution, String receiveTaskId, String handlerType, Map<String, Object> bizParams) {
String asyncTaskId = UUID.randomUUID().toString();
public String registerAsyncTask(DelegateExecution execution, String receiveTaskId, String handlerType, Map<String, Object> bizParams,String status,String asyncTaskId) {
AsyncTaskRecord record = new AsyncTaskRecord();
record.setAsyncTaskId(asyncTaskId);
record.setProcessInstanceId(execution.getProcessInstanceId());
@@ -41,7 +38,7 @@ public class AsyncTaskRecordServiceImpl extends ServiceImpl<AsyncTaskRecordMappe
record.setReceiveTaskId(receiveTaskId);
record.setHandlerType(handlerType);
record.setRequestJson(JSON.toJSONString(bizParams));
record.setStatus("INIT");
record.setStatus(status);
this.save( record);
return asyncTaskId;
}

View File

@@ -1,15 +1,16 @@
package com.sdm.pbs.controller;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.entity.req.pbs.hpc.*;
import com.sdm.common.entity.resp.pbs.hpc.*;
import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobResp;
import com.sdm.common.entity.resp.pbs.hpc.listtasks.ListTasksResp;
import com.sdm.common.entity.resp.pbs.hpc.nodecore.NodeListCoreResp;
import com.sdm.common.entity.resp.pbs.hpc.nodelist.NodeListResp;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.utils.HpcCommandExcuteUtil;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.service.HpcInstructionService;
import com.sdm.pbs.service.IPbsService;
import io.swagger.v3.oas.annotations.Operation;
@@ -27,7 +28,7 @@ import java.util.Map;
@RestController
@RequestMapping("/pbs")
@Tag(name = "HPC调度", description = "与hpc交互的接口")
public class TaskController {
public class TaskController implements ITaskFeignClient {
@Autowired
private HpcInstructionService hpcInstructionService;

View File

@@ -1,10 +1,10 @@
package com.sdm.pbs.service;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.pbs.model.bo.FileBaseInfo;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import java.util.List;

View File

@@ -2,6 +2,7 @@ package com.sdm.pbs.service.impl;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.constants.HpcConstants;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.common.entity.req.pbs.hpc.*;
import com.sdm.common.entity.resp.pbs.hpc.SubmitHpcJobResp;
import com.sdm.common.entity.resp.pbs.hpc.listjobs.ListJobResp;
@@ -10,7 +11,6 @@ import com.sdm.common.entity.resp.pbs.hpc.listtasks.ListTasksResp;
import com.sdm.pbs.model.bo.FileBaseInfo;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.service.HpcInstructionService;
import com.sdm.pbs.service.IPbsService;
import lombok.extern.slf4j.Slf4j;

View File

@@ -1,10 +1,10 @@
package com.sdm.pbs.service.impl;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq;
import com.sdm.pbs.model.bo.FileBaseInfo;
import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.bo.HpcResouceInfo;
import com.sdm.pbs.model.req.SubmitHpcTaskReq;
import com.sdm.pbs.service.IPbsService;
import com.sdm.pbs.service.IPbsServiceDecorator;
import org.springframework.beans.factory.annotation.Autowired;

View File

@@ -24,7 +24,7 @@ spring:
nacos:
discovery:
server-addr: 192.168.65.161:8848
group: DEV_GROUP
group: YANG_GROUP
enabled: true
# namespace: 3
# username: nacos