Flink vs Jet: 性能压测对比(一)

2021-08-09 18:17肖贝贝

前言

flink 是一个非常火热的流计算框架, hazelcast-jet 是 TAPDATA 在进行流计算时采用的一个嵌入式的流计算框架

看到 hazelcast-jet 官方自己做了一些与 flink 的性能压测对比, 这边比较好奇他们的性能差异是否真有那么大, 会尝试做一系列性能测试, 来给大家一个参考

环境准备

Flink

Flink 的任务运行属于外部任务, 在进行任务编写之前, 需要启动 flink 集群, 部署过程如下:

  1. 在运行环境自行安装 JAVA, 并以 java --version 有输出为标记

  2. 下载 flink 最新版, 下载地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz

  3. 解压, 并执行 bash ./bin/start-cluster.sh

  4. 执行 jps, 能看到一个 StandaloneSessionClusterEntrypoint 的进程, 表明 flink 启动成功

  5. 将 bin 下的 flink 二进制, 放到系统可执行目录下, 比如 /usr/bin/ 下面

Jet

Jet 我们这里使用集成模式运行, 对 Jet 进行编译:

  1. 需要自行安装 java 环境, 与 maven 包

  2. Clone 代码库, git clone https://github.com/hazelcast/hazelcast-jet.git

  3. 运行 mvn build -DskipTests, 等待编译完成, 成品在 hazelcast-jet/hazelcast-jet-all/target/hazelcast-jet-4.6-SNAPSHOT.jar 路径下, 后面运行任务的时候需要用

运行任务准备

文件准备

  1. 文件准备: 我们以经典 wordcount 做测试用例, 首先准备文本文件, 我们以 hazelcast-jet/examples/wordcount/books/shakespeare-complete-works.txt 文件做基础文件, 将其重复一百次, 构成大概 500M 的文本文件, 将 500M 的文本文件放到 /tmp/books 目录下

Flink 准备

flink 使用默认为 wordcount 测试用例, 略微修改将并发修改为 4, 与 jet 保持一致, 具体操作是:

  1. 下载 flink 源代码, 地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-src.tg

  2. 解压, 进入目录: flink-examples/flink-examples-batch

  3. 修改文件 src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java, 将内容替换为:

/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.   See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.   The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.   You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */packageorg.apache.flink.examples.java.wordcount;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.utils.MultipleParameterTool;importorg.apache.flink.examples.java.wordcount.util.WordCountData;importorg.apache.flink.util.Collector;importorg.apache.flink.util.Preconditions;/**  * Implements the "WordCount" program that computes a simple word occurrence histogram over text  * files.  *  * <p>The input is a plain text file with lines separated by newline characters.  *  * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>  * If no parameters are provided, the program is run with default data from {@link WordCountData}.  *  * <p>This example shows how to:  *  * <ul>  *   <li>write a simple Flink program.  *   <li>use Tuple data types.  *   <li>write and use user-defined functions.  * </ul>  */publicclassWordCount{// ************************************************************************* //     PROGRAM // ************************************************************************* publicstaticvoidmain(String[]args)throwsException{finalMultipleParameterToolparams=MultipleParameterTool.fromArgs(args);// set up the execution environment finalExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interface env.getConfig().setGlobalJobParameters(params);// get input data DataSet<String>text=null;if(params.has("input")){// union all the inputs from text files for(Stringinput:params.getMultiParameterRequired("input")){if(text==null){text=env.readTextFile(input);}else{text=text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text,"Input DataSet should not be null.");}else{// get default test text data System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text=WordCountData.getDefaultTextLineDataSet(env);}DataSet<Tuple2<String,Integer>>counts=// split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(newTokenizer()).setParallelism(4)// group by the tuple field "0" and sum up tuple field "1" .groupBy(0).sum(1).setParallelism(4);// emit result if(params.has("output")){counts.writeAsCsv(params.get("output"),"\n"," ");// execute program env.execute("WordCount Example");}else{System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}// ************************************************************************* //     USER FUNCTIONS // ************************************************************************* /**      * Implements the string tokenizer that splits sentences into words as a user-defined      * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the      * form of "(word,1)" ({@code Tuple2<String, Integer>}).      */publicstaticfinalclassTokenizerimplementsFlatMapFunction<String,Tuple2<String,Integer>>{@OverridepublicvoidflatMap(Stringvalue,Collector<Tuple2<String,Integer>>out){// normalize and split the line String[]tokens=value.toLowerCase().split("\\W+");// emit the pairs for(Stringtoken:tokens){if(token.length()>0){out.collect(newTuple2<>(token,1));}}}}}

4. 在 flink-examples/flink-examples-batch 目录, 执行编译: mvn clean package, 编译测试包到 target/WordCount.jar, 准备使用

Jet 准备

jet 默认也提供了 wordcount 测试用例, 不过这个用例实现将全部文件加载到了内存, 我们由于测试文本较大, 这里修改一下实现, 使用流读取来代替内存存储, 具体操作是:

  1. 进入目录: hazelcast-jet/examples/wordcount

  2. src/main/java/com/hazelcast/jet/examples/wordcount/WordCount.java 内容修改为:

/*  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */packagecom.hazelcast.jet.examples.wordcount;importcom.hazelcast.jet.Jet;importcom.hazelcast.jet.JetInstance;importcom.hazelcast.jet.pipeline.Pipeline;importcom.hazelcast.jet.pipeline.Sinks;importcom.hazelcast.jet.pipeline.Sources;importjava.util.Map;importjava.util.concurrent.TimeUnit;importjava.util.regex.Pattern;import staticcom.hazelcast.function.Functions.wholeItem;import staticcom.hazelcast.jet.Traversers.traverseArray;import staticcom.hazelcast.jet.aggregate.AggregateOperations.counting;import staticjava.util.Comparator.comparingLong;/**  * Demonstrates a simple Word Count job in the Pipeline API. Inserts the  * text of The Complete Works of William Shakespeare into a Hazelcast  * IMap, then lets Jet count the words in it and write its findings to  * another IMap. The example looks at Jet's output and prints the 100 most  * frequent words.  */publicclassWordCount{privatestaticfinalStringBOOK_LINES="bookLines";privatestaticfinalStringCOUNTS="counts";privateJetInstancejet;privatestaticPipelinebuildPipeline(){Patterndelimiter=Pattern.compile("\\W+");Pipelinep=Pipeline.create();p.readFrom(Sources.files("/tmp/books")).flatMap(e->traverseArray(delimiter.split(e.toLowerCase()))).filter(word->!word.isEmpty()).groupingKey(wholeItem()).aggregate(counting()).writeTo(Sinks.map(COUNTS));returnp;}publicstaticvoidmain(String[]args)throwsException{newWordCount().go();}/**      * This code illustrates a few more things about Jet, new in 0.5. See comments.      */privatevoidgo(){try{setup();System.out.println("\nCounting words... ");longstart=System.nanoTime();Pipelinep=buildPipeline();jet.newJob(p).join();System.out.println("done in "+TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start)+" milliseconds.");Map<String,Long>results=jet.getMap(COUNTS);printResults(results);}finally{Jet.shutdownAll();}}privatevoidsetup(){jet=Jet.bootstrappedInstance();System.out.println("Loading The Complete Works of William Shakespeare");}privatestaticMap<String,Long>checkResults(Map<String,Long>counts){if(counts.get("the")!=27_843){thrownewAssertionError("Wrong count of 'the'");}System.out.println("Count of 'the' is valid");returncounts;}privatestaticvoidprintResults(Map<String,Long>counts){finalintlimit=100;StringBuildersb=newStringBuilder(String.format(" Top %d entries are:%n",limit));sb.append("/-------+---------\\\n");sb.append("| Count | Word    |\n");sb.append("|-------+---------|\n");counts.entrySet().stream().sorted(comparingLong(Map.Entry<String,Long>::getValue).reversed()).limit(limit).forEach(e->sb.append(String.format("|%6d | %-8s|%n",e.getValue(),e.getKey())));sb.append("\\-------+---------/\n");System.out.println(sb.toString());}}

3. 编译成品, mvn clean package, 到 target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar

4. 将环境准备中, 编译的 jet 包: hazelcast-jet-4.6-SNAPSHOT.jar 也拷贝到 target 下

任务运行

Flink 任务运行

  1. 进入目录: flink-1.13.1/flink-examples/flink-examples-batch

  2. 执行操作: flink run target/flink-examples-batch_2.11-1.13.1-WordCount.jar --input /tmp/books/shakespeare-complete-works.txt --output f

  3. 等待执行完成

Jet 任务运行

  1. 进入目录: hazelcast-jet/examples/wordcount/

  2. 执行操作: java -cp target/hazelcast-jet-4.6-SNAPSHOT.jar:target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar com.hazelcast.jet.examples.wordcount.WordCount

  3. 等待执行完成

结果分析

执行环境

执行宿主机环境为:

  1. MacBook Pro (13-inch, 2017, Two Thunderbolt 3 ports)

  2. 处理器: 2.3 GHz 双核Intel Core i5

  3. 内存: 8 GB 2133 MHz LPDDR3

资源占用

Flink 资源占用

  1. CPU: 大概占用 3.8 个核

  2. 内存: 大概占用 750M

Jet 资源占用

  1. CPU: 大概占用 3.5 个核

  2. 内存: 大概占用 360M

运行速度对比

Flink 用时: 56s

Jet 用时: 16s

在 wordcount case 下, 几乎完全一致的逻辑, 结果对比一致的情况下, Jet 在内存占用一半的前提下, 得到了计算速度为 Flink 3.5 倍的结果

这个结果与 hazelcat-jet 自己的官方测试结果一致, 正在尝试做一波性能分析, 找到性能差异点


推荐阅读

Tapdata 推出“钛计划”公益项目,着力打通数据孤岛助推社会数字化升级

为响应数据要素市场化配置改革政策方向的指引,Tapdata 推出“钛计划”打通数据孤岛公益行动,面向非盈利机构(如各城市政务服务数据管理局、社会公益组织/项目等)以及为社会培养数据技术人才的相关培训机构,提供 Tapdata 实时数据服务平台的特殊免费授权,助推公共领域数据互通、共享与实时应用......

Tapdata 钛铂数据的产品理念

Tapdata 是全球首个基于数据即服务架构理念、面向 TP 场景的企业实时主数据服务平台,可以帮助企业快速实现主数据的统一管理和发布,并为所有数据库、数仓、大数据平台提供最实时的源数据,让数据随时可用。

Tapdata Cloud 是什么?

Tapdata Cloud 是钛铂数据自研的异构数据库实时同步工具 Tapdata Replicator 的云服务版本,现在免费提供所有开发者和企业使用Tapdata Cloud 目前支持 Oracle、MySQL、PostgreSQL、SQL Server、MongoDB、Elasticsearch 之间的数据迁移和同步,未来将陆续上线 DB2、Sybase ASE、Redis、Kafka 等。

什么是数据即服务(Data as a Service)?

数据即服务(DaaS)是一种数据管理策略,旨在利用数据作为业务资产来提高业务创新的敏捷性。它是自 1990 年代互联网高速发展以来越来越受欢迎的“一切皆服务”(XaaS)趋势下关于数据服务化的那一部分,介于 PaaS 和 SaaS 之间。与 SaaS 类似,DaaS 提供了一种方式来管理企业每天生成的大量数据,并在整个业务范围内提供这些有价值的信息,以便于进行数据驱动的商业决策。同时,我们也...

什么是数据虚拟化(Data Virtualization)?

本文将简单易懂地介绍数据虚拟化技术及数据虚拟化软件架构的实现方法,尽量避免教条主义。如需要了解虚拟化定义,可通过wiki 百科了解。先引用一段百度百科的文字来说明数据虚拟化的定义:数据虚拟化(data virtualization)是用来描述所有数据管理方法的涵盖性术语,这些方法允许应用程序检索并管理数据,且不需要数据相关的技术细节,例如它格式化的方式或物理位置所在。正如百科的定义,采用数据...

Tapdata 数据库实时同步的技术要点

Tapdata 专注于实时数据的处理技术,在数据库迁移和同步方面,Tapdata 的表现非常优秀,实时、多元、异构,尤其在关系数据库到非关系数据库之间的双向同步方面,无论是从操作上,还是效率上,都体现了业界领先的水平。本文重点阐述 Tapdata 在数据库实时同步方面的技术要点。

教育中台与第三方系统对接整合数据案例

最近, 南京秦淮区教育中台系统,成功地和市系统进行了一次圆满对接。通过教育中台提供的统一数据能力和低代码API对接能力,实现了对市系统数据的实时推送和拉取,以及各类业务逻辑上的处理。这次对接为南京市中小学生创客大赛的成功举办提供了及时可靠的数据支撑, 体现了中台系统在快速响应业务方面的优越性。

周生生 | 全渠道商品中心建设

通过Tapdata 构建全渠道商品中心,实现: - 支持中国大陆港澳台的上千家门店的生产环境; - 使用JS脚本来进行流处理计算,业务需求从开发到上线过程快至 1 天以内; - 任务配置与执行监测全程可视化操作,不懂技术也能完成操作,极大降低维护成本; - 一套产品可满足不同需求,根据业务需求产出不同类型的业务模型节省大量人力物力。

关系型数据库到MongoDB实时数据同步解决方案

使用MongoDB作为主机下行或新一代数据库的选择,将业务数据从已有主机或Oracle等关系型数据库复制到MongoDB; 使用Tapdata Replicator的CDC技术,实时监听现有业务库的数据变动并同步至MongoDB; 使用Tapdata 的RDM技术将关系型表合并转型到MongoDB JSON数据结构,并保持和源库的高度数据一致; 在MongoDB上进行新业务的开发。

Tapdata肖贝贝:实时数据引擎系列(一)-新鲜的数据流

前言2006 年诞生的 hadoop 和 她周边的生态, 在过去的这些年里为大数据的火热提供了足够的能量, 十几年过去了, 场景在变化, 技术在演变, 大家对数据的认知已经不再局限于 T+1 与 高吞吐高延迟 为主要特征的上一代框架理念, 在真实的场景里, 实时, 准确, 多变 的数据也发挥着越来越重要的作用为满足这些新的需求, 各种框架和中间件如雨后春笋般不断涌出hive 的出现让这头大象...
联络我们:
Email:team@tapdata.io    电话:0755-26656080
深圳市南山区临海大道香江金融中心 2410-13
官方服务号
Tapdata 微信公众号
扫码关注