新增:hpc任务开始结束增加消息通知

This commit is contained in:
yangyang01000846
2026-01-05 09:34:33 +08:00
parent 3587489fcd
commit 5981cdad6c
3 changed files with 56 additions and 8 deletions

View File

@@ -9,8 +9,8 @@ public enum MessageTemplateEnum {
DATA_ALERT("数据通知", "您的数据存储空间已达阈值,请前往仿真数据管理平台查看"), DATA_ALERT("数据通知", "您的数据存储空间已达阈值,请前往仿真数据管理平台查看"),
DATA_ALERT_MANAGER("数据通知", "%s的数据存储空间已达阈值请前往仿真数据管理平台查看"), DATA_ALERT_MANAGER("数据通知", "%s的数据存储空间已达阈值请前往仿真数据管理平台查看"),
APPROVE_ALERT("审批通知", "收到一条%s消息请前往仿真数据管理平台查看"), APPROVE_ALERT("审批通知", "收到一条%s消息请前往仿真数据管理平台查看"),
HPC_START("作业通知", "作业已发起"), HPC_START("作业通知", "HPC任务作业%s已发起%s"),
HPC_END("作业通知", "作业已结束") HPC_END("作业通知", "HPC任务作业%s已结束执行结果%s")
; ;
private final String title; private final String title;
@@ -30,10 +30,13 @@ public enum MessageTemplateEnum {
} }
/** /**
* 获取模板内容(含参数替换 * 支持任意数量参数的占位符替换
* @param args 替换占位符的参数(数量与模板中的%s一致,顺序也要一致)
* @return 替换后的完整内容
*/ */
public String getContent(String taskName) { public String getContent(Object... args) {
return String.format(content, taskName); // 无参数时直接返回原内容;有参数时执行格式化
return args == null || args.length == 0 ? content : String.format(content, args);
} }
} }

View File

@@ -1,13 +1,17 @@
package com.sdm.flowable.delegate.handler; package com.sdm.flowable.delegate.handler;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.sdm.common.common.SdmResponse; import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.common.ThreadLocalContext;
import com.sdm.common.config.FlowableConfig; import com.sdm.common.config.FlowableConfig;
import com.sdm.common.entity.enums.MessageTemplateEnum;
import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig;
import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.data.GetFileBaseInfoReq;
import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq;
import com.sdm.common.entity.req.system.SendMsgReq;
import com.sdm.common.entity.resp.data.FileMetadataInfoResp; import com.sdm.common.entity.resp.data.FileMetadataInfoResp;
import com.sdm.common.feign.impl.system.MessageFeignClientImpl;
import com.sdm.common.feign.inter.data.IDataFeignClient; import com.sdm.common.feign.inter.data.IDataFeignClient;
import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.feign.inter.pbs.ITaskFeignClient;
import com.sdm.common.log.CoreLogger; import com.sdm.common.log.CoreLogger;
@@ -43,6 +47,9 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
@Autowired @Autowired
private IDataFeignClient dataFeignClient; private IDataFeignClient dataFeignClient;
@Autowired
private MessageFeignClientImpl messageFeignClient;
/* /*
* params:业务参数 * params:业务参数
* config框架属性 * config框架属性
@@ -82,10 +89,15 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
// 1. 调用 HPC 平台提交任务 // 1. 调用 HPC 平台提交任务
SdmResponse<String> submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq); SdmResponse<String> submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq);
if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){
// 推送失败消息
sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"失败");
log.error("HpcHandler submit failed,jobName:{}",params); log.error("HpcHandler submit failed,jobName:{}",params);
throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage()); throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage());
} }
String hpcTaskId = submitResp.getData(); String hpcTaskId = submitResp.getData();
// 推送成功消息
sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"成功");
CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId); CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId);
// 2. 存数据库(提交状态 + 外部任务ID // 2. 存数据库(提交状态 + 外部任务ID
asyncTaskRecordService.registerAsyncTask( asyncTaskRecordService.registerAsyncTask(
@@ -100,6 +112,16 @@ public class HpcHandler implements ExecutionHandler<Map<String, Object>,HPCExecu
log.info("HPC 任务 {} 已提交", "hpcTaskId"); log.info("HPC 任务 {} 已提交", "hpcTaskId");
} }
private void sendMsg(Long tenanId,Long userId,String jobName,String result){
SendMsgReq req = new SendMsgReq();
req.setTitle(MessageTemplateEnum.HPC_START.getTitle());
req.setContent(MessageTemplateEnum.HPC_START.getContent(jobName,result));
req.setTenantId(String.valueOf(tenanId));
req.setUserId(String.valueOf(userId));
log.info("hpc start push msg:{}", JSON.toJSONString(req));
messageFeignClient.sendMessage(req);
}
private void initUserInfo(DelegateExecution execution) { private void initUserInfo(DelegateExecution execution) {
// 获取当前流程实例参数 // 获取当前流程实例参数
Long userId = (Long) execution.getVariable("userId"); Long userId = (Long) execution.getVariable("userId");

View File

@@ -1,14 +1,18 @@
package com.sdm.pbs.service.impl; package com.sdm.pbs.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import com.sdm.common.common.SdmResponse; import com.sdm.common.common.SdmResponse;
import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.common.ThreadLocalContext;
import com.sdm.common.entity.enums.MessageTemplateEnum;
import com.sdm.common.entity.req.flowable.AsyncCallbackRequest; import com.sdm.common.entity.req.flowable.AsyncCallbackRequest;
import com.sdm.common.entity.req.system.SendMsgReq;
import com.sdm.common.entity.resp.PageDataResp; import com.sdm.common.entity.resp.PageDataResp;
import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo;
import com.sdm.common.feign.impl.system.MessageFeignClientImpl;
import com.sdm.common.feign.inter.flowable.IFlowableFeignClient; import com.sdm.common.feign.inter.flowable.IFlowableFeignClient;
import com.sdm.common.log.CoreLogger; import com.sdm.common.log.CoreLogger;
import com.sdm.common.utils.HpcCommandExcuteUtil; import com.sdm.common.utils.HpcCommandExcuteUtil;
@@ -73,6 +77,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
@Autowired @Autowired
private ISimulationCommandPlaceholderService simulationCommandPlaceholderService; private ISimulationCommandPlaceholderService simulationCommandPlaceholderService;
@Autowired
private MessageFeignClientImpl messageFeignClient;
@Override @Override
public SdmResponse<HpcResouceInfo> queryHpcResource() { public SdmResponse<HpcResouceInfo> queryHpcResource() {
return pbsService.queryHpcResource(); return pbsService.queryHpcResource();
@@ -322,9 +329,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
public SdmResponse<Boolean> jobFileCallback(JobFileCallBackReq req) { public SdmResponse<Boolean> jobFileCallback(JobFileCallBackReq req) {
CoreLogger.info("hpc jobFileCallback params:{}",JSONObject.toJSONString(req)); CoreLogger.info("hpc jobFileCallback params:{}",JSONObject.toJSONString(req));
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getJobId, req.getJobId()).one();
if(!Objects.isNull(req)&&Objects.equals(req.getUploadResult(),"Y")) { if(!Objects.isNull(req)&&Objects.equals(req.getUploadResult(),"Y")) {
// 回传成功 // 回传成功
SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getJobId, req.getJobId()).one();
newDbJob.setFileStatus("finished"); newDbJob.setFileStatus("finished");
newDbJob.setUpdateTime(LocalDateTime.now()); newDbJob.setUpdateTime(LocalDateTime.now());
simulationJobService.updateById(newDbJob); simulationJobService.updateById(newDbJob);
@@ -332,11 +339,27 @@ public class PbsServiceDecorator implements IPbsServiceDecorator {
AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest(); AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest();
asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId()); asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId());
asyncCallbackRequest.setResultJson("finished"); asyncCallbackRequest.setResultJson("finished");
SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest); // SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest);
CoreLogger.info("flowableFeignClient asyncCallback result:{}", JSONObject.toJSONString(sdmResponse)); // 发送成功完成的消息
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"成功");
// CoreLogger.info("flowableFeignClient asyncCallback result:{}", JSONObject.toJSONString(sdmResponse));
return SdmResponse.success(true); return SdmResponse.success(true);
}else{
// 发送失败完成的消息
sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"失败");
} }
return SdmResponse.success(false); return SdmResponse.success(false);
} }
private void sendMsg(Long tenanId,Long userId,String jobName,String result){
SendMsgReq req = new SendMsgReq();
req.setTitle(MessageTemplateEnum.HPC_END.getTitle());
req.setContent(MessageTemplateEnum.HPC_END.getContent(jobName,result));
req.setTenantId(String.valueOf(tenanId));
req.setUserId(String.valueOf(userId));
CoreLogger.info("hpc finish push msg:{}", JSON.toJSONString(req));
messageFeignClient.sendMessage(req);
}
} }