修改:HPC优化

This commit is contained in:
yangyang01000846
2025-12-07 15:06:02 +08:00
parent 6713511078
commit 55f3683f02
16 changed files with 482 additions and 45 deletions

View File

@@ -5,14 +5,13 @@ import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data; import lombok.Data;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@Data @Data
public class SubmitHpcTaskRemoteReq { public class SubmitHpcTaskRemoteReq implements Serializable {
private static final long serialVersionUID = 10086L;
@Schema(description = "配置时的mm时间戳")
public String timesmap;
@Schema(description = "计算任务名称") @Schema(description = "计算任务名称")
public String jobName; public String jobName;

View File

@@ -4,7 +4,6 @@ import com.sdm.common.common.SdmResponse;
import com.sdm.common.config.LongTimeRespFeignConfig; import com.sdm.common.config.LongTimeRespFeignConfig;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
@@ -15,7 +14,7 @@ import org.springframework.web.bind.annotation.PostMapping;
public interface ITaskFeignClient { public interface ITaskFeignClient {
// "作业提交" // "作业提交"
@PostMapping(value = "/pbs/adapterSubmitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) @PostMapping(value = "/pbs/adapterSubmitHpcJob")
SdmResponse<String> adapterSubmitHpcJob( SubmitHpcTaskRemoteReq req); SdmResponse<String> adapterSubmitHpcJob( SubmitHpcTaskRemoteReq req);
} }

View File

@@ -329,7 +329,8 @@ public class FilesUtil {
String inputFilesRegularStr, String inputFilesRegularStr,
AtomicReference<String> masterFilePath, AtomicReference<String> masterFilePath,
List<String> inputFilePaths) { List<String> inputFilePaths) {
Objects.requireNonNull(jobWorkDir, "jobWorkDir 不能为空"); log.info("求解文件目录={}", jobWorkDir);
Objects.requireNonNull(jobWorkDir, "本地求解文件夹不能为空");
boolean hasMasterRule = boolean hasMasterRule =
masterFileRegularStr != null && !masterFileRegularStr.isBlank(); masterFileRegularStr != null && !masterFileRegularStr.isBlank();
boolean hasInputRule = boolean hasInputRule =

View File

@@ -172,10 +172,12 @@ public class HpcCommandExcuteUtil {
return builder.body(body); return builder.body(body);
} }
public SdmResponse<Boolean> callHpcUploadToTarget(String jobId, String workDir) { public SdmResponse<Boolean> callHpcUploadToTarget(String jobId, String workDir,String callBackMinioDir,String callBackNasDir) {
com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject(); com.alibaba.fastjson2.JSONObject paramJson = new com.alibaba.fastjson2.JSONObject();
paramJson.put("jobId", jobId); paramJson.put("jobId", jobId);
paramJson.put("jobWorkDir", workDir); paramJson.put("jobWorkDir", workDir);
paramJson.put("callBackMinioDir", callBackMinioDir);
paramJson.put("callBackNasDir", callBackNasDir);
Boolean call = false; Boolean call = false;
String resultString = ""; String resultString = "";
try { try {

View File

@@ -124,4 +124,5 @@ security:
- /data/approveDataFile - /data/approveDataFile
- /data/downloadFile - /data/downloadFile
- /data/flowableUpFileToLocal - /data/flowableUpFileToLocal
- /data/flowableUpFileToLocalMerge - /data/flowableUpFileToLocalMerge
- /data/getFileBaseInfo

View File

@@ -15,15 +15,17 @@ import com.sdm.flowable.dto.req.RetryRequest;
import com.sdm.flowable.process.ProcessService; import com.sdm.flowable.process.ProcessService;
import com.sdm.flowable.service.IProcessNodeParamService; import com.sdm.flowable.service.IProcessNodeParamService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.bpmn.model.FlowableListener;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.ReadOnlyDelegateExecution;
import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.validation.ValidationError; import org.flowable.validation.ValidationError;
import org.flowable.variable.api.persistence.entity.VariableInstance;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.HashMap; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Slf4j @Slf4j
@RestController @RestController
@@ -40,22 +42,6 @@ public class ProcessController implements IFlowableFeignClient {
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
// 验证流程模型
@PostMapping("/testHpc")
public String testHpc(@RequestBody Map<String, Object> params) {
String beforeNodeId = params.get("beforeNodeId").toString();
HPCExecuteConfig config=new HPCExecuteConfig();
config.setBeforeNodeId(beforeNodeId);
if(!Objects.isNull(params.get("masterFileRegularStr"))){
config.setMasterFileRegularStr(params.get("masterFileRegularStr").toString());
}
if(!Objects.isNull(params.get("inputFilesRegularStr"))){
config.setInputFilesRegularStr(params.get("inputFilesRegularStr").toString());
}
hpcHandler.execute(null,params,config);
return "ok";
}
// 验证流程模型 // 验证流程模型
@PostMapping("/validate") @PostMapping("/validate")
public Map<String, Object> validate(@RequestBody ProcessDefinitionDTO processDTO) { public Map<String, Object> validate(@RequestBody ProcessDefinitionDTO processDTO) {
@@ -196,4 +182,439 @@ public class ProcessController implements IFlowableFeignClient {
return SdmResponse.failed("重试失败: " + e.getMessage()); return SdmResponse.failed("重试失败: " + e.getMessage());
} }
} }
// mock验证HPC流程使用
@PostMapping("/testHpc")
public String testHpc(@RequestBody Map<String, Object> params) {
String beforeNodeId = params.get("beforeNodeId").toString();
HPCExecuteConfig config=new HPCExecuteConfig();
config.setBeforeNodeId(beforeNodeId);
if(!Objects.isNull(params.get("masterFileRegularStr"))){
config.setMasterFileRegularStr(params.get("masterFileRegularStr").toString());
}
if(!Objects.isNull(params.get("inputFilesRegularStr"))){
config.setInputFilesRegularStr(params.get("inputFilesRegularStr").toString());
}
String currentNodeId = params.get("currentNodeId").toString();
DelegateExecution execution = new DelegateExecution() {
@Override
public String getId() {
return "";
}
@Override
public String getProcessInstanceId() {
return "";
}
@Override
public String getRootProcessInstanceId() {
return "";
}
@Override
public String getEventName() {
return "";
}
@Override
public void setEventName(String eventName) {
}
@Override
public String getProcessInstanceBusinessKey() {
return "";
}
@Override
public String getProcessInstanceBusinessStatus() {
return "";
}
@Override
public String getProcessDefinitionId() {
return "";
}
@Override
public String getPropagatedStageInstanceId() {
return "";
}
@Override
public String getParentId() {
return "";
}
@Override
public String getSuperExecutionId() {
return "";
}
@Override
public String getCurrentActivityId() {
return currentNodeId;
}
@Override
public String getTenantId() {
return "";
}
@Override
public FlowElement getCurrentFlowElement() {
return null;
}
@Override
public void setCurrentFlowElement(FlowElement flowElement) {
}
@Override
public FlowableListener getCurrentFlowableListener() {
return null;
}
@Override
public void setCurrentFlowableListener(FlowableListener currentListener) {
}
@Override
public ReadOnlyDelegateExecution snapshotReadOnly() {
return null;
}
@Override
public DelegateExecution getParent() {
return null;
}
@Override
public List<? extends DelegateExecution> getExecutions() {
return List.of();
}
@Override
public void setActive(boolean isActive) {
}
@Override
public boolean isActive() {
return false;
}
@Override
public boolean isEnded() {
return false;
}
@Override
public void setConcurrent(boolean isConcurrent) {
}
@Override
public boolean isConcurrent() {
return false;
}
@Override
public boolean isProcessInstanceType() {
return false;
}
@Override
public void inactivate() {
}
@Override
public boolean isScope() {
return false;
}
@Override
public void setScope(boolean isScope) {
}
@Override
public boolean isMultiInstanceRoot() {
return false;
}
@Override
public void setMultiInstanceRoot(boolean isMultiInstanceRoot) {
}
@Override
public Map<String, Object> getVariables() {
return Map.of();
}
@Override
public Map<String, VariableInstance> getVariableInstances() {
return Map.of();
}
@Override
public Map<String, Object> getVariables(Collection<String> variableNames) {
return Map.of();
}
@Override
public Map<String, VariableInstance> getVariableInstances(Collection<String> variableNames) {
return Map.of();
}
@Override
public Map<String, Object> getVariables(Collection<String> variableNames, boolean fetchAllVariables) {
return Map.of();
}
@Override
public Map<String, VariableInstance> getVariableInstances(Collection<String> variableNames, boolean fetchAllVariables) {
return Map.of();
}
@Override
public Map<String, Object> getVariablesLocal() {
return Map.of();
}
@Override
public Map<String, VariableInstance> getVariableInstancesLocal() {
return Map.of();
}
@Override
public Map<String, Object> getVariablesLocal(Collection<String> variableNames) {
return Map.of();
}
@Override
public Map<String, VariableInstance> getVariableInstancesLocal(Collection<String> variableNames) {
return Map.of();
}
@Override
public Map<String, Object> getVariablesLocal(Collection<String> variableNames, boolean fetchAllVariables) {
return Map.of();
}
@Override
public Map<String, VariableInstance> getVariableInstancesLocal(Collection<String> variableNames, boolean fetchAllVariables) {
return Map.of();
}
@Override
public Object getVariable(String variableName) {
return null;
}
@Override
public VariableInstance getVariableInstance(String variableName) {
return null;
}
@Override
public Object getVariable(String variableName, boolean fetchAllVariables) {
return null;
}
@Override
public VariableInstance getVariableInstance(String variableName, boolean fetchAllVariables) {
return null;
}
@Override
public Object getVariableLocal(String variableName) {
return null;
}
@Override
public VariableInstance getVariableInstanceLocal(String variableName) {
return null;
}
@Override
public Object getVariableLocal(String variableName, boolean fetchAllVariables) {
return null;
}
@Override
public VariableInstance getVariableInstanceLocal(String variableName, boolean fetchAllVariables) {
return null;
}
@Override
public <T> T getVariable(String variableName, Class<T> variableClass) {
return null;
}
@Override
public <T> T getVariableLocal(String variableName, Class<T> variableClass) {
return null;
}
@Override
public Set<String> getVariableNames() {
return Set.of();
}
@Override
public Set<String> getVariableNamesLocal() {
return Set.of();
}
@Override
public void setVariable(String variableName, Object value) {
}
@Override
public void setVariable(String variableName, Object value, boolean fetchAllVariables) {
}
@Override
public Object setVariableLocal(String variableName, Object value) {
return null;
}
@Override
public Object setVariableLocal(String variableName, Object value, boolean fetchAllVariables) {
return null;
}
@Override
public void setVariables(Map<String, ?> variables) {
}
@Override
public void setVariablesLocal(Map<String, ?> variables) {
}
@Override
public boolean hasVariables() {
return false;
}
@Override
public boolean hasVariablesLocal() {
return false;
}
@Override
public boolean hasVariable(String variableName) {
return false;
}
@Override
public boolean hasVariableLocal(String variableName) {
return false;
}
@Override
public void removeVariable(String variableName) {
}
@Override
public void removeVariableLocal(String variableName) {
}
@Override
public void removeVariables(Collection<String> variableNames) {
}
@Override
public void removeVariablesLocal(Collection<String> variableNames) {
}
@Override
public void removeVariables() {
}
@Override
public void removeVariablesLocal() {
}
@Override
public void setTransientVariable(String variableName, Object variableValue) {
}
@Override
public void setTransientVariableLocal(String variableName, Object variableValue) {
}
@Override
public void setTransientVariables(Map<String, Object> transientVariables) {
}
@Override
public Object getTransientVariable(String variableName) {
return null;
}
@Override
public Map<String, Object> getTransientVariables() {
return Map.of();
}
@Override
public void setTransientVariablesLocal(Map<String, Object> transientVariables) {
}
@Override
public Object getTransientVariableLocal(String variableName) {
return null;
}
@Override
public Map<String, Object> getTransientVariablesLocal() {
return Map.of();
}
@Override
public void removeTransientVariableLocal(String variableName) {
}
@Override
public void removeTransientVariable(String variableName) {
}
@Override
public void removeTransientVariables() {
}
@Override
public void removeTransientVariablesLocal() {
}
};
hpcHandler.execute(execution,params,config);
return "ok";
}
} }

View File

@@ -152,9 +152,7 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
if (params == null) { if (params == null) {
return req; return req;
} }
// ObjectMapper objectMapper = new ObjectMapper(); // 需确保ObjectMapper已配置或注入
// 基础字段映射 // 基础字段映射
req.setTimesmap(params.get("timesmap").toString());
req.setJobName(params.get("jobName").toString()); req.setJobName(params.get("jobName").toString());
// 处理int类型字段包含空值和非数字的异常处理 // 处理int类型字段包含空值和非数字的异常处理
try { try {

View File

@@ -36,4 +36,5 @@ mybatis-plus:
security: security:
whitelist: whitelist:
paths: paths:
- /pbs/jobFileCallback - /process/testHpc
- /process/asyncCallback

View File

@@ -1,3 +1,3 @@
spring: spring:
profiles: profiles:
active: local active: dev

View File

@@ -1,5 +1,6 @@
package com.sdm.pbs.controller; package com.sdm.pbs.controller;
import com.alibaba.fastjson2.JSONObject;
import com.sdm.common.common.SdmResponse; import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
@@ -11,7 +12,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
@@ -30,7 +30,7 @@ public class TaskAdapter implements ITaskFeignClient {
@Autowired @Autowired
private TaskController taskController; private TaskController taskController;
@PostMapping(value = "/adapterSubmitHpcJob", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) @PostMapping(value = "/adapterSubmitHpcJob")
@Operation(summary = "作业提交") @Operation(summary = "作业提交")
public SdmResponse<String> adapterSubmitHpcJob(SubmitHpcTaskRemoteReq req) { public SdmResponse<String> adapterSubmitHpcJob(SubmitHpcTaskRemoteReq req) {
// spdm 回传路径 // spdm 回传路径
@@ -42,6 +42,7 @@ public class TaskAdapter implements ITaskFeignClient {
} }
private void getSimulationFile(SubmitHpcTaskRemoteReq req){ private void getSimulationFile(SubmitHpcTaskRemoteReq req){
log.info("提交请求参数:{}", JSONObject.toJSONString(req));
String simulationFileLocalPath = req.getSimulationFileLocalPath(); String simulationFileLocalPath = req.getSimulationFileLocalPath();
String masterFileRegularStr = req.getMasterFileRegularStr(); String masterFileRegularStr = req.getMasterFileRegularStr();
String inputFilesRegularStr = req.getInputFilesRegularStr(); String inputFilesRegularStr = req.getInputFilesRegularStr();

View File

@@ -238,8 +238,8 @@ public class TaskController {
// 通知文件回传 mock 测试使用 // 通知文件回传 mock 测试使用
@PostMapping("/callHpcUploadToTarget") @PostMapping("/callHpcUploadToTarget")
public SdmResponse<Boolean> hpcDownloadFiles(@RequestBody Map paramMap) { public SdmResponse<Boolean> hpcDownloadFiles(@RequestBody Map paramMap) {
return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString()); return hpcInstructionService.callHpcUploadToTarget(paramMap.get("jobId").toString(), paramMap.get("jobWorkDir").toString(),
paramMap.get("callBackMinioDir").toString(),paramMap.get("callBackNasDir").toString());
} }
} }

View File

@@ -45,8 +45,9 @@ public class FinishedStatusHandler implements JobStatusHandler {
newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume( newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume(
statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS)); statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS));
newDbJob.setUpdateTime(LocalDateTime.now()); newDbJob.setUpdateTime(LocalDateTime.now());
// 通知工具回传文件 // 通知工具回传文件 minio 或者 nas
SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath()); SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath(),
newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath());
if (!callResponse.isSuccess()||!callResponse.getData()) { if (!callResponse.isSuccess()||!callResponse.getData()) {
CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath()); CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath());
return; return;

View File

@@ -55,6 +55,6 @@ public interface HpcInstructionService {
ResponseEntity<StreamingResponseBody> hpcDownloadFile(String fileName,Long fileSize); ResponseEntity<StreamingResponseBody> hpcDownloadFile(String fileName,Long fileSize);
// 通知hpc回传文件 // 通知hpc回传文件
SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir); SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir,String callBackMinioDir,String callBackNasDir);
} }

View File

@@ -481,8 +481,8 @@ public class HpcInstructionServiceImpl implements HpcInstructionService {
} }
@Override @Override
public SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir) { public SdmResponse<Boolean> callHpcUploadToTarget(String jobId,String workDir,String callBackMinioDir,String callBackNasDir) {
return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir); return hpcCommandExcuteUtil.callHpcUploadToTarget(jobId,workDir,callBackMinioDir,callBackNasDir);
} }
/** /**

View File

@@ -83,8 +83,19 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
// 任务输出的文件夹 // 任务输出的文件夹
String hpcOutPutDir = extractDirectory(masterFilePath); String hpcOutPutDir = extractDirectory(masterFilePath);
req.setWorkDir(hpcOutPutDir); req.setWorkDir(hpcOutPutDir);
// 前置处理 替换求解文件 // 前置处理 替换求解文件 todo 从数据库查询
String formatCommand = String.format(req.getCommand(), masterFilePath); String command="";
if(StringUtils.isNotBlank(req.getCommand())) {
command=req.getCommand();
}else {
SimulationSoftConfig simulationSoftConfig = simulationSoftConfigService.lambdaQuery().
eq(SimulationSoftConfig::getSoftName,req.getSoftware()).one();
command = simulationSoftConfig.getCommand();
}
if(StringUtils.isBlank(command)) {
return SdmResponse.failed("command命令不能为空软件名称:"+req.getSoftware());
}
String formatCommand = String.format(command, masterFilePath);
req.setCommand(formatCommand); req.setCommand(formatCommand);
req.setMasterFilePath(masterFilePath); req.setMasterFilePath(masterFilePath);
SdmResponse<String> response = pbsService.submitHpcJob(req); SdmResponse<String> response = pbsService.submitHpcJob(req);

View File

@@ -136,4 +136,6 @@ pbs:
security: security:
whitelist: whitelist:
paths: paths:
- /pbs/jobFileCallback - /pbs/jobFileCallback
- /pbs/netTest
- /pbs/adapterSubmitHpcJob