conductor记忆方法 conductor( 二 )

  • Index: 索引数据库 , 用于存储执行历史;
  • 四 运行模型1 Task状态转移
    • SCHEDULED:待调度 , task放到队列中还没有被poll出来执行时的状态
    • IN_PROGRESS:执行中 , 被poll出来执行但还没有完成时的状态
    • COMPLETED:执行完成
    • FAILED:执行失败
    • CANCELLED:被中止时为此状态 , 一般出现在两种情况:手动中止流程时 , 正在运行中的task会被置为此状态;多个fork分支 , 当某个分支的task失败时 , 其它分支中正在运行的task会被置为此状态;

    conductor记忆方法 conductor

    文章插图
    2 任务队列
    任务的执行(同步的系统任务除外)都会先添加到任务队列中 , 是典型的生产者消费者模式 。
    • 任务队列 , 是一个带有延迟、优先级功能的队列;
    • 每种类型的Task是一个单独的队列 , 此外 , 如果配置了domain、isolationGroup , 还会拆分成多个队列实现执行隔离;
    • decider service是生产者 , 其根据流程配置与当前执行情况 , 解析出可执行的task后 , 添加到队列;
    • 任务执行器(SystemTaskWorker、Worker)是消费者 , 其长轮询对应的队列 , 从队列中获取任务执行;
    队列接口可插拔 , conductor提供了Dynomite 、MySQL、PostgreSQL的实现 。
    3 核心功能实现机制
    conductor调度的核心是decider service , 其根据当前流程运行的状态 , 解析出将要执行的任务列表 , 将任务入队交给worker执行 。
    decide主要流程简化如下 , 详细代码见WorkflowExecutor.java的decide方法:
    conductor记忆方法 conductor

    文章插图
    其中 , 调度任务处理流程简化如下 , 详细代码见WorkflowExecutor.java的scheduleTask方法:
    conductor记忆方法 conductor

    文章插图
    decide的触发时机
    最主要的触发时机:
    1. 新启动执行时 , 会触发decide操作
    2. 系统任务执行完成时 , 会触发decide操作
    3. Workder任务通过ExecutionService更新任务状态时 , 会触发decide操作
    流程控制节点的实现机制
    1)Task & TaskMapper
    对于每一个Task来说 , 都有Task和TaskMapper两部分:
    1. Task:任务的执行逻辑代码 , 它的作用是Task的执行
    2. TaskMapper:任务的映射逻辑代码 , 它通过Task的定义配置、当前实例的执行状态等信息 , 返回实际需要执行的Task列表
    对于一般的任务来说 , TaskMapper返回的是就是Task本身 , 补充一些执行实例的状态信息 。但是对于控制节点来说 , 会有不同的逻辑 。
    2)条件分支(SWITCH)的实现机制
    SWITCH用于根据条件判断 , 执行不同的分支 。
    实际上 , 该节点的Task不做任何操作 , TaskMapper根据分支条件 , 判断出要走的分之后 , 返回对应分支的第一个Task 。
    SwitchTaskMapper.java getMappedTasks方法关键代码:
    // 待调度的Task list , 最终返回结果List<Task> tasksToBeScheduled = new LinkedList<>();// evalResult是分支条件变量的值(case)// decisionCases是一个Map结构 , key为分支的case值 , value为对应分支的任务定义list(分支内的任务定义会有多个)// 根据分支变量的实际值 , 获取对应分支的任务定义listList<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);// default的逻辑:如果获取不到对应的分支或者分支为空 , 则用默认的分支if (selectedTasks == null || selectedTasks.isEmpty()) {selectedTasks = taskToSchedule.getDefaultCase();}if (selectedTasks != null && !selectedTasks.isEmpty()) {// 获取分支的第一个(下标0)task , 返回给decider service去做调度(decider会把任务添加到队列里 , 交给worker去执行)WorkflowTask selectedTask = selectedTasks.get(0);// 调用了deciderService的getTasksToBeScheduled方法 , 此方法里又获取到TaskMapper调用了getMappedTasks 。这里采用了递归调用的方式 , 解析嵌套的TaskList<Task> caseTasks = taskMapperContext.getDeciderService().getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());tasksToBeScheduled.addAll(caseTasks);switchTask.getInputData().put("hasChildren", "true");}return tasksToBeScheduled;

    秒懂生活扩展阅读