This commit is contained in:
2026-01-15 20:03:03 +08:00
7 changed files with 115 additions and 42 deletions

View File

@@ -43,7 +43,7 @@ public class HpcJobStatusScheduleExcutor implements Runnable{
.isNotNull(SimulationJob::getJobId)
.notIn(SimulationJob::getJobStatus, "Canceled", "Failed")
// 非上传中的,非回传结束的。JobStatus 结束 通知 uploading 只会有一次,回传失败后,人工改表修复
.notIn(SimulationJob::getFileStatus, "uploading","finished")
.notIn(SimulationJob::getFileStatus, "uploading","finished","failed")
.list();
if(CollectionUtils.isEmpty(list)){
log.info("HpcJobStatus query db data null");

View File

@@ -1,7 +1,13 @@
package com.sdm.pbs.schedule.hpc.hander;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.sdm.common.common.SdmResponse;
import com.sdm.common.entity.enums.MessageTemplateEnum;
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.req.system.SendMsgReq;
import com.sdm.common.feign.impl.system.MessageFeignClientImpl;
import com.sdm.common.feign.inter.flowable.IFlowableFeignClient;
import com.sdm.common.log.CoreLogger;
import com.sdm.common.utils.DateUtils;
import com.sdm.common.utils.String2NumberUtil;
@@ -9,15 +15,18 @@ import com.sdm.pbs.model.bo.HpcJobStatusInfo;
import com.sdm.pbs.model.entity.SimulationJob;
import com.sdm.pbs.service.HpcInstructionService;
import com.sdm.pbs.service.ISimulationJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class FinishedStatusHandler implements JobStatusHandler {
// 租户桶的前缀 spdm-租户id
@@ -29,10 +38,16 @@ public class FinishedStatusHandler implements JobStatusHandler {
@Autowired
private HpcInstructionService hpcInstructionService;
@Autowired
private MessageFeignClientImpl messageFeignClient;
@Autowired
private IFlowableFeignClient flowableFeignClient;
@Override
public void handle(SimulationJob simJob, HpcJobStatusInfo statusInfo) {
try {
// 过程结束修改
log.info("任务id:{},Hpc任务详情:{}",simJob.getId(),JSON.toJSONString(statusInfo));
if(Objects.equals(statusInfo.getJobStatus(),"Canceled")){
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
newDbJob.setJobStatus(statusInfo.getJobStatus());
newDbJob.setStartTime(statusInfo.getStartTime());
@@ -43,22 +58,49 @@ public class FinishedStatusHandler implements JobStatusHandler {
newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume(
statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS));
newDbJob.setUpdateTime(LocalDateTime.now());
String minioBucket = minioBucketPrefix + newDbJob.getTenantId();
Long userId=newDbJob.getCreatorId();
Long tenantId = newDbJob.getTenantId();
// 通知工具回传文件 minio 或者 nas
SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath(),minioBucket,
newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath(),newDbJob.getDirId(),userId,tenantId);
if (!callResponse.isSuccess()||!callResponse.getData()) {
CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath());
return;
}
// 通知成功修改状态
newDbJob.setFileStatus("uploading");
newDbJob.setFileStatus("failed");
simulationJobService.updateById(newDbJob);
} catch (Exception e) {
CoreLogger.error("HpcJobStatus finshed handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
// 回调通知工作流节点
AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest();
asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId());
asyncCallbackRequest.setStatus("FAIL");
asyncCallbackRequest.setResultJson("Hpc任务已取消");
SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest);
log.info("Hpc任务取消,通知工作流返回:{}",JSON.toJSONString(sdmResponse));
// 发送成功完成的消息
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"Hpc任务已取消");
}
if(Objects.equals(statusInfo.getJobStatus(),"Finished")){
try {
// 过程结束修改
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getId, simJob.getId()).one();
newDbJob.setJobStatus(statusInfo.getJobStatus());
newDbJob.setStartTime(statusInfo.getStartTime());
newDbJob.setEndTime(statusInfo.getEndTime());
newDbJob.setNodeName(statusInfo.getAllocatedNodes());
newDbJob.setTotalKernelTime(String2NumberUtil.stringToLong(statusInfo.getTotalKernelTime()));
newDbJob.setTotalUserTime(String2NumberUtil.stringToLong(statusInfo.getTotalUserTime()));
newDbJob.setTotalElapsedTime(DateUtils.calculateTimeConsume(
statusInfo.getStartTime(), statusInfo.getEndTime(), TimeUnit.SECONDS));
newDbJob.setUpdateTime(LocalDateTime.now());
String minioBucket = minioBucketPrefix + newDbJob.getTenantId();
Long userId=newDbJob.getCreatorId();
Long tenantId = newDbJob.getTenantId();
// 通知工具回传文件 minio 或者 nas
SdmResponse<Boolean> callResponse = hpcInstructionService.callHpcUploadToTarget(newDbJob.getJobId(), newDbJob.getStdoutHpcFilePath(),minioBucket,
newDbJob.getStdoutSpdmMinoFilePath(),newDbJob.getStdoutSpdmNasFilePath(),newDbJob.getDirId(),userId,tenantId);
if (!callResponse.isSuccess()||!callResponse.getData()) {
CoreLogger.error("callHpcUploadToTarget failed,jobId:{},workDir:{}",newDbJob.getJobId(),newDbJob.getStdoutHpcFilePath());
return;
}
// 通知成功修改状态
newDbJob.setFileStatus("uploading");
simulationJobService.updateById(newDbJob);
} catch (Exception e) {
CoreLogger.error("HpcJobStatus finshed handle error:{},newDbJob:{},statusInfo:{}",e.getMessage(),
JSONObject.toJSONString(simJob),JSONObject.toJSONString(statusInfo));
}
}
}
@@ -66,7 +108,17 @@ public class FinishedStatusHandler implements JobStatusHandler {
@Override
public List<String> getSupportedStatus() {
// todo 抽取成枚举类
return Collections.singletonList("Finished");
return Arrays.asList("Finished","Canceled");
}
private void sendMsg(Long tenanId,Long userId,String jobName,String result){
SendMsgReq req = new SendMsgReq();
req.setTitle(MessageTemplateEnum.HPC_END.getTitle());
req.setContent(MessageTemplateEnum.HPC_END.getContent(jobName,result));
req.setTenantId(String.valueOf(tenanId));
req.setUserId(String.valueOf(userId));
CoreLogger.info("hpc finish push msg:{}", JSON.toJSONString(req));
messageFeignClient.sendMessage(req);
}
}

View File

@@ -42,7 +42,7 @@ public class ProcessStatusHandler implements JobStatusHandler{
@Override
public List<String> getSupportedStatus() {
// todo 枚举类
return Arrays.asList("Configuring", "Queued", "Running", "Canceled", "Failed");
return Arrays.asList("Configuring", "Queued", "Running", "Failed");
}
}

View File

@@ -158,7 +158,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
throw new RuntimeException("Hpc执行失败返回jobId空");
}
// 4. 保存任务信息到数据库
saveSimulationJobToDb(req, jobId, hpcOutPutDir, commandResult.getCommand());
saveSimulationJobToDb(req, jobId, hpcOutPutDir);
return SdmResponse.success(jobId);
}
@@ -254,9 +254,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
* @param req 任务请求参数
* @param jobId 任务ID
* @param hpcOutPutDir 输出目录
* @param command 执行命令
* 执行命令
*/
private void saveSimulationJobToDb(SubmitHpcTaskReq req, String jobId, String hpcOutPutDir, String command) {
private void saveSimulationJobToDb(SubmitHpcTaskReq req, String jobId, String hpcOutPutDir) {
if (StringUtils.isNotEmpty(jobId)) {
// 数据入库
SimulationJob simulationJob = new SimulationJob();
@@ -289,7 +289,7 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
simulationJob.setStdoutSpdmNasFilePath(req.getStdoutSpdmNasFilePath());
// 执行信息 定时任务回传的时候修改
// simulationJob.setNodeName("");
simulationJob.setExecutCommand(command);
simulationJob.setExecutCommand(req.getCommand());
// 执行信息 定时任务回传的时候修改
// simulationJob.setStartTime("2025-11-30 10:00:00");
// simulationJob.setEndTime("2025-11-30 12:30:00");
@@ -614,25 +614,42 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
public SdmResponse<Boolean> jobFileCallback(JobFileCallBackReq req) {
CoreLogger.info("hpc jobFileCallback params:{}",JSONObject.toJSONString(req));
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getJobId, req.getJobId()).one();
String fileStatus="";
String status="";
String resultReson="";
String endMsg="";
if(!Objects.isNull(req)&&Objects.equals(req.getUploadResult(),"Y")) {
// 回传成功
newDbJob.setFileStatus("finished");
newDbJob.setUpdateTime(LocalDateTime.now());
simulationJobService.updateById(newDbJob);
// 回调通知工作流节点
AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest();
asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId());
asyncCallbackRequest.setResultJson("finished");
// SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest);
// 发送成功完成的消息
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"成功");
// CoreLogger.info("flowableFeignClient asyncCallback result:{}", JSONObject.toJSONString(sdmResponse));
fileStatus="finished";
status="SUCCESS";
endMsg="成功";
dealCallBack(newDbJob,fileStatus,status,endMsg,resultReson);
return SdmResponse.success(true);
}else{
// 发送失败完成的消息
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"失败");
}
return SdmResponse.success(false);
if(!Objects.isNull(req)&&Objects.equals(req.getUploadResult(),"N")) {
fileStatus="failed";
status="FAIL";
resultReson="文件回传失败";
endMsg="失败";
dealCallBack(newDbJob,fileStatus,status,endMsg,resultReson);
return SdmResponse.success(false);
}
return SdmResponse.success(true);
}
private void dealCallBack(SimulationJob newDbJob,String fileStatus,String status,String endMsg,String resultReson){
// 回传成功
newDbJob.setFileStatus(fileStatus);
newDbJob.setUpdateTime(LocalDateTime.now());
simulationJobService.updateById(newDbJob);
// 回调通知工作流节点
AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest();
asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId());
asyncCallbackRequest.setStatus(status);
asyncCallbackRequest.setResultJson(resultReson);
SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest);
// 发送成功完成的消息
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),endMsg);
log.info("Hpc任务{}完成,通知工作流返回:{}",endMsg,JSON.toJSONString(sdmResponse));
}
private void sendMsg(Long tenanId,Long userId,String jobName,String result){

View File

@@ -1037,9 +1037,12 @@ public class SimulationRunServiceImpl extends ServiceImpl<SimulationRunMapper, S
public SdmResponse deliverableApprove(DeliverableApproveReq req) {
SimulationTask simulationTask = simulationTaskService.lambdaQuery().eq(SimulationTask::getUuid, req.getTaskId()).one();
if (simulationTask != null) {
if (Arrays.asList(ApprovalStatusEnum.ING.getCode(), ApprovalStatusEnum.PASSED.getCode()).contains(simulationTask.getApprovalStatus())) {
if (ApprovalStatusEnum.ING.getCode().equals(simulationTask.getApprovalStatus())) {
return SdmResponse.failed("任务评审中,请勿重复发起");
}
if (ApprovalStatusEnum.PASSED.getCode().equals(simulationTask.getApprovalStatus())) {
return SdmResponse.failed("任务评审通过,请勿重复发起");
}
ApprovalDeliverableContentsModel contentsModel = new ApprovalDeliverableContentsModel();
contentsModel.setTaskId(req.getTaskId());
contentsModel.setDifficult(req.getDifficult());

View File

@@ -615,6 +615,7 @@
<if test="name != null and name != ''">
and runName like CONCAT('%',#{name},'%')
</if>
order by createTime desc
</select>
<select id="getNodeListByTag" resultType="com.sdm.project.model.po.ProjectNodePo">

View File

@@ -422,7 +422,7 @@
</select>
<select id="getAnalysisTask" resultType="com.sdm.project.model.vo.SpdmAnalysisTaskVo">
select * from simulation_task where uuid = #{taskNodeId}
select * from simulation_task where uuid = #{taskNodeId} order by create_time desc
</select>