Apache Spark 示例:Java 中的单词计数程序

Apache Spark 是一个开源的分布式计算系统,主要用于大规模数据处理和分析。

Apache Spark是一个开源的数据处理框架,在分布式环境中可以对大数据进行分析操作。它是在2009年由UC Berkeley的Matei Zaharia在UC Berkeley的AMPLab进行的学术项目。Apache Spark是在一个名为Mesos的集群管理工具的基础上创建的。后来对其进行了修改和升级,使其能够在基于集群的环境中进行分布式处理。

Apache Spark 示例项目设置

我们将使用Maven创建一个样例项目进行演示。要创建这个项目,请在您将用作工作空间的目录中执行以下命令。

mvn archetype:generate -DgroupId=com.Olivia.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

如果您第一次运行Maven,执行生成命令可能需要几秒钟,因为Maven需要下载所有所需的插件和构件以完成生成任务。一旦创建了项目,可以随意在您喜欢的集成开发环境中打开它。下一步是向项目添加适当的Maven依赖。这里是包含适当依赖项的pom.xml文件。

<dependencies>

    <!-- Import Spark -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>1.4.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.0.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <id>copy</id>
                    <phase>install</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>${project.build.directory}/lib</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

由于这是一个基于Maven的项目,实际上不需要在您的机器上安装和设置Apache Spark。当我们运行这个项目时,将启动一个运行时的Apache Spark实例,一旦程序执行完毕,它将被关闭。最后,为了了解在我们添加这个依赖时向项目中添加的所有JAR文件,我们可以运行一个简单的Maven命令,它允许我们查看项目的完整依赖树。以下是我们可以使用的命令:

mvn dependency:tree

当我们运行这个命令时,它将会展示给我们以下的依赖树。

shubham:JD-Spark-WordCount shubham$ mvn dependency:tree

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.Olivia:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.Olivia:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.Olivia:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] |  +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] |  |     +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] |  |     +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  |     \- org.objenesis:objenesis:jar:1.2:compile
[INFO] |  +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] |  +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] |  |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  |  +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] |  |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  |  +- commons-io:commons-io:jar:2.1:compile
[INFO] |  |  |  +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] |  |  |  +- commons-lang:commons-lang:jar:2.5:compile
[INFO] |  |  |  +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] |  |  |  |  +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] |  |  |  |  +- commons-digester:commons-digester:jar:1.8:compile
[INFO] |  |  |  |  |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] |  |  |  |  \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] |  |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] |  |  |  +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] |  |  |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] |  |  |     \- org.tukaani:xz:jar:1.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] |  |  |  \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] |  |  |  |  +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] |  |  |  |  |  +- com.google.inject:guice:jar:3.0:compile
[INFO] |  |  |  |  |  |  +- javax.inject:javax.inject:jar:1:compile
[INFO] |  |  |  |  |  |  \- aopalliance:aopalliance:jar:1.0:compile
[INFO] |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] |  |  |  |  |  |  |  +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] |  |  |  |  |  |  |  \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] |  |  |  |  |  |  \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |     \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] |  |  |  |  |  |     |        \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] |  |  |  |  |  +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] |  |  |  |  |  |  |  \- stax:stax-api:jar:1.0.1:compile
[INFO] |  |  |  |  |  |  +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] |  |  |  |  |  |  |  \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] |  |  |  |  |  |  |     \- javax.activation:activation:jar:1.1:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] |  |  |  |  |  |  \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] |  |  |  |  |  \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] |  |  |  |  \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] |  |  \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] |  +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] |  |  +- commons-codec:commons-codec:jar:1.3:compile
[INFO] |  |  \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] |  |  +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] |  |  |  \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] |  |  +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] |  |  |  \- jline:jline:jar:0.9.94:compile
[INFO] |  |  \- com.google.guava:guava:jar:14.0.1:compile
[INFO] |  +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] |  +- log4j:log4j:jar:1.2.17:compile
[INFO] |  +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] |  +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] |  +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] |  +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] |  +- commons-net:commons-net:jar:2.2:compile
[INFO] |  +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] |  |  +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] |  |  |  \- com.typesafe:config:jar:1.2.1:compile
[INFO] |  |  +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] |  |  +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] |  +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] |  +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] |  +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] |  |  \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] |  |     +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] |  |     \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] |  |        \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] |  |           +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] |  |           \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] |  +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] |  |  \- asm:asm:jar:3.1:compile
[INFO] |  +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] |  +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] |  +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] |  +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] |  +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] |  +- oro:oro:jar:2.0.8:compile
[INFO] |  +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] |  |  \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] |  +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] |  +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] |  \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO]    \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------

只需添加两个依赖项,Spark将收集项目中所需的所有依赖项,包括Scala依赖项以及Apache Spark本身是用Scala编写的。

创建一个输入文件

由于我们要创建一个字数统计程序,我们将在项目的根目录中创建一个名为input.txt的示例输入文件。在其中放入任何内容,我们使用以下文本:

Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.

Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.

请随意使用本文件中的任何文本。

项目结构

在我们继续并开始为项目编写代码之前,让我们在这里展示一下项目结构。一旦我们将所有代码添加到项目中,我们就会拥有以下的项目结构:【附注:附件20349的图片为项目结构,居中对齐,宽度为399】。

创建一个字数统计工具

现在,我们准备开始编写我们的程序。当你开始使用大数据程序时,导入可以造成很多困惑。为了避免这种情况,在我们的项目中,这是我们将使用的所有导入:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

接下来,这是我们将要使用的课程结构。

package com.Olivia.sparkdemo;

...imports...

public class WordCounter {

    private static void wordCount(String fileName) {
        ...
    }

    public static void main(String[] args) {
        ...
    }
}

所有逻辑都将放在wordCount方法中。我们首先要为SparkConf类定义一个对象。这个类的对象用于为程序设置各种Spark参数,作为键值对。我们只提供简单的参数。

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

主节点指定本地,这意味着该程序应连接到在本地主机上运行的Spark线程。应用程序名称只是为Spark提供应用程序的元数据的一种方式。现在,我们可以使用这个配置对象构建一个Spark上下文对象。

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

Spark将其获得的每个资源都视为RDD(弹性分布式数据集),这有助于将数据组织成可更高效分析的查找数据结构。我们现在将输入文件转换为JavaRDD对象本身。

JavaRDD<String> inputFile = sparkContext.textFile(fileName);

我们将现在使用Java 8的API来处理JavaRDD文件,并将文件中包含的单词分割成独立的单词。

JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

同样,我们使用Java 8的mapToPair(…)方法来计算字数,并提供一个字数和数字的对,可以作为输出展示。

JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

现在,我们可以将输出文件保存为文本文件。

countData.saveAsTextFile("CountData");

最后,我们可以使用main()方法提供程序的入口点。

public static void main(String[] args) {
    if (args.length == 0) {
        System.out.println("No files provided.");
        System.exit(0);
    }
    wordCount(args[0]);
}

整个文件的外观如下:

package com.Olivia.sparkdemo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class WordCounter {

    private static void wordCount(String fileName) {

        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        JavaRDD<String> inputFile = sparkContext.textFile(fileName);

        JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

        JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

        countData.saveAsTextFile("CountData");
    }

    public static void main(String[] args) {

        if (args.length == 0) {
            System.out.println("No files provided.");
            System.exit(0);
        }

        wordCount(args[0]);
    }
}

我们现在将使用Maven自身来运行这个程序。

运行应用程序

要运行这个应用程序,请进入程序的根目录并执行以下命令:

mvn exec:java -Dexec.mainClass=com.Olivia.sparkdemo.WordCounter -Dexec.args="input.txt"

在这个命令中,我们向Maven提供了Main类的完全限定名和输入文件的名称。一旦这个命令执行完成,我们可以在我们的项目中看到一个新的目录被创建: 项目输出目录 当我们打开这个目录,并打开其中的名为“part-00000.txt”的文件时,它的内容如下: 单词统计输出

结论

在这节课中,我们学习了如何在基于Maven的项目中使用Apache Spark来创建一个简单但有效的单词计数程序。阅读更多关于大数据的文章,以深入了解可用的大数据工具和处理框架。

广告
将在 10 秒后关闭
bannerAds