From b75cfaa338afdeaeb615f9d089a7d16a6c2e0c06 Mon Sep 17 00:00:00 2001 From: yangyang01000846 <15195822163@163.com> Date: Thu, 27 Nov 2025 15:36:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Apbs=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=8F=90=E4=BA=A4=E6=8E=A5=E5=85=A5=E5=B7=A5=E4=BD=9C?= =?UTF-8?q?=E6=B5=81=E5=BC=95=E6=93=8E=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/LongTimeRespFeignConfig.java | 15 +++++ .../entity/req/pbs}/SubmitHpcTaskReq.java | 2 +- .../impl/pbs/TaskClientFeignClientImpl.java | 31 ++++++++++ .../feign/inter/pbs/ITaskFeignClient.java | 21 +++++++ .../controller/ProcessController.java | 18 ++++-- .../flowable/delegate/handler/HpcHandler.java | 62 +++++++++++++++++-- .../service/IAsyncTaskRecordService.java | 4 +- .../impl/AsyncTaskRecordServiceImpl.java | 11 ++-- .../sdm/pbs/controller/TaskController.java | 5 +- .../java/com/sdm/pbs/service/IPbsService.java | 2 +- .../pbs/service/impl/IPbsHpcServiceImpl.java | 2 +- .../pbs/service/impl/PbsServiceDecorator.java | 2 +- pbs/src/main/resources/application-dev.yml | 2 +- 13 files changed, 153 insertions(+), 24 deletions(-) create mode 100644 common/src/main/java/com/sdm/common/config/LongTimeRespFeignConfig.java rename {pbs/src/main/java/com/sdm/pbs/model/req => common/src/main/java/com/sdm/common/entity/req/pbs}/SubmitHpcTaskReq.java (97%) create mode 100644 common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java create mode 100644 common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java diff --git a/common/src/main/java/com/sdm/common/config/LongTimeRespFeignConfig.java b/common/src/main/java/com/sdm/common/config/LongTimeRespFeignConfig.java new file mode 100644 index 00000000..03195547 --- /dev/null +++ b/common/src/main/java/com/sdm/common/config/LongTimeRespFeignConfig.java @@ -0,0 +1,15 @@ +package com.sdm.common.config; + +import feign.Request; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class LongTimeRespFeignConfig { + @Bean + public Request.Options pbsFeignOptions() { + // 根据你的HPC任务实际需要调整超时时间 + // 5秒连接,5分钟读取 + return new Request.Options(5000, 300000); + } +} diff --git a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskReq.java similarity index 97% rename from pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java rename to common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskReq.java index 7853d14d..6cb09e6f 100644 --- a/pbs/src/main/java/com/sdm/pbs/model/req/SubmitHpcTaskReq.java +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskReq.java @@ -1,4 +1,4 @@ -package com.sdm.pbs.model.req; +package com.sdm.common.entity.req.pbs; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; diff --git a/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java b/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java new file mode 100644 index 00000000..64ece921 --- /dev/null +++ b/common/src/main/java/com/sdm/common/feign/impl/pbs/TaskClientFeignClientImpl.java @@ -0,0 +1,31 @@ +package com.sdm.common.feign.impl.pbs; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq; +import com.sdm.common.feign.inter.pbs.ITaskFeignClient; +import com.sdm.common.log.CoreLogger; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TaskClientFeignClientImpl implements ITaskFeignClient { + + @Autowired + private ITaskFeignClient taskFeignClient; + + @Override + public SdmResponse submitHpcJob(SubmitHpcTaskReq req) { + SdmResponse response; + try { + response = taskFeignClient.submitHpcJob(req); + return response; + } catch (Exception e) { + CoreLogger.error("SubmitHpcJob Exception:{}", e.getMessage()); + return SdmResponse.failed("Hpc任务提交失败"); + } + + } + +} diff --git a/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java new file mode 100644 index 00000000..0a4b84d5 --- /dev/null +++ b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java @@ -0,0 +1,21 @@ +package com.sdm.common.feign.inter.pbs; + +import com.sdm.common.common.SdmResponse; +import com.sdm.common.config.LongTimeRespFeignConfig; +import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + + +@FeignClient( + name = "pbs", + configuration = LongTimeRespFeignConfig.class +) +public interface ITaskFeignClient { + + // "作业提交" + @PostMapping("/pbs/submitHpcJob") + SdmResponse submitHpcJob(@RequestBody SubmitHpcTaskReq req); + +} diff --git a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java index 13fbda64..0b494933 100644 --- a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java +++ b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java @@ -1,15 +1,15 @@ package com.sdm.flowable.controller; -import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.sdm.common.common.SdmResponse; +import com.sdm.flowable.delegate.UniversalDelegate; +import com.sdm.flowable.delegate.handler.HpcHandler; import com.sdm.flowable.dto.ProcessDefinitionDTO; +import com.sdm.flowable.dto.req.AsyncCallbackRequest; +import com.sdm.flowable.dto.req.CompleteTaskReq; import com.sdm.flowable.dto.resp.ProcessInstanceResp; import com.sdm.flowable.process.ProcessService; import com.sdm.flowable.service.IProcessNodeParamService; -import com.sdm.flowable.dto.req.CompleteTaskReq; -import com.sdm.flowable.dto.req.AsyncCallbackRequest; -import com.sdm.flowable.delegate.UniversalDelegate; import org.flowable.engine.history.HistoricProcessInstance; import org.flowable.engine.repository.Deployment; import org.flowable.engine.runtime.ProcessInstance; @@ -33,8 +33,18 @@ public class ProcessController { @Autowired private UniversalDelegate universalDelegate; + @Autowired + private HpcHandler hpcHandler; + private final ObjectMapper objectMapper = new ObjectMapper(); + // 验证流程模型 + @GetMapping("/testHpc") + public String testHpc() { + hpcHandler.mockinit(); + return "ok" ; + } + // 验证流程模型 @PostMapping("/validate") public Map validate(@RequestBody ProcessDefinitionDTO processDTO) { diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java index 99a643ed..819779fd 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java @@ -1,12 +1,19 @@ package com.sdm.flowable.delegate.handler; +import com.sdm.common.common.SdmResponse; +import com.sdm.common.entity.req.pbs.SubmitHpcTaskReq; +import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.flowable.config.executeConfig.BaseExecuteConfig; import com.sdm.flowable.service.IAsyncTaskRecordService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.flowable.engine.delegate.DelegateExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -14,23 +21,70 @@ import java.util.Map; @Slf4j @Component("HPC") public class HpcHandler implements ExecutionHandler { + @Autowired private IAsyncTaskRecordService asyncTaskRecordService; + @Autowired + private ITaskFeignClient taskFeignClient; + @Override public void execute(DelegateExecution execution, Map params, BaseExecuteConfig config) { // 实现HPC处理逻辑... + // INIT(初始化)/RUNNING(执行中)/SUCCESS(执行成功)/FAIL(执行失败) + String status = "INIT"; // 1. 调用 HPC 平台提交任务 - // String hpcTaskId = submitHpcTask(params); - String hpcTaskId = ""; + SubmitHpcTaskReq mockReq = mockSubmitHpcTaskReq(); + SdmResponse submitResp = taskFeignClient.submitHpcJob(mockReq); + if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ + log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName()); + status = "FAIL"; + } + String hpcTaskId = submitResp.getData(); + // 2. 存数据库(提交状态 + 外部任务ID) asyncTaskRecordService.registerAsyncTask( execution, config.getCallbackNodeId(), // ReceiveTask ID "HPC", // handlerType - new HashMap<>() + new HashMap<>(), + status, + hpcTaskId ); - log.info("HPC 任务 {} 已提交", hpcTaskId); } + + + public void mockinit(){ + SubmitHpcTaskReq mockReq = mockSubmitHpcTaskReq(); + SdmResponse submitResp = taskFeignClient.submitHpcJob(mockReq); + if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ + log.error("HpcHandler submit failed,jobName:{}",mockReq.getJobName()); + System.out.println("失败"); + } + String hpcTaskId = submitResp.getData(); + System.out.println("成功:"+hpcTaskId); + } + + private SubmitHpcTaskReq mockSubmitHpcTaskReq() { + SubmitHpcTaskReq req = new SubmitHpcTaskReq(); + // 生成任务名称:年月日-时分秒,如 20251127-145120 + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd-HHmmss"); + String timestamp = sdf.format(new Date()); + req.jobName = "电池_"+timestamp; + req.coreNum = 8; // 默认8核 + req.software = "reta.exe"; + req.jobType = "仿真计算"; + req.independence = 1; // 独立任务 + req.inputFiles = Arrays.asList("input1.dat", "input2.dat", "input3.dat"); + req.masterFile = "master.dat"; + req.taskId = "TASKID_" + timestamp; + req.taskName = "测试任务_" + timestamp; + req.runId = "RUNID_" + timestamp; + req.runName = "测试算力_" + timestamp; + req.command = "\\\\CARSAFE\\share\\solver\\RLithium\\reta.exe -i \\\\CARSAFE\\share\\testproject\\testjob\\testtask\\model\\aa.xml"; + req.projectname = "新能源汽车锂电池安全性能优化"; + return req; + } + } \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/service/IAsyncTaskRecordService.java b/flowable/src/main/java/com/sdm/flowable/service/IAsyncTaskRecordService.java index 5f61b23d..4a7c7c0b 100644 --- a/flowable/src/main/java/com/sdm/flowable/service/IAsyncTaskRecordService.java +++ b/flowable/src/main/java/com/sdm/flowable/service/IAsyncTaskRecordService.java @@ -1,8 +1,8 @@ package com.sdm.flowable.service; +import com.baomidou.mybatisplus.extension.service.IService; import com.sdm.flowable.dto.req.AsyncCallbackRequest; import com.sdm.flowable.entity.AsyncTaskRecord; -import com.baomidou.mybatisplus.extension.service.IService; import org.flowable.engine.delegate.DelegateExecution; import java.util.Map; @@ -19,7 +19,7 @@ public interface IAsyncTaskRecordService extends IService { /** * 注册异步任务 */ - String registerAsyncTask(DelegateExecution execution, String receiveTaskId, String handlerType, Map bizParams); + String registerAsyncTask(DelegateExecution execution, String receiveTaskId, String handlerType, Map bizParams,String status,String asyncTaskId); /** * 异步回调恢复流程 diff --git a/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java b/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java index 23dd2d00..a8252e16 100644 --- a/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java +++ b/flowable/src/main/java/com/sdm/flowable/service/impl/AsyncTaskRecordServiceImpl.java @@ -1,11 +1,11 @@ package com.sdm.flowable.service.impl; import com.alibaba.fastjson2.JSON; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.sdm.flowable.dao.AsyncTaskRecordMapper; import com.sdm.flowable.dto.req.AsyncCallbackRequest; import com.sdm.flowable.entity.AsyncTaskRecord; -import com.sdm.flowable.dao.AsyncTaskRecordMapper; import com.sdm.flowable.service.IAsyncTaskRecordService; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.flowable.engine.RuntimeService; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.runtime.Execution; @@ -13,7 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Map; -import java.util.UUID; /** *

@@ -31,9 +30,7 @@ public class AsyncTaskRecordServiceImpl extends ServiceImpl bizParams) { - String asyncTaskId = UUID.randomUUID().toString(); - + public String registerAsyncTask(DelegateExecution execution, String receiveTaskId, String handlerType, Map bizParams,String status,String asyncTaskId) { AsyncTaskRecord record = new AsyncTaskRecord(); record.setAsyncTaskId(asyncTaskId); record.setProcessInstanceId(execution.getProcessInstanceId()); @@ -41,7 +38,7 @@ public class AsyncTaskRecordServiceImpl extends ServiceImpl