Tapdata肖贝贝:实时数据引擎系列(二)-批流一体的数据

2021-08-27 15:00Tapdata 技术合伙人肖贝贝

前言


上文我们提到了 通过数据库日志 获取新鲜的数据, 在对数据的认识里, TAPDATA 引擎的设计和一些其他的流框架不太一样, 他的对象抽象里没有批数据和流数据的区分, 数据只有一种, 被命名为 Record, 数据来源只有一种, 命名为 DataSource, 而数据流阶段也只有一种, 被命名为 DataStage


在抽象上数据去除了批与流的区别, 在全部的计算流程里也不会有区别, 基于这个理念设计的框架才是真正批流一体的框架


所以问题来了, 应该设计一个什么样的数据结构, 来表达批流一体的数据呢?


设计一个结构


首先要解决的是批数据与流数据的一致性表达, 我们把已经存在的批数据认为是新写入的流数据, 就完成了概念上的统一


而流数据包含了包含了 写入, 更新, 与 删除, 是批数据的超集


接下来从 0 开始, 一步步来看一下这份数据结构里应该包含哪些内容


先给出一份示例结构, 然后对照看下面的解释会清晰很多


{

  "op": "u", // 一个更新操作

  "ts": 1465491461815, // 操作时间

  "offset": "123456", // 操作的位移

  "before": {          // 更新之前的值

    "_id": 12345,

    "uid": 12345,

    "name": "tapdata",

  },

  "after": {          // 更新之后的值

    "_id": 12345,

    "uid": 12345,

    "name": "tap",

    "nick": "dfs",     

  },

  "patch": {         // 更新操作的内容

    "$set": {

      "name": "tap",

      "nick": "dfs",

    }

  },

  "key": {         // 记录唯一标识条件, 如果没有, 可以为 {}

    "_id": 12345,

  },

  "source": {      // 数据源的属性

    "connector": "mongodb",

    "name": "fulfillment",

    "snapshot": true,

    "db": "system",

    "table": "user",

  }

}


新鲜的值


最显而易见需要包含的内容, 对于写入, 指的是写入的值, 对于更新, 指的是更新之后的值, 对于删除, 用 {} 表达


这里的值的 key 用 after 来表示


陈旧的值


指的是变更之前的数据, 对于数据库的主从同步来说, 出于数据一致性的目的, 陈旧的值并不重要, 但是在进行流计算的时候, 由于需要进行增量实时计算, 变更前的值变得不可或缺


举例说明一下, 考虑我们对于一份数据的某个字段进行一个 求和 操作, 基于流计算的设计, 求和必然是可以增量计算, 而不是每次更新对全部的存量数据做一次计算, 我们只需要每次将字段变化的值进行相加, 就能得到完整的实时的结果, 而这个过程中变化的值, 需要用新鲜的值减去陈旧的值


这里的值我们用 before 表示


操作类型


对应于 写入/更新/删除 的标记


从某种意义上来说, 这个标记并不是必须的, 我们可以从前面两个新旧值 a b 得到, 有 a 无 b 的就是写入, 有 a 有 b 的就是更新, 无 a 有 b 的就是删除, 但是冗余存储一份会让数据在感官上非常清晰


操作类型的字段用 op 来表示


操作内容


用来描述这次操作具体做了什么变更, 更多是用于 更新 操作, 在一些场景, 比如数据的实时同步上, 可以减少一些额外的负担


这个值可以通过 新旧值 的差获取, 单独记录也是为了提升记录本身的可读性


操作内容的字段用 patch 里表示


唯一标记


用来描述操作对应的是哪条记录, 可以用来对数据进行精准识别


大多数情况下, 这里的标记是主键, 在没有主键的情况下, 可以用唯一索引替代, 如果都没有, 标记需要退化为 全部的陈旧的值


唯一标记用 key 来表示


结构


当前数据 schema 的描述


关于结构, 有两种比较通用的做法, 一种是将结构与数据放在一起, 这样做的好处是每个内容都是自解析的, 不需要额外存储结构, 不好处是额外占用了大量的存储空间, 因为相比数据的变更, 结构的变更往往是少量的, 每个数据都带结构存储对资源是一种浪费


另一种设计是将结构, 与结构的变更单独设计一个事件进行通知, 这样的设计节省了资源, 但是在进行数据实时处理的过程中, 框架需要保证每条数据需要与数据本身的结构一一对应, 带来了额外的工作量


在这里 TAPDATA 的选择还是从场景出发, 选择将结构变更单独存放, 成为 DDL 事件, 不在数据流里展示, 结构的结构与数据的结构完全一致, 只是在 kv 的内容上, 变成对字段的描述


用 type, 值为 ddl, 或者是 dml 里表示是数据描述还是结构描述


时间


操作发生的时间, 由于对人来说, 时间是非常直观的属性, 在回退消费和定位数据点等场景下非常方便, 我们用 ts 来表示, 一般的精度在 ms 级别


位移


与时间类似, 记录操作发生的序号, 时间的好处在于人可读, 不好处在于不精确, 一般时间的精度在 ms 级别, 而每 ms 可以发生很多事件, 为了精确定位一个事件, 我们需要一个唯一位移, 这里用 offset 表示, 一个确定的数据源, 和一个确定的位移, 可以表达一个确定的数据流


来源


用来描述这个数据所属的数据源的信息, 类型, 名字, 库/表, 是发生在全量阶段, 还是增量阶段(稍候我会解释为什么需要一个这样的区分), 我们作为数据源的对象, 有时候需要通过一个操作获取比较多空间的数据, 在这里增加一个属性区分, 也有利于后续的数据处理


这里用 source 字段表示, 大概的属性有:


"source": {   

    "connector": "mongodb",

    "name": "fulfillment",

    "ts": 1558965508000,

    "snapshot": false,

    "db": "inventory",

    "table": "customers",

}


实现上的问题


实时数据的结构设计可以做得比较完善, 但是实现起来会有各种各样的问题, 之前讲过一些, 这里从更细节的角度做一些总结


值的缺失


将结构按操作分组, 对完整的流计算框架的需求来说, 写入操作应该包含 after 值, 更新操作应该包含 before/after 值, 删除操作应该包含 before 值


但是由于数据库的日志设计是为同步准备的, 只需要保证现有日志应用之后, 目标的数据可以达到一致的状态就可以, 不一定会包含全部的字段, 而经过流计算之后, 完整的数据不一定会被保留, 这个会造成引擎本身无法获取完整的数据流


举 MongoDB 的例子, 知名的 CDC 框架 debezium 是如此解释的:


In MongoDB’s oplog,updateevents do not contain thebeforeorafterstates of the changed document. Consequently, it is not possible for a Debezium connector to provide this information. However, a Debezium connector provides a document’s starting state increateandreadevents. Downstream consumers of the stream can reconstruct document state by keeping the latest state for each document and comparing the state in a new event with the saved state. Debezium connector’s are not able to keep this state.


由于数据库本身的日志里不包含这些关键信息, 对于 日志 的消费方来说, 想要补全是很困难的, debezium 的原文是 it is not possible for a Debezium connector to provide this information


抛开 CDC 框架的束缚, 从流计算框架的角度来看, 只要框架在同步的时候, 能把之前的值保存下来, 在发生更新的时候把数据吐出去, 就能得到完整的前后值了


不一致的数据类型


数据获取时候, 需要在平台进行各种处理, 而不同的数据源子数据类型上有各自的标准, 在进行涉及多个源数据交互的时候, 会遇到无法识别的问题


比如来自 Oracle 的 9 位精度时间, 和 来自 MongoDB 的 3 位精度时间都在表达时间, 但是两者同步做 JOIN 的时候, 直接比对会出现永远无法匹配的情况


因此对于数据, 实现一个完整一致的数据类型, 对于后续的流处理是非常关键的


不一致的结构类型


不同的数据库结构差异可能非常大, 举几个例子:


命名空间层级: 部分数据库只有单层空间, 比如 ES 的索引, 部分数据库可能存在三层空间, 比如 Oracle 的库, 表, schema

表定义: 部分数据库是强结构表, 比如大部分的 SQL 数据库, 部分数据库是动态弱结构表, 比如 ES 的动态 mapping, 部分数据库无结构, 比如 MongoDB, 部分数据库是 KV, 比如 Redis

索引结构差异大: 有些数据库只支持 B 树索引, 有些支持 地理位置, 全文, 或者图索引

...


对于数据结构的不同, 实现完全一样的抽象是非常困难的, 但是实现一个边界清晰的支持范围是可行的


TAPDATA 的解决方案


针对批流一体数据格式, TAPDATA 在实现数据流出的时候, 已经针对不同的数据源完成了统一规整, 对于 MYSQL 类似的数据库, 由于 ROW LOG 包含了完整的字段, 可以直接转换解析, 对于其他的不包含完整数据的数据库, 进行了 内存+外存缓存 构建完整数据流的方案, 简单配置, 规整全自动


针对数据类型的问题, TAPDATA 的框架抽象了多种平台标准的数据类型, 数据源在 读/写 数据时均对此完成了适配, 并且保留了通用数据类型的扩展接口, 解决了异构数据类型的相互通信问题


在结构变更上, 同样实现了结构变更的统一转换, 比如针对于 MYSQL 的删除字段, 在 MongoDB 里会转换为对全表的 UNSET 字段操作, 解决了异构数据源之间的 DDL 操作转换问题


在完成这些标准化之后, 来自数十个数据库的数据就变成了统一规整的流, 四四方方排好队, 等待引擎下一步的解析与计算


留一个小问题


流计算引擎的实时计算, 一般是计算的哪些内容呢?


关注 Tapdata 微信公众号, 带给你最新的实时计算引擎的思考。本文作者为tapdata 技术合伙人 肖贝贝,更多技术博客:https://tapdata.net/blog.html

Tapdata官网_430x430_微信服务号渠道码.png


推荐阅读

Tapdata 推出“钛计划”公益项目,着力打通数据孤岛助推社会数字化升级

为响应数据要素市场化配置改革政策方向的指引,Tapdata 推出“钛计划”打通数据孤岛公益行动,面向非盈利机构(如各城市政务服务数据管理局、社会公益组织/项目等)以及为社会培养数据技术人才的相关培训机构,提供 Tapdata 实时数据服务平台的特殊免费授权,助推公共领域数据互通、共享与实时应用......

Tapdata 钛铂数据的产品理念

Tapdata 是全球首个基于数据即服务架构理念、面向 TP 场景的企业实时主数据服务平台,可以帮助企业快速实现主数据的统一管理和发布,并为所有数据库、数仓、大数据平台提供最实时的源数据,让数据随时可用。

Tapdata Cloud 是什么?

Tapdata Cloud 是钛铂数据自研的异构数据库实时同步工具 Tapdata Replicator 的云服务版本,现在免费提供所有开发者和企业使用Tapdata Cloud 目前支持 Oracle、MySQL、PostgreSQL、SQL Server、MongoDB、Elasticsearch 之间的数据迁移和同步,未来将陆续上线 DB2、Sybase ASE、Redis、Kafka 等。

什么是数据即服务(Data as a Service)?

数据即服务(DaaS)是一种数据管理策略,旨在利用数据作为业务资产来提高业务创新的敏捷性。它是自 1990 年代互联网高速发展以来越来越受欢迎的“一切皆服务”(XaaS)趋势下关于数据服务化的那一部分,介于 PaaS 和 SaaS 之间。与 SaaS 类似,DaaS 提供了一种方式来管理企业每天生成的大量数据,并在整个业务范围内提供这些有价值的信息,以便于进行数据驱动的商业决策。同时,我们也...

什么是数据虚拟化(Data Virtualization)?

本文将简单易懂地介绍数据虚拟化技术及数据虚拟化软件架构的实现方法,尽量避免教条主义。如需要了解虚拟化定义,可通过wiki 百科了解。先引用一段百度百科的文字来说明数据虚拟化的定义:数据虚拟化(data virtualization)是用来描述所有数据管理方法的涵盖性术语,这些方法允许应用程序检索并管理数据,且不需要数据相关的技术细节,例如它格式化的方式或物理位置所在。正如百科的定义,采用数据...

Tapdata 数据库实时同步的技术要点

Tapdata 专注于实时数据的处理技术,在数据库迁移和同步方面,Tapdata 的表现非常优秀,实时、多元、异构,尤其在关系数据库到非关系数据库之间的双向同步方面,无论是从操作上,还是效率上,都体现了业界领先的水平。本文重点阐述 Tapdata 在数据库实时同步方面的技术要点。

教育中台与第三方系统对接整合数据案例

最近, 南京秦淮区教育中台系统,成功地和市系统进行了一次圆满对接。通过教育中台提供的统一数据能力和低代码API对接能力,实现了对市系统数据的实时推送和拉取,以及各类业务逻辑上的处理。这次对接为南京市中小学生创客大赛的成功举办提供了及时可靠的数据支撑, 体现了中台系统在快速响应业务方面的优越性。

周生生 | 全渠道商品中心建设

通过Tapdata 构建全渠道商品中心,实现: - 支持中国大陆港澳台的上千家门店的生产环境; - 使用JS脚本来进行流处理计算,业务需求从开发到上线过程快至 1 天以内; - 任务配置与执行监测全程可视化操作,不懂技术也能完成操作,极大降低维护成本; - 一套产品可满足不同需求,根据业务需求产出不同类型的业务模型节省大量人力物力。

关系型数据库到MongoDB实时数据同步解决方案

使用MongoDB作为主机下行或新一代数据库的选择,将业务数据从已有主机或Oracle等关系型数据库复制到MongoDB; 使用Tapdata Replicator的CDC技术,实时监听现有业务库的数据变动并同步至MongoDB; 使用Tapdata 的RDM技术将关系型表合并转型到MongoDB JSON数据结构,并保持和源库的高度数据一致; 在MongoDB上进行新业务的开发。

Tapdata肖贝贝:实时数据引擎系列(一)-新鲜的数据流

前言2006 年诞生的 hadoop 和 她周边的生态, 在过去的这些年里为大数据的火热提供了足够的能量, 十几年过去了, 场景在变化, 技术在演变, 大家对数据的认知已经不再局限于 T+1 与 高吞吐高延迟 为主要特征的上一代框架理念, 在真实的场景里, 实时, 准确, 多变 的数据也发挥着越来越重要的作用为满足这些新的需求, 各种框架和中间件如雨后春笋般不断涌出hive 的出现让这头大象...
联络我们:
Email:team@tapdata.io    电话:0755-26656080
深圳市南山区临海大道香江金融中心 2410-13
官方服务号
Tapdata 微信公众号
扫码关注