让建站和SEO变得简单

让不懂建站的用户快速建站,让会建站的提高建站效率!

数仓 | 几种常见的数据同步模式

发布日期:2022-05-15 15:55    点击次数:186


本文转载自微信公众号「大数据时候与数仓 」,作家西贝。转载本文请相干大数据时候与数仓公众号。

写在前边

数据仓库的特质之一是集成,即最初把未经过加工处理的、不同起头的、不同形势的数据同步到ODS层,一般情况下,这些ODS层数据包括日记数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中),将数据采集并导入到数仓中(过去是Hive或者MaxCompute)是曲常广大的一个要津。

那么,该奈何将业务DB数据高效准确地同步到数仓中呢?一般企业会使用两种决议:直连同步与及时增量同步(数据库日记通晓)。其中直连同步的基本思绪是直连数据库进行SELECT,然后将查询的数据存储到土产货文献动作中间存储,临了把文献Load到数仓中。这种模式十分的约略约略,可是跟着业务的发展,会碰到一些瓶颈,具体见下文分析。

为了不休这些问题,一般会使用及时增量的模式进行数据同步,其基应承趣是CDC (Change Data Capture) + Merge,即及时Binlog采集 + 离线处理Binlog收复业务数据这么一套不休决议。

本文主要包括以下本色,但愿对你有所匡助

常见数据同步模式 流式数据集成 数据同步的模式 直连同步

直连同步是指通过界说好的表率接口API和基于动态聚积库的模式径直聚积业务库,比如ODBC/JDBC等礼貌了长入的模范接口,不同的数据库基于这套模范提供表率的运转,从而赞助澈底调换的函数调用和SQL竣事。比如时常使用的Sqoop即是选拔这种模式进行批量数据同步的。

直连同步的模式设立十分约略,很容易上手操作,相比合适操作型业务系统的数据同步,可是会存在以下问题:

数据同步时候:跟着业务畛域的增长,数据同步破耗的时候会越来越长,无法得志下贱数仓出产的时候条目。 性能瓶颈:直连数据库查询数据,对数据库影响十分大,容易酿成慢查询,若是业务库莫得选拔主备计策,则会影响业务线上的正便劳动,若是选拔了主备计策,诚然不错幸免对业务系统的性能影响,但当数据量较大时,性能还是会很差。

日记通晓

所谓日记通晓,即通晓数据库的变更日记,比如MySQL的Binlog日记,Oracle的存档日记文献。通过读取这些日记信息,采集变化的数据并将其通晓到运筹帷幄存储中即可完成数据的及时同步。这种读操作是在操作系统层面完成的,不需要通过数据库,因此不会给源数据库带来性能上的瓶颈。

数据库日记通晓的同步模式不错竣事及时与准及时的同步,蔓延不错截止在毫秒级别的,其最大的上风即是性能好、后果高,不会对源数据库酿成影响,现在,从业务系统到数据仓库中的及时增量同步,等闲选拔这种模式。天然,这种模式也会存在一些问题,比如批量补数时酿成无数数据更新,日记通晓会处理较慢,酿成数据蔓延。除此除外,这种模式相比复杂,参加也较大,因为需要一个及时的抽取系统去抽取并通晓日记,下文会对此进行详备讲授。

如上图所示架构,在直连同步基础之上加多了流式同步的链路,经过流式运筹帷幄引擎把相应的 Binlog 采集到 Kafka,同期会经过一个 Kafka 2Hive 的模范把它导入到原始数据,再经过一层 Merge,产出下贱需要的 ODS 数据。

上述的数据集成模式上风是曲常昭彰的,把数据传输的时候放到了 T+0 这一天去做,在第二天的时候只需要去做一次 merge 就不错了。十分从简时候和运筹帷幄资源。

流式数据集成竣事 竣事思绪

最初,选拔Flink追究把Kafka上的Binlog数据拉取到HDFS上,生成增量表。

然后,对每张ODS表,最初需要一次性制作快照(Snapshot),把MySQL里的存量数据读取到Hive上,这一过程底层选拔直连MySQL去Select数据的模式,不错使用Sqoop进行一次性全量导入,生成一张全量表。

临了,对每张ODS表,每天基于存量数据和本日增量产生的Binlog做Merge,从而收复出业务数据。

Binlog是流式产生的,通过对Binlog的及时采集,把部分数据处理需求由每天一次的批处理分担到及时流上。不管从性能上如故对MySQL的看望压力上,都会有昭彰地改善。Binlog本人记载了数据变更的类型(Insert/Update/Delete),通过一些语义方面的处理,澈底不详做到精确的数据收复。

对于Binlog通晓部分,不错使用canal器用,采集到Kafka之后,不错使用Flink通晓kafka数据并写入到HDFS上,通晓kafka的数据不错使用Flink的DataStreamAPI,也不错使用FlinkSQL的canal-json数据源神气进行通晓,使用FlinkSQL相对来说是相比约略的。底下是canal-json神气的kafka数据源。

CREATE TABLE region (   id BIGINT,   region_name STRING ) WITH (  'connector' = 'kafka',  'topic' = 'mydw.base_region',  'properties.bootstrap.servers' = 'kms-3:9092',  'properties.group.id' = 'testGroup',  'format' = 'canal-json' ,  'scan.startup.mode' = 'earliest-offset'  ); 

数据通晓完成之后,底下的即是吞并收复完满数据的过程,对于吞并收复数据,一种相比常见的模式即是全外聚积(FULL OUTER JOIN)。具体如下:

生成增量表与全量表的Merge任务,本日的增量数据与昨天的全量数据进行全外聚积,该Merge任务的基本逻辑是:

INSERT OVERWRITE TABLE user_order PARTITION(ds='20211012') SELECT  CASE    WHEN n.id IS NULL THEN o.id                  ELSE n.id          END         ,CASE    WHEN n.id IS NULL THEN o.create_time                   ELSE n.create_time           END         ,CASE    WHEN n.id IS NULL THEN o.modified_time                  ELSE n.modified_time           END         ,CASE    WHEN n.id IS NULL THEN o.user_id                   ELSE n.user_id           END                  ,CASE    WHEN n.id IS NULL THEN o.sku_code                   ELSE n.sku_code           END         ,CASE    WHEN n.id IS NULL THEN o.pay_fee                  ELSE n.pay_fee           END FROM    (             SELECT  *             FROM    user_order_delta             WHERE   ds = '20211012'             AND     id IS NOT NULL             AND     user_id IS NOT NULL         ) n FULL OUTER JOIN (-- 全外聚积进行数据merge                     SELECT  *                     FROM    user_order                     WHERE   ds = '20211011'                     AND     id IS NOT NULL                     AND     user_id IS NOT NULL                                  ) o ON      o.id = n.id AND     o.user_id = n.user_id ; 

经过上述才能,即可将数据收复完满。

回顾

本文最初先容了数据仓库构建ODS层常见的数据同步模式,并对每种模式进行了讲授,给出了相对应的暗意图。接着给出了CDC+Merge的数据同步决议。值得着重的是,Flink1.11引入了CDC的connector,比如MySQL CDC和Postgres CDC,同期对Kafka的Connector赞助canal-json和debezium-json以及changelog-json的format,通过这种模式不错很约略地拿获变化的数据,大大简化了数据处理的经由和数据同步的复杂度。

 






Powered by 快3app @2013-2022 RSS地图 HTML地图

Copyright 站群 © 2013-2021 365建站器 版权所有

栏目分类

热点资讯

相关资讯