3)并行(FORK)的实现机制
FORK用于开启多个并行分支 。
实际上 , 该节点的Task不做任何操作 , TaskMapper返回所有并行分支的第一个Task 。
ForkJoinTaskMapper.java getMappedTasks关键代码:
// 待调度的Task list , 最终返回结果List<Task> tasksToBeScheduled = new LinkedList<>();// 配置中的所有fork分支List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();for (List<WorkflowTask> wfts : forkTasks) {// 每个分支取第一个TaskWorkflowTask wft = wfts.get(0);// 调用了deciderService的getTasksToBeScheduled方法 , 此方法里又获取到TaskMapper调用了getMappedTasks 。这里采用了递归调用的方式 , 解析嵌套的TaskList<Task> tasks2 = taskMapperContext.getDeciderService().getTasksToBeScheduled(workflowInstance, wft, retryCount);tasksToBeScheduled.addAll(tasks2);}return tasksToBeScheduled;
总的来说 , 分支(SWITCH)、并行(FORK)节点本身没有执行逻辑 , 其通过TaskMapper返回到实际要执行的Task , 然后交给Decider Service处理 。
重试的实现机制
重试和其延迟时间设置 , 都是借助任务队列的功能实现的 。
重试:将任务重新添加到任务队列
重试的延迟时间:添加到任务队列时设置延迟时间 , 延迟时间过后 , 任务才能在队列中被poll出来执行
五 完整性保障机制由于调度过程中可能会出现因机器重启、网络异常、JVM崩溃等偶发情况 , 这些会导致的decide过程意外终止 , 流程执行不完整 , 展现出如流程一直运行中(实际已经没有在调度) , 或者其它状态错误等异常现象 。
1 WorkflowReconciler
针对这种情况 , conductor有一个WorkflowReconciler , 会定期尝试decide所有正在运行中的流程 , 修复流程执行的一致性 。此外 , 它还有一个作用是校验流程超时时间 。
2 decideQueue
那么WorkflowReconciler是如何获取到当前运行中的流程呢 , 答案是decideQueue 。
decideQueue和任务队列相同 , 也是一个具有延迟功能的队列 , 其存放的是正在执行中的流程的实例id 。在任务开始执行时(包括新启动执行、重试执行、恢复执行、重跑执行等) , 会将实例id push到decideQueue中;在执行结束(成功、失败)时 , 会从decideQueue中删除实例id 。
3 ExecutionLockService
WorkflowReconciler会定期尝试decide所有正在运行中的流程用于超时判断、维护流程一致性 。但是流程本身正常执行也会触发decide , 如果同一个执行同时触发两个decide , 可能会导致状态混乱 , 执行卡住等问题 。
conductor采用了锁来解决这个问题 , 其提供了单机LocalOnlyLock(基于信号量实现)、redis分布式锁(基于redission实现)、zookeeper分布式锁三种实现 。
decide方法中最开始会尝试获取锁 , 如果获取失败则直接返回 。通过锁来保障不会对同一个流程实例并发执行decide 。
if (!executionLockService.acquireLock(workflowId)) {return false;}
由于锁是可配置的 , 可能会导致一个误区:单台机器的话不用配置锁 。其实单机也是需要配置锁的 , 因为WorkflowReconciler和流程正常执行会产生冲突 , 可能会导致偶发的流程状态混乱问题 。
参考:
Github: https://github.com/Netflix/conductor
官方文档:https://netflix.github.io/conductor/
WorkflowReconciler:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java
WorkflowSystemTask:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java?spm=ata.21736010.0.0.2b501a3cYnrSfT&file=WorkflowSystemTask.java
作者 | 夜阳
原文链接:https://developer.aliyun.com/article/818136?utm_content=g_1000311143
【conductor记忆方法 conductor】本文为阿里云原创内容 , 未经允许不得转载 。
秒懂生活扩展阅读
- 怎么关iebrowser
- 新鲜野菜怎样长期储存 新鲜野菜如何长期储存的方法
- 如何将qq录音转发给另外一个人
- 淘宝直通车关键词质量分怎么提升?提升方法介绍
- 民国大洋怎么看真假
- 维生素c的作用及养颜方法
- 怎么找出租房
- 冷熟饺子怎么吃
- 绿萝的叶子向四面塌下怎么办
- 用草钓草鱼的挂钩方法是什么