开始学习使用Spark Streaming开发,版本为Spark 1.5
介绍
这篇帖子是以”Advent Calendar 2015 .. NextGen Distributed Computing system”为起因所写的!这是Advent Calendar的第一天文章。
关于方针
我打算逐步向那些第一次接触Spark和SparkStreaming的人解释这个主题,以便他们能够顺利理解。开发是基于Scala的。我并不是说如果你不熟练地使用Scala就无法编写Spark处理程序,但基本知识还是必要的。为了掌握Scala基础知识,我建议您参考以下链接:
https://gist.github.com/scova0731/2c405ea55488d804b366
SparkStreaming的介绍
SparkStreaming是什么?
使用Spark核心扩展模块,能处理可扩展、高吞吐量、容错性强的实时流数据。
数据源可以连接到多个地方,例如Kafka、Flume、Twitter、ZeroMQ、Kinesis、TCP sockets等。
功能包括高级处理的map、reduce、join、window等。
最终结果可以推送到文件系统、数据库、实时仪表盘,可用于机器学习和图处理。
在内部,SparkStreaming将输入流分割成小的批处理单元,并在Spark引擎中对批处理进行处理。
为了执行此操作,实现了一个抽象概念称为DStream。
DStream是Spark处理的基本单位,它是RDD的序列。
Spark是什么?
一种可以广泛应用于各种用途的快速分布式处理系统。可以使用Scala、Java、Python和R进行编写。
作为执行高级处理的工具,可以使用SparkSQL对结构化数据进行处理,使用MLlib进行机器学习,
使用GraphX进行图处理,使用Spark Streaming进行流处理。
在手边运行简单的示例样本。
為了將其應用到實際開發中,我們可以重新實現以下所介紹的程式碼並執行,詳細請參閱以下連結:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
准备开发环境
开发时使用IntelliJ、scala_2.11和spark_1.5。
-
- IntelliJを以下からDLする。(無料のCommunity Editionで問題なし)
-
- https://www.jetbrains.com/idea/download/
-
- sbt pluginインストール
-
- IntelliJ起動後 sbt pluginをインストール
-
- IntelliJでのSpark設定は以下の記事がわかりやすい!
- http://qiita.com/imaifactory/items/823caa33639196f5459a
写代码
- 本体:sbtプロジェクトのsrc > main > scalaに作る。
package org.apache.spark.examples.streaming
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
/**
* `$ nc -lk 9999`で入力データを設定する。
*/
object NetworkWordCount extends Logging {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// localhost:9999で入力を受つける。
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
-
- build:sbtプロジェクトのbuild.sbtを上書き
-
- ※名前衝突を避けるためMerge Strategyを記述する。
- https://github.com/sbt/sbt-assembly#merge-strategy より
name := "ss1"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "1.5.2",
"org.apache.spark" % "spark-streaming_2.11" % "1.5.2"
)
// Merge Strategy
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
- plugins:sbtプロジェクトのproject > plugins.sbtを上書き
logLevel := Level.Warn
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
-
- github:以下に今回のコード〜参考までに
- https://github.com/kaz3284/4qiita/tree/master/sparkstreaming/ss1
建设
cd /Users/kaz3284/github/4qiita/sparkstreaming/ss1 #sbtプロジェクトのホーム
sbt assembly
执行
準備
pre-build版をDLして実行環境を作る。※手もとでspark-submitを動かせるようにするため。
下記リンク先で「1.5.2」「Pre-built for Hadoop 2.6 and later」を選択してDL、解凍して${SPARK_HOME}とする。
http://spark.apache.org/downloads.html
sparkstreaming起動
Macで実行する場合の例
export SPARK_HOME=/Users/kaz3284/develop/spark/spark-1.5.2-bin-hadoop2.6
export SS_SRC=/Users/kaz3284/github/4qiita/sparkstreaming/ss1
${SPARK_HOME}/bin/spark-submit ${SS_SRC}/target/scala-2.11/ss1-assembly-1.0.jar
- ncでメッセージ送る
nc -lk 9999
hello world hello world hello world
hello world hello world
hello world
- 下記のような標準出力が出れば手もとで動かせるSparkStreamingプログラム完成!!
- 補足:開発時に余計なメッセージを消す場合は以下を実施する。
cp ${SPARK_HOME}/conf/log4j.properties.template ${SPARK_HOME}/conf/log4j.properties
vi ${SPARK_HOME}/conf/log4j.properties
- 以下のように標準出力の部分をINFO=> WARNへ
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
最终
这次我们创建了一个可以在本地进行调试的SparkStreaming开发环境!
接下来我们将扩展我们这次创建的开发环境〜
下一次我们将使用更实用的DataFrame和SQL来进行聚合操作的实现。
只需一个选择来以中文本地化重述以下内容:
作为SparkStreaming开发的参考
-
- Spark Streaming Programming Guide
-
- http://spark.apache.org/docs/latest/streaming-programming-guide.html
spark git
https://github.com/apache/spark
spark docs
https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.package