-
Notifications
You must be signed in to change notification settings - Fork 412
1.3_深入Task
DataLink真正执行数据同步任务的是每一个具体的Task,由Task从某一个固定类型的数据源读取数据,并同步到若干个目标端数据源,即为一对多的关系。我们将源端数据源类型规定为Task的类型,系统目前支持的Task类型有:MYSQL, FLEXIBLEQ, HBASE,支持同步到的目标端数据源类型有:Rdbms、ElasticSearch、Hdfs、HBase、FlexibleQ、SDDL。
上图展示了Task进行一次数据同步的流程。
-
【类拥有关系】
> WorkerTaskContainer为Task的执行入口类,负责管理WorkerTaskReader和WorkerCombinedTaskWriter的生命周期;WorkerCombinedTaskWriter负责管理多个WorkerTaskWriter;WorkerTaskReader和WorkerTaskWriter分别管理TaskReader和TaskWriter的生命周期。 -
【Task同步流程】
> 同步流程从WorkerTaskReader发起,由TaskReader从某个类型的数据源拉取数据,或者收到数据源推送的数据,然后将数据put到队列中,进行callback等待;WorkerCombinedTaskWriter负责从队列take数据,再分别存储到与它所管理的各个WorkerTaskWriter所共享的队列中,进行callback等待;然后,每个WorkerTaskWriter从队列中take数据,由各个TaskWriter根据数据的Record类型加载对应的Handler,等所有Handler处理完数据后,不论同步成功或失败都进行callback通知,结束所有队列的等待;最后,TaskReader进行判断,如果成功则执行commit,然后发起下一轮同步,如果失败则执行rollback,然后重试。
【注】在Task的Reader和Writer为一对多的情况下,除了第一个Writer用Reader直接传过来的RecordChunk,其他Writer要copy一份RecordChunk来用,而copyRecordChunk使用反射实现,比较耗费系统资源,所以,如果是数据量较大且性能要求较高的同步场景,不建议使用一对多。 -
【Task生命周期】
> TargetState
Task的目标状态,包括STARTED和PAUSED两种,用于在在Manager端控制Task的运行状态。
> TaskStatus
Task的实际运行状态类,包括UNASSIGNED、PREPARING、RUNNING、PAUSED、FAILED五种状态以及executionId、generation、startTime、workerId等运行属性,会随Task的启动注册到ZK,用于监控Task的当前运行状态信息。
例,id=10的Task在ZK上的Status节点路径为“/datalink/tasks/10/status”,节点内容如下图所示:
> Task生命周期
Task的实际运行状态默认值设置为UNASSIGNED。Task启动时,首先进入Prepare阶段,即在ZK添加Task的status节点,并将运行状态更新为PREPARING。Prepare成功之后,判断Task的目标状态,若目标状态为PAUSED,则将status节点的运行状态更新为PAUSED,同时Task线程阻塞;若目标状态为STARTED,则将status节点的运行状态更新为RUNNING,同时Task正常运行。如果Task在运行过程中出现异常,且Task未被cancelled,则将status节点的运行状态更新为FAILED,同时将异常堆栈发送到ZK,阻塞等待直到触发stopTask。stopTask完成之后,便会remove掉Task的status节点。在上述过程中,由TaskStatusListener负责监听并通知系统对ZK的status节点做相应的状态变更。 -
【Task状态监控报警】
在Task生命周期中,对其运行状态的监控与报警有如下几种情况:
> 在Task启动之前的Prepare阶段,在添加status节点时,若发现ZK上已存在该节点,说明该Task正在其他地方运行,则抛出TaskConflictException异常,并每秒进行一次重试,如果重试时间已经超过了ZK的会话超时时间,则触发报警,并继续重试,直到Task重复运行的情况消失。
> 除了Task运行冲突异常监控报警,TaskStatusMonitor还对Task的实际运行状态进行监控,当出现以下三种情况之一时,触发报警:
(1)Task的目标状态和运行状态不一致(正常情况下,目标状态STARTED对应运行状态RUNNING,目标状态PAUSED对应运行状态PAUSED);
(2)Task的status节点消失,运行状态变为UNASSIGNED;
(3)Task出现异常,运行状态变为FAILED。
并且,如果Task的目标状态为STARTED而运行状态为FAILED时,会重启Task,即结束线程,重新启动。
-
【TaskContext】
> 每个Task都会绑定一个TaskContext,通过context可以实现Task和Runtime之间的交互,可以供TaskReader和TaskWriter随时获取Task的Id、ExecutionId、Service、Global-Session、Global-Attributes与开启Session功能。TaskContext的生命周期和Task的运行生命周期保持一致,当Task启动时TaskContext随之创建,当Task关闭时TaskContext也随之销毁。TaskAttributes和TaskContext的生命周期是一致的,主要用来支持Task整个生命周期内的数据共享。而TaskSession是会话级的数据共享机制,完成一次RecordChunk数据同步的过程定义为一次会话,TaskSession的生命周期即为一次数据同步。
-
【TaskSession】
> 如上图所示,TaskSession主要用来支持一次数据同步过程中的数据共享,即TaskReader和TaskWriter之间可以通过TaskSession共享数据,而TaskReaderSession和TaskWriterSession则分别供TaskReader和TaskWriter用于其内部共享数据。一次会话的起始和结束均为TaskReader,所以,在此触发TaskSession的begin操作(首先进行reset)。 -
【TaskSession应用示例】
> MySQL-Reader插件dump获取到的原始数据:存入TaskReaderSession中的key为MESSAGE_KEY,value为本次同步fetch到的原始数据,并根据需要进行dump。
> Reader端和Writer端各自的数据同步性能统计:存入TaskReaderSession中的key为ReaderStatistic,TaskWriterSession中的key为WriterStatistic,value分别为各自的统计指标,通过在程序中按需存取,计算到本次同步的Record数量、延迟时间、load时间、TPS等,为Task的性能监控提供基础数据。
> 映射拦截器OrderEntRecordInterceptor:该拦截器的作用是拦截非企业订单,只同步企业订单。存入TaskWriterSession中的key为PREFIX+id,value为企业订单id的值,当同步订单子表数据时,只有子表的订单id在session中存在,才进行同步。这样利用TaskSession,并依赖主表子表关系,可以实现企业订单的快速过滤,提高了同步效率。
-
【TaskPositionManager】
PositionManager是Datalink提供的一个公用的数据同步位点管理组件,负责查询、更新消费位点。如果TaskReader有自己的位点管理机制,用自己的机制即可。TaskPositionManager是PositionManager的默认实现,启动和停止跟随Worker主线程,其主要功能如下:
> 查询消费位点主要用于实时监控Task的同步情况(当前消费位点);
> 更新消费位点有定时更新和实时更新两种。正常同步过程中采用定时更新,首先将每个Task的位点变更信息存入内存,然后每隔1秒将内存中Task的最新位点信息持久化到ZK节点(多次变更只刷一次),表示当前已经消费到的位置。在重启Task时可以重置消费位点,因此采用实时更新,使其启动之后能够从正确的位点开始消费。
> 例,id=10的Task在ZK上的Position节点路径为“/datalink/tasks/10/position”,节点内容如下图所示:
-
【位点管理机制应用示例】
> Task的消费位点变更发生在一次同步成功之后。Reader端进行commit时,会判断是否自带commit机制,若有,则Reader插件自己进行commit和刷新位点;若没有,则使用DataLink公用的位点管理机制刷新ZK的位点信息。一般情况下,具体的Reader插件不需要实现commit功能,由DataLink框架自动记录records消费的偏移量;若需要在自己系统内部存储偏移量,则可以选择实现commit功能。
> DataLink的MySQL-Reader插件选择使用canal自带的位点更新机制,同时自定义CanalTaskMetaManager,重写了canal自带的MetaManager,将canal的消费位点纳入统一管理。
> DataLink的HBase-Reader插件和Fq-Reader插件均使用自己的位点管理机制。
-
【TaskConfigManager】
> TaskConfigManager是Task配置信息管理器,负责发现Task配置变更并进行事件通知,并以组为单位进行监控管理,不属于本组的变更不予关注。
> TaskConfigManager随Worker线程启动后,首先进行强制refresh,初始化本组的taskConfigList和version,获取最新配置。然后开始每隔500ms循环刷新Task配置信息,若version发生变化,则通过TaskConfigUpdateListener通知系统做相应的处理:若有Task新增或删除,则触发分组进行ReBalance;若有Task参数配置更新(包括Task参数、Reader参数或者Writer参数的变化),则重启Task;若有Task目标状态更新,则更新Task的目标状态,并对mustRestartWhenPause == true类型的Task进行重启(具体原因请见Task常用参数)。
MYSQL类型数据同步,DataLink的Task和Reader端的数据源是一对一的关系,一个同步任务对应一个Task的配置信息。但是分布式集群的同步,例如HBASE,DataLink的Task模拟是HBASE的从集群,与Reader端的HBASE集群是多对一的关系,若为每个Task进行一次配置,同步将过于繁琐。因此针对集群对集群、需要多个Task的数据同步模型,引入LeaderTask机制,简化Task配置。
-
【LeaderTask机制应用示例】
> 每个要同步的HBASE集群对应Reader端一个Repl-ZNode-Parent,同一ZNode下的所有Task模拟的是一个HBASE从集群,每个Task模拟的是一个RegionServer,其中创建的第一个Task是LeaderTask。
> 所有Task的相关配置,包括同步映射和Task监控等,只需要在LeaderTask上配置即可,其它FollowerTask会复用LeaderTask的配置信息。
-
【Task基本参数】
Task基本参数是DataLink所有类型Task的通用参数。
Task基本参数 参数描述 默认值 备注 id
Task在数据库中的唯一标识
无 groupId
Task所属分组的id
无 每个Task必须且只能属于一个分组 TaskType
Task的类型
无 按照Reader端类型划分为三种:MYSQL, FLEXIBLEQ, HBASE; mustRestartWhenPause mustRestartWhenPause==true时,
若Task的目标状态被置为PAUSED,则必须重启TaskHBASE TaskType中的一个公用方法属性 TargetState
Task的目标状态
无 STARTED,PAUSED; TaskStatus
Task的当前状态信息 无 UNASSIGNED,PREPARING,RUNNING,PAUSED,FAILED;
当前状态与运行信息均在ZK上注册,显示Task的当前运行状况readerMediaSourceId Task所关联的Reader的数据源id 无 taskParameter
Task的参数 无 目前只有taskId taskReaderParameter
Task的Reader插件的基本参数 无 PluginReaderParameter类型, 不同的Reader插件可以定义各自的扩展参数
taskWriterParameter
Task的Writer插件的基本参数 无 List<PluginWriterParameter>类型,
不同的Writer插件可以定义各自的扩展参数
isLeaderTask
Task是否为LeaderTask 无 适用于集群类型的Task leaderTaskId
是FollowerTask时,其LeaderTask的id 无 适用于集群类型的Task version Task配置的版本 无 即整个分组配置的版本 【注】HBASE类型的Task被设为mustRestartWhenPause==true的原因:
> Task被置为PAUSE之后,其ZK节点还存在,因此HBase-Master会继续往Task推送Log,但是Task收到Log之后会阻塞住,HBase-Master等待超时,继续推,再等待超时。从实际测试结果来看,此Task会导致所有Task的同步出现严重延迟。所以必须重启改Task,WorkerTaskReader检测到PAUSE状态,会阻塞在HBase-Reader启动之前,这样Task便不会在ZK注册,避免HBase-Master往Task推送Log,同时降低系统资源的占用。
-
【插件基本参数】
PluginParameter是DataLink所有插件的通用参数,PluginReaderParameter和PluginWriterParameter均继承了PluginParameter。
PluginParameter参数 参数描述 默认值 备注 pluginName
插件的名字 插件自定义 pluginClass
插件的实现类 插件实现类的路径 pluginListenerClass
插件的Listener实现类 插件Listener实现类的路径 supportedSourceTypes
插件支持的数据源类型 不同的插件支持不同的数据源类型 可以为多个,例如Rdbms-Writer支持的类型有
MYSQL、SQLSERVER、POSTGRESQL、SDDLperfStatistic
是否开启性能统计
false
用于监控Reader端和Writer端的数据同步性能 -
【Reader插件基本参数】
PluginReaderParameter是DataLink所有Reader插件的通用参数,具体类型的Reader插件可以自定义参数,但是必须继承PluginReaderParameter。
PluginReaderParameter参数 参数描述 默认值 备注 mediaSourceId
Reader关联的数据源的id
无 冗余存储 dump
是否需要dump fetch到的数据
false
用于确认Reader端获取的Binlog数据,有助于排查问题 ddlSync
是否同步ddl操作
true
主要针对关系型数据库 -
【Writer插件基本参数】
PluginWriterParameter是DataLink所有Writer插件的通用参数,具体类型的Writer插件可以自定义参数,但是必须继承PluginWriterParameter。
PluginWriterParameter参数 参数描述 默认值 备注 poolSize
Writer的线程池大小
5
根据MediaMapping的配置情况设置一个合理的值
dryRun
dryRun==true时不会进行实际写入操作
false
useBatch
是否开启批量写入
true
不同的Writer对批量写入会有不同的定义
batchSize
批量写入时每个批次的大小
50
merging
是否可以对数据进行合并
false
maxRetryTimes
最大重试次数
3
retryMode
重试模式
Always
当Task写入出现异常时,有四种处理方式:
Always,//一直重试
TimesOutDiscard,//超过重试次数后丢弃数据
TimesOutError,//超过重试次数后抛异常,终止任务
NoAndError;//不重试,直接抛异常,终止TasksyncAutoAddColumn
目标端缺少列时,是否自动加列
true
用于源端加字段时,在目标端自动补全相应字段 【注】异常处理方式默认为一直重试,直到恢复成功为止,否则一直抛异常和报警。在一直重试的情况下,多数异常可以自动恢复正常,如mysql到mysql的同步目标表缺字段,或者目标库因自身原因导致连接失败等问题,一些特殊异常需要人工介入处理,比如暂不支持的异构数据源之间的ddl同步。