- SCHEDULED:待调度 , task放到队列中还没有被poll出来执行时的状态
- IN_PROGRESS:执行中 , 被poll出来执行但还没有完成时的状态
- COMPLETED:执行完成
- FAILED:执行失败
- CANCELLED:被中止时为此状态 , 一般出现在两种情况:手动中止流程时 , 正在运行中的task会被置为此状态;多个fork分支 , 当某个分支的task失败时 , 其它分支中正在运行的task会被置为此状态;
文章插图
2 任务队列
任务的执行(同步的系统任务除外)都会先添加到任务队列中 , 是典型的生产者消费者模式 。
- 任务队列 , 是一个带有延迟、优先级功能的队列;
- 每种类型的Task是一个单独的队列 , 此外 , 如果配置了domain、isolationGroup , 还会拆分成多个队列实现执行隔离;
- decider service是生产者 , 其根据流程配置与当前执行情况 , 解析出可执行的task后 , 添加到队列;
- 任务执行器(SystemTaskWorker、Worker)是消费者 , 其长轮询对应的队列 , 从队列中获取任务执行;
3 核心功能实现机制
conductor调度的核心是decider service , 其根据当前流程运行的状态 , 解析出将要执行的任务列表 , 将任务入队交给worker执行 。
decide主要流程简化如下 , 详细代码见WorkflowExecutor.java的decide方法:
文章插图
其中 , 调度任务处理流程简化如下 , 详细代码见WorkflowExecutor.java的scheduleTask方法:
文章插图
decide的触发时机
最主要的触发时机:
- 新启动执行时 , 会触发decide操作
- 系统任务执行完成时 , 会触发decide操作
- Workder任务通过ExecutionService更新任务状态时 , 会触发decide操作
1)Task & TaskMapper
对于每一个Task来说 , 都有Task和TaskMapper两部分:
- Task:任务的执行逻辑代码 , 它的作用是Task的执行
- TaskMapper:任务的映射逻辑代码 , 它通过Task的定义配置、当前实例的执行状态等信息 , 返回实际需要执行的Task列表
2)条件分支(SWITCH)的实现机制
SWITCH用于根据条件判断 , 执行不同的分支 。
实际上 , 该节点的Task不做任何操作 , TaskMapper根据分支条件 , 判断出要走的分之后 , 返回对应分支的第一个Task 。
SwitchTaskMapper.java getMappedTasks方法关键代码:
// 待调度的Task list , 最终返回结果List<Task> tasksToBeScheduled = new LinkedList<>();// evalResult是分支条件变量的值(case)// decisionCases是一个Map结构 , key为分支的case值 , value为对应分支的任务定义list(分支内的任务定义会有多个)// 根据分支变量的实际值 , 获取对应分支的任务定义listList<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);// default的逻辑:如果获取不到对应的分支或者分支为空 , 则用默认的分支if (selectedTasks == null || selectedTasks.isEmpty()) {selectedTasks = taskToSchedule.getDefaultCase();}if (selectedTasks != null && !selectedTasks.isEmpty()) {// 获取分支的第一个(下标0)task , 返回给decider service去做调度(decider会把任务添加到队列里 , 交给worker去执行)WorkflowTask selectedTask = selectedTasks.get(0);// 调用了deciderService的getTasksToBeScheduled方法 , 此方法里又获取到TaskMapper调用了getMappedTasks 。这里采用了递归调用的方式 , 解析嵌套的TaskList<Task> caseTasks = taskMapperContext.getDeciderService().getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());tasksToBeScheduled.addAll(caseTasks);switchTask.getInputData().put("hasChildren", "true");}return tasksToBeScheduled;
秒懂生活扩展阅读
- 怎么关iebrowser
- 新鲜野菜怎样长期储存 新鲜野菜如何长期储存的方法
- 如何将qq录音转发给另外一个人
- 淘宝直通车关键词质量分怎么提升?提升方法介绍
- 民国大洋怎么看真假
- 维生素c的作用及养颜方法
- 怎么找出租房
- 冷熟饺子怎么吃
- 绿萝的叶子向四面塌下怎么办
- 用草钓草鱼的挂钩方法是什么