Apache Spark 示例:Java 中的单词计数程序
Apache Spark 是一个开源的分布式计算系统,主要用于大规模数据处理和分析。
Apache Spark是一个开源的数据处理框架,在分布式环境中可以对大数据进行分析操作。它是在2009年由UC Berkeley的Matei Zaharia在UC Berkeley的AMPLab进行的学术项目。Apache Spark是在一个名为Mesos的集群管理工具的基础上创建的。后来对其进行了修改和升级,使其能够在基于集群的环境中进行分布式处理。
Apache Spark 示例项目设置
我们将使用Maven创建一个样例项目进行演示。要创建这个项目,请在您将用作工作空间的目录中执行以下命令。
mvn archetype:generate -DgroupId=com.Olivia.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
如果您第一次运行Maven,执行生成命令可能需要几秒钟,因为Maven需要下载所有所需的插件和构件以完成生成任务。一旦创建了项目,可以随意在您喜欢的集成开发环境中打开它。下一步是向项目添加适当的Maven依赖。这里是包含适当依赖项的pom.xml文件。
<dependencies>
<!-- Import Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
由于这是一个基于Maven的项目,实际上不需要在您的机器上安装和设置Apache Spark。当我们运行这个项目时,将启动一个运行时的Apache Spark实例,一旦程序执行完毕,它将被关闭。最后,为了了解在我们添加这个依赖时向项目中添加的所有JAR文件,我们可以运行一个简单的Maven命令,它允许我们查看项目的完整依赖树。以下是我们可以使用的命令:
mvn dependency:tree
当我们运行这个命令时,它将会展示给我们以下的依赖树。
shubham:JD-Spark-WordCount shubham$ mvn dependency:tree
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.Olivia:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.Olivia:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.Olivia:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] | +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] | | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] | | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] | | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] | | \- org.objenesis:objenesis:jar:1.2:compile
[INFO] | +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] | +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile
[INFO] | | | +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] | | | +- xmlenc:xmlenc:jar:0.52:compile
[INFO] | | | +- commons-io:commons-io:jar:2.1:compile
[INFO] | | | +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] | | | +- commons-lang:commons-lang:jar:2.5:compile
[INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] | | | | +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile
[INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] | | | +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] | | | +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] | | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] | | | \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] | | | \- org.tukaani:xz:jar:1.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] | | | \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] | | | | +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] | | | | | +- com.google.inject:guice:jar:3.0:compile
[INFO] | | | | | | +- javax.inject:javax.inject:jar:1:compile
[INFO] | | | | | | \- aopalliance:aopalliance:jar:1.0:compile
[INFO] | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] | | | | | | | +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] | | | | | | | \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] | | | | | | \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] | | | | | | | \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] | | | | | | \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] | | | | | +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] | | | | | | +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] | | | | | | | \- stax:stax-api:jar:1.0.1:compile
[INFO] | | | | | | +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] | | | | | | | \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] | | | | | | | \- javax.activation:activation:jar:1.1:compile
[INFO] | | | | | | +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] | | | | | | \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] | | | | | \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] | | | | \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] | | \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] | +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] | +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] | | +- commons-codec:commons-codec:jar:1.3:compile
[INFO] | | \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] | +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] | | +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] | | +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] | | | \- jline:jline:jar:0.9.94:compile
[INFO] | | \- com.google.guava:guava:jar:14.0.1:compile
[INFO] | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] | +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] | +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] | +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] | +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] | +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] | +- log4j:log4j:jar:1.2.17:compile
[INFO] | +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] | +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] | +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] | +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] | +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] | +- commons-net:commons-net:jar:2.2:compile
[INFO] | +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] | | +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] | | | \- com.typesafe:config:jar:1.2.1:compile
[INFO] | | +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] | | +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] | | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] | +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] | +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] | +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] | | \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] | | +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] | | \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] | | \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] | | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] | | \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] | +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] | | \- asm:asm:jar:3.1:compile
[INFO] | +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] | +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] | +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] | +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] | +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] | | +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] | | \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] | +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] | +- oro:oro:jar:2.0.8:compile
[INFO] | +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] | | \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] | +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] | +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO] \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------
只需添加两个依赖项,Spark将收集项目中所需的所有依赖项,包括Scala依赖项以及Apache Spark本身是用Scala编写的。
创建一个输入文件
由于我们要创建一个字数统计程序,我们将在项目的根目录中创建一个名为input.txt的示例输入文件。在其中放入任何内容,我们使用以下文本:
Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.
Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.
请随意使用本文件中的任何文本。
项目结构
在我们继续并开始为项目编写代码之前,让我们在这里展示一下项目结构。一旦我们将所有代码添加到项目中,我们就会拥有以下的项目结构:【附注:附件20349的图片为项目结构,居中对齐,宽度为399】。
创建一个字数统计工具
现在,我们准备开始编写我们的程序。当你开始使用大数据程序时,导入可以造成很多困惑。为了避免这种情况,在我们的项目中,这是我们将使用的所有导入:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
接下来,这是我们将要使用的课程结构。
package com.Olivia.sparkdemo;
...imports...
public class WordCounter {
private static void wordCount(String fileName) {
...
}
public static void main(String[] args) {
...
}
}
所有逻辑都将放在wordCount方法中。我们首先要为SparkConf类定义一个对象。这个类的对象用于为程序设置各种Spark参数,作为键值对。我们只提供简单的参数。
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
主节点指定本地,这意味着该程序应连接到在本地主机上运行的Spark线程。应用程序名称只是为Spark提供应用程序的元数据的一种方式。现在,我们可以使用这个配置对象构建一个Spark上下文对象。
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Spark将其获得的每个资源都视为RDD(弹性分布式数据集),这有助于将数据组织成可更高效分析的查找数据结构。我们现在将输入文件转换为JavaRDD对象本身。
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
我们将现在使用Java 8的API来处理JavaRDD文件,并将文件中包含的单词分割成独立的单词。
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
同样,我们使用Java 8的mapToPair(…)方法来计算字数,并提供一个字数和数字的对,可以作为输出展示。
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
现在,我们可以将输出文件保存为文本文件。
countData.saveAsTextFile("CountData");
最后,我们可以使用main()方法提供程序的入口点。
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
整个文件的外观如下:
package com.Olivia.sparkdemo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class WordCounter {
private static void wordCount(String fileName) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
countData.saveAsTextFile("CountData");
}
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
}
我们现在将使用Maven自身来运行这个程序。
运行应用程序
要运行这个应用程序,请进入程序的根目录并执行以下命令:
mvn exec:java -Dexec.mainClass=com.Olivia.sparkdemo.WordCounter -Dexec.args="input.txt"
在这个命令中,我们向Maven提供了Main类的完全限定名和输入文件的名称。一旦这个命令执行完成,我们可以在我们的项目中看到一个新的目录被创建: 项目输出目录 当我们打开这个目录,并打开其中的名为“part-00000.txt”的文件时,它的内容如下: 单词统计输出
结论
在这节课中,我们学习了如何在基于Maven的项目中使用Apache Spark来创建一个简单但有效的单词计数程序。阅读更多关于大数据的文章,以深入了解可用的大数据工具和处理框架。