flink 是一个非常火热的流计算框架, hazelcast-jet 是 TAPDATA 在进行流计算时采用的一个嵌入式的流计算框架
看到 hazelcast-jet 官方自己做了一些与 flink 的性能压测对比, 这边比较好奇他们的性能差异是否真有那么大, 会尝试做一系列性能测试, 来给大家一个参考
Flink 的任务运行属于外部任务, 在进行任务编写之前, 需要启动 flink 集群, 部署过程如下:
在运行环境自行安装 JAVA, 并以 java --version
有输出为标记
下载 flink 最新版, 下载地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz
解压, 并执行 bash ./bin/start-cluster.sh
执行 jps, 能看到一个 StandaloneSessionClusterEntrypoint 的进程, 表明 flink 启动成功
将 bin 下的 flink 二进制, 放到系统可执行目录下, 比如 /usr/bin/ 下面
Jet 我们这里使用集成模式运行, 对 Jet 进行编译:
需要自行安装 java 环境, 与 maven 包
Clone 代码库, git clone https://github.com/hazelcast/hazelcast-jet.git
运行 mvn build -DskipTests
, 等待编译完成, 成品在 hazelcast-jet/hazelcast-jet-all/target/hazelcast-jet-4.6-SNAPSHOT.jar 路径下, 后面运行任务的时候需要用
文件准备
文件准备: 我们以经典 wordcount 做测试用例, 首先准备文本文件, 我们以 hazelcast-jet/examples/wordcount/books/shakespeare-complete-works.txt
文件做基础文件, 将其重复一百次, 构成大概 500M 的文本文件, 将 500M 的文本文件放到 /tmp/books 目录下
Flink 准备
flink 使用默认为 wordcount 测试用例, 略微修改将并发修改为 4, 与 jet 保持一致, 具体操作是:
下载 flink 源代码, 地址在: https://ftp.jaist.ac.jp/pub/apache/flink/flink-1.13.1/flink-1.13.1-src.tg
解压, 进入目录: flink-examples/flink-examples-batch
修改文件 src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
, 将内容替换为:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */packageorg.apache.flink.examples.java.wordcount;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.utils.MultipleParameterTool;importorg.apache.flink.examples.java.wordcount.util.WordCountData;importorg.apache.flink.util.Collector;importorg.apache.flink.util.Preconditions;/** * Implements the "WordCount" program that computes a simple word occurrence histogram over text * files. * * <p>The input is a plain text file with lines separated by newline characters. * * <p>Usage: <code>WordCount --input <path> --output <path></code><br> * If no parameters are provided, the program is run with default data from {@link WordCountData}. * * <p>This example shows how to: * * <ul> * <li>write a simple Flink program. * <li>use Tuple data types. * <li>write and use user-defined functions. * </ul> */publicclassWordCount{// ************************************************************************* // PROGRAM // ************************************************************************* publicstaticvoidmain(String[]args)throwsException{finalMultipleParameterToolparams=MultipleParameterTool.fromArgs(args);// set up the execution environment finalExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interface env.getConfig().setGlobalJobParameters(params);// get input data DataSet<String>text=null;if(params.has("input")){// union all the inputs from text files for(Stringinput:params.getMultiParameterRequired("input")){if(text==null){text=env.readTextFile(input);}else{text=text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text,"Input DataSet should not be null.");}else{// get default test text data System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text=WordCountData.getDefaultTextLineDataSet(env);}DataSet<Tuple2<String,Integer>>counts=// split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(newTokenizer()).setParallelism(4)// group by the tuple field "0" and sum up tuple field "1" .groupBy(0).sum(1).setParallelism(4);// emit result if(params.has("output")){counts.writeAsCsv(params.get("output"),"\n"," ");// execute program env.execute("WordCount Example");}else{System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}// ************************************************************************* // USER FUNCTIONS // ************************************************************************* /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the * form of "(word,1)" ({@code Tuple2<String, Integer>}). */publicstaticfinalclassTokenizerimplementsFlatMapFunction<String,Tuple2<String,Integer>>{@OverridepublicvoidflatMap(Stringvalue,Collector<Tuple2<String,Integer>>out){// normalize and split the line String[]tokens=value.toLowerCase().split("\\W+");// emit the pairs for(Stringtoken:tokens){if(token.length()>0){out.collect(newTuple2<>(token,1));}}}}}
4. 在 flink-examples/flink-examples-batch
目录, 执行编译: mvn clean package
, 编译测试包到 target/WordCount.jar
, 准备使用
Jet 准备
jet 默认也提供了 wordcount 测试用例, 不过这个用例实现将全部文件加载到了内存, 我们由于测试文本较大, 这里修改一下实现, 使用流读取来代替内存存储, 具体操作是:
进入目录: hazelcast-jet/examples/wordcount
将 src/main/java/com/hazelcast/jet/examples/wordcount/WordCount.java
内容修改为:
/* * Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */packagecom.hazelcast.jet.examples.wordcount;importcom.hazelcast.jet.Jet;importcom.hazelcast.jet.JetInstance;importcom.hazelcast.jet.pipeline.Pipeline;importcom.hazelcast.jet.pipeline.Sinks;importcom.hazelcast.jet.pipeline.Sources;importjava.util.Map;importjava.util.concurrent.TimeUnit;importjava.util.regex.Pattern;import staticcom.hazelcast.function.Functions.wholeItem;import staticcom.hazelcast.jet.Traversers.traverseArray;import staticcom.hazelcast.jet.aggregate.AggregateOperations.counting;import staticjava.util.Comparator.comparingLong;/** * Demonstrates a simple Word Count job in the Pipeline API. Inserts the * text of The Complete Works of William Shakespeare into a Hazelcast * IMap, then lets Jet count the words in it and write its findings to * another IMap. The example looks at Jet's output and prints the 100 most * frequent words. */publicclassWordCount{privatestaticfinalStringBOOK_LINES="bookLines";privatestaticfinalStringCOUNTS="counts";privateJetInstancejet;privatestaticPipelinebuildPipeline(){Patterndelimiter=Pattern.compile("\\W+");Pipelinep=Pipeline.create();p.readFrom(Sources.files("/tmp/books")).flatMap(e->traverseArray(delimiter.split(e.toLowerCase()))).filter(word->!word.isEmpty()).groupingKey(wholeItem()).aggregate(counting()).writeTo(Sinks.map(COUNTS));returnp;}publicstaticvoidmain(String[]args)throwsException{newWordCount().go();}/** * This code illustrates a few more things about Jet, new in 0.5. See comments. */privatevoidgo(){try{setup();System.out.println("\nCounting words... ");longstart=System.nanoTime();Pipelinep=buildPipeline();jet.newJob(p).join();System.out.println("done in "+TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start)+" milliseconds.");Map<String,Long>results=jet.getMap(COUNTS);printResults(results);}finally{Jet.shutdownAll();}}privatevoidsetup(){jet=Jet.bootstrappedInstance();System.out.println("Loading The Complete Works of William Shakespeare");}privatestaticMap<String,Long>checkResults(Map<String,Long>counts){if(counts.get("the")!=27_843){thrownewAssertionError("Wrong count of 'the'");}System.out.println("Count of 'the' is valid");returncounts;}privatestaticvoidprintResults(Map<String,Long>counts){finalintlimit=100;StringBuildersb=newStringBuilder(String.format(" Top %d entries are:%n",limit));sb.append("/-------+---------\\\n");sb.append("| Count | Word |\n");sb.append("|-------+---------|\n");counts.entrySet().stream().sorted(comparingLong(Map.Entry<String,Long>::getValue).reversed()).limit(limit).forEach(e->sb.append(String.format("|%6d | %-8s|%n",e.getValue(),e.getKey())));sb.append("\\-------+---------/\n");System.out.println(sb.toString());}}
3. 编译成品, mvn clean package
, 到 target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar
下
4. 将环境准备中, 编译的 jet 包: hazelcast-jet-4.6-SNAPSHOT.jar 也拷贝到 target 下
进入目录: flink-1.13.1/flink-examples/flink-examples-batch
执行操作: flink run target/flink-examples-batch_2.11-1.13.1-WordCount.jar --input /tmp/books/shakespeare-complete-works.txt --output f
等待执行完成
进入目录: hazelcast-jet/examples/wordcount/
执行操作: java -cp target/hazelcast-jet-4.6-SNAPSHOT.jar:target/hazelcast-jet-examples-wordcount-4.6-SNAPSHOT.jar com.hazelcast.jet.examples.wordcount.WordCount
等待执行完成
执行宿主机环境为:
MacBook Pro (13-inch, 2017, Two Thunderbolt 3 ports)
处理器: 2.3 GHz 双核Intel Core i5
内存: 8 GB 2133 MHz LPDDR3
Flink 资源占用
CPU: 大概占用 3.8 个核
内存: 大概占用 750M
CPU: 大概占用 3.5 个核
内存: 大概占用 360M
Flink 用时: 56s
Jet 用时: 16s
在 wordcount case 下, 几乎完全一致的逻辑, 结果对比一致的情况下, Jet 在内存占用一半的前提下, 得到了计算速度为 Flink 3.5 倍的结果
这个结果与 hazelcat-jet 自己的官方测试结果一致, 正在尝试做一波性能分析, 找到性能差异点