作者:梁福坤
大数据的分布式调度是在进行数据ETL过程中起到了总体的承上启下的角色,整个数据的生产、交付、消费都会贯穿其中,本文从调度、分布式调度的特征展开,再对大数据调度个性化特征的一些阐述,由满足大数据使用的架构和业务场景的需求上娓娓道来,从实践的角度分享如何打造一个高可用、高效率、灵活性的大数据调度平台。
调度
从上个世纪50年代起,调度问题的研究就受到数学、运筹学、工程技术学等领域科学的重视[1],人们主要从数学的角度来研究调度问题,调度问题也同样被定义为”分配一组资源来执行一组任务”,以获得生产任务执行时间或成本的最优[2]。调度在计算机任务的实现可以依赖操作系统的定时任务进行触发(例如Linux系统的Crontab),主要针对单任务机制的触发,调度最基本的需要能够按时或者按照事件进行触发(At-least-once),如果任务不符合预期,还需要在应用端进行重试,最大可能保证任务被按时执行,并且成功执行,同时不能多次执行(Exactly once);但是在业务场景能保证可重复执行、一致性操作情况下对于争取能正常调度执行多次执行也是不可或缺的,比如给商户进行1min前的例行结算,如果结算是按照30min的时间窗口查找未结算的商户,那么就会容忍30min延迟,并且多次被执行也不会给商户多结算,因为在结算付款和重置是否结算标志位可以设计成原子性操作。所以在调度上能够做到按时、正确的执行,在业务方设计为了保证最终一致性也有一些架构上的取舍。
如果应用场景有上下游的协作,或者在任务执行会存在不同的宿主机来完成,或者为了保证任务高可用场景,就需要引入分布式调度的架构。
分布式调度
分布式调度是在单机的基础上发展起来,在综合考虑高可用、高效率、分布式协作的背景下逐步演进的调度方式,从单点调度到分布式协作是一个质变的过程,这个过程涉及到许多在单机并不存在的特征,下面针对重点展开聊下:
图1 分布式调度组件化分解图
2.1 调度器去中心化&高可用
涉及到分布式调度的协作,就需要有调度中心节点,同时要保证高可用的目的就需要调度中心节点是多节点发布,主备的方式去单点依赖。
2.2 宿主选择
分布式调度在任务执行阶段,可以在目标宿主中进行全部执行、N选M(N>=M>=1)的选择,宿主机具备相同类型任务互备的机制,在MPP(Massively Parallel Processor)架构中尤为常见,把大任务分而治之快速完成。也存在场景(比如外卖给商户结算)为了一致性和准确性只能由一台主机进行执行,并且需要成功执行。
被动选择策略:宿主的被动选择机制一般可以随机或者按照顺序选择策略,也可以按照当前宿主机进行的任务执行数量的方式进行常规的调度分配。当然,也可以进行高级的操作,参照宿主机的处理能力(吞吐量和响应时间)、资源使用情况(CPU、Memory、Disk I/O、Net I/O等)进行反馈机制的动态分配。后者需要有集中节点存储当前宿主机的处理能力、资源情况,便于在决策选择中提供参照。
主动选择策略:宿主的主动选择具备更加丰富的选举策略,任务在下达到具体算子时,会比较明确的定义出当前任务需要由多少个宿主参与执行,通过zookeeper的分布式锁来实现锁的抢占机制,抢占成功则执行,否则放弃。这种选举策略让宿主机得到了更多的参与,降低了对调度器的依赖。这种主动选择的方式,避免被动选择因不具备执行条件被选中,在执行的能力在时间上的损耗。
2.3 任务故障转移
调度任务的从任务级别job到transformer、operator,整个链条都存在具体局部失败的情况,调度器需要在原目标宿主机重试和失败后转移到其他备宿主机的功能,最大力度的保证任务被成功执行。
2.4 执行算子抽象
以往单机任务的调度可以比较灵活的执行多样的任务,可以是脚本、Webservice调用、HDFS Client命令行等,但是对于分布式协作需要接收外部命令运行,这就需要算子通过标准的数据通讯协议对外提供调用服务,常规的WebService、RPC(thrift/protocol buffer)等协议在跨语言通讯上具有较为广泛的应用。所以具体执行单元可以是具体任务的抽象,例如提供了Rest API方式,调用的URL和参数都是执行方填入,最大程度上支撑了灵活性;数据库操作算子可以包含数据库验证信息、具体执行的SQL等。执行算子抽象后,满足规范和灵活性,灵活是一个双刃剑,可以最大限度的满足用户需求,但也会导致大数据层面无法很细粒度的去感知数据的表、字段数据的完成情况,对数据生产无法更加精细粒度的产出交付。
2.5 弹性扩展
任务具体执行的宿主机需要在调度层面满足弹性的扩展,扩展最主要的需要是满足高可用和任务随着水平扩展进行分摊压力。在集群目标宿主机选择时,一般目标集合可以指定具体IP-List,也可以是一个BNS(百度机器的NameServer服务)。IP-List方式设置比较简单直观,但是存在每次调整依赖变更调度系统服务,变更之后还需要进行刷新宿主机的情况。而通过BNS服务比较简单,同时和线上服务发布部署进行结合,不存在延迟部署和刷新,推荐通过BNS的方式介入。
2.6 触发机制
常规触发是按照执行间隔或者具体时间的Crontab语法,开始时间,截止时间参数完成,但是在分布式调度任务中,最重要的就是完成协作,所以如果要进阶的话,就是依赖触发的机制。这种就很好的形成了上下游依赖触发,是分布式协作的关键步骤。从最初的任务节点按照常规触发,下游节点形成依赖链条,这里如果在高级进阶的话,就是依赖的某个/某些频次触发,比如每小时的12分钟开始被执行,下游可以选择具体的2:12 ,4:12进行触发,而非每个整点12分都被调用。这三种方式目前在外卖的大数据平台都有不同场景诉求,架构设计在3个需求上都有灵活的交付。
2.7 堵塞机制
对于相同任务的不同时间的运行实例,会存在前面的实例还没有正常结束的情况,这种在高频次调用,第三方依赖故障延迟等情况下会出现,如果继续调用会造成调用链条恶化,所以防止这种情况,堵塞机制会提供三种模式:常规例行(默认模式)、丢弃后续、丢弃前例。后面2种方案都需要提供容错重放机制,这个场景比较类似1.1章节提到的结算案例。
2.8 图形化进展查看
调度可以根据调用链条和不同事件频次的实例,通过树状图形化的方式查看执行的进度情况,例如可以查看job中transformer、算子的运行机器状况、状态和具体的实时执行日志。图形化是根据调用的触发机制分析出来的一个链条,是在烦冗复杂的调用关系中找到清晰脉络的数据直观表达的方式,是调度中常规的展示方式。在进阶中可以查看相应的参数传递,并发算子的执行进度条,预估完成周期等。
2.9 报警
通过邮件或者短信的方式对不符合预期返回标识的进行中止,同时通过邮件或者短信等方式对预先设置的用户或者用户组发出警告。报警触发的机制可以在宿主机单台时候触发,也可以在一定占比的宿主机在一定的时间窗口超过了阈值,触发报警。同时也要支持报警的屏蔽,用在进行运维或者升级部署、运维接管的情况。
上面是很多常规调度拥有的一些特征,这些是在分布式场景下的延伸需求,从单点简单的逻辑到多节点的协作统筹在工程层面无疑增加了额外辅助,这些都是在业务演进中逐步完善起来,而高可用、高效率是在分布式环境下做出的改变。
大数据分布式调度
大数据分布式调度,在上面通用调度的基础上又进行了具体跟数据特征相匹配的改良。主要是从数据的流程层面进行梳理,用来解释数据的上下游、血缘关系的问题,具体又有哪些特征是针对大数据的呢?
3.1 数据扇入扇出
大数据的存储和检索方案很多,因大数据特征之一就是多样性,为了满足多样的业务场景会有不同的引擎或者存储选择,在多样化解决方案的同时,造成了数据之间进行交换变得复杂,引擎之间的数据存取规则都有个性化的支持,比如Hbase的数据到Mysql和ElasticSearch(以下简称ES),涉及到Hbase的读取和后续后面两者的数据存入,这种对于Hbase就是一对二的数据扇出,但是在数据在Hbase中通过Get或者Scan方式获取后,要插入数据需要了解后面2者的存储结构,甚至是索引结构。所以类似这种跨引擎(或者跨版本,不同API)的方式,为了保持通用,需要进行需求的抽象,在外卖平台针对数据的交换定义了一套开放式SQL,这个框架对数据引擎的存和取分别作了抽象,在不同的目标引擎中有具体的实现,所以就有一些约定的规范。
图2 开放式SQL扇入扇出流程图
主键:数据必须存在业务主键或者联合主键,目的是为了保证数据在聚合或者更新的时候有依据。主键在Nosql的引擎中作为RowKey,在关系数据库中作为主键,在ES中作为主键key。对于Kudu来讲也是主键,针对数据的upsert就可以有依据的进行更新或者插入。
数据列:数据列的变更会稍微复杂,如果在关系数据库中会涉及到增加、变更列,但是在Hbase、ES中基本不需要主动扩展列,只需要对数据变更就可以了。
分区字段:对于事实表数据,在大数据量的情况下,为了检索效率和数据存放最优,一般会提供分区和桶的策略,针对Hive、Impala、GreenPlum的引擎会额外增加分区字段,分区可以是一级到多级,一般业务场景下第一分区为日期,根据实际业务需求可以变更更细粒度或者其他业务字段。在一般Mysql、Postgresql、Hbase这种引擎中不需要单独增加分区字段。
数据更新范围:大数据的数据交换,一般为了提高效率会进行多批次的并发处理,这就需要在一批次的数据进行分割,一般情况下会按照单一字段的进行截取,字段的类型以时间戳(create_time、update_time)居多,也可以根据主键的key排序后分批次获取,在源数据引擎允许的情况下,按照多批次的并发query可以做到很好的数据获取,把串行的操作截断成多段的并发;这种在同一个任务多时间批次的情况下也很重要,每个批次会界定本批次设计数据更新的范围。数据更新范围使用前一般会获取本次更新的数据量,可以根据原目标引擎单个批次的最优性能计算出offset。
多步骤过程:多步骤顾名思义就是数据的准备不是一蹴而就的,例如在3个Mysql库、Postgresql、Oracle中获取员工信息,而员工编号是统一的,最终数据在DB2中汇聚在一起,最基础的步骤是三份数据汇入到Oracle中,这就涉及到前面通过key做数据的Merge,这里会涉及到数据的插入和更新,但是如果有key存在并且不同数据源目标数据列清楚的情况下,三份数据早到和晚到场景都没有太大区别。第二步骤则根据汇总完的数据分析出一个过滤场景下的聚合信息,这步骤的场景作为计算数据源,再次进行数据的扇出插入结果。第三步骤可以把第一步的临时结果进行删除。所以在多步骤的场景下数据是分步骤完成了汇聚、聚合和删除。
更新类型:百度外卖大数据实践的开放式SQL场景有Insert(大批明细场景)、Update(数据后续更新)、Insert Once(聚合结果插入)、Insert Temp(临时结果缓存)、Delete(善后处理场景),在这些组合操作类型的场景下,需要在是线上增加一个执行优先级的信息,如果区分优先级会按照从前到后的步骤执行,如果没有设定则可以并发操作。
黑盒暴露操作:黑盒操作是在通过开放式SQL的存取原则情况下,对无法按照约定规范操作的情况下实行的一种妥协方式,目的有两个:一方面要把黑盒对数据依赖过程必须对外暴漏,这样是为了后期梳理数据血缘关系提供素材;另一方面通过黑盒来满足数据处理的灵活性,比如对json负责xpath的选择,集中缓存优化方案;黑盒虽然通过规范暴露了依赖源数据,但是也造成了对外不好解释数据的处理过程,同时这种黑盒一般针对表或者多个字段,精细化程度不够。
开放式SQL是大数据在做数据ETL的一个规范标准,目的在数据的交换和流动是通过配置的范式来完成,并非是通过硬编码或者单纯组件化的方式。编码更多的是要提供丰富的解析函数,更优秀的中间大结果集的Cache和复用。开放式SQL提供了数据从哪里来,到哪里去的哲学问题,同时也可以进行对外阐述对数据做何种操作,这是在为后期数据血缘关系提供最基础的指导,在发展过程中,百度外卖大数据平台也经历了如下的不同阶段。
图3 分布式调度的演进过程
3.2 协作参数一致性
调度策略除了有之前提到的上下游关系外,在大数据场景下还需保证数据处理的统筹协作,更为重要的是精细参数的上传下达。上下游使用系统默认的参数Key定义,也可以自定义Key的参数;系统参数比如说起止时间戳、机器IP、执行任务实例等。对于全局系统默认的Key,由调度系统进行赋值。
参数的作用域有本地化和全局2种方式,本地化可以设定参数的Key:Value,相同Key的全局不会被覆盖,本地的优先级高于全局;而全局的变量是由上游产生并且进行流转;调度本身规定了不同算子在参数接收方面的追加、解析、编码规范,比如在Shell命令和WebService中追加参数有较大区别。
参数除了作用域还有是否被传递的属性,上游的参数可以有针对性的对下游输出,同样,如果算子接收到上游参数可以选择修改值,但是这种传递是不被修改。
3.3 数据质量实时Check
数据生产在交付之前一般会对数据进行校验,由于大数据生产的过程比较冗长,如果在后期输出数据再进行质量校验,往往发现问题比较滞后。所以在数据的阶段性交付过程就可以对数据进行核验,可以比较早的对数据的问题进行干预,保证数据交付的可靠及时性。
Check算子:针对数据的校验特点,设计了专门算子提供质量保证。数据核验的方式一般有2种:跟自身历史比较、跟其他数据源进行比较。前者只需要对目标数据源进行选择相应的SQL或者标准API来获取当前生产窗口的数据,然后才去同比、环比、滑动窗口的均值、左右边界等方式,时间粒度可以灵活到天、小时、分钟。如果跟其他数据源进行比较则需要对源和目标分别进行描述,可以进行严格相等、区间、浮动率等方式比较,应用的场景以数据交换较多。除了数据比较之外,还提供关键性字段类型、精度、宽度的比较,以及对空置率、重复率、区分度的统计报表产出,比较直观的查看数据的稀疏和分布。
整体和抽样:针对于其他数据源进行比较的方式,常规的是通过宏观的字段抽样的Count方式条数比较,也可以通过对数据类型的Sum、Avg的比较,这里需要注意不同引擎的存储精度略有区别,尽量选择整形字段;除此之外也会增加对明细数据抽样的全列的字段比较,这种比较容易发现字段值的缺失,类型变更等问题。
这里需要说明的是,如果没有配置Check算子,则认为数据生产完就可以进行交付;如果数据的树状结构中有Check算子,则认为在下一个Check算子之间的所有数据生产节点都默认数据可以交付。这样默认操作是因为数据的校验不一定要面面俱到,否则也会带来时间上的损耗,一般情况下我们认为只需要在关键性节点进行核验就可以了。校验失败通过告警的方式中止数据ETL过程,后续可以重试或者人工方式介入处理。
3.4 数据血缘关系
人生哲学解释:血缘关系分析是大数据调度与其他调度之间的区分度较大特征之一,主要解决大数据的“人生哲学问题”:我是谁,从哪里来,到哪里去。而这一切的基础是开放式SQL对数据存取的规范,之后依赖对开放式SQL的解析来完成血缘关系分析,主要包含数据的上游依赖关系和下游的被依赖关系,这2个是通常被涉及到的,除此之外还包含第三个特征:计算逻辑或者口径对外的输出,鉴于大数据在进行计算和挖掘之后数据会被推送到不同的业务场景使用,会造成相同口径指标不同的计算结果,当被提及计算逻辑时,研发同学也无所适从,经常需要追根溯源对代码和过程进行回访,进而导致无益消耗的增加。
所以计算逻辑输出也是常规和减少人力梳理成本的重要特点。
开放式SQL可以对外解释,数据从哪里来,到哪里去的逻辑问题,也会涉及到具体SQL或者API层面的计算口径,但是这里需要提到之前的【黑盒暴露】和研发专注开发ETL的丰富function,黑盒是无法解释计算逻辑的,但是function却可以给出入参、出参的说明,让特征三的提供成本最低。
血缘关系分析的手法一方面依赖SQL属主引擎的语法解析,例如Mysql可以使用Alibaba druid、JSqlparser,GreenPlum、Postgresql可以借助JSqlparser,Impala则需要通过impala-frontend进行语法分析,分析的结果在外卖大数据平台需要精确到单个字段依赖上游的哪些库表、字段;越是精细越是精细在进行大数据回溯的时候就越有针对性,同时也越有利于效率的提高。
在进行大数据回溯的时候越有针对性和利于效率的提高。
针对非SQL方式,例如Hbase、ElasticSearch数据源的依赖,也会同样被映射成不同的文档/表,具体的列簇中的列,source中的key。
总之,数据可解释是血缘关系存在的价值,血缘关系同样和开放式SQL都在ETL的演进中具有里程碑的意义。
3.5 基于表的Transformer演进
在大数据调度中,对用户最直观的展示是某个表是否可以被交付,或者更为精确查看表中的字段哪些具备了可以被交付?这样做是为了让下游数据更好的有选择性的、细粒度的依赖触发动作。所以在大数据调度中会区分出三类角色,从粗粒度到细粒度分别是:Job、Transformer、operator。
图4 三者协作示例
下面解释下三者的分工和协作:
任务(Job):Job的主要作用是进行数据相关性的统筹,简单来讲是针对表之间、多种数据源之间进行协作的一个统筹,是一个最大粒度的过程,具体调度的实例化过程都是以Job作为入口,其他2个角色都不具备实例化的能力。这里会区分出同样有数据之间依赖,但是并不一定在一个执行频次上的任务,可以采取配置不同的job依赖关系。
转换(Transformer):一个转换就代表一个表,单独把表拿出来,是因为在大数据的交付过程,表是一个完整的符号,不如库的粒度大,也不像字段太精细无法对外完整表述。
算子(operator):算子是调度的最细粒度,不可分割。算子的分类根据应用会扩展很多,有控制类型算子,例如启停算子、分发算子、Check算子等。也会有针对数据操作进行封装的功能性算子,比如获取hdfs数据推送到mysql,Ftp到对象存储等;针对大数据调度的功能性算子是针对单个字段或者几个字段的产生,这个完全依赖于数据产生的难易程度和组合回溯的相关性,最终由开放式SQL进行配置,例如其中的一行则认为是对一个算子的功能进行的描述,select字段中的数据获取可以是多个,同样对应的insert中也可以是多个;大数据调度在完成开发之后,后期的更多运维精力就在算子的丰富。算子的实现会考虑到前面提到的灵活和通用的选择。
3.6 基于字段精细化回溯
字段级别的回溯,主要依赖2+1的方式完成,前面的2是指血缘关系+可更新目标引擎;通过开放式SQL可以梳理出数据的血缘关系,便于分析出整个链条中可以上下游依赖的点和并发的点。另外的1是指在调度的图形化界面中,可以针对一个具体实例化的Job选择需要回溯的transformer或者某些算子。
同样,根据上图4中的流程,我们走一个具体的实例。图中标识的黑色0/6代表的是开放式SQL中黑盒的部分,这部分对数据来说无法解释的生产过程;三个标识图形2代表的是Check算子,其他圆角方形颜色相同代表有上下游血缘关系依赖,例如7会依赖上游的1。下面我们了解下几个场景的回溯:
回溯1:在这种情况下算子1/2/3/4/6会被进行回溯,而算子0和5则不会被执行到,同样因为1后面有紧邻的check算子2,则1执行完,算子7不会马上被并发执行,因为有一个黑色的算子6。但是在算子2执行成功之后,如果能暴露出算子6的依赖和产出关系,算子7就可以被执行,不需要等待算子3/4/6的执行完成。所以节约了一定的时间。其他场景也是类似
回溯Transformer2,这种场景算子7和算子9会同时触发执行,同样,如果算子9在完成的情况下,下游transformer3中的11不会被执行,因为是非首节点,但是在算子7执行完成之后,算子13和算子10都会被同时调起。
可更新目标引擎是指非SQL On Hadoop的文件解决方案,类似GreenPlum、Hbase、ES都是可以被实时更新。这里不详细展开。
3.7 信号灯
信号灯在大数据分布式调度中作为一个消息中间件,主要作用是生产者(Producer)在数据生产结束、数据质量核验通过等过程对外释放信号,这里面包含具体的库表、字段和本批次的数据范围等信息,消费者(Consumer)可以根据需要监听不同的表主题,来完成后续的操作。通过信号灯的方式,可以很好的对数据下游依赖解耦合,同时信号灯也可以被应用在数据集市中库表、字段的数据完成情况标识,可以让用户进行查看,免去了数据是否可用,是否交付的交互。
总结
大数据分布式调度的应用场景和ETL的定义过程、数据引擎和业务场景的需求有着至关重要的关联,分布式调度的过程是通过场景化驱动逐步完善的过程,百度外卖大数据的调度V2.0是满足了通用的调度之后,发现存在的数据解释和细粒度更新延迟等问题之后,开启了逐步迭代完善过程,后期也期待我们的系统开源的一天。