Tapdata 技术博客
Tapdata 技术博客

Flink vs Jet: 性能压测对比(一)

2021-08-09 18:17肖贝贝

前言

flink 是一个非常火热的流计算框架, hazelcast-jet 是 TAPDATA 在进行流计算时采用的一个嵌入式的流计算框架

看到 hazelcast-jet 官方自己做了一些与 flink 的性能压测对比, 这边比较好奇他们的性能差异是否真有那么大, 会尝试做一系列性能测试, 来给大家一个参考

环境准备

Flink

Flink 的任务运行属于外部任务, 在进行任务编写之前, 需要启动 flink 集群, 部署过程如下:

  1. 在运行环境自行安装 JAVA, 并以 java --version 有输出为标记

  2. 下载 flink 最新版, 下载地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz

  3. 解压, 并执行 bash ./bin/start-cluster.sh

  4. 执行 jps, 能看到一个 StandaloneSessionClusterEntrypoint 的进程, 表明 flink 启动成功

  5. 将 bin 下的 flink 二进制, 放到系统可执行目录下, 比如 /usr/bin/ 下面

Jet

Jet 我们这里使用集成模式运行, 对 Jet 进行编译:

  1. 需要自行安装 java 环境, 与 maven 包

  2. Clone 代码库, git clone https://github.com/hazelcast/hazelcast-jet.git

  3. 运行 mvn build -DskipTests, 等待编译完成, 成品在 hazelcast-jet/hazelcast-jet-all/target/hazelcast-jet-4.6-SNAPSHOT.jar 路径下, 后面运行任务的时候需要用

运行任务准备

文件准备

  1. 文件准备: 我们以经典 wordcount 做测试用例, 首先准备文本文件, 我们以 hazelcast-jet/examples/wordcount/books/shakespeare-complete-works.txt 文件做基础文件, 将其重复一百次, 构成大概 500M 的文本文件, 将 500M 的文本文件放到 /tmp/books 目录下

Flink 准备

flink 使用默认为 wordcount 测试用例, 略微修改将并发修改为 4, 与 jet 保持一致, 具体操作是:

  1. 下载 flink 源代码, 地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-src.tg

  2. 解压, 进入目录: flink-examples/flink-examples-batch

  3. 修改文件 src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java, 将内容替换为:

/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.   See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.   The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.   You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */packageorg.apache.flink.examples.java.wordcount;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.utils.MultipleParameterTool;importorg.apache.flink.examples.java.wordcount.util.WordCountData;importorg.apache.flink.util.Collector;importorg.apache.flink.util.Preconditions;/**  * Implements the "WordCount" program that computes a simple word occurrence histogram over text  * files.  *  * <p>The input is a plain text file with lines separated by newline characters.  *  * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>  * If no parameters are provided, the program is run with default data from {@link WordCountData}.  *  * <p>This example shows how to:  *  * <ul>  *   <li>write a simple Flink program.  *   <li>use Tuple data types.  *   <li>write and use user-defined functions.  * </ul>  */publicclassWordCount{// ************************************************************************* //     PROGRAM // ************************************************************************* publicstaticvoidmain(String[]args)throwsException{finalMultipleParameterToolparams=MultipleParameterTool.fromArgs(args);// set up the execution environment finalExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interface env.getConfig().setGlobalJobParameters(params);// get input data DataSet<String>text=null;if(params.has("input")){// union all the inputs from text files for(Stringinput:params.getMultiParameterRequired("input")){if(text==null){text=env.readTextFile(input);}else{text=text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text,"Input DataSet should not be null.");}else{// get default test text data System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text=WordCountData.getDefaultTextLineDataSet(env);}DataSet<Tuple2<String,Integer>>counts=// split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(newTokenizer()).setParallelism(4)// group by the tuple field "0" and sum up tuple field "1" .groupBy(0).sum(1).setParallelism(4);// emit result if(params.has("output")){counts.writeAsCsv(params.get("output"),"\n"," ");// execute program env.execute("WordCount Example");}else{System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}// ************************************************************************* //     USER FUNCTIONS // ************************************************************************* /**      * Implements the string tokenizer that splits sentences into words as a user-defined      * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the      * form of "(word,1)" ({@code Tuple2<String, Integer>}).      */publicstaticfinalclassTokenizerimplementsFlatMapFunction<String,Tuple2<String,Integer>>{@OverridepublicvoidflatMap(Stringvalue,Collector<Tuple2<String,Integer>>out){// normalize and split the line String[]tokens=value.toLowerCase().split("\\W+");// emit the pairs for(Stringtoken:tokens){if(token.length()>0){out.collect(newTuple2<>(token,1));}}}}}

4. 在 flink-examples/flink-examples-batch 目录, 执行编译: mvn clean package, 编译测试包到 target/WordCount.jar, 准备使用

Jet 准备

jet 默认也提供了 wordcount 测试用例, 不过这个用例实现将全部文件加载到了内存, 我们由于测试文本较大, 这里修改一下实现, 使用流读取来代替内存存储, 具体操作是:

  1. 进入目录: hazelcast-jet/examples/wordcount

  2. src/main/java/com/hazelcast/jet/examples/wordcount/WordCount.java 内容修改为:

/*  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */packagecom.hazelcast.jet.examples.wordcount;importcom.hazelcast.jet.Jet;importcom.hazelcast.jet.JetInstance;importcom.hazelcast.jet.pipeline.Pipeline;importcom.hazelcast.jet.pipeline.Sinks;importcom.hazelcast.jet.pipeline.Sources;importjava.util.Map;importjava.util.concurrent.TimeUnit;importjava.util.regex.Pattern;import staticcom.hazelcast.function.Functions.wholeItem;import staticcom.hazelcast.jet.Traversers.traverseArray;import staticcom.hazelcast.jet.aggregate.AggregateOperations.counting;import staticjava.util.Comparator.comparingLong;/**  * Demonstrates a simple Word Count job in the Pipeline API. Inserts the  * text of The Complete Works of William Shakespeare into a Hazelcast  * IMap, then lets Jet count the words in it and write its findings to  * another IMap. The example looks at Jet's output and prints the 100 most  * frequent words.  */publicclassWordCount{privatestaticfinalStringBOOK_LINES="bookLines";privatestaticfinalStringCOUNTS="counts";privateJetInstancejet;privatestaticPipelinebuildPipeline(){Patterndelimiter=Pattern.compile("\\W+");Pipelinep=Pipeline.create();p.readFrom(Sources.files("/tmp/books")).flatMap(e->traverseArray(delimiter.split(e.toLowerCase()))).filter(word->!word.isEmpty()).groupingKey(wholeItem()).aggregate(counting()).writeTo(Sinks.map(COUNTS));returnp;}publicstaticvoidmain(String[]args)throwsException{newWordCount().go();}/**      * This code illustrates a few more things about Jet, new in 0.5. See comments.      */privatevoidgo(){try{setup();System.out.println("\nCounting words... ");longstart=System.nanoTime();Pipelinep=buildPipeline();jet.newJob(p).join();System.out.println("done in "+TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start)+" milliseconds.");Map<String,Long>results=jet.getMap(COUNTS);printResults(results);}finally{Jet.shutdownAll();}}privatevoidsetup(){jet=Jet.bootstrappedInstance();System.out.println("Loading The Complete Works of William Shakespeare");}privatestaticMap<String,Long>checkResults(Map<String,Long>counts){if(counts.get("the")!=27_843){thrownewAssertionError("Wrong count of 'the'");}System.out.println("Count of 'the' is valid");returncounts;}privatestaticvoidprintResults(Map<String,Long>counts){finalintlimit=100;StringBuildersb=newStringBuilder(String.format(" Top %d entries are:%n",limit));sb.append("/-------+---------\\\n");sb.append("| Count | Word    |\n");sb.append("|-------+---------|\n");counts.entrySet().stream().sorted(comparingLong(Map.Entry<String,Long>::getValue).reversed()).limit(limit).forEach(e->sb.append(String.format("|%6d | %-8s|%n",e.getValue(),e.getKey())));sb.append("\\-------+---------/\n");System.out.println(sb.toString());}}

3. 编译成品, mvn clean package, 到 target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar

4. 将环境准备中, 编译的 jet 包: hazelcast-jet-4.6-SNAPSHOT.jar 也拷贝到 target 下

任务运行

Flink 任务运行

  1. 进入目录: flink-1.13.1/flink-examples/flink-examples-batch

  2. 执行操作: flink run target/flink-examples-batch_2.11-1.13.1-WordCount.jar --input /tmp/books/shakespeare-complete-works.txt --output f

  3. 等待执行完成

Jet 任务运行

  1. 进入目录: hazelcast-jet/examples/wordcount/

  2. 执行操作: java -cp target/hazelcast-jet-4.6-SNAPSHOT.jar:target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar com.hazelcast.jet.examples.wordcount.WordCount

  3. 等待执行完成

结果分析

执行环境

执行宿主机环境为:

  1. MacBook Pro (13-inch, 2017, Two Thunderbolt 3 ports)

  2. 处理器: 2.3 GHz 双核Intel Core i5

  3. 内存: 8 GB 2133 MHz LPDDR3

资源占用

Flink 资源占用

  1. CPU: 大概占用 3.8 个核

  2. 内存: 大概占用 750M

Jet 资源占用

  1. CPU: 大概占用 3.5 个核

  2. 内存: 大概占用 360M

运行速度对比

Flink 用时: 56s

Jet 用时: 16s

在 wordcount case 下, 几乎完全一致的逻辑, 结果对比一致的情况下, Jet 在内存占用一半的前提下, 得到了计算速度为 Flink 3.5 倍的结果

这个结果与 hazelcat-jet 自己的官方测试结果一致, 正在尝试做一波性能分析, 找到性能差异点


推荐阅读

DTCC 干货分享:Real Time DaaS - 面向TP+AP业务的数据平台架构

2021年10月20日,Tapdata 创始人唐建法(TJ)受邀出席 DTCC 2021(中国数据库技术大会),并在企业数据中台设计与实践专场上,发表主旨演讲“Real Time DaaS :打造面向 TP+AP 业务的数据平台架构”,从 AP 业务场景 vs. TP 业务场景、常见数据平台优劣势、如何打造面向 TP+AP 业务的数据平台等角度,全面分享了 Tapdata 在全链路实时数据融...

Tapdata 在数字化防疫场景的最佳实践

在“动态清零”总方针的指导下,国内疫情防控工作渐趋规范化、常态化,各类防疫应用和手段层出不穷,防疫战也是数据战。Tapdata 基于数据虚拟化和主数据管理能力的防疫专项解决方案,助力张家港市卫健委高效落地疫情防控数字化,实现精准防疫。

Tapdata 在线研讨会:DaaS vs 大数据平台,是竞争还是共处?

我们为什么需要一个Real Time DaaS?它和大数据平台技术上有什么区别?如果企业还没有构建数据平台,我是应该考虑DaaS还是Big Data?如果已经有了大数据平台,我是否还需要DaaS?如果你想了解更多,请参加本次的在线研讨会。

下一个十年,你还在用 Big Data 搭建数据中台吗?

数据中台的存在是有合理性的,企业需要中台帮助他们来有效管理企业的数据资产,为业务所用。但在经历过大数据时代的热度之后,你在为企业构建数据中台的时候可以考虑另外一种比较务实的 DaaS 架构。DaaS 更加专注于数据层面:打通企业内部的孤岛数据,在中台构建共享模型,以API方式快速发布数据服务...

解锁5大应用场景,最新实时同步实现方案分享

数字化时代的到来,企业业务敏捷度的提升,对传统的数据处理和可用性带来更高的要求,实时数据同步技术的发展,给基于数据的业务创新带来了更多的可能。 Tapdata 产品合伙人徐亮带来实时数据同步的5大典型场景以及4种主流的技术模式分享,并一起了解作为新生代实时数据同步的 Tapdata Cloud 如何更轻松灵活的满足各种实时数据场景。

Tapdata 钛铂数据的产品理念

Tapdata 是全球首个基于数据即服务架构理念、面向 TP 场景的企业实时主数据服务平台,可以帮助企业快速实现主数据的统一管理和发布,并为所有数据库、数仓、大数据平台提供最实时的源数据,让数据随时可用。

Tapdata 数据库实时同步的技术要点

Tapdata 专注于实时数据的处理技术,在数据库迁移和同步方面,Tapdata 的表现非常优秀,实时、多元、异构,尤其在关系数据库到非关系数据库之间的双向同步方面,无论是从操作上,还是效率上,都体现了业界领先的水平。本文重点阐述 Tapdata 在数据库实时同步方面的技术要点。

教育中台与第三方系统对接整合数据案例

最近, 南京秦淮区教育中台系统,成功地和市系统进行了一次圆满对接。通过教育中台提供的统一数据能力和低代码API对接能力,实现了对市系统数据的实时推送和拉取,以及各类业务逻辑上的处理。这次对接为南京市中小学生创客大赛的成功举办提供了及时可靠的数据支撑, 体现了中台系统在快速响应业务方面的优越性。

周生生 | 全渠道商品中心建设

通过Tapdata 构建全渠道商品中心,实现: - 支持中国大陆港澳台的上千家门店的生产环境; - 使用JS脚本来进行流处理计算,业务需求从开发到上线过程快至 1 天以内; - 任务配置与执行监测全程可视化操作,不懂技术也能完成操作,极大降低维护成本; - 一套产品可满足不同需求,根据业务需求产出不同类型的业务模型节省大量人力物力。

关系型数据库到MongoDB实时数据同步解决方案

使用MongoDB作为主机下行或新一代数据库的选择,将业务数据从已有主机或Oracle等关系型数据库复制到MongoDB; 使用Tapdata Replicator的CDC技术,实时监听现有业务库的数据变动并同步至MongoDB; 使用Tapdata 的RDM技术将关系型表合并转型到MongoDB JSON数据结构,并保持和源库的高度数据一致; 在MongoDB上进行新业务的开发。

Tapdata肖贝贝:实时数据引擎系列(一)-新鲜的数据流

前言2006 年诞生的 hadoop 和 她周边的生态, 在过去的这些年里为大数据的火热提供了足够的能量, 十几年过去了, 场景在变化, 技术在演变, 大家对数据的认知已经不再局限于 T+1 与 高吞吐高延迟 为主要特征的上一代框架理念, 在真实的场景里, 实时, 准确, 多变 的数据也发挥着越来越重要的作用为满足这些新的需求, 各种框架和中间件如雨后春笋般不断涌出hive 的出现让这头大象...

Tapdata 肖贝贝:实时数据引擎系列(六)-从 PostgreSQL 实时数据集成看增量数据缓存层的必要性

对于 PostgreSQL 的实时数据采集, 业界经常遇到了包括:对源库性能/存储影响较大, 采集性能受限, 时间回退重新同步不支持, 数据类型较复杂等等问题。Tapdata 在解决 PostgreSQL 增量复制问题过程中,获得了一些不错的经验和思考,本文将分享 Tapdata 自研的 TAP-CDC-CACHE,和其他几种市面常见的解决方案的优势和特性。

搭建企业级实时数据融合平台难吗?Tapdata + ES + MongoDB 就能搞定

如何打造一套企业级的实时数据融合平台?Tapdata 已经找到了最佳实践,下文将以 Tapdata 的零售行业客户为例,与您分享:基于 ES 和 MongoDB 来快速构建一套企业级的实时数据融合平台。