Tapdata肖贝贝:实时数据引擎系列(一)-新鲜的数据流

2021-08-09 18:22Tapdata 技术合伙人肖贝贝

前言

2006 年诞生的 hadoop 和 她周边的生态, 在过去的这些年里为大数据的火热提供了足够的能量, 十几年过去了, 场景在变化, 技术在演变, 大家对数据的认知已经不再局限于 T+1 与 高吞吐高延迟 为主要特征的上一代框架理念, 在真实的场景里, 实时, 准确, 多变 的数据也发挥着越来越重要的作用

为满足这些新的需求, 各种框架和中间件如雨后春笋般不断涌出

hive 的出现让这头大象有了一个精致但呆滞的面庞, hbase 与 impala 开始尝试将其提速, spark/flink 作为新的流处理框架, 尝试通过实时计算的方式, 将数据更快地输送到业务方面前, presto/dremio 从数据模型入手, 尝试通过虚拟化实时集合来自不同数据源的数据, 变相达到实时的目的, 而各种新型的 OLAP 数据库, 以 clickhouse 为代表, 试图提供近实时的海量数据统计分析方案, 在不同的细分领域, 比如 时序/特征 等领域, 也各自涌现了富有特色的产品出来

111.jpeg


与传统的商业软件发展方式不同, 这个实时数据相关的赛道中, 开源已经逐渐成为不约而同的选择, talk is cheap, show me the code, 大家各凭本事说话

而基础框架就像是心爱的姑娘, 每个人都觉得自己的才是最好的, TAPDATA 在实时数据方案的落地过程中, 也逐渐感觉到了现有的各种技术产品总是在什么地方差点东西, 一个个场景做下来, 一个个客户谈下来, 去实现一个属于自己的流计算框架的想法在脑海中越来越明确。

222.jpeg

在给客户产生直接价值的同时, 把这些经验累积起来, 去做一个可以影响更多人的技术产品, 可能是一件更有意思的事情

为此, 我前几天登录了好久没用的知乎账号, 在这个人均百万的平台下, 开始了这个系列的分享, 去把 TAPDATA 对于实时计算引擎的一些思考整理成文字, 大家看了如果觉得有用, 可以默默收藏, 如果觉得哪里写得不对, 可以评论或者私信我, 如果觉得这个东西方向有问题, 或者说就是一些没有价值的垃圾, 也欢迎提醒我, 我们共同进步

新鲜的, 才是最好的

完成一个实时的数据计算, 第一步是数据来源怎么取得, 基于 JDBC 或者各个数据库驱动的 Query, 可以很方便拿到批量的数据, 但是更实时的数据拿起来, 就不是那么的显而易见和标准化

实时数据的获取, 有一个名词叫 CDC, 全称是 change data capture, 可以想见一个场景如果有一个专门的名词缩写来描述, 一般都不会很简单

CDC 的实现一般有以下几种方式

轮询

最直接的想法是通过 Query, 定期轮询最新的数据, 这么做的好处是几乎全部的数据库都可以直接支持, 开发起来成本也低, 但是问题也很明显, 主要有:

  1. 轮询需要有条件, 这个条件一般是递增字段, 或者时间属性, 对业务上有 入侵

  2. 最小 延时 为轮询间隔

  3. 轮询对数据库造成了额外的查询 压力

  4. 最致命的是, 轮询 无法获取被删除的数据, 也无法得知更新的数据更新了哪些内容, 这些虽然在工程上可以通过各种手段去找一个折衷方案, 但终究会存在各种各样的问题

由于实现容易, 轮询是最早也是目前最广泛被应用于实际场景的方案, 但是也由于缺点很多, 在最近出现的各种计算框架中, 轮询一般作为保底而不是首选方案出现

触发器

不少数据库都有触发器(Trigger) 的设计, 在对数据行列进行读写时, 可以触发一个存储过程, 完成一系列的操作, 基于这个前提, 可以对数据库的写操作编写一个自定义触发器, 完成数据获取, 常见的方案有:

  1. 数据触发保存到单独的一张表, 典型的产品化实现有 SQL Server, 其他的数据库也可以自己实现类似的逻辑, 然后通过轮询这张表获得变更

  2. 数据触发到外部消息队列, 消费者通过消息队列获取数据

  3. 通过 api 直接发送到目标端

333.jpeg

相比轮询, 触发器可以更全面地获取更详细的实时数据, 不过问题也有很多, 主要是的问题有:

  1. 没有标准: 用户需要根据每种数据库的触发器去设计自己的数据获取方案

  2. 通用性不够: 部分数据库没有触发器设计

  3. 影响性能: 触发器在数据写入的时候, 在数据处理逻辑里增加了一段逻辑, 虽然有些触发器的设计是异步的, 不影响延时, 但是因为占用了数据库本身的计算资源, 对吞吐有一些影响

相比轮询, 触发器子方案在延时和数据准确性上有了一些突破, 是一种方案的进步

数据库日志

绝大数据数据库都有各种各样的日志, 其中一种日志用来记录每个操作产生的数据变更, 很多数据库都用这份日志来做多副本同步, 或者用来做数据恢复

而外部服务也可以通过这种方式拿到最新的实时变更, 相比轮询, 通过日志拿到的数据延时一般在亚秒内, 而且对数据库的性能影响非常低, 同时支持的数据库类型相比触发器更多, 只要存在副本, 就存在类似的日志设计

444.jpeg

由于基于数据库日志的方案具备其他两种方案不可比拟的优势, 已经逐渐成为实时计算框架首选的数据获取方案, 但是这种方案由于使用了数据库内部的设计, 开发难度和实现成本是最高的, 这个也限制了方案的使用

消息队列

除此之外, 还有一些来自应用的消息, 或者一些其他的业务自定义数据, 大多数都通过各种消息队列来中转, 典型的有 kafka 和 各种名字的 MQ, 由于更多是业务定制在里面, 这里各家都有各家的场景, 统一来做是比较困难的

数据库日志的难题

在之前提到的各种 CDC 方案中, 数据库日志具有非常明显的结果优势, 但是因为开发困难, 目前应用范围也不是特别广泛, 数据库日志方案的问题主要有以下几种

数据库种类繁多

数据库日志属于数据库内部实现逻辑, 除了特意为兼容去设计之外, 很少有相同或者相似的对外接口, 不管是从 API, 还是日志格式上来说, 基本是各家有各家的做法, 对流计算框架来说, 适配起来要一个个做, 没有捷径可以走, 成本很高

555.jpeg

当前市面上用的比较多的数据库少说有几十种, 如果想覆盖全, 大概有两百种左右的适配工作量, 放眼看去目前并没有哪个开源或者闭源的方案, 在这方面做得比较全面, 除了开源数据库之外, 还有一些商业数据库, 比如 db2, gaussdb, hana, 文档的缺失, 开源方案的缺失, 导致这些方案实现起来很麻烦

不兼容的版本

即使是同一种数据库, 不同的版本之间也往往有不兼容的情况, 极少有数据库可以在一个副本内运行不同的大版本, 比如 oracle 的 8 到 20 之间的版本, mongodb 的 2 到 5 之间的版本, 会存在很多细节和设计的不同

666.jpeg

数据库种类已经很多, 加上版本的不兼容, 要完整处理这些场景, 适配的数量一下子增加到五百种以上, 困难成倍提升

部署架构多种多样

第三种多样性来自于部署架构, 即使是同一个数据库的同一个版本, 也存在各种各样的部署架构, 比如对 Mysql, 有包括 PXC, Myshard, Mycat 在内的各种集群方案, PG 也有 GP, XL, XC, Citus 在内的各种方案, oracle 有 DG, RAC, mongodb 有 副本, 分片

这些多样性与前几种相互组合, 最后的完整的工作量已经达到几乎人力不可为的程度

不标准的格式

如果说多样性只是工作量上的问题, 数据库日志的一些设计, 则从理念上造成了一些困难

由于数据库的日志更多是为了主从同步设计, 主要是保证数据的最终一致, 这个与实时计算的场景需求存在一些差异, 比如我们以 MongoDB 的一个删除日志来做示例

rs0:PRIMARY>usemockswitchedtodbmockrs0:PRIMARY>db.t.insert({a:1,b:1}) WriteResult({"nInserted" : 1 }) rs0:PRIMARY>db.t.remove({}) WriteResult({"nRemoved" : 1 }) rs0:PRIMARY>uselocalswitchedtodblocalrs0:PRIMARY>db.oplog.rs.find({ns:"mock.t"}).pretty() {"op" : "i", "ns" : "mock.t", "ui" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), "o" : { "_id" : ObjectId("610eba317d24f05b0e9fdb3b"), "a" : 1, "b" : 1 },"ts" : Timestamp(1628355121, 2), "t" : NumberLong(1), "wall" : ISODate("2021-08-07T16:52:01.890Z"), "v" : NumberLong(2) }{"op" : "d", "ns" : "mock.t", "ui" : UUID("9bf0197e-0e59-45d6-b5a1-21726c281afd"), "o" : { "_id" : ObjectId("610eba317d24f05b0e9fdb3b") },"ts" : Timestamp(1628355126, 1), "t" : NumberLong(1), "wall" : ISODate("2021-08-07T16:52:06.191Z"), "v" : NumberLong(2) }

插入一条数据, 将其删除, 查询一下数据库日志, 关注删除那条记录, 里面只记录将主键删除的信息, 并无法得到原始字段的值

实时计算一个比较典型的场景是多表 JOIN, 如果我们以 a 为字段进行 JOIN, 来自数据源为 MongoDB 的实时流由于无法拿到被删除的数据中 a 字段的值是多少, 这个会导致实时的 JOIN 无法获取最新的结果

为了实现完整的流计算的需求, 只保证数据同步一致性的日志是不足够的, 我们往往需要完整的数据库变更数据

一些现存的解决方案

虽然数据库日志有着各种各样的问题, 但是由于其过于明显的优势, 越来越成为实时流框架的当红炸子鸡选型, 那上面的问题, 也逐渐有了解法

针对实现工作量的问题, 现在出现了三种流派 :

一个是专精派, 每个方案只解决一个数据库, 或者只专注解决一个数据库, 比如 oracle 的 ogg, mysql 的 canal, 都专注在自己的领域去做到很高的深度

一个是包容万象派, 典型的有 debezium, 通过插件的形式去兼容各个数据库的标准

最后一个是融合派, 他们自己不做实现, 只是将来自一和二的方案再经过一次抽象, 做成融合的一个解决方案(没错的, 说的就是 github.com/ververica/fl)


而针对数据日志不标准的问题, 在技术上一般是通过一个完整数据的缓存层来实现日志的二次加工, 虽然在功能上实现了较好的补充, 但是由于完整保存了数据, 资源消耗也比较高, 而且目前没有看到统一的产品出现, 更多是停留在一些场景里做方案补充

TAPDATA 的解决方案

在我们的方案里, 是按照 包容万象 + 必要的数据缓存 结合的方式去解决的这个问题

相比与 debezium, 我们在性能上做了大量的优化, 在 解析速度上有数倍提升, 同时, 支持的数据库种类已经扩展到 三十种以上

对数据库日志不标准的问题, 也完成了必要的存储抽象, 一个典型的用法如下:

CacheConfigcacheConfig=TapCache.config("source-cache")..setSize("1g").setTtl("3d");DataSource<Record>source=TapSources.mongodb("mongodb-source").setHost("127.0.0.1").setPort(27017).setUser("root").setPassword("xxx").withCdc().formatCdc(cacheConfig).build()

来构建一个完整的实时数据流, 其中流出的数据, 包含了完整的 全量 + 增量数据, 并使用了内存缓存对增量日志做了规整化

对下游来讲, 这就是新鲜的, 实时的数据流了

留一个小问题

细心的朋友已经已经发现了, 这里的数据包含了全量与增量, 但是我们的数据格式, 并没有像 flink 或者 hazelcast jet 这些通用的做法一样, 分成了 BatchSource, Record, ChangeRecord 这些类别, 是出于什么考虑呢?

关注 Tapdata 微信公众号, 带给你最新的实时计算引擎的思考。

Tapdata官网_430x430_微信服务号渠道码.png

推荐阅读

Tapdata 推出“钛计划”公益项目,着力打通数据孤岛助推社会数字化升级

为响应数据要素市场化配置改革政策方向的指引,Tapdata 推出“钛计划”打通数据孤岛公益行动,面向非盈利机构(如各城市政务服务数据管理局、社会公益组织/项目等)以及为社会培养数据技术人才的相关培训机构,提供 Tapdata 实时数据服务平台的特殊免费授权,助推公共领域数据互通、共享与实时应用......

Tapdata 钛铂数据的产品理念

Tapdata 是全球首个基于数据即服务架构理念、面向 TP 场景的企业实时主数据服务平台,可以帮助企业快速实现主数据的统一管理和发布,并为所有数据库、数仓、大数据平台提供最实时的源数据,让数据随时可用。

Tapdata Cloud 是什么?

Tapdata Cloud 是钛铂数据自研的异构数据库实时同步工具 Tapdata Replicator 的云服务版本,现在免费提供所有开发者和企业使用Tapdata Cloud 目前支持 Oracle、MySQL、PostgreSQL、SQL Server、MongoDB、Elasticsearch 之间的数据迁移和同步,未来将陆续上线 DB2、Sybase ASE、Redis、Kafka 等。

什么是数据即服务(Data as a Service)?

数据即服务(DaaS)是一种数据管理策略,旨在利用数据作为业务资产来提高业务创新的敏捷性。它是自 1990 年代互联网高速发展以来越来越受欢迎的“一切皆服务”(XaaS)趋势下关于数据服务化的那一部分,介于 PaaS 和 SaaS 之间。与 SaaS 类似,DaaS 提供了一种方式来管理企业每天生成的大量数据,并在整个业务范围内提供这些有价值的信息,以便于进行数据驱动的商业决策。同时,我们也...

什么是数据虚拟化(Data Virtualization)?

本文将简单易懂地介绍数据虚拟化技术及数据虚拟化软件架构的实现方法,尽量避免教条主义。如需要了解虚拟化定义,可通过wiki 百科了解。先引用一段百度百科的文字来说明数据虚拟化的定义:数据虚拟化(data virtualization)是用来描述所有数据管理方法的涵盖性术语,这些方法允许应用程序检索并管理数据,且不需要数据相关的技术细节,例如它格式化的方式或物理位置所在。正如百科的定义,采用数据...

Tapdata 数据库实时同步的技术要点

Tapdata 专注于实时数据的处理技术,在数据库迁移和同步方面,Tapdata 的表现非常优秀,实时、多元、异构,尤其在关系数据库到非关系数据库之间的双向同步方面,无论是从操作上,还是效率上,都体现了业界领先的水平。本文重点阐述 Tapdata 在数据库实时同步方面的技术要点。

教育中台与第三方系统对接整合数据案例

最近, 南京秦淮区教育中台系统,成功地和市系统进行了一次圆满对接。通过教育中台提供的统一数据能力和低代码API对接能力,实现了对市系统数据的实时推送和拉取,以及各类业务逻辑上的处理。这次对接为南京市中小学生创客大赛的成功举办提供了及时可靠的数据支撑, 体现了中台系统在快速响应业务方面的优越性。

周生生 | 全渠道商品中心建设

通过Tapdata 构建全渠道商品中心,实现: - 支持中国大陆港澳台的上千家门店的生产环境; - 使用JS脚本来进行流处理计算,业务需求从开发到上线过程快至 1 天以内; - 任务配置与执行监测全程可视化操作,不懂技术也能完成操作,极大降低维护成本; - 一套产品可满足不同需求,根据业务需求产出不同类型的业务模型节省大量人力物力。

关系型数据库到MongoDB实时数据同步解决方案

使用MongoDB作为主机下行或新一代数据库的选择,将业务数据从已有主机或Oracle等关系型数据库复制到MongoDB; 使用Tapdata Replicator的CDC技术,实时监听现有业务库的数据变动并同步至MongoDB; 使用Tapdata 的RDM技术将关系型表合并转型到MongoDB JSON数据结构,并保持和源库的高度数据一致; 在MongoDB上进行新业务的开发。

Tapdata肖贝贝:实时数据引擎系列(一)-新鲜的数据流

前言2006 年诞生的 hadoop 和 她周边的生态, 在过去的这些年里为大数据的火热提供了足够的能量, 十几年过去了, 场景在变化, 技术在演变, 大家对数据的认知已经不再局限于 T+1 与 高吞吐高延迟 为主要特征的上一代框架理念, 在真实的场景里, 实时, 准确, 多变 的数据也发挥着越来越重要的作用为满足这些新的需求, 各种框架和中间件如雨后春笋般不断涌出hive 的出现让这头大象...
联络我们:
Email:team@tapdata.io    电话:0755-26656080
深圳市南山区临海大道香江金融中心 2410-13
官方服务号
Tapdata 微信公众号
扫码关注