From 5981cdad6ce2d470d1d871523f7f7f37d823f3d5 Mon Sep 17 00:00:00 2001 From: yangyang01000846 <15195822163@163.com> Date: Mon, 5 Jan 2026 09:34:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=EF=BC=9Ahpc=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=BC=80=E5=A7=8B=E7=BB=93=E6=9D=9F=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/enums/MessageTemplateEnum.java | 13 +++++---- .../flowable/delegate/handler/HpcHandler.java | 22 ++++++++++++++ .../pbs/service/impl/PbsServiceDecorator.java | 29 +++++++++++++++++-- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/com/sdm/common/entity/enums/MessageTemplateEnum.java b/common/src/main/java/com/sdm/common/entity/enums/MessageTemplateEnum.java index 16ab35ba..b43e17a9 100644 --- a/common/src/main/java/com/sdm/common/entity/enums/MessageTemplateEnum.java +++ b/common/src/main/java/com/sdm/common/entity/enums/MessageTemplateEnum.java @@ -9,8 +9,8 @@ public enum MessageTemplateEnum { DATA_ALERT("数据通知", "您的数据存储空间已达阈值,请前往仿真数据管理平台查看"), DATA_ALERT_MANAGER("数据通知", "%s的数据存储空间已达阈值,请前往仿真数据管理平台查看"), APPROVE_ALERT("审批通知", "收到一条%s消息,请前往仿真数据管理平台查看"), - HPC_START("作业通知", "作业已发起"), - HPC_END("作业通知", "作业已结束") + HPC_START("作业通知", "HPC任务作业:%s,已发起%s"), + HPC_END("作业通知", "HPC任务作业:%s,已结束,执行结果:%s") ; private final String title; @@ -30,10 +30,13 @@ public enum MessageTemplateEnum { } /** - * 获取模板内容(含参数替换) + * 支持任意数量参数的占位符替换 + * @param args 替换占位符的参数(数量与模板中的%s一致,顺序也要一致) + * @return 替换后的完整内容 */ - public String getContent(String taskName) { - return String.format(content, taskName); + public String getContent(Object... args) { + // 无参数时直接返回原内容;有参数时执行格式化 + return args == null || args.length == 0 ? content : String.format(content, args); } } diff --git a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java index 63b9c1f3..e2424f22 100644 --- a/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java +++ b/flowable/src/main/java/com/sdm/flowable/delegate/handler/HpcHandler.java @@ -1,13 +1,17 @@ package com.sdm.flowable.delegate.handler; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.sdm.common.common.SdmResponse; import com.sdm.common.common.ThreadLocalContext; import com.sdm.common.config.FlowableConfig; +import com.sdm.common.entity.enums.MessageTemplateEnum; import com.sdm.common.entity.flowable.executeConfig.HPCExecuteConfig; import com.sdm.common.entity.req.data.GetFileBaseInfoReq; import com.sdm.common.entity.req.pbs.SubmitHpcTaskRemoteReq; +import com.sdm.common.entity.req.system.SendMsgReq; import com.sdm.common.entity.resp.data.FileMetadataInfoResp; +import com.sdm.common.feign.impl.system.MessageFeignClientImpl; import com.sdm.common.feign.inter.data.IDataFeignClient; import com.sdm.common.feign.inter.pbs.ITaskFeignClient; import com.sdm.common.log.CoreLogger; @@ -43,6 +47,9 @@ public class HpcHandler implements ExecutionHandler,HPCExecu @Autowired private IDataFeignClient dataFeignClient; + @Autowired + private MessageFeignClientImpl messageFeignClient; + /* * params:业务参数 * config:框架属性 @@ -82,10 +89,15 @@ public class HpcHandler implements ExecutionHandler,HPCExecu // 1. 调用 HPC 平台提交任务 SdmResponse submitResp = taskFeignClient.adapterSubmitHpcJob(submitHpcTaskRemoteReq); if(!submitResp.isSuccess()|| StringUtils.isBlank(submitResp.getData())){ + // 推送失败消息 + sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"失败"); log.error("HpcHandler submit failed,jobName:{}",params); throw new RuntimeException("HpcHandler submit failed,"+submitResp.getMessage()); } + String hpcTaskId = submitResp.getData(); + // 推送成功消息 + sendMsg(ThreadLocalContext.getTenantId(),ThreadLocalContext.getUserId(),submitHpcTaskRemoteReq.getJobName(),"成功"); CoreLogger.info("hpc task submit succ jobId:{}",hpcTaskId); // 2. 存数据库(提交状态 + 外部任务ID) asyncTaskRecordService.registerAsyncTask( @@ -100,6 +112,16 @@ public class HpcHandler implements ExecutionHandler,HPCExecu log.info("HPC 任务 {} 已提交", "hpcTaskId"); } + private void sendMsg(Long tenanId,Long userId,String jobName,String result){ + SendMsgReq req = new SendMsgReq(); + req.setTitle(MessageTemplateEnum.HPC_START.getTitle()); + req.setContent(MessageTemplateEnum.HPC_START.getContent(jobName,result)); + req.setTenantId(String.valueOf(tenanId)); + req.setUserId(String.valueOf(userId)); + log.info("hpc start push msg:{}", JSON.toJSONString(req)); + messageFeignClient.sendMessage(req); + } + private void initUserInfo(DelegateExecution execution) { // 获取当前流程实例参数 Long userId = (Long) execution.getVariable("userId"); diff --git a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java index a5d90865..5ae97c8f 100644 --- a/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java +++ b/pbs/src/main/java/com/sdm/pbs/service/impl/PbsServiceDecorator.java @@ -1,14 +1,18 @@ package com.sdm.pbs.service.impl; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import com.sdm.common.common.SdmResponse; import com.sdm.common.common.ThreadLocalContext; +import com.sdm.common.entity.enums.MessageTemplateEnum; import com.sdm.common.entity.req.flowable.AsyncCallbackRequest; +import com.sdm.common.entity.req.system.SendMsgReq; import com.sdm.common.entity.resp.PageDataResp; import com.sdm.common.entity.resp.pbs.hpc.FileNodeInfo; +import com.sdm.common.feign.impl.system.MessageFeignClientImpl; import com.sdm.common.feign.inter.flowable.IFlowableFeignClient; import com.sdm.common.log.CoreLogger; import com.sdm.common.utils.HpcCommandExcuteUtil; @@ -73,6 +77,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { @Autowired private ISimulationCommandPlaceholderService simulationCommandPlaceholderService; + @Autowired + private MessageFeignClientImpl messageFeignClient; + @Override public SdmResponse queryHpcResource() { return pbsService.queryHpcResource(); @@ -322,9 +329,9 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { public SdmResponse jobFileCallback(JobFileCallBackReq req) { CoreLogger.info("hpc jobFileCallback params:{}",JSONObject.toJSONString(req)); + SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getJobId, req.getJobId()).one(); if(!Objects.isNull(req)&&Objects.equals(req.getUploadResult(),"Y")) { // 回传成功 - SimulationJob newDbJob = simulationJobService.lambdaQuery().eq(SimulationJob::getJobId, req.getJobId()).one(); newDbJob.setFileStatus("finished"); newDbJob.setUpdateTime(LocalDateTime.now()); simulationJobService.updateById(newDbJob); @@ -332,11 +339,27 @@ public class PbsServiceDecorator implements IPbsServiceDecorator { AsyncCallbackRequest asyncCallbackRequest = new AsyncCallbackRequest(); asyncCallbackRequest.setAsyncTaskId(newDbJob.getJobId()); asyncCallbackRequest.setResultJson("finished"); - SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest); - CoreLogger.info("flowableFeignClient asyncCallback result:{}", JSONObject.toJSONString(sdmResponse)); +// SdmResponse sdmResponse = flowableFeignClient.asyncCallback(asyncCallbackRequest); + // 发送成功完成的消息 + sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"成功"); +// CoreLogger.info("flowableFeignClient asyncCallback result:{}", JSONObject.toJSONString(sdmResponse)); return SdmResponse.success(true); + }else{ + // 发送失败完成的消息 + sendMsg(newDbJob.getTenantId(),newDbJob.getCreatorId(),newDbJob.getJobName(),"失败"); } return SdmResponse.success(false); } + private void sendMsg(Long tenanId,Long userId,String jobName,String result){ + SendMsgReq req = new SendMsgReq(); + req.setTitle(MessageTemplateEnum.HPC_END.getTitle()); + req.setContent(MessageTemplateEnum.HPC_END.getContent(jobName,result)); + req.setTenantId(String.valueOf(tenanId)); + req.setUserId(String.valueOf(userId)); + CoreLogger.info("hpc finish push msg:{}", JSON.toJSONString(req)); + messageFeignClient.sendMessage(req); + } + + }