Tapdata 技术博客
Tapdata 技术博客

Flink vs Jet: 性能压测对比(一)

2021-08-09 18:17 肖贝贝

前言

flink 是一个非常火热的流计算框架, hazelcast-jet 是 TAPDATA 在进行流计算时采用的一个嵌入式的流计算框架

看到 hazelcast-jet 官方自己做了一些与 flink 的性能压测对比, 这边比较好奇他们的性能差异是否真有那么大, 会尝试做一系列性能测试, 来给大家一个参考

环境准备

Flink

Flink 的任务运行属于外部任务, 在进行任务编写之前, 需要启动 flink 集群, 部署过程如下:

  1. 在运行环境自行安装 JAVA, 并以 java --version 有输出为标记

  2. 下载 flink 最新版, 下载地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz

  3. 解压, 并执行 bash ./bin/start-cluster.sh

  4. 执行 jps, 能看到一个 StandaloneSessionClusterEntrypoint 的进程, 表明 flink 启动成功

  5. 将 bin 下的 flink 二进制, 放到系统可执行目录下, 比如 /usr/bin/ 下面

Jet

Jet 我们这里使用集成模式运行, 对 Jet 进行编译:

  1. 需要自行安装 java 环境, 与 maven 包

  2. Clone 代码库, git clone https://github.com/hazelcast/hazelcast-jet.git

  3. 运行 mvn build -DskipTests, 等待编译完成, 成品在 hazelcast-jet/hazelcast-jet-all/target/hazelcast-jet-4.6-SNAPSHOT.jar 路径下, 后面运行任务的时候需要用

运行任务准备

文件准备

  1. 文件准备: 我们以经典 wordcount 做测试用例, 首先准备文本文件, 我们以 hazelcast-jet/examples/wordcount/books/shakespeare-complete-works.txt 文件做基础文件, 将其重复一百次, 构成大概 500M 的文本文件, 将 500M 的文本文件放到 /tmp/books 目录下

Flink 准备

flink 使用默认为 wordcount 测试用例, 略微修改将并发修改为 4, 与 jet 保持一致, 具体操作是:

  1. 下载 flink 源代码, 地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-src.tg

  2. 解压, 进入目录: flink-examples/flink-examples-batch

  3. 修改文件 src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java, 将内容替换为:

/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.   See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.   The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.   You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */packageorg.apache.flink.examples.java.wordcount;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.utils.MultipleParameterTool;importorg.apache.flink.examples.java.wordcount.util.WordCountData;importorg.apache.flink.util.Collector;importorg.apache.flink.util.Preconditions;/**  * Implements the "WordCount" program that computes a simple word occurrence histogram over text  * files.  *  * <p>The input is a plain text file with lines separated by newline characters.  *  * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>  * If no parameters are provided, the program is run with default data from {@link WordCountData}.  *  * <p>This example shows how to:  *  * <ul>  *   <li>write a simple Flink program.  *   <li>use Tuple data types.  *   <li>write and use user-defined functions.  * </ul>  */publicclassWordCount{// ************************************************************************* //     PROGRAM // ************************************************************************* publicstaticvoidmain(String[]args)throwsException{finalMultipleParameterToolparams=MultipleParameterTool.fromArgs(args);// set up the execution environment finalExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interface env.getConfig().setGlobalJobParameters(params);// get input data DataSet<String>text=null;if(params.has("input")){// union all the inputs from text files for(Stringinput:params.getMultiParameterRequired("input")){if(text==null){text=env.readTextFile(input);}else{text=text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text,"Input DataSet should not be null.");}else{// get default test text data System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text=WordCountData.getDefaultTextLineDataSet(env);}DataSet<Tuple2<String,Integer>>counts=// split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(newTokenizer()).setParallelism(4)// group by the tuple field "0" and sum up tuple field "1" .groupBy(0).sum(1).setParallelism(4);// emit result if(params.has("output")){counts.writeAsCsv(params.get("output"),"\n"," ");// execute program env.execute("WordCount Example");}else{System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}// ************************************************************************* //     USER FUNCTIONS // ************************************************************************* /**      * Implements the string tokenizer that splits sentences into words as a user-defined      * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the      * form of "(word,1)" ({@code Tuple2<String, Integer>}).      */publicstaticfinalclassTokenizerimplementsFlatMapFunction<String,Tuple2<String,Integer>>{@OverridepublicvoidflatMap(Stringvalue,Collector<Tuple2<String,Integer>>out){// normalize and split the line String[]tokens=value.toLowerCase().split("\\W+");// emit the pairs for(Stringtoken:tokens){if(token.length()>0){out.collect(newTuple2<>(token,1));}}}}}

4. 在 flink-examples/flink-examples-batch 目录, 执行编译: mvn clean package, 编译测试包到 target/WordCount.jar, 准备使用

Jet 准备

jet 默认也提供了 wordcount 测试用例, 不过这个用例实现将全部文件加载到了内存, 我们由于测试文本较大, 这里修改一下实现, 使用流读取来代替内存存储, 具体操作是:

  1. 进入目录: hazelcast-jet/examples/wordcount

  2. src/main/java/com/hazelcast/jet/examples/wordcount/WordCount.java 内容修改为:

/*  * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  * http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */packagecom.hazelcast.jet.examples.wordcount;importcom.hazelcast.jet.Jet;importcom.hazelcast.jet.JetInstance;importcom.hazelcast.jet.pipeline.Pipeline;importcom.hazelcast.jet.pipeline.Sinks;importcom.hazelcast.jet.pipeline.Sources;importjava.util.Map;importjava.util.concurrent.TimeUnit;importjava.util.regex.Pattern;import staticcom.hazelcast.function.Functions.wholeItem;import staticcom.hazelcast.jet.Traversers.traverseArray;import staticcom.hazelcast.jet.aggregate.AggregateOperations.counting;import staticjava.util.Comparator.comparingLong;/**  * Demonstrates a simple Word Count job in the Pipeline API. Inserts the  * text of The Complete Works of William Shakespeare into a Hazelcast  * IMap, then lets Jet count the words in it and write its findings to  * another IMap. The example looks at Jet's output and prints the 100 most  * frequent words.  */publicclassWordCount{privatestaticfinalStringBOOK_LINES="bookLines";privatestaticfinalStringCOUNTS="counts";privateJetInstancejet;privatestaticPipelinebuildPipeline(){Patterndelimiter=Pattern.compile("\\W+");Pipelinep=Pipeline.create();p.readFrom(Sources.files("/tmp/books")).flatMap(e->traverseArray(delimiter.split(e.toLowerCase()))).filter(word->!word.isEmpty()).groupingKey(wholeItem()).aggregate(counting()).writeTo(Sinks.map(COUNTS));returnp;}publicstaticvoidmain(String[]args)throwsException{newWordCount().go();}/**      * This code illustrates a few more things about Jet, new in 0.5. See comments.      */privatevoidgo(){try{setup();System.out.println("\nCounting words... ");longstart=System.nanoTime();Pipelinep=buildPipeline();jet.newJob(p).join();System.out.println("done in "+TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start)+" milliseconds.");Map<String,Long>results=jet.getMap(COUNTS);printResults(results);}finally{Jet.shutdownAll();}}privatevoidsetup(){jet=Jet.bootstrappedInstance();System.out.println("Loading The Complete Works of William Shakespeare");}privatestaticMap<String,Long>checkResults(Map<String,Long>counts){if(counts.get("the")!=27_843){thrownewAssertionError("Wrong count of 'the'");}System.out.println("Count of 'the' is valid");returncounts;}privatestaticvoidprintResults(Map<String,Long>counts){finalintlimit=100;StringBuildersb=newStringBuilder(String.format(" Top %d entries are:%n",limit));sb.append("/-------+---------\\\n");sb.append("| Count | Word    |\n");sb.append("|-------+---------|\n");counts.entrySet().stream().sorted(comparingLong(Map.Entry<String,Long>::getValue).reversed()).limit(limit).forEach(e->sb.append(String.format("|%6d | %-8s|%n",e.getValue(),e.getKey())));sb.append("\\-------+---------/\n");System.out.println(sb.toString());}}

3. 编译成品, mvn clean package, 到 target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar

4. 将环境准备中, 编译的 jet 包: hazelcast-jet-4.6-SNAPSHOT.jar 也拷贝到 target 下

任务运行

Flink 任务运行

  1. 进入目录: flink-1.13.1/flink-examples/flink-examples-batch

  2. 执行操作: flink run target/flink-examples-batch_2.11-1.13.1-WordCount.jar --input /tmp/books/shakespeare-complete-works.txt --output f

  3. 等待执行完成

Jet 任务运行

  1. 进入目录: hazelcast-jet/examples/wordcount/

  2. 执行操作: java -cp target/hazelcast-jet-4.6-SNAPSHOT.jar:target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar com.hazelcast.jet.examples.wordcount.WordCount

  3. 等待执行完成

结果分析

执行环境

执行宿主机环境为:

  1. MacBook Pro (13-inch, 2017, Two Thunderbolt 3 ports)

  2. 处理器: 2.3 GHz 双核Intel Core i5

  3. 内存: 8 GB 2133 MHz LPDDR3

资源占用

Flink 资源占用

  1. CPU: 大概占用 3.8 个核

  2. 内存: 大概占用 750M

Jet 资源占用

  1. CPU: 大概占用 3.5 个核

  2. 内存: 大概占用 360M

运行速度对比

Flink 用时: 56s

Jet 用时: 16s

在 wordcount case 下, 几乎完全一致的逻辑, 结果对比一致的情况下, Jet 在内存占用一半的前提下, 得到了计算速度为 Flink 3.5 倍的结果

这个结果与 hazelcat-jet 自己的官方测试结果一致, 正在尝试做一波性能分析, 找到性能差异点


推荐阅读