Files
2026-02-10 14:59:16 +08:00
..
2025-11-27 12:36:41 +08:00
2025-12-03 15:30:32 +08:00
2025-12-01 10:47:48 +08:00
2026-02-10 14:59:16 +08:00
2025-11-27 12:36:41 +08:00
2025-11-27 12:36:41 +08:00
2025-12-08 20:52:59 +08:00
2025-11-27 12:36:41 +08:00
2025-11-27 12:36:41 +08:00
2026-01-06 17:25:26 +08:00
2025-12-08 20:52:59 +08:00

这份 README 文档在原有的基础上进行了深度迭代,完整融合了“HPC 异步任务”与“本地应用 UserTask”两套编排逻辑,并详细阐述了最新的 ID 管理与状态聚合策略。


SDM Flowable Workflow Module

1. 模块简介

本模块 (com.sdm.flowable) 是系统的工业仿真流程编排引擎核心。基于 Flowable 7.0.1 构建,摒弃了传统的静态 BPMN 文件模式,采用 动态模型生成技术

它不仅支持长耗时 HPC 异步任务的“提交-等待-校验”闭环,还创新性地支持了本地应用 UserTask 的“前置异步编排”模式,实现了复杂业务场景下的状态强一致性与容错能力。


2. 核心架构与模型转换 (Dto2BpmnConverter)

核心类 Dto2BpmnConverter 负责将前端定义的业务流程DTO转换为可执行的 BPMN 模型。根据节点类型不同,采用两种不同的裂变策略:

2.1 策略 AHPC 任务后置分裂 (Sentinel Pattern)

针对 ServiceTask,若配置为异步回调(如 HPC 提交),逻辑节点物理分裂为 3 个连续节点

  1. Original Node (ServiceTask): 执行提交逻辑(如提交 HPC
  2. Wait Node (ReceiveTask): 命名为 _wait,挂起流程等待回调。
  3. Check Node (ServiceTask): 命名为 _check,哨兵校验。

链路: Original \rightarrow _wait \rightarrow _check

2.2 策略 B本地应用 UserTask 前置编排 (Pre-Orchestration)

针对 UserTask,若配置为 localApp 类型,需实现“先生成 ID 拉起应用,回调成功后,再让人工确认”的逻辑。系统在该节点前面插入 3 个辅助节点:

  1. Register Node (ServiceTask): 命名为 _register。绑定 LocalAppRegisterDelegate,负责生成全局唯一的 asyncTaskId 并注册到业务表,同时存入流程 Local 变量。
  2. Wait Node (ReceiveTask): 命名为 _wait。挂起流程,等待本地应用运行结束的回调。
  3. Check Node (ServiceTask): 命名为 _check。哨兵校验本地应用的回调结果(成功/失败)。
  4. Original Node (UserTask): 原始节点。只有当应用执行成功后,流程才会流转至此,等待用户点击“下一步”。

链路: _register \rightarrow _wait \rightarrow _check \rightarrow Original

2.3 人工介入控制 (Manual/Auto Switch)

针对普通 HPC 节点,handleWaitUserTask 方法默认在 ServiceTask 前插入 _waitUser (UserTask)。

  • Manual 模式: 流程停在 _waitUser,等待用户确认。
  • Auto 模式: 利用 SkipExpression 自动跳过 _waitUser

3. 异步任务、变量管理与状态聚合

3.1 变量管理与 ID 获取

在本地应用场景中,asyncTaskId 的流转至关重要:

  1. 生成与存储: _register 节点生成 ID通过 asyncTaskRecordService.registerAsyncTask 落库,同时 execution.setVariableLocal 存入流程。
  2. 前端获取: 前端轮询状态时,后端 直接查询业务表 (async_task_record) 获取最新的 asyncTaskId
    • 查询条件: processInstanceId + receiveTaskId + status=RUNNING + OrderByTimeDesc
    • 优势: 确保在“回退重试”场景下,永远获取到最新的业务 ID避免流程变量残留旧值的问题。

3.2 任务完成与强一致性 (completeAsyncTask)

复用同一个回调接口。为了防止“流程挂起回调却强推”,执行 三层校验

  1. 实例校验: 流程是否存在。
  2. 挂起校验: 若流程 SUSPENDED,仅更新数据库为 SUCCESS不触发流转(回调缓冲)。激活时通过 triggerPendingTasks 自动补偿。
  3. 节点校验: 确保流程停在 _wait
  4. 触发: 使用 runtimeService.trigger 唤醒流程。

3.3 状态聚合逻辑 (determineUnifiedState)

针对分裂后的多物理节点,后端计算出唯一的业务聚合状态返给前端:

场景 物理节点位置 聚合状态 前端展示/交互
HPC 运行中 Original / _wait / _check ACTIVE 显示“正在计算”
本地应用运行中 _register / _wait / _check ACTIVE 显示“拉起应用/运行中”
任务失败 _check (死信) ERROR 显示红色错误,允许重试
本地应用成功 Original (UserTask) WAITING_FOR_USER 显示绿色,按钮变为“下一步”
流程挂起 任意节点 SUSPENDED 锁死操作

4. 核心时序交互图

4.1 场景一HPC 异步任务(含挂起与补偿)

展示 HPC 任务从提交到回调的全过程,包含中途挂起导致的回调缓冲与激活后的自动补偿。

sequenceDiagram
    autonumber
    actor User as 用户/前端
    participant Ctrl as ProcessController
    participant Service as ProcessService
    participant Engine as Flowable Engine
    participant Delegate as UniversalDelegate
    participant DB as DB (AsyncRecord)
    participant Sentinel as AsyncResultCheckDelegate
    participant External as 外部系统(HPC)

    %% ==========================================
    %% 阶段一:异步提交
    %% ==========================================
    rect rgb(240, 255, 240)
        note right of User: == 阶段一HPC 异步提交 ==
        note right of Engine: 流转至 Original Node (ServiceTask)
        Engine->>Delegate: execute()
        Delegate->>External: 提交HPC任务
        Delegate->>DB: 插入记录 (Status=RUNNING)
        note right of Engine: 流转至 _wait Node (ReceiveTask)
        Engine->>Engine: 流程挂起 (Wait State)
    end

    %% ==========================================
    %% 阶段二:挂起与回调缓冲
    %% ==========================================
    rect rgb(255, 240, 240)
        note right of User: == 阶段二:挂起与回调缓冲 ==
        User->>Ctrl: suspendProcessInstance()
        Ctrl->>Engine: runtimeService.suspend()
        note right of Engine: 流程变为 SUSPENDED
        
        External->>Ctrl: asyncCallback (任务完成)
        Ctrl->>Service: asyncCallback()
        Service->>DB: 查询状态
        
        alt 流程已挂起
            DB->>DB: 仅更新 Status=SUCCESS<br/>(不触发 trigger)
            note right of DB: 回调被缓冲,流程停在 _wait
        end
    end

    %% ==========================================
    %% 阶段三:激活与补偿
    %% ==========================================
    rect rgb(240, 240, 255)
        note right of User: == 阶段三:激活与补偿 ==
        User->>Ctrl: activateProcessInstance()
        Ctrl->>Engine: runtimeService.activate()
        
        Ctrl->>Service: triggerPendingTasks() <br/>(激活后的自动动作)
        Service->>DB: 查询缓冲的成功记录
        Service->>Engine: runtimeService.trigger() <br/>(补偿触发)
    end

    %% ==========================================
    %% 阶段四:哨兵校验
    %% ==========================================
    rect rgb(255, 255, 240)
        note right of User: == 阶段四:哨兵校验 ==
        note right of Engine: 流转至 _check Node
        Engine->>Sentinel: execute()
        
        alt STATUS == FAIL
            Sentinel-->>Engine: 抛出异常 -> DeadLetterJob
            note right of Engine: 节点变红 (ERROR)
        end
    end

4.2 场景二:本地应用 UserTask交互闭环

展示本地应用从注册 ID、拉起应用、回调校验到最终人工确认的完整闭环。

sequenceDiagram
    autonumber
    actor User as 用户
    participant FE as 前端/插件
    participant Ctrl as ProcessController
    participant Engine as Flowable Engine
    participant Reg as LocalAppRegisterDelegate
    participant DB as DB (AsyncRecord)
    participant Sentinel as AsyncResultCheckDelegate

    %% ==========================================
    %% 步骤 1: 自动注册与挂起
    %% ==========================================
    rect rgb(230, 245, 255)
        note right of User: == 1. 自动注册与挂起 ==
        Engine->>Reg: 进入 _register 节点
        Reg->>DB: 生成 asyncTaskId, 存库 (RUNNING)
        Reg->>Engine: setVariableLocal(asyncTaskId)
        Engine->>Engine: 流转至 _wait, 流程挂起
    end

    %% ==========================================
    %% 步骤 2: 前端获取 ID 并拉起
    %% ==========================================
    rect rgb(255, 250, 230)
        note right of User: == 2. 拉起应用 ==
        FE->>Ctrl: getProcessDetail()
        Ctrl->>DB: 查询 _wait 节点最新的 RUNNING 记录
        Ctrl-->>FE: 返回 asyncTaskId, 状态 ACTIVE
        
        User->>FE: 点击"拉起应用"
        FE->>User: 唤起本地 EXE (传入 asyncTaskId)
    end

    %% ==========================================
    %% 步骤 3: 异步回调与校验
    %% ==========================================
    rect rgb(240, 255, 240)
        note right of User: == 3. 回调与校验 ==
        User->>FE: 应用运行结束
        FE->>Ctrl: /asyncCallback (asyncTaskId, Code=0)
        Ctrl->>DB: 更新 SUCCESS
        Ctrl->>Engine: runtimeService.trigger()
        
        Engine->>Sentinel: 进入 _check 校验
        Sentinel->>Engine: 校验通过
    end

    %% ==========================================
    %% 步骤 4: 人工确认
    %% ==========================================
    rect rgb(240, 240, 240)
        note right of User: == 4. 人工确认 ==
        Engine->>Engine: 流转至 Original UserTask
        
        FE->>Ctrl: getProcessDetail()
        Ctrl-->>FE: 状态 WAITING_FOR_USER
        
        User->>FE: 点击"下一步"
        FE->>Ctrl: /continueServiceTask
        Ctrl->>Engine: taskService.complete()
    end

5. 附加功能交互流程

5.1 原地重试 (Retry)

当哨兵校验失败(_check 变红)时,用户修复问题后可点击重试。主要用于 HPC 任务数据校验 场景。

  • 操作: /retryFailedNode
  • 原理: 将 DeadLetterJob 移回 ExecutableJob,重新触发校验逻辑。

5.2 回退跳转 (Rewind/Jump)

适用于 本地应用失败参数填写错误 场景。

  • 操作: /retryToNode(targetNodeId)
  • 原理:
    1. 用户点击“重新执行”。
    2. 后端将流程指针强行跳转回 _register(针对本地应用)或任意前置节点。
    3. _register 重新执行,生成全新的 asyncTaskId
    4. 流程重新挂起,前端获取新 ID允许用户再次拉起应用。

6. ProcessController 接口能力清单

方法 描述 逻辑细节
deploy 流程部署 DTO -> BPMN 转换并部署
saveParamsByDefinitionId 保存参数 保存用户输入的节点参数,作为运行模板
startByProcessDefinitionId 启动流程 根据定义ID启动实例
suspendProcessInstance 挂起实例 校验实例状态,挂起后阻止任务执行和回调触发
activateProcessInstance 激活实例 激活流程,并立即调用 triggerPendingTasks 补偿触发积压的回调
cancelProcessInstance 取消实例 终止运行中的流程
getProcessAndNodeDetailByInstanceId 状态查询 获取聚合状态;若为本地应用且 Active会查询业务表返回 asyncTaskId
previewNodeInputFiles 文件预览 扫描 MinIO 或 本地磁盘,返回文件列表
continueServiceTask 继续执行 完成 _waitUserOriginal UserTask,推动流程
asyncCallback 异步回调 通用回调接口,支持 HPC 和 本地应用
retryFailedNode 原地重试 恢复死信作业
retryToNode 回退重试 携带新参数将流程跳转至任意指定节点支持生成新业务ID

好的,我将上述四个核心 Q&A 进行提炼和简化,保持专业性并统一格式,作为文档的 第 7 章节 补充在最后。


7. 核心机制与开发 Q&A

Q1: 链路 _waitUser -> Original -> _wait -> _check 中,任意节点都可以挂起吗?

A: 是的,可以在任意节点挂起,但底层表现不同。

所有节点在 Flowable 数据库中都有对应的持久化状态Task、Job 或 Execution挂起操作Suspension是针对流程实例级的会冻结所有当前活跃的节点。

节点 类型 驱动方 挂起后的表现
_waitUser UserTask 人工 用户点击“完成”时报错(拒绝操作)。
Original ServiceTask (Async) 引擎 Job 停留在数据库AsyncExecutor 会跳过执行,直到流程激活。
_wait ReceiveTask 外部 外部回调会被业务代码拦截并缓冲到 DB不触发流程流转
_check ServiceTask (Async) 引擎 Original,校验逻辑暂停执行。

Q2: 为什么只有 _wait (ReceiveTask) 需要代码补偿 (triggerPendingTasks)

A: 因为只有它是“事件驱动”且请求不可重放的。

  • _wait (ReceiveTask): 依赖外部系统的一次性回调。如果挂起期间回调到达Flowable 引擎会拒绝触发。为了不丢失这次回调,业务层必须先存库。流程激活后,引擎不知道发生过回调,必须由代码主动查询数据库并补发 trigger
  • Original / _check: 依赖引擎的轮询扫描。流程激活后AsyncExecutor 下次扫描时会自动发现并执行这些 Job无需人工干预。
  • _waitUser: 依赖人工重试。流程挂起导致操作失败后,用户只需在界面上再次点击按钮即可。

Q3: runtimeService.trigger 是专门用来触发 ReceiveTask 的吗?

A: 在本系统的上下文中,是的。

请严格区分以下 API 的用途:

  1. runtimeService.trigger(executionId): 专属用于 ReceiveTask (_wait 节点)。告诉引擎结束等待,继续流转。
  2. taskService.complete(taskId): 专属用于 UserTask (_waitUser 节点)。完成人工任务。
  3. managementService.moveDeadLetterJob...: 专属用于 死信 Job (_check 失败后)。恢复异常任务。

注意: 绝不能混用 triggercomplete


Q4: Flowable 变量有哪几种?如何正确设置和获取?

A: 主要分为“全局变量”和“本地变量”,本系统强烈建议使用本地变量以保证隔离性。

1. 变量类型对比

类型 作用域 (Scope) 特点 适用场景
Global Process Instance 全流程可见,同名覆盖 业务主键 (runId)、发起人信息
Local Execution / Task 仅当前节点可见,数据隔离 异步任务 ID、并行分支数据、临时状态

2. 代码操作指南

设置变量 (Write):

// [推荐] 使用 Local 防止并发分支数据污染
// 在 JavaDelegate 中
execution.setVariableLocal("asyncTaskId", id);
// 在 Service 中
runtimeService.setVariableLocal(executionId, "status", "SUCCESS");

获取变量 (Read):

// [推荐] 使用 getVariable (自动冒泡查找)
// 优先找 Local找不到找 Global。这样兼容性最好。
String val = (String) execution.getVariable("asyncTaskId");

最佳实践: 在 _register 节点存入 asyncTaskId 时使用 setVariableLocal。前端查询展示时,建议直接查询业务表 (async_task_record) 而非查询流程变量,以确保在回退/重试场景下获取到最新的 ID。