diff --git a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java index 1ed4214d..67122f29 100644 --- a/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java +++ b/common/src/main/java/com/sdm/common/entity/req/pbs/SubmitHpcTaskRemoteReq.java @@ -5,14 +5,13 @@ import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import org.springframework.web.multipart.MultipartFile; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; @Data -public class SubmitHpcTaskRemoteReq { - - @Schema(description = "配置时的mm时间戳") - public String timesmap; +public class SubmitHpcTaskRemoteReq implements Serializable { + private static final long serialVersionUID = 10086L; @Schema(description = "计算任务名称") public String jobName; diff --git a/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java index c5048433..00e93c2f 100644 --- a/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java +++ b/common/src/main/java/com/sdm/common/feign/inter/pbs/ITaskFeignClient.java @@ -4,7 +4,6 @@ import com.sdm.common.common.SdmResponse; import com.sdm.common.config.LongTimeRespFeignConfig; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import org.springframework.cloud.openfeign.FeignClient; -import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; @@ -15,7 +14,7 @@ import org.springframework.web.bind.annotation.PostMapping; public interface ITaskFeignClient { // "作业提交" - @PostMapping(value = "/pbs/adapterSubmitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + @PostMapping(value = "/pbs/adapterSubmitHpcJob") SdmResponse adapterSubmitHpcJob( SubmitHpcTaskRemoteReq req); } diff --git a/common/src/main/java/com/sdm/common/utils/FilesUtil.java b/common/src/main/java/com/sdm/common/utils/FilesUtil.java index 56cf15f3..c1ddb463 100644 --- a/common/src/main/java/com/sdm/common/utils/FilesUtil.java +++ b/common/src/main/java/com/sdm/common/utils/FilesUtil.java @@ -329,7 +329,8 @@ public class FilesUtil { String inputFilesRegularStr, AtomicReference masterFilePath, List inputFilePaths) { - Objects.requireNonNull(jobWorkDir, "jobWorkDir 不能为空"); + log.info("求解文件目录={}", jobWorkDir); + Objects.requireNonNull(jobWorkDir, "本地求解文件夹不能为空"); boolean hasMasterRule = masterFileRegularStr != null && !masterFileRegularStr.isBlank(); boolean hasInputRule = diff --git a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java index f1738a30..b9961de5 100644 --- a/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java +++ b/common/src/main/java/com/sdm/common/utils/HpcCommandExcuteUtil.java @@ -172,10 +172,12 @@ public class HpcCommandExcuteUtil { return builder.body(body); } - public SdmResponse callHpcUploadToTarget(String jobId, String workDir) { + public SdmResponse callHpcUploadToTarget(String jobId, String workDir,String callBackMinioDir,String callBackNasDir) { com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject(); paramJson.put("jobId", jobId); paramJson.put("jobWorkDir", workDir); + paramJson.put("callBackMinioDir", callBackMinioDir); + paramJson.put("callBackNasDir", callBackNasDir); Boolean call = false; String resultString = ""; try { diff --git a/data/src/main/resources/application-dev.yml b/data/src/main/resources/application-dev.yml index 40c9e645..7884761b 100644 --- a/data/src/main/resources/application-dev.yml +++ b/data/src/main/resources/application-dev.yml @@ -124,4 +124,5 @@ security: - /data/approveDataFile - /data/downloadFile - /data/flowableUpFileToLocal - - /data/flowableUpFileToLocalMerge \ No newline at end of file + - /data/flowableUpFileToLocalMerge + - /data/getFileBaseInfo \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java index 9a1ab86a..41e98e69 100644 --- a/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java +++ b/flowable/src/main/java/com/sdm/flowable/controller/ProcessController.java @@ -15,15 +15,17 @@ import com.sdm.flowable.dto.req.RetryRequest; import com.sdm.flowable.process.ProcessService; import com.sdm.flowable.service.IProcessNodeParamService; import lombok.extern.slf4j.Slf4j; +import org.flowable.bpmn.model.FlowElement; +import org.flowable.bpmn.model.FlowableListener; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.ReadOnlyDelegateExecution; import org.flowable.engine.runtime.ProcessInstance; import org.flowable.validation.ValidationError; +import org.flowable.variable.api.persistence.entity.VariableInstance; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; @Slf4j @RestController @@ -40,22 +42,6 @@ public class ProcessController implements IFlowableFeignClient { private final ObjectMapper objectMapper = new ObjectMapper(); - // 验证流程模型 - @PostMapping("/testHpc") - public String testHpc(@RequestBody Map params) { - String beforeNodeId = params.get("beforeNodeId").toString(); - HPCExecuteConfig config=new HPCExecuteConfig(); - config.setBeforeNodeId(beforeNodeId); - if(!Objects.isNull(params.get("masterFileRegularStr"))){ - config.setMasterFileRegularStr(params.get("masterFileRegularStr").toString()); - } - if(!Objects.isNull(params.get("inputFilesRegularStr"))){ - config.setInputFilesRegularStr(params.get("inputFilesRegularStr").toString()); - } - hpcHandler.execute(null,params,config); - return "ok"; - } - // 验证流程模型 @PostMapping("/validate") public Map validate(@RequestBody ProcessDefinitionDTO processDTO) { @@ -196,4 +182,439 @@ public class ProcessController implements IFlowableFeignClient { return SdmResponse.failed("重试失败: " + e.getMessage()); } } + + // mock验证HPC流程使用 + @PostMapping("/testHpc") + public String testHpc(@RequestBody Map params) { + String beforeNodeId = params.get("beforeNodeId").toString(); + HPCExecuteConfig config=new HPCExecuteConfig(); + config.setBeforeNodeId(beforeNodeId); + if(!Objects.isNull(params.get("masterFileRegularStr"))){ + config.setMasterFileRegularStr(params.get("masterFileRegularStr").toString()); + } + if(!Objects.isNull(params.get("inputFilesRegularStr"))){ + config.setInputFilesRegularStr(params.get("inputFilesRegularStr").toString()); + } + String currentNodeId = params.get("currentNodeId").toString(); + DelegateExecution execution = new DelegateExecution() { + @Override + public String getId() { + return ""; + } + + @Override + public String getProcessInstanceId() { + return ""; + } + + @Override + public String getRootProcessInstanceId() { + return ""; + } + + @Override + public String getEventName() { + return ""; + } + + @Override + public void setEventName(String eventName) { + + } + + @Override + public String getProcessInstanceBusinessKey() { + return ""; + } + + @Override + public String getProcessInstanceBusinessStatus() { + return ""; + } + + @Override + public String getProcessDefinitionId() { + return ""; + } + + @Override + public String getPropagatedStageInstanceId() { + return ""; + } + + @Override + public String getParentId() { + return ""; + } + + @Override + public String getSuperExecutionId() { + return ""; + } + + @Override + public String getCurrentActivityId() { + return currentNodeId; + } + + @Override + public String getTenantId() { + return ""; + } + + @Override + public FlowElement getCurrentFlowElement() { + return null; + } + + @Override + public void setCurrentFlowElement(FlowElement flowElement) { + + } + + @Override + public FlowableListener getCurrentFlowableListener() { + return null; + } + + @Override + public void setCurrentFlowableListener(FlowableListener currentListener) { + + } + + @Override + public ReadOnlyDelegateExecution snapshotReadOnly() { + return null; + } + + @Override + public DelegateExecution getParent() { + return null; + } + + @Override + public List getExecutions() { + return List.of(); + } + + @Override + public void setActive(boolean isActive) { + + } + + @Override + public boolean isActive() { + return false; + } + + @Override + public boolean isEnded() { + return false; + } + + @Override + public void setConcurrent(boolean isConcurrent) { + + } + + @Override + public boolean isConcurrent() { + return false; + } + + @Override + public boolean isProcessInstanceType() { + return false; + } + + @Override + public void inactivate() { + + } + + @Override + public boolean isScope() { + return false; + } + + @Override + public void setScope(boolean isScope) { + + } + + @Override + public boolean isMultiInstanceRoot() { + return false; + } + + @Override + public void setMultiInstanceRoot(boolean isMultiInstanceRoot) { + + } + + @Override + public Map getVariables() { + return Map.of(); + } + + @Override + public Map getVariableInstances() { + return Map.of(); + } + + @Override + public Map getVariables(Collection variableNames) { + return Map.of(); + } + + @Override + public Map getVariableInstances(Collection variableNames) { + return Map.of(); + } + + @Override + public Map getVariables(Collection variableNames, boolean fetchAllVariables) { + return Map.of(); + } + + @Override + public Map getVariableInstances(Collection variableNames, boolean fetchAllVariables) { + return Map.of(); + } + + @Override + public Map getVariablesLocal() { + return Map.of(); + } + + @Override + public Map getVariableInstancesLocal() { + return Map.of(); + } + + @Override + public Map getVariablesLocal(Collection variableNames) { + return Map.of(); + } + + @Override + public Map getVariableInstancesLocal(Collection variableNames) { + return Map.of(); + } + + @Override + public Map getVariablesLocal(Collection variableNames, boolean fetchAllVariables) { + return Map.of(); + } + + @Override + public Map getVariableInstancesLocal(Collection variableNames, boolean fetchAllVariables) { + return Map.of(); + } + + @Override + public Object getVariable(String variableName) { + return null; + } + + @Override + public VariableInstance getVariableInstance(String variableName) { + return null; + } + + @Override + public Object getVariable(String variableName, boolean fetchAllVariables) { + return null; + } + + @Override + public VariableInstance getVariableInstance(String variableName, boolean fetchAllVariables) { + return null; + } + + @Override + public Object getVariableLocal(String variableName) { + return null; + } + + @Override + public VariableInstance getVariableInstanceLocal(String variableName) { + return null; + } + + @Override + public Object getVariableLocal(String variableName, boolean fetchAllVariables) { + return null; + } + + @Override + public VariableInstance getVariableInstanceLocal(String variableName, boolean fetchAllVariables) { + return null; + } + + @Override + public T getVariable(String variableName, Class variableClass) { + return null; + } + + @Override + public T getVariableLocal(String variableName, Class variableClass) { + return null; + } + + @Override + public Set getVariableNames() { + return Set.of(); + } + + @Override + public Set getVariableNamesLocal() { + return Set.of(); + } + + @Override + public void setVariable(String variableName, Object value) { + + } + + @Override + public void setVariable(String variableName, Object value, boolean fetchAllVariables) { + + } + + @Override + public Object setVariableLocal(String variableName, Object value) { + return null; + } + + @Override + public Object setVariableLocal(String variableName, Object value, boolean fetchAllVariables) { + return null; + } + + @Override + public void setVariables(Map variables) { + + } + + @Override + public void setVariablesLocal(Map variables) { + + } + + @Override + public boolean hasVariables() { + return false; + } + + @Override + public boolean hasVariablesLocal() { + return false; + } + + @Override + public boolean hasVariable(String variableName) { + return false; + } + + @Override + public boolean hasVariableLocal(String variableName) { + return false; + } + + @Override + public void removeVariable(String variableName) { + + } + + @Override + public void removeVariableLocal(String variableName) { + + } + + @Override + public void removeVariables(Collection variableNames) { + + } + + @Override + public void removeVariablesLocal(Collection variableNames) { + + } + + @Override + public void removeVariables() { + + } + + @Override + public void removeVariablesLocal() { + + } + + @Override + public void setTransientVariable(String variableName, Object variableValue) { + + } + + @Override + public void setTransientVariableLocal(String variableName, Object variableValue) { + + } + + @Override + public void setTransientVariables(Map transientVariables) { + + } + + @Override + public Object getTransientVariable(String variableName) { + return null; + } + + @Override + public Map getTransientVariables() { + return Map.of(); + } + + @Override + public void setTransientVariablesLocal(Map transientVariables) { + + } + + @Override + public Object getTransientVariableLocal(String variableName) { + return null; + } + + @Override + public Map getTransientVariablesLocal() { + return Map.of(); + } + + @Override + public void removeTransientVariableLocal(String variableName) { + + } + + @Override + public void removeTransientVariable(String variableName) { + + } + + @Override + public void removeTransientVariables() { + + } + + @Override + public void removeTransientVariablesLocal() { + + } + }; + hpcHandler.execute(execution,params,config); + return "ok"; + } + + } \ No newline at end of file diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java index 25e6a3a6..228a9fb7 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java @@ -152,9 +152,7 @@ public class HpcHandler implements ExecutionHandler,HPCExecu if (params == null) { return req; } -// ObjectMapper objectMapper = new ObjectMapper(); // 需确保ObjectMapper已配置或注入 // 基础字段映射 - req.setTimesmap(params.get("timesmap").toString()); req.setJobName(params.get("jobName").toString()); // 处理int类型字段,包含空值和非数字的异常处理 try { diff --git a/flowable/src/main/resources/application-dev.yml b/flowable/src/main/resources/application-dev.yml index e9c5a86b..14c3f6a3 100644 --- a/flowable/src/main/resources/application-dev.yml +++ b/flowable/src/main/resources/application-dev.yml @@ -36,4 +36,5 @@ mybatis-plus: security: whitelist: paths: - - /pbs/jobFileCallback + - /process/testHpc + - /process/asyncCallback diff --git a/flowable/src/main/resources/application.yml b/flowable/src/main/resources/application.yml index ef46c2ad..caf4dfcd 100644 --- a/flowable/src/main/resources/application.yml +++ b/flowable/src/main/resources/application.yml @@ -1,3 +1,3 @@ spring: profiles: - active: local \ No newline at end of file + active: dev \ No newline at end of file diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java index 0501d1d8..cc12628f 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskAdapter.java @@ -1,5 +1,6 @@ package com.sdm.pbs.controller; +import com.alibaba.fastjson2.JSONObject; import com.sdm.common.common.SdmResponse; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; @@ -11,7 +12,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -30,7 +30,7 @@ public class TaskAdapter implements ITaskFeignClient { @Autowired private TaskController taskController; - @PostMapping(value = "/adapterSubmitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + @PostMapping(value = "/adapterSubmitHpcJob") @Operation(summary = "作业提交") public SdmResponse adapterSubmitHpcJob(SubmitHpcTaskRemoteReq req) { // spdm 回传路径 @@ -42,6 +42,7 @@ public class TaskAdapter implements ITaskFeignClient { } private void getSimulationFile(SubmitHpcTaskRemoteReq req){ + log.info("提交请求参数:{}", JSONObject.toJSONString(req)); String simulationFileLocalPath = req.getSimulationFileLocalPath(); String masterFileRegularStr = req.getMasterFileRegularStr(); String inputFilesRegularStr = req.getInputFilesRegularStr(); diff --git a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java index f6bd0a8b..edb7067e 100644 --- a/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java +++ b/pbs/src/main/java/com/sdm/pbs/controller/TaskController.java @@ -238,8 +238,8 @@ public class TaskController { // 通知文件回传 mock 测试使用 @PostMapping("/callHpcUploadToTarget") public SdmResponse hpcDownloadFiles(@RequestBody Map paramMap) { - return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString()); + return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString(), + paramMap.get("callBackMinioDir").toString(),paramMap.get("callBackNasDir").toString()); } - } diff --git a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java index 91e537ec..a7ad7f18 100644 --- a/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java +++ b/pbs/src/main/java/com/sdm/pbs/schedule/hpc/hander/FinishedStatusHandler.java @@ -45,8 +45,9 @@ public class FinishedStatusHandler implements JobStatusHandler { newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume( statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS)); newDbJob.setUpdateTime(LocalDateTime.now()); - // 通知工具回传文件 - SdmResponse callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath()); + // 通知工具回传文件 minio 或者 nas + SdmResponse callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath(), + newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath()); if (!callResponse.isSuccess()||!callResponse.getData()) { CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath()); return; diff --git a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java index 39ca01e8..b2ff85b6 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java +++ b/pbs/src/main/java/com/sdm/pbs/service/HpcInstructionService.java @@ -55,6 +55,6 @@ public interface HpcInstructionService { ResponseEntity hpcDownloadFile(String fileName,Long fileSize); // 通知hpc回传文件 - SdmResponse callHpcUploadToTarget(String jobId,String workDir); + SdmResponse callHpcUploadToTarget(String jobId,String workDir,String callBackMinioDir,String callBackNasDir); } diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java index 9d772ebc..a794ac09 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/HpcInstructionServiceImpl.java @@ -481,8 +481,8 @@ public class HpcInstructionServiceImpl implements HpcInstructionService { } @Override - public SdmResponse callHpcUploadToTarget(String jobId,String workDir) { - return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir); + public SdmResponse callHpcUploadToTarget(String jobId,String workDir,String callBackMinioDir,String callBackNasDir) { + return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir,callBackMinioDir,callBackNasDir); } /** diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java index a07a99bc..fae96ca5 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java @@ -83,8 +83,19 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { // 任务输出的文件夹 String hpcOutPutDir = extractDirectory(masterFilePath); req.setWorkDir(hpcOutPutDir); - // 前置处理 替换求解文件 - String formatCommand = String.format(req.getCommand(), masterFilePath); + // 前置处理 替换求解文件 todo 从数据库查询 + String command=""; + if(StringUtils.isNotBlank(req.getCommand())) { + command=req.getCommand(); + }else { + SimulationSoftConfig simulationSoftConfig = simulationSoftConfigService.lambdaQuery(). + eq(SimulationSoftConfig::getSoftName,req.getSoftware()).one(); + command = simulationSoftConfig.getCommand(); + } + if(StringUtils.isBlank(command)) { + return SdmResponse.failed("command命令不能为空,软件名称:"+req.getSoftware()); + } + String formatCommand = String.format(command, masterFilePath); req.setCommand(formatCommand); req.setMasterFilePath(masterFilePath); SdmResponse response = pbsService.submitHpcJob(req); diff --git a/pbs/src/main/resources/application-dev.yml b/pbs/src/main/resources/application-dev.yml index 8456fdfb..fe4a820e 100644 --- a/pbs/src/main/resources/application-dev.yml +++ b/pbs/src/main/resources/application-dev.yml @@ -136,4 +136,6 @@ pbs: security: whitelist: paths: - - /pbs/jobFileCallback \ No newline at end of file + - /pbs/jobFileCallback + - /pbs/netTest + - /pbs/adapterSubmitHpcJob \ No newline at end of file