#mongo-db #task #worker #schedule #running

mscheduler

Use mongodb to schedule task running

4 releases

0.2.0 Dec 5, 2023
0.1.2 Jul 24, 2023
0.1.1 Jul 24, 2023
0.1.0 Jul 23, 2023

#530 in Concurrency

Download history 34/week @ 2024-02-12 6/week @ 2024-02-26 67/week @ 2024-04-01

67 downloads per month

MIT/Apache

57KB
968 lines

MIT licensed Build Status

功能点

  1. 任务的发布和执行
  2. 多个worker同时运行任务
  3. 任务失败后重试
  4. worker ping超时后其他worker可以抢占任务

核心模型

Task

  1. TaskState
    1. 整体任务状态参数
    2. Vec<TaskWorkerState>
  2. TaskOption
  3. <T>参数

TaskConsumer

  1. Arc<Func> 因为需要spawn到其他线程内执行

任务中worker state的状态转换

  1. INIT 刚发送
  2. RUNNING 被worker占领并执行中
  3. SUCCESS worker执行成功
  4. FAIL worker执行失败

功能设计

任务发送

不变

任务消费

如何发现任务

  1. 启动时主动查询一批任务
  2. 通过change stream 订阅一批发生变化的任务
  3. 任务列表空时主动查询一批任务

查询什么任务

  1. INIT=》需要抢占
  2. RUNNING=》等待超时后抢占
  3. 其他worker执行失败,但是符合抢占条件=》走复杂判断逻辑

按什么优先级查询任务

目前没有特殊要求, 按任意key顺序查询

如何判断任务是否可抢占

  1. 任务参数限制不可执行
    1. specific_worker_ids限制不可执行
  2. 其他worker状态限制
    1. ping_expire_time没到, 不可抢占
    2. 其他worker info占据了并发执行数量
      1. max_unexpected_retries超出限制, 不执行. 约束: max_unexpected_retries<failed worker info cnt
      2. concurrent_worker_cnt限制并行执行数量. 约束: concurrent_worker_cnt<running worker cnt && concurrent_worker_cnt<success worker info cnt

抢占的流程

  1. 启动前使用change stream监听. 启动后查询一批任务, 查询条件: 任务参数允许worker执行 && 任务没有完全成功或失败(根据info cnt数量确定).
  2. 查询后读取worker info状态, 用于判断抢占时间点
  3. 遍历这些任务, 计算任务可抢占的时间点. 设置定时任务在这些时间点进行抢占. PriorityQueue实现, 有最大长度限制.
  4. 如果可抢占任务为空, 那么重新查询一批任务直到没有数据返回. 之后完全可以通过change stream监听

执行的流程

  1. 通过tokio::spawn隔离报错
  2. 按option里的设置持续ping
  3. 任务失败/成功后更新任务状态

任务失败的报错 没有特殊需求, 保持原样

模块划分

  1. stream监听. 始终存在
  2. 批量读取任务. 读取完成后可退出, 直到stream监听报错为止(存在时间gap, 无法判断是否存在新更新的任务). 所以这个由stream监听线程启动
  3. 任务抢占. 监听和批量读取后通过该线程维护PriorityQueue, 并执行任务抢占
  4. 任务执行. 任务抢占后spawn任务并执行. 执行过程中维护执行handler的状态. 任务完成后负责更新任务状态

测试用例

  1. 一个worker, 持续运行,
Feature Bull Agenda
Backend redis mongo
Priorities
Concurrency
Delayed jobs
Global events
Rate Limiter
Pause/Resume
Sandboxed worker
Repeatable jobs
Atomic ops ~
Persistence
UI
REST API
Central (Scalable) Queue
Supports long running jobs
Optimized for Jobs / Messages Jobs
Gracefuly stop
Remote stop
Multiple worker

流程

  1. 发布任务
    1. 幂等

worker状态

  1. 执行中 worker占用任务成功后立即进入执行中状态, 并设置下次刷新时间 如果距离上次刷新时间超过worker_timeout_ms, 那么认为是超时状态
  2. 失败 worker主动设置执行状态为fail. 如果任务失败时worker不在列表中, 状态设置会失败
  3. 成功 worker主动设置执行状态为success. 如果任务成功时worker不在列表中, 状态设置会失败
  4. 超时 根据worker_timeout_ms推导得到的状态

任务状态

任务状态由当前worker列表决定

  1. 未启动 列表为空
  2. 执行中 列表中存在执行中worker
  3. 成功 列表中只有success的worker
  4. 失败 列表中只有fail的worker

提供功能

  1. 发布任务(幂等) 查找key对应的没有运行中的任务 没有找到=>setOnInsert 找到了=>set nothing, 按配置决定是否清除失败/成功的worker
  2. 占用任务(多worker) 查找条件: 判断是否被指定worker_id, 任务状态不是成功, 任务没有被当前worker处理过(不能在worker列表中), 可以接受更多的worker 排序条件: priority最高, 优先被指定worker_id的 没有找到=>不做操作 找到了=>增加自己的worker对象, 并且过滤worker列表(清除超时的运行中worker)
  3. 维持任务 查找条件: key相同, worker id相同, 找到了=> 更新对应的超时时间 没找到=> 结束当前任务
  4. 任务执行成功 尝试更新任务状态为成功
  5. 任务执行失败 主动返回任务失败, 尝试更新任务状态为失败
  6. 任务执行异常 按option中设置重试, 并更新重试次数, 如果重试次数超过限制那么更新为任务失败. 重试过程只在本地发生, 所有重试次数失败后更新为失败状态

配置更新后的影响

  1. specific_worker_ids变动 如果更新后不允许当前worker执行, 那么结束任务
  2. ping_interval_ms变动 下次ping时生效
  3. 其他参数变动 不影响正在执行中的任务, 变动需要在重新占用任务时体现

具体实现

发布任务

直接发送就行

消费任务

核心问题: 下一次什么时候去占用任务

  1. 启动时计算下一次时间next_try_time
  2. 使用change_stream实时更新next_try_time

TODO

-[ ] clean_success 暂不实现, maintenance -[ ] clean_failed 暂不实现, maintenance -[ ] detect compatibility of collection data -[x] auto worker id -[ ] 错误处理

Dependencies

~26–61MB
~1M SLoC