开始学习使用Spark Streaming开发,版本为Spark 1.5

介绍

这篇帖子是以”Advent Calendar 2015 .. NextGen Distributed Computing system”为起因所写的!这是Advent Calendar的第一天文章。

关于方针

我打算逐步向那些第一次接触Spark和SparkStreaming的人解释这个主题,以便他们能够顺利理解。开发是基于Scala的。我并不是说如果你不熟练地使用Scala就无法编写Spark处理程序,但基本知识还是必要的。为了掌握Scala基础知识,我建议您参考以下链接:
https://gist.github.com/scova0731/2c405ea55488d804b366

SparkStreaming的介绍

SparkStreaming是什么?

使用Spark核心扩展模块,能处理可扩展、高吞吐量、容错性强的实时流数据。
数据源可以连接到多个地方,例如Kafka、Flume、Twitter、ZeroMQ、Kinesis、TCP sockets等。
功能包括高级处理的map、reduce、join、window等。
最终结果可以推送到文件系统、数据库、实时仪表盘,可用于机器学习和图处理。

streaming-arch.png

在内部,SparkStreaming将输入流分割成小的批处理单元,并在Spark引擎中对批处理进行处理。
为了执行此操作,实现了一个抽象概念称为DStream。
DStream是Spark处理的基本单位,它是RDD的序列。

streaming-dstream.png

Spark是什么?

一种可以广泛应用于各种用途的快速分布式处理系统。可以使用Scala、Java、Python和R进行编写。
作为执行高级处理的工具,可以使用SparkSQL对结构化数据进行处理,使用MLlib进行机器学习,
使用GraphX进行图处理,使用Spark Streaming进行流处理。

spark.png

在手边运行简单的示例样本。

為了將其應用到實際開發中,我們可以重新實現以下所介紹的程式碼並執行,詳細請參閱以下連結:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example

准备开发环境

开发时使用IntelliJ、scala_2.11和spark_1.5。

    • IntelliJを以下からDLする。(無料のCommunity Editionで問題なし)

 

    • https://www.jetbrains.com/idea/download/

 

    • sbt pluginインストール

 

    • IntelliJ起動後 sbt pluginをインストール

 

    • IntelliJでのSpark設定は以下の記事がわかりやすい!

 

    http://qiita.com/imaifactory/items/823caa33639196f5459a

写代码

    本体:sbtプロジェクトのsrc > main > scalaに作る。
package org.apache.spark.examples.streaming

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

/**
 *  `$ nc -lk 9999`で入力データを設定する。
 */
object NetworkWordCount extends Logging {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    // localhost:9999で入力を受つける。
    val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
    • build:sbtプロジェクトのbuild.sbtを上書き

 

    • ※名前衝突を避けるためMerge Strategyを記述する。

 

    https://github.com/sbt/sbt-assembly#merge-strategy より
name := "ss1"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % "1.5.2",
  "org.apache.spark" % "spark-streaming_2.11" % "1.5.2"
)

// Merge Strategy
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}
    plugins:sbtプロジェクトのproject > plugins.sbtを上書き
logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
    • github:以下に今回のコード〜参考までに

 

    https://github.com/kaz3284/4qiita/tree/master/sparkstreaming/ss1

建设

cd /Users/kaz3284/github/4qiita/sparkstreaming/ss1 #sbtプロジェクトのホーム
sbt assembly

执行

準備
pre-build版をDLして実行環境を作る。※手もとでspark-submitを動かせるようにするため。

下記リンク先で「1.5.2」「Pre-built for Hadoop 2.6 and later」を選択してDL、解凍して${SPARK_HOME}とする。
http://spark.apache.org/downloads.html

sparkstreaming起動
Macで実行する場合の例

export SPARK_HOME=/Users/kaz3284/develop/spark/spark-1.5.2-bin-hadoop2.6
export SS_SRC=/Users/kaz3284/github/4qiita/sparkstreaming/ss1

${SPARK_HOME}/bin/spark-submit ${SS_SRC}/target/scala-2.11/ss1-assembly-1.0.jar
    ncでメッセージ送る
nc -lk 9999

hello world hello world hello world
hello world hello world
hello world
    下記のような標準出力が出れば手もとで動かせるSparkStreamingプログラム完成!!
result_sc.png
    補足:開発時に余計なメッセージを消す場合は以下を実施する。
cp ${SPARK_HOME}/conf/log4j.properties.template ${SPARK_HOME}/conf/log4j.properties
vi ${SPARK_HOME}/conf/log4j.properties
    以下のように標準出力の部分をINFO=> WARNへ
# Set everything to be logged to the console
log4j.rootCategory=WARN, console

最终

这次我们创建了一个可以在本地进行调试的SparkStreaming开发环境!
接下来我们将扩展我们这次创建的开发环境〜
下一次我们将使用更实用的DataFrame和SQL来进行聚合操作的实现。

只需一个选择来以中文本地化重述以下内容:

作为SparkStreaming开发的参考

    • Spark Streaming Programming Guide

 

    • http://spark.apache.org/docs/latest/streaming-programming-guide.html

spark git
https://github.com/apache/spark

spark docs
https://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.package

广告
将在 10 秒后关闭
bannerAds