使用Apache Spark 2.3进行WordCount

根据这个参考,我尝试在Apache Spark Scala中执行了WordCount。环境如下所示。

・CentOS 7.5 (Linux 操作系统版本为 CentOS 7.5)
・Apache Spark 2.3.1 (使用 Apache Spark 版本为 2.3.1)
・Scala 2.12.6 (使用 Scala 版本为 2.12.6)
・sbt 1.1.6 (使用 sbt 版本为 1.1.6)

安装Spark、Scala和sbt。

执行Apache Spark安装和环境变量设置。

安装Scala和设置环境变量。

# wget https://downloads.lightbend.com/scala/2.12.6/scala-2.12.6.tgz
# tar xvzf scala-2.12.6.tgz
# mv scala-2.12.6 /usr/local/scala

# vi /etc/profile
# export SCALA_HOME=/usr/local/scala
# export PATH=$PATH:$SCALA_HOME/bin

source /etc/profile

安装 Scala sbt,确认版本

# curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
# sudo yum install sbt

# sbt sbtVersion

准备sbt构建环境。

事前に準備するために、以下のディレクトリ(outputを含まない)およびファイルを用意してください。

・目录

WordCount/
├── build.sbt (ビルド方法を定義)
├── input.txt (ワードカウント対象のファイル)
├── output/  (ワードカウントの結果:sbt runにて作成される)
│
├── project/ (sbtの追加設定)
│   └── assembly.sbt (sbtのプラグイン)
└── src/
   └──main/
     └── scala/
     └── jp/
       └── hoge/
        └── news/
          └── WordCountApp.scala

这里参考build.sbt文档,指定scalaVersion为2.11.7,spark-core版本为2.1.0。
※由于Spark 2.3.1是使用Scala 2.11编译的,所以需要降低Scala的版本。

name := "WordCountApp"
version := "1.0.0"
scalaVersion := "2.11.7"
resolvers += "Atilika Open Source repository" at "http://www.atilika.org/nexus/content/repositories/atilika"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.atilika.kuromoji" % "kuromoji" % "0.7.7"
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}
mainClass in assembly := Some("WordCountApp")

在以下指定中生成一个包含Scala JAR和lib目录下所有JAR的fat JAR文件。
请注意,从这个链接下载的sbt版本1.x支持sbt-assembly 0.14.6。

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")    

・WordCountApp.scala的中文本地化版本。

package jp.hoge.news

import java.util.regex.{Matcher, Pattern}
import scala.collection.convert.WrapAsScala._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.atilika.kuromoji.Tokenizer
import org.atilika.kuromoji.Token

object WordCountApp{
def main(args: Array[String]) {
    //スパークの環境設定
    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount App")
    val sc = new SparkContext(sparkConf)
    //kuromojiのトークナイザ
    val tokenizer = Tokenizer.builder.mode(Tokenizer.Mode.NORMAL).build()
    //テキストファイルから1行ずつ読み込み。名詞を配列に分解する。
    //テキストファイルからRDDオブジェクトを取得する。
        val input = sc.textFile("input.txt").flatMap(line => {
        val tokens : java.util.List[Token] = Tokenizer.builder().build().tokenize(line)
        val output : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
        tokens.foreach(token => {
            if(token.getAllFeatures().indexOf("名詞") != -1) {
      output += token.getSurfaceForm()
        }})
        output// return
    })
    //ワードカウントを行う。数える名詞をキーにし、キーを元に加算処理を行う。
    val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)
    //降順に単語を列挙して出力する。
    val output = wordCounts.map( x => (x._2, x._1)).sortByKey(false).saveAsTextFile("output")
    }
}

・输入文件.txt .txt)

Apache Sparkはオープンソースのクラスタコンピューティングフレームワークである。カリフォルニア大学バークレー校のAMPLabで開発されたコードが、管理元のApacheソフトウェア財団に寄贈された。Sparkのインタフェースを使うと、暗黙のデータ並列性と耐故障性を備えたクラスタ全体をプログラミングできる。日経BP社が発表した「ITインフラテクノロジーAWARD 2015」において、SparkはDockerに次ぐ準グランプリとされた。
フォールトトレラントシステムで管理され、複数マシンのクラスタに分散されたデータ項目の読み取り専用多重集合であるRDD(resilient distributed dataset)と呼ばれるデータ構造を中心とするアプリケーションプログラミングインターフェイスを備えている。MapReduceは、分散プログラム上で特定の線形データフロー構造を強制するクラスタコンピューティングプログラミングパラダイムの制限に対応して開発された。MapReduceは、ディスクから入力データを読み込み、データ全体に関数をマップし、削減結果をディスクに格納する。SparkのRDDは、 分散共有メモリの (意図的に)制限された形式で提供する分散プログラムのワーキングセットとして機能する。

RDDの可用性は、ループ内で複数回データセットを参照する反復法アルゴリズム、および対話型/探索型データ分析、データ反復のデータベースクエリの両方の実装を容易にする。このようなアプリケーションのレイテンシ(Apache Hadoopスタックでは一般的であったMapReduce実装と比較して)は、桁違いに低下する可能性がある。反復アルゴリズムのクラスの中には、 機械学習のための訓練アルゴリズムがあり、Apache Sparkを開発の初期の刺激となった。クラスタマネージャと分散ストレージシステムが必要であり、クラスタ管理のためにスタンドアロン(ネイティブのSparkクラスタ)、Hadoop YARN、Apache Mesosに対応している。分散ストレージの場合、Hadoop分散ファイルシステム、MapRファイルシステム(MapR-FS)、Apache Cassandra、OpenStack Swift、Amazon S3、Kudu、カスタムソリューションを実装できる。擬似分散ローカルモードも対応しており通常は開発やテスト目的でのみ使用される。分散ストレージは不要でローカルファイルシステムを代わりに使用でき、CPUマルチコアごとに1台のマシン上で実行される。

运行sbt。

在包含bulid.bat文件的项目根目录上执行以下操作

# sbt run

成功运行完毕后,查看output文件夹下的part-00000x,确认WordCount已执行。

# vi output/part-00000
(9,分散)
(8,データ)
(6,Apache)
(6,Spark)
(5,クラスタ)
(5,システム)
(4,開発)
(3,反復)
(3,実装)
(3,MapReduce)
(3,RDD)
(3,対応)
(3,ファイル)
(3,ストレージ)
广告
将在 10 秒后关闭
bannerAds