作者|张宇轩,何做好分章逸
,布式曾丹 无论是任务互联网应用或者企业级应用
,都充斥着大量的调度任务 。我们常常需要一些任务调度系统帮助我们解决问题 。探索随着微服务化架构的何做好分逐步演进
,单体架构逐渐演变为分布式、布式微服务架构。任务在此的调度背景下,很多原先的探索单点式任务调度平台已经不能满足业务系统的需求
。于是建站模板何做好分出现了一些基于分布式的任务调度平台。 Scheduler 是布式飞书内的分布式任务调度平台。分布式任务调度能力主要包括: 名词解释: Processor: 编程处理器, 拥有一定的编程规范, 用户自定义实现 。 Executor: 一个 SDK,运行 Processor 的进行容器,与 Scheduler 通信的载体
。 Job:用户创建的任务,其中包含任务的高防服务器调度规则、调度模型、执行器名称等信息。 Instance:运行态的Job ,每当Job触发后会生成一个Instance,记录本次执行的调度信息。 Task :最小执行单元,不同调度模型的任务产生的Task数量不同 。 通过架构图可以发现 ,Scheduler主要有以下三个部分: 因此,我们可以用一句话解释清楚 Scheduler 所做的事情,即:在「指定时间」「通知执行器」以「指定方式」执行任务 这句话中包含了三个关键点,也分别代表着 Scheduler 的香港云服务器三个核心模块: 在一个Job的调度周期中,各个模块各司其职,整个流程如下: 拥有这三个核心模块后,Scheduler 已具备了成熟的任务调度功能。另外
,为了增加 Scheduler 的稳定性 ,有额外两个模块为其保驾护航: 本篇文章不对 Scheduler 所支持的定时任务能力作赘述
,而是从三个方面(易用性
、多功能性、稳定性)介绍 Scheduler 对于分布式任务调度的思考和探索: 以字节跳动内部为例,当前团队想要实现一个定时任务有多种方式 :接入字节云的 cronjob 平台、自己实现一套定时任务框架或者接入第三方定时任务框架。 对于第一种接入 cronjob 平台,每一种定时任务都需要注册各自的 psm 和运行时环境(镜像),当任务需要访问依赖资源如 redis/db 等时
,需要各自添加授权。任务代码逻辑有变化时也需要各自升级,导致开发、管理起来较为复杂。 对于第二种自己实现一套定时任务框架,不仅整体开发时间较长
,且需要大量时间进行测试回归来保证框架的稳定性。如果项目内使用到的定时任务较多 ,那么自身研发一套框架用途也较广泛;若项目中使用到的定时任务较少,则 ROI 较低,很多时候也只是为了造轮子而造轮子。 因此,大多数项目面对增加定时任务的需求时 ,都会寻求直接接入第三方成熟的定时任务框架。对于他们来说
,是否易于接入、与现有代码联系是否紧密
、调试是否方便是很重要的选取指标
。 基于这种背景,Scheduler 在设计时就站在了接入方的角度,思考了如何让接入方能够在最短时间内以最低成本接入 Scheduler,实现自己的定时任务。 站在接入方角度,对定时任务框架进行选型时最关注的几个点无非是定时任务执行准确性、最高支持 qps 、定时设置多样性、接入成本这几个
。对于前两个指标,Scheduler 目前接入业务方 50+,日均调度任务 20w+ 次
,与公司内其他第三方定时任务框架相比也较有竞争性,同时对于后两个关注点,Scheduler 也有自己的风格 。 一般的定时任务框架只支持 crontab 表达式 ,例如 0 1 * * * ,代表每天凌晨一点执行一次 。cronTab 功能强大,但是若配置复杂的定时策略
,有一定学习成本,且可读性不高 。因此,鉴于这种情况,Scheduler 在 crontab 之上设计了更易读更强大的定时策略,做到所见即所得 。 因此 ,该设置所代表的定时间隔为:每两月的3 、5、23号触发一次 触发时间(北京时间) 2022-03-23 18:00:00 2022-05-03 18:00:00 2022-05-05 18:00:00 2022-05-23 18:00:00 可能有同学注意到 ,Scheduler 对于重复级别的支持十分丰富
,不仅可以按照普通的年 、月、日等级别进行设置,还可以按照工作日进行重复调度(例如每两个工作日执行一次) ,这归因在 Scheduler 孵化于字节跳动内部企业服务系统,为诸如人事系统、权限系统等 ToB 服务提供定时任务能力
。往往 ToB 客户的需求复杂多变 ,因此,需要提前具备更多能力,才能更好地服务好 ToB 客户。 Scheduler 在调研接入方需求时,得到了有些客户对于定时提醒这类任务的需求是尽量不要在「非工作日」打扰
。于是,Scheduler 决定增加工作日调度选项来适配客户潜在需求 ,也侧面说明了 Scheduler 为了让接入方更快更小成本接入做出的努力。 相信「开箱即用」对于人们在采买诸如家电
、数码产品时,是十分重要的一个考核指标。而对于对外提供的服务 or 框架 ,亦是如此。Scheduler 的目标就是让接入方能够在短时间熟悉 Scheduler 、编写测试代码以及上线定时任务 。 如果想实现一个定时任务,接入方只需要三步
:引入 Scheduler sdk,绑定相应 processor,在 process 接口中实现具体业务逻辑。同时
,由于定时任务的实现位于原代码中,启动配置无需更改,本地测试也较为便捷 。同时,在字节跳动环境下,无需新增 psm
、授权配置等,尽可能做到了「开箱即用」
。 复制import ( "context" "code.byted.org/apaas/scheduler_sdk/executor" // 引入 sdk) func main() { executorSvc, err := executor.NewExecutor(executor.NewDefaultExecutorConfig(), &HelloWorld{ }) // 绑定 processor if err != nil { panic(err) } if err = executorSvc.Run(); err != nil { panic(err) }} type HelloWorld struct { ProcessorApiName string} func (h *HelloWorld) GetApiName() string { return h.ProcessorApiName} 没有程序员想主动写出 bug
,但问题总是会突然出现。如何在出现问题时快速运维、快速止损 ,是所有工程师都追求的目标。Scheduler 在这方面做了几种尝试: 用户可以在创建 job 时,可以选择配置报警机器人,并把 Scheduler 机器人拉入对应报警群组 。当检测到对应 job 出现问题时
,Scheduler 机器人会把相应报警推送到对应群组,做到实时响应。 目前 task 的相关状态如下,当一个 task 长期没有到终态时
,根据状态码即可知 task 目前处于什么状态
,从而推断是哪一步骤出了问题。 状态码 状态 100 等待触发 101 Ready 就绪态
,等待推送 201 推送到 Executor,还未实际执行(任务太多排队) 202 执行中 203 执行超时 ,逻辑复杂导致 301 执行成功 302 执行失败 401 Ready 超时 ,没有 Executor 拉取 402 推送到 Executor 后长期未执行 403 执行超时,Executor 宕机导致 并且一些 Scheduler 常见的报错也做了封装 ,帮助快速定位问题,例如 错误码 错误原因 k_sc_ec_000004 找不到任务{ { .jobApiName}} k_sc_ec_100004 找不到任务实例{ { .instanceID}} k_sc_ec_300001 Processor Name 未注册{ { .content}} k_sc_ec_300006 processor({ { .content}}) 找不到对应 executor k_sc_ec_400002 找不到Executor { { .content}} k_sc_ec_400005 无权限操作 一个成熟的项目中避免不了大型批量任务
,比如通过 Excel
、csv 或其他数据源批量创建或更新数据
,批量任务一般数据量很大
,如果按照单实例串行执行 ,那么不能充分利用计算机资源且一次运行会消费大量时间,用户体验不友好。 以 Kunlun 举例, 旧阶段的批量任务依赖于消息队列、Redis 实现
,总体分为三大部分: 使用消息队列、redis的定时任务可以提速和优化用户体验 ,但有以下不足: 基于这种背景,Scheduler 丰富了原本的任务调度能力 ,补充了分片能力,以满足复杂繁琐的任务分片处理的需求
。 若打算做出一套贴合业务需求的分片任务框架 ,需要先了解现阶段的分片任务的实现步骤。 现阶段的分片任务大致可以抽象成3个步骤: Scheduler 要做的事情则是替换其中分片 、消息队列、Redis 的功能,做出以下抽象: 执行器需要实现ShardingProcessor接口以供调度器进行调度 。调度过程如下: Scheduler 支持分片任务重点在于丰富调度模型,提升调度器调度能力,完善执行器执行能力来达到支持分片任务的目的 调度侧需要根据任务进度依次生成 PreTask
、ProcessTask 、NotifyTask
、PostTask 来调度执行器 ShardingProcessor 中的 PreProcess - Process - Notify - PostProcess 四个方法。单机调度 PreProcess ,PostProcess,Notify ,并行调度 PostProcess ,总体调度呈现总-分-总的形式
。调度过程如下
: 数据拆分即任务分片
,指的是将单一任务按照特定的逻辑切分为多个独立的子任务,将其分派到不同的节点执行,以提高任务的执行效率
。 而 Scheduler 要处理的任务内部可能存在依赖关系(比如 kunlun 业务中 metadata 批量创建的需求 ,由于存在 lookup 和 reference 字段等
,记录创建之间存在拓扑关系),所以在执行时需要优先级的概念
,而不能被简单拆分为独立的子任务
。 为了支持带优先级的任务分片,Scheduler 接收的分片任务的数据特点如下
: 二维数组可以是下面这样 : 了解了待分片任务的结构
,我们来讨论如何对任务进行分片。比如 ,分片的数量由什么决定 ,单个分片上的信息是如何分配的,不同分片又是不同分派到不同的处理器上的... 分片数的确定 分片数的确定基于以下参数的值
:数据量、任务创建时用户指定的单片最大数量、单片最小数量 ,以及实际可用的执行器数量。 分片算法 分片特征值(sharding key)的选择要遵循的原则应该是基于最常用的访问方式。 由于 Scheduler 分片时并不关心业务数据的结构 ,所以选用数据数组的下标来作为分片特征值
。 由于分片数量确定后 ,不涉及到由于分片的增加或减少对数据进行 Rehash 的情况,所以无需考虑虚拟节点
、一致性哈希等方式进行分片。 这里选用哈希分片的分片算法 ,原因是既可以均匀分布数据,实现起来也很简单。 分片的存储和派发 分片完成后
,需要给每个分片创建一个 Task ,并把分片的数据存储下来
。 关于 Task 的派发,根据上面关于分片数的讨论 ,可以得到分片数和 Executor 数的关系 : 为了让各个 Task / 各个分片 能够均匀派发给各个 Executor,也为了避免某个executor挂掉时
,其他Executor 不能均匀分摊挂掉的节点原先承担的分片,需要采用合理的分片策略 。 在分片时,我们保证了各分片的数据是尽量均匀分布的,所以从分片到 Executor 的分派方式可以尽可能地简单,采用平均分配的策略即可 。对于挂掉的节点所承担的分片,也采用同样的策略派发到存活的 Executors 上即可
。 例如: 平均分配*
:对于不能均分的情况,为了避免靠前的 Executor 总是承担更多的压力,可以根据待分配分片数量的奇偶来决定是升序分派还是降序分派。 Scheduler 支持通知 Executor 任务执行的整体进度 执行侧需要实现并注册 SDK 提供的ShardingProcessor接口,来处理由调度侧发来的多种类型的Task。 PreProcess 预处理方法
,可以进行但不限于以下的操作: 如果不需要预处理,可直接在方法内 return ,分片时数据使用启动时的 Data 复制func (s *ShardingProcessor) PreProcess(ctx context.Context, tc taskContext) error{ oldData := tc.GetData() // 用户业务, 数据处理 newData := Transform(oldData) // 返回带拓扑排序的数据 tc.SetResult(newData) return nil ShardingProcess 分片处理函数 ,主要是进行数据更新、创建操作
。ShardingProcess 的入参是切分后的数组([]interface{ }) 。Executor 需要对参数进行两部分额外处理: 复制func (s *ShardingProcessor) ShardingProcess(ctx context.Context, tc taskContext) error{ taskData := tc.GetData() PostProcess 接受所有分片处理结果,进行后续处理 ,如生成错误文件。 Notify 提供给子任务上报的能力
,Scheduler 会根据所有子任务上报结果计算进度
,通知 Executor,通知粒度为数据条数。如果接入方不主动上报子任务进度 ,Scheduler 会根据子任务完成度进行通知,通知粒度为分片粒度 。 在 Scheduler 设计初期时,更多的是把注意力放在了如何能够快速
、准确、低延迟的触发任务,为此还多次优化了触发器 、分派器
、派遣器三大模块的轮询逻辑,但是忽略了任务量过大时下游能否抗住流量的问题。 如果 Scheduler 在调度时无法准确感知下游压力,那么很容易将下游打挂,如 :在定时任务首次上线时,因为 kunlun 的装包机制导致数千个应用下配置了同样的定时任务 ,虽然一个包内的数十个定时任务触发时间分散,但是应用包之间的同一个任务触发时间相同
,导致下游需要在同一时刻处理数千个任务
,再加上任务的处理流程还会通过消息中间件进行扩散,导致数据库在任务执行阶段一直处理低IDLE 阶段
。 目前大部分后端服务,通过分析任务的流量走向,可以大致确认每一条任务在执行过程中不论扩散还是非扩散流量都会走向DB
,流量图大致如图
。 任务的流量最终打到了 DB,所以流量控制的目标就更加清晰 :对 DB 的流量控制。 需要对 DB 进行流量控制,那么就要设定合理的指标,理论上,只要指标采纳的足够合理 ,就能严格、准确的控制流量,指标则需要具备以下条件
: 优先级:实时性 > 权威性 。当一个指标的实时性不够高
,那么它的权威性就不再有价值。 只需要实时监听着 DB 的指标,来判断任务是立刻执行,还是延迟执行就能有效的保护 DB。 指标选择 反映 DB 压力较为直接的指标是 cpu idle,但考虑到服务部署往往多实例以及 cpu idle 采集难度大的情况,以近似指标来代替 。另一方面
,通过历史数据分析 ,DB 流量与 cpu idle 有一定的关联,因此以 DB 流量作为 DB 压力指标。 参考限流的实现方案,采用单独的 Redis 存储流量数据 ,以 1s 为时间窗口作为 Redis key,每个时间窗口的流量作为 Redis value,每次发生 DB 操作时更新流量数据 。系统中存在多个 DB ,每个 DB 单独统计,在 Redis key 中加入db信息 。Redis key 设置10s过期时间 ,查询时根据过去3个窗口的加权平均(80%/15%/5%)作为当前流量,以处理窗口交界处的突发流量。 目前 DB 流量已有 metrics 监控数据,但由于 metrics 会在本地聚合 30s 数据后上报,至少会有 30s的延迟。而造成 DB 压力大的定时任务多为短期集中触发,使用 metrics 数据会有感知不及时的问题,因此需要额外收集数据 。参考 DB metrics 数据采集的方式
,通过 Gorm 的 callback 机制插入具体的采集逻辑,减少对业务代码的侵入
。 复制func SetMonitorCallBack(db *gorm.DB) { db.Callback().Create().Before("gorm:before_create").Register("metric:before_create", beforeCreateCallback) db.Callback().Delete().Before("gorm:before_delete").Register("metric:before_delete", beforeDeleteCallback) ...} func beforeCreateCallback(scope *gorm.Scope) { beforeCallback(scope, "create")} 在采集逻辑上,需要考虑以下几个问题 : 复制type MetricType int8 const ( QueryCount MetricType = 1) type DBMetric struct { DBName string DBMethod string Type MetricType Timestamp int64 Value interface{ }} // callback中将metric数据写入channelfunc beforeCallback(scope *gorm.Scope, method string) { dbName := getStringValueFromCtx(scope.DB().Ctx, CtxVariableDBName) curMs := time.Now().UnixNano()/int64(time.Millisecond) metric := DBMetric{ dbName, method, QueryCount, curMs, 1} select { case ch <- metric: default: // channel is full, ignore this metric }} Scheduler 调度速率与 DB 负载之间的关系较为复杂
,本期采用简单的阈值反馈机制,设置 DB 流量阈值,当流量超出阈值时,停止 Scheduler 当前周期调度
。根据历史数据,设置阈值为5K。 当流量未超出阈值时 ,不能预估任务对 DB 流量的影响
,采用简单策略对任务数进行限制:任务数 = max((DB流量阈值 - DB当前流量)* 100 / DB 流量阈值, 0) 目前 Kunlun 的 DB 资源根据租户进行分配,不同租户的数据和流量会落在不同的 DB 上
。Scheduler会记录 Job 所处租户 ,所以在调度时 ,需要根据租户查找真实的 DB 资源,通过 DB 指标的健康状况来决定是否派遣任务 :初识 Scheduler
找准定位:分布式任务调度平台
摸清脉络
:Scheduler的结构和核心模块


换位思考-快速接入
背景:效率至上 ,时间是金
分析
:站在用户角度想问题
轻松的使用方式
并肩作战-分片任务
背景:任务越多
,挑战越大


调度侧能力
分批调度的能力
进度通知的能力

执行侧分批执行的能力
分片任务流程

削峰填谷-流量控制
背景:提供能力,而非施加压力
分析 :流量追踪,剥茧抽丝

指标收集
指标范围

Scheduler 调度反馈
流量阈值限制调度控制流程
