TapFlow 是 TapData Live Data Platform 最新推出的一个面向编程的 API 框架。TapFlow 可以让开发者和数据工程师用一个简单易用而又强大的编程语言来进行数据管道和数据模型的开发工作。
这次的发布包括一个 Python 的 SDK。TapFlow 需要连接一个 TapData Cluster (可以是企业版,云版或社区版)才能运行。
为何需要编程式的方式?
TapData 目前提供的是一个以可视化拖拉拽方式来构建数据管道,数据开发的UI界面。 UI界面在易使用和易运维上有很大的优势,但是在不少地方也有一些局限性。TapFlow 希望能够对这些局限性提供一个有效的补充。
1. 满足开发者的深度定制需求,复杂处理逻辑需要 JS 和 Python 代码
在一些复杂脱敏逻辑,或者定制化的数据字段值标准化的时候需要使用不少代码。这个时候如果有频繁的改动,UI 界面的代码操作就不会很方便
2. 更好地支持 CI/CD 和自动化 ,
团队需要定期部署和更新多个环境中的数据集成任务(如开发、测试、生产)。使用编程式 API,可以通过脚本自动完成任务的生成和迁移,轻松与 Git 等版本管理工具集成,而无需人工在 GUI 中操作。
3. 降低复杂场景下的操作成本
经常有用户需要同步 100+ 个数据库表,且每张表有不同的字段映射规则。在 GUI 中手动设置这些规则不仅繁琐,而且易出错,而通过编程式 API 可以实现规则的自动化生成。
4. 面向开发者与技术团队的友好性,更容易集成到工作流
提供面向开发者的工具,更贴近他们的工作方式,更容易和其他业务模块进行集成
开放代码能力,产品更容易扩展,如增加企业内部的可复用组件等。
我们假设我们有一个 CRM 应用运行在 MySQL 数据库上,下面是这个 MySQL 库的 schema:
由于查询性能的考量和一些特定的宽表需求,我们需要将订单数据复制到 MongoDB 里面来发布一个订单查询 API 。我们会用 TapFlow 把数据从 MySQL 里面复制到 MongoDB, 过程中对数据做一些加工处理和合并的操作。
# pip3 install tapflow
TapFlow 的Python SDK 支持两种模式: 以程序方式执行,或在交互模式下运行。接下来我们以交互模式下来展现如何使用 TapFlow API。
# tap
--- Please configure TapData cluster ---
Tap Flow requires TapData Live Data Platform(LDP) cluster to run.
If you would like to use with TapData Enterprise or TapData Community, type L to continue.
If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue.
Please type L or C (L/[C]):
在你输入相应的 TapData 连接信息或者密钥后,就可以开始在交互模式下使用。
更多安装信息可以参见 Quick Start 文档: https://docs.tapdata.net/tapflow/quick-start
tap> mysql_conn = DataSource('mysql', 'MySQL_ECommerce',
{
'database': 'ECommerceData',
'port': 3306,
'host': 'demo.tapdata.io',
'username': 'demouser',
'password': 'demopass'
})
.type('source')
.save()
tap> mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
{
"uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
})
.type("target")
.save()
创建一个简单的订单表的复制任务(Data Flow)
目的:将 MySQL 的订单数据同步到 MongoDB 中做统一订单查询
tap> myflow = Flow("simple_data_replication_flow") \
.read_from("MySQL_ECommerce.ecom_orders") \
.write_to("MongoDB_ECommerce.orders_replica_collection") \
.save()
tap> myFlow.start()
目标表不需要一些字段
tap> myflow = Flow("order_fields_exclude_flow") \
.read_from("MySQL_ECommerce.ecom_orders") \
.exclude("order_delivered_carrier_date", "order_delivered_customer_date") \
.write_to("MongoDB_ECommerce.orders_renamed_fields_collection") \
.save()
tap> myFlow.start()
字段改名
tap> myflow = Flow("order_fields_rename_flow") \
.read_from("MySQL_ECommerce.ecom_orders") \
.rename_fields({'order_purchase_timestamp': 'purchase_timestamp'}) \
.write_to("MongoDB_ECommerce.orders_exclude_fields_collection") \
.save()
tap> myFlow.start()
# 标准化 order_status 的字段值
tap> def pyfunc(record):
if not record['order_status'] :
record['order_status'] = 'invalid'
if record['order_status'] == 'SendError' :
record['order_status'] = 'undeliverable'
return record # 返回处理后的记录
# 创建数据流任务,应用 Python 函数,并将结果写入目标数据库
tap> myflow = Flow("orders_complex_data_processing_flow") \
.read_from("MySQL_ECommerce.ecom_orders") \
.py(pyfunc)
.write_to("MongoDB_ECommerce.orders_processed_collection") \
.save()
tap> myFlow.start()
tap> myflow = Flow("orders_lookup_flow") \
.read_from("MySQL_ECommerce.ecom_orders") \
.lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]])
.write_to("MongoDB_ECommerce.wide_orders_collection") \
.save()
tap> myFlow.start()
tap> use MongoDB_ECommerce
datasource switch to: MongoDB_ECommerce
tap> peek wide_orders_collection
table wide_orders_collection has 12641 records
{
'order_status': 'unavailable',
'customer_state': 'SP',
'customer_unique_id': 'a77550dd00887c5bb24100ccbd08cbe9',
'order_estimated_delivery_date': '2017-11-03 00:00:00',
'_id': '676154705338b293ccd85c80',
'customer_id': '3a92efdb6e6163dc1734d44f2f5f6d04',
'order_id': '0010dedd556712d7bb69a19cb7bbd37a',
'purchase_timestamp': '2017-10-21 19:32:06',
'order_approved_at': '2017-10-24 03:25:32',
'customer_city': 'sao paulo',
'customer_zip_code_prefix': '04851'
}
{
'order_status': 'shipped',
'customer_state': 'BA',
'customer_unique_id': '205d5aa158338f2b733a07326aae8c87',
'order_estimated_delivery_date': '2018-04-16 00:00:00',
'_id': '676154705338b293ccd85c82',
'customer_id': '7fa80efb1ef15ca4104627910c29791c',
'order_id': '002f19a65a2ddd70a090297872e6d64e',
'purchase_timestamp': '2018-03-21 13:05:30',
'order_approved_at': '2018-03-21 13:15:27',
'customer_city': 'camacari',
'customer_zip_code_prefix': '42804'
}
{
'order_status': 'canceled',
'customer_state': 'MG',
'customer_unique_id': 'ec979208947bbba310f2ad8e50963725',
'order_estimated_delivery_date': '2018-08-29 00:00:00',
'_id': '676154705338b293ccd85c84',
'customer_id': '0dad07848c618cc5a4679a1bfe1db8d2',
'order_id': '00310b0c75bb13015ec4d82d341865a4',
'purchase_timestamp': '2018-08-15 14:29:08',
'order_approved_at': '2018-08-15 15:04:25',
'customer_city': 'belo horizonte',
'customer_zip_code_prefix': '31160'
}
{
'order_status': 'unavailable',
'customer_state': 'BA',
'customer_unique_id': 'cd88b962adbc4b174353217f99dc6174',
'order_estimated_delivery_date': '2018-01-08 00:00:00',
'_id': '676154705338b293ccd85c85',
'customer_id': '3d2f26eab3f79dd1fe9977f615e70c2f',
'order_id': '00a500bc03bc4ec968e574c2553bed4b',
'purchase_timestamp': '2017-11-23 10:53:01',
'order_approved_at': '2017-11-25 10:54:38',
'customer_city': 'salvador',
'customer_zip_code_prefix': '41180'
}
{
'order_status': 'shipped',
'customer_state': 'BA',
'customer_unique_id': '60ec651482858c327c177cf9360cc0a2',
'order_estimated_delivery_date': '2018-09-18 00:00:00',
'_id': '676154705338b293ccd85c87',
'customer_id': '7a399396442d5601cbedfbd0a3cf1da4',
'order_id': '00a99c50fdff7e36262caba33821875a',
'purchase_timestamp': '2018-08-17 16:25:04',
'order_approved_at': '2018-08-17 16:35:18',
'customer_city': 'teixeira de freitas',
'customer_zip_code_prefix': '45990'
}
更多的交互式命令参考可以看这个文档:https://docs.tapdata.net/tapflow/tapcli-reference
# cat order_mview.py
from tapflow.lib import *
from tapflow.cli.cli import init
mysql_conn = DataSource('mysql', 'MySQL_ECommerce',
{
'database': 'ECommerceData',
'port': 3306,
'host': 'demo.tapdata.io',
'username': 'demouser',
'password': 'demopass'
})
.type('source')
.save()
mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
{
"uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
})
.type("target")
.save()
def pyfunc(record):
if not record['order_status'] :
record['order_status'] = 'invalid'
if record['order_status'] == 'SendError' :
record['order_status'] = 'undeliverable'
return record
myflow = Flow("mysql_order_flow") \
.read_from("MySQL_ECommerce.ecom_orders") \
.exclude("order_delivered_carrier_date", "order_delivered_customer_date") \
.rename_fields({'order_purchase_timestamp': 'purchase_timestamp'}) \
.py(pyfunc)
.lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]])
.write_to("MongoDB_ECommerce.wide_orders_collection") \
.save()
myflow.start() # Start the flow
# python order_mview.py
Lookup 目前只能用 MongoDB 作为目标
我们将持续改进 TapFlow 的能力,以下是一些中长期的路线图功能:
Lookup 支持除了 MongoDB 以外的更多目标库
支持项目工程 Project
支持 inner join
提供 Java SDK / RESTful API
如果你还不是很熟悉,TapData 是为企业内部的实时数据集成和实时数据服务专门打造的一个实时数据平台。它具有以下特点:
专为实时数据管道而设计的框架: 基于 CDC 技术,数据采集及处理延迟可在亚秒级
高性能:单节点每秒可处理数十万条记录
内置丰富的 CDC(Change Data Capture)连接器:快速对接 Oracle, DB2, Sybase, MySQL, PostgreSQL, MSSQL 等
丰富的实时数据处理功能:过滤,改名,增删字段
多表关联: 构建持续更新的物化视图,数据汇聚
UDF: Javascript 或 Python 自定义function,处理复杂逻辑
至少一次和精确一次的一致性保障
完善的数据校验: 全量/增量校验,哈希校验,二次校验等
国产数据库支持:Dameng, Kingbase, GaussDB, OceanBase, GBase, VastBase
Kafka 支持:作为生产者把数据库事件直接推送给Kafka,或从Kafka 队列消费事件
支持私有化和全托管:可以开源版线下部署,或直接使用TapData Cloud的全托管服务
TapData 的常见应用场景包括:
替代 Oracle Golden Gate 进行数据库间实时同步
传统数据库复制工具往往成本高昂且复杂,TapFlow 提供了一种轻量级且易用的替代方案,帮助您在不同的数据库之间高效同步数据,支持从 MySQL 到 PostgreSQL,甚至是 NoSQL 数据库。
替代 Kafka 构建实时数据管道
对于那些需要实时传输数据的场景,TapFlow 是一个强有力的替代方案。它无需部署复杂的 Kafka 集群,而是直接通过轻量化的方式提供同等甚至更高效的数据管道构建能力。
创建持续刷新的物化视图,用来做查询加速,读写分离等
当业务需要实时查询最新的数据结果时,物化视图是一种高效的方式。TapFlow 可以持续刷新物化视图,保证数据的实时性,从而支持实时分析与决策。
数据实时入仓入湖
现代数据分析的趋势是实时化,TapFlow 可以将数据实时写入数据仓库或数据湖(如Apache Doris, Clickhouse, 或者云数仓如 Ali Cloud ADB, SelectDB, BigQuery)
通用流式 ETL 数据处理
TapFlow 同样支持复杂的 ETL(抽取、转换、加载)任务,借助 Python 的灵活性和内置的处理能力,开发者可以轻松处理复杂的数据转换需求。