使用Apache Spark学习分布式处理

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/   

2016年7月末,Apache Spark 2.0.0发布了,我第一次尝试了一些功能,做了一些笔记。
因为这只是笔记,请多包涵?
另外,本文中附带的示例代码主要是以Java为主,但我觉得Scala和Python更加简洁。
以后会随时进行编辑。

Apache Spark 是什么

sparkhadoopgraphic

根据图片来源于https://spark.apache.org,据说有时候比Hadoop的MapReduce快100倍,非常强大,是Spark。

Spark是一个开源框架,可以快速并行处理大型数据。(Java杂志中写着Spark是一个快速且可扩展的通用分布式处理引擎)

我认为重要的是以下几点(通过阅读各种网站和文件)。
有关以下内容的详细信息将在后文中提及。

1. 弹性分布式数据集(RDD)的工作原理
2. 支持Scala、Java、R、Python等编程语言(提供了相应的API)
3. 丰富的库集合
4. 多种部署方案(独立模式、YARN、Mesos、内嵌模式、云环境)
5. 广泛的处理模型(批处理、交互式处理、流式处理)

顺便提一下,根据日经BP公司发布的“IT基础设施技术奖2015”评选结果,Spark获得了亚军,紧随Docker获得準大奖。

Spark与Hadoop一样都是分布式处理框架(确切地说,负责的部分有些不同)。Spark最早由加州大学伯克利分校开发,并在2014年捐赠给了Apache Software Foundation。与Hadoop使用Java语言不同,Spark是使用Scala语言开发的。

Spark的熟练程度

得意

    • Hadoopで加工したのちのドリルダウン分析

 

    • TB級までのデータを扱うシステム

 

    • サンプリングが有効でないロングテールのデータ分析

 

    数秒~数分程度のHadoopよりも短いレスポンスが必要な処理

不太高兴

    • クラスタ全体のメモリに乗りきらない巨大なデータ処理(TB以上)

 

    • 大きなデータセットを少しずつ更新する処理

 

    秒以下の時に短いレスポンスが必要な処理

与Hadoop的不同之处

作为相同的分布式处理框架,Hadoop是著名的。虽然这两者不能直接进行比较,但根据以下图片(引自http://www.aerospike.com/blog/what-the-spark-introduction/),我们可以看出Spark与Hadoop的对应关系应该是在MapReduce这一部分。

hadoopgraphic2

Hadoop自身是非常方便的,但也有一些缺点。Spark似乎正试图解决各种缺点。

以下是Hadoop被称为缺点的三个主要问题。

    • 個々のメモリを活用できる設計でなかった(タスクごとにデータをディスク読み込み&書き込みしなきゃいけないのがボトルネック)

 

    • 同じ処理を複数行う際に、その都度データアクセスが発生

 

    同じデータでもなんども使うものは使う際に無駄にアクセスがいっぱい発生

这个问题的解决方案是通过Spark的分布式共享内存机制RDD(Resilient Distributed Datasets)来实现的。简单来说,与Hadoop每次都需要访问存储相比,RDD可以在内存中运行,有很多好处。

在Hadoop的情况下
スクリーンショット 2016-09-09 14.57.37.png
(*1)HDFS:MapReduceで処理するデータを扱う、分散ストレージとして複数のマシンを一つのストレージとして扱うことができる。
HDFSは管理するファイルのI/Oを高速化するため大きなファイルを一定の大きさ(初期設定では64MB)のブロックに分割して、
複数の記憶装置に分散して保存、I/Oを記憶装置の台数だけ並列に実行できるようにしている。

只需要一个选项用中文转述以下内容:
在Spark中
スクリーンショット 2016-09-09 15.11.31.png

RDD(弹性分布式数据集)的机制。

RDD是一种可容纳重复使用数据的分布式数据集,它能够在内存中存储数据,并且继承了Hadoop的MapReduce的特性,包括容错性、数据本地性和可扩展性。

I. RDD的特性

    • イミュータブルで分割されたオブジェクトのコレクション(フォールトトレラント性を実現するためRDDでは「得たいデータが失われていたら前のデータから再生成する」というアプローチを取っているため、それぞれイミュータブルである必要がある)

 

    • 読み取り専用

 

    • 並列処理(map,filter,groupBy,join)をストレージ上のデータに適応した結果を生成

 

    • 再利用するためにメモリ上にキャッシュされる

 

    遅延評価される(アクション系のメソッド(*2)が呼ばれるまで」実際の処理は行われない)
(*2)アクション系のメソッド(引用)
> RDDに保持したデータを操作するメソッドは大きく分けて2つに分類されます。
「Transformations」と「Actions」です。
「Transformations」はRDDを操作し、結果を新しいRDDとして返します。
「Actions」はRDDのデータを操作し、結果をRDD以外の形式で返すか保存を行います。
大雑把に言えば、RDDを返すのが「Transformations」、そうではないものが「Actions」であると言えると思います。

获得RDD的生成需要

有两种主要的方法来生成。
第一种方法是使用称为SparkContext的驱动程序程序所拥有的对象来访问Spark。

JavaRDD<String> lines = sc.textFile("README.md");

以下是加载文本文件等作为字符串RDD的数据集的方法。
另外一种方法是通过驱动程序将对象集合(如列表或集合)进行分布式处理。

例如,可以将现有程序中的集合按以下方式传递给SparkContext的parallelize()方法。

JavaRDD<String> lines = sc.parallelize(Arrays.asList("student","i am a student"))

然而,这种方法需要一台计算机的内存中包含整个数据集,因此不太实际。

Ⅲ. RDD的循环利用(持久化/缓存)

当RDD的操作被执行时,默认情况下会重新计算。如果某个RDD需要多次在操作中重新计算,可以通过Spark对其进行持久化(lines是指RDD)。

lines.persist()

你可以使用Spark将数据放置在不同的地方,当对标记为持久化的RDD进行计算时,Spark会将该RDD的内容保存在内存中(并分散在集群内的多台机器上),以便在以后的操作中重复使用。你还可以将RDD永久保存到磁盘上。当你让Spark将RDD永久保存时,计算节点会负责保存它们自己的分区。如果保存数据的节点遇到故障,Spark会根据需要重新计算丢失的数据分区。此外,为了避免节点故障导致速度下降,你还可以将数据复制到多个节点上。默认的Java persist()方法会将数据序列化并保存在JVM内存中。至于时机方面,需要在RDD的第一个操作之前调用persist()方法,因为persist()和cache()方法声明了RDD的持久化,并且延迟评估。关于指定持久化级别的信息可以在org.apache.spark.storage.StorageLevel中找到。当试图缓存无法全部放入内存的数据时,可以使用LRU(Least Recently Used)策略将旧的分区转移到磁盘中。转移到磁盘还是重新计算被转移的数据取决于持久化级别。此外,你可以使用unpersist()方法从缓存中移除RDD。

(*3)LRUポリシー:Least Recently Usedポリシー、
文字どおり最も最近使ったのを残していき、
最近使わなかったやつは外していく考え方

IV. RDD操作

RDD支持两种操作:转换操作和行动操作。转换操作是创建新RDD的操作(例如map、filter),而行动操作则不是(例如count)。
大部分Spark的转换和行动操作依赖于函数传递,用于Spark处理数据的计算过程中。
此外,RDD的方法会在闭包中执行。换句话说,Spark将RDD方法的执行视为闭包。执行RDD方法时,该方法、作为参数传递给该方法的函数以及在函数内引用的变量会作为闭包一起发送给执行器,并由每个执行器执行。
然而,由于闭包的执行环境,无法更改闭包内引用的外部变量的值。这种情况下,需要使用累加器(Accumulator)。(据称,累加器仅在行动操作内使用时,才能保证将Spark任务的更新应用于每个累加器一次)

(*4)クロージャ: 演算を実行する際に必要となる変数とメソッド群をカプセル化したもの。

转变

对于RDD进行转换处理后,将返回一个新的RDD。转换后的RDD的操作将被延迟执行,直到在动作中使用时才开始执行。例如,如果要对日志文件进行过滤,只保留错误信息,函数传递的方式如下所示。

//A. 無名のインタークラスとしてインラインで定義し関数渡しする
JavaRDD<String> srcLog = sc.textFile("Log.txt");
JavaRDD<String> errorLog = srcLog.filter(
      new Function<String,Boolean>(){
            public Boolean call(String str){return str.contains("Error");}
   }
);

//B. 名前付きクラスを使って関数渡し
JavaRDD<String> srcLog = sc.textFile("Log.txt");
class ContainsError implements Function<String,Boolean>{
      public Boolean call(String str){return str.contains("Error");}
}
JavaRDD<String> errorLog = srcLog.filter(new ContainsError());

//C. パラメータありの名前付きクラスを使って関数渡し
JavaRDD<String> srcLog = sc.textFile("Log.txt");
class Contains implements Function<String,Boolean>{
      private String argQuery;
      public Contains(String arg){this.argQuery = arg;}
      public Boolean call(String str){return str.contains(argQuery);}
}
JavaRDD<String> errorLog = srcLog.filter(new Contains("Error"));

//D. Java8のラムダ式を使って関数渡し(オススメ)
JavaRDD<String> srcLog = sc.textFile("Log.txt");
JavaRDD<String> errorLog = srcLog.filter(s->s.contains("Error"));

在这里的操作并不是改变`srcLog`这个RDD本身,而是需要注意`filter()`函数返回的是一个指向全新的RDD `errorLog` 的指针。

[補充]主要的Java函数接口在org.apache.spark.api.java.function中。

関数実装する必要のあるメソッド使用方法Function<T1,R>R call(T1)入力を一つ取り、出力を一つ返す。map()やfilter()などと一緒に使うFunction2<T1,T2,R>R call(T1,T2)入力を二つ取り、出力を一つ返す。aggregate()やfold()などと一緒に使うFlatMapFunction<T,R>Iterable call(T)入力を一つ取り、出力を返さないこともあれば、複数返すこともある。。flatMap()といった操作と一緒に使う
要素单位的转换

在进行要素单位转换时,我们可以使用map()和filter等方法。

関数説明map()引数に関数を一つ取り、その関数をRDD内の各要素に適応し、その結果を新しい値とするRDDを返すfilter()引数に関数を一つ取り、そのフィルタ関数が真になる要素だけを含むRDDを返す

以下是一个例子,用于获取RDD内每个值的绝对值。

JavaRDD<Integer> input = sc.parallelize(Arrays.asList(-1,2,-3,4,-5));
JavaRDD<Integer> output = input.map(i->Math.abs(i));
System.out.println(output.collect());
多个输出

如果想要从每个输入生成多个输出,可以使用flatMap()函数来实现。

関数説明flatMap()引数に関数を一つ取り、その関数をRDD内の各要素に適応して呼ばれるが、この関数はその結果を返すIteratorを返します。それらのIterator全てから返された要素を値とするRDDを最優的に返します

以下是一个在RDD中将字符串按空格分割的例子。

JavaRDD<String> input = sc.parallelize(Arrays.asList("Hello I am a dog"));
JavaRDD<String> output = input.flatMap(s -> Arrays.asList(s.split(" ")));
System.out.println(output.collect());
汇集操作

在数学中,还有许多涉及集合的操作函数可供使用。

関数説明A[1,2,3],B[3,4,5],C[1,1,2,3]だった場合union()それぞれのRDDの和集合の重複を許したすべての要素からなるRDDを生成A.union(B) >>> [1,2,3,3,4,5]intersection()重複を許さずそれぞれのRDDの積集合の要素からなるRDDを生成A.intersection(B) >>> [3]subtract()片方の要素からもう一方の要素を除いた要素からなるRDDを生成A.substract(B) >>> [1,2]distinct()一つのRDD内部での重複要素を一つにまとめた要素からなるRDDを生成C.distinct() >>> [1,2,3]

B. 动作

操作是指将最终值返回给驱动程序(具有main函数)或将数据写入外部存储的行为。在对RDD执行操作时,必须实际生成输出,因此会评估RDD所需的转换的执行。例如,提供了以下函数。

関数説明count()要素の個数を返すtake(int num)複数の要素を引数の数だけ集めてくるcollect()RDD全体を取り出す(小さいサイズ場合のみ)saveAsTestFile()分散ストレージシステムに書き出すsaveAsSequenceFile()分散ストレージシステムに書き出す

另外,每次调用操作时RDD都需要重新计算。由于这可能是低效的,因此在某些情况下,用户可能需要对中间结果进行持久化。

计算总数

当需要计算RDD中某些值的总和时,reduce()函数在进行聚合操作时非常有效。

JavaRDD<Integer> input = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
Integer Result = input.reduce((a, b) -> (a + b));
System.out.println(Result);

reduce()是一个接收函数作为参数的函数,它对RDD中的任意两个相同类型的元素进行操作,并返回该类型的元素。
在这个例子中,我们使用Java 8的lambda函数来对两个元素求和并返回一个元素。在上面的例子中,结果应该是55。除了reduce之外,还有一些类似的函数,比如fold()和aggregate(),它们的返回类型可以与操作对象的RDD不同。

関数説明reduce(func)結合処理を並列に行うfold(zero)(func)reduce()と見た目は同じ、中身は若干違うっぽく、単位元的な値(その操作を適応しても変わらない)を受け取るaggregate(func)reduce()と同じだが、返す型が扱うRDDの要素と違う場合に使う

此外,据说 rdd.reduceByKey(func) 会生成与 rdd.groupByKey().mapValues(value -> value.reduce(func)) 相同的 RDD,但由于省略了生成每个键对应的值列表的步骤,因此具有较高的效率。

将数据返回给驱动程序。

在中国人口中,RDD的操作包括collect()和take()等函数,用于将数据的一部分或全部作为常规的集合或值返回给驱动程序。由于已经在代码中介绍了这些操作,因此不会详细展开。

関数説明take(n)RDDからn個の要素を返すtop(n)RDDの先頭からn個までの要素を返すtakeOrdered(n)(ordering)指定された順序でRDDからn個要素を返すtakeSample(withReplacement,n,seed)RDDからランダムにn個要素を返すcollect()RDDすべての要素を返すcount()RDDの要素数を返すcountByValue()RDDの各要素の出現回数を返す
其他

当在RDD上执行操作时,有时我们并不需要将结果返回给驱动程序,而是希望执行诸如写入外部存储等操作。这种情况下,可以使用foreach()函数。

使用foreach()函数,可以对RDD中的每个元素执行操作,而无需将结果返回给驱动程序。

関数説明foreach(func)RDD中の各要素に指定した関数を適用

Ⅴ. RDD转换的延迟评估

正如前面所提到的,Spark在执行操作之前不会立即开始转换处理。Spark会内部记录表示该操作已被请求的元数据,而不会立即执行该操作。然后Spark会进行延迟评估,将操作分组,并减少对数据的操作次数(在内部进行一定程度的优化)。因此,似乎不需要考虑如何将操作组合以尽量减少Hadoop Mapreduce的次数。

Ⅵ. RDD之间的依赖关系

RDDの変換において、変換の元(親)のRDDと変換後(子)との間には依存関係が定義される。
依存関係はスケジューリング(RDDの変換チェインを処理可能なタスクに分割する)に関係する。
狭い依存関係(親のパーティションが単一の子のパーティションの生成に関わっている依存関係)と、広い依存関係(親のパーティションが複数の子のパーティション生成に関わっている依存関係)がある

第七点. RDD的抗故障性

RDDはデータそのものではなく、「RDDを構築するためにデータに対して行った変換」を記録することで効率的な耐障害性を実現している。
もしRDDの中の一部分がロストした場合、RDDはそのデータが他のRDDからどのように変換されて生じたデータかを保持しているため、コストの大きなレプリケーション(*5)なしに迅速に再計算/復旧させることができる。

(*5)レプリケーション
レプリケーション、ソフトウェアやハードウェアの冗長な、リソース間で一貫性を保ちながら情報を共有する処理を意味する。
信頼性やfault tolerance性やアクセス容易性を強化する。

Ⅷ. PairRDD

第八章:PairRDD

在Java中没有内置的Tuple(*6)类型,因此用户需要使用Spark的JavaAPI中定义的Tuple2类来生成元组。这个链接提供了一个关于如何生成元组的详细指南。

(*6)Tupple:データベースやファイルでいう1行・1レコードのデータのようなもの。
つまり複数のデータを1つの塊として扱えるもの。
タプルを使って、メソッド・関数から複数の値を一度に返すことが出来るので、非常に便利らしい。

Spark处理的流程[补充说明]

一. 将系谱分割到不同的阶段

DAGSchedulerが、系譜をスケジュールに分割する
ステージは系譜中で狭い依存関係が連続して発生する範囲(依存関係の種類は変換の種類できまる)
系譜中で広い依存関係が発生する変換関数を処理する場合はエグゼキュータ間でシャッフルと呼ばれる多数対多数の通信が発生する。
この分割は、パーティションごとに一つのエグゼキュータがまとめて計算できる変換の範囲を決めるため

关于数据分区的事项

在分散式程式中,通訊非常昂貴,將網絡流量降到最低限度可大幅改善效能。
Spark程式需要控制RDD的分區,以減少通訊量。

例如,Spark可以通过编程保证键集合出现在归为一组的节点上。尤其是在数据集被重复使用多次进行键操作的情况下,分区操作非常有用,例如合并操作等。因为Spark的许多操作都会在网络上引发基于键的洗牌操作,它们都可以受益于分区操作。

在Java或Scala中,可以使用partitioner属性来指定RDD的分区方式。在这里,将`spark.Partitioner`对象的值设置为一个能告知RDD中每个键的目的分区的对象。

当在分区化的RDD上执行像reduceByKey()这样针对单个RDD进行操作时,每个键的所有值都会在一个单独的机器上本地计算,并且只有经过本地reduce()操作后的最终值会从每个工作节点返回给主节点。

此外,在对两个RDD进行操作时,如果预先进行了分区,至少会有其中一个RDD不会被洗牌。

令人惊讶的是,Spark知道每个操作对分区化的影响,因此会自动为通过进行数据分区操作生成的RDD设置partitioner。

补充说明,如果要对RDD进行分区,并将生成的RDD作为转换的目标,则应使用persist()进行持久化。原因是,之后对RDD的操作将需要重新评估使用分区生成的整个RDD体系,因此在分区之前对RDD进行多次哈希分区将变得毫无意义。

判断是否执行阶段

Sparkではスケジューラで制御されてる複数のジョブで同じRDDを共有することができる
共有しているRDDがすでに計算済みで、ディスクやメモリに実態を持つ場合、そのRDDを生成するための前段のステージ実行を省略できる

Ⅲ. 创建任务

DAGSchedulerが実行対象の個々のステージについてタスクを定義する
各ステージにおいて、ステージ内の最後のRDDのパーティション数から当該ステージのタスク数が決まる
ステージに含まれるRDDの変換チェインから、タスクあたりの処理範囲が決まる

Ⅳ. 决定执行任务的位置。

    • プリファードロケーションがRDDにセットされている場合はそれをヒントにエグゼキュータを選ぶ

 

    • ない場合は親をたどって最初に見つかったものを使う

 

    • プリファードロケーションはRDDの種類ごとに定義される

 

    • データソースをもとに異性されるRDDはプリファードロケーションを持つので

 

    いつかはプリファードロケーションが見つかる

Ⅴ. 安排任务的执行顺序

    • ステージを構築するタスク群は「タスクセット」としてTaskSchedulerに渡される

TaskSchedulerはタスクセット単位で実行順序のスケジューリングを行う。

TaskSchedulerはタスクセットの実行順序を決定するまでに、タスクセットを登録しておく「プール」を一つ以上保持。スケジューリング方法によってプールの数や使い方が異なる
標準で二つのスケジューリング方がある

名称説明FIFO単一のプールを用いて、キューのように扱う。プールに登録されたタスクセットを順にスケジューリングするFAIR複数のプールを用いて、各プールから公平にアスクセットを取り出しスケジューリングする

关键词

キーワード説明クライアントジョブのキックを担当マスタジョブのキックを担当ワーカ計算資源の提供・管理を担当ドライバユーザがRDDの変換を記述したプログラムエグゼキューターワーカー上で動作し、実際の計算を担当ジョブSparkの処理内でアクション系のメソッドを実行するとジョブが作成される。SparkではRDDと呼ばれる抽象データセットの変換を繰り返して目的の結果を得る。この一連の処理をジョブというステージジョブはデータのシャッフルが必要な場合に複数のステージに分割されるタスクジョブをエグゼキューターが実行可能な粒度に分割した処理単位。ステージは複数のタスクを持ち、ワーカーに対して送られる並列実行可能な処理をタスクというスケジューラジョブをタスクに分割したり、タスクのエグゼキュータへの割り当てを担当

我在以下的网站上找到了非常详细的资料,参考了它们:
http://www.ne.jp/asahi/hishidama/home/tech/scala/spark/RDD.html
http://dev.classmethod.jp/etc/apache-spark_rdd_investigation/

資料的輸入/輸出[補充].

加载和保存文件

在bin / data文件夹中,可以将类似以下的JSON文件批量加载为一个RDD。(这里没有涉及JSON的处理,但是可以使用Java的JSON库,例如https://github.com/FasterXML/jackson,即使是使用该库,也需要首先将其作为文本加载然后再进行处理。)

SaveFileSample.java - SparkTest - [~_Desktop_SORACOM_SoraPractice_IntelliJPractice].png

本次作为例子,已经准备了通过Twitter的RESTApi获取的6个账户最近发布的200条推文数据的json文件。

我从那些推文的JSON文件中提取了只包含推文文本行的部分(input.filter(s -> s.contains(“\”text\”:”))),并只保留了其中包含“笑”的文本。最后,我将结果写入名为result/output01的目录中。
在这里,saveAsTextFile()方法的参数路径被视为一个目录,Spark将在其中输出多个文件。这样做可以使Spark从多个节点输出结果。

スクリーンショット 2016-09-29 15.54.50.png

结果如上所示,针对每个json文件也生成了相应的结果文件。
关于part-00000,无法打开该文件,可能是因为没有要输出的内容(不包含“笑”)。

最后我会附上这段源代码。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * Created by hirokinaganuma on 2016/09/29.
 */
public class TwitterSample {
    public static void main(String[] args) throws Exception {
        String inputFile = "bin/data/*.json";//まとめて読み込むことも可能
        String outputPath = "result/output01";
        String master;
        if (args.length > 0) {
            master = args[0];
        } else {
            master = "local";
        }
        JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
        JavaRDD<String> input = sc.textFile(inputFile);
        JavaRDD<String> output = input.filter(s -> s.contains("\"text\":")).filter(s -> s.contains("笑"));
        output.saveAsTextFile(outputPath);
        sc.stop();
    }
}

文件压缩
Spark在输入格式(如textFile())等一些输入格式中,会自动处理几种类型的压缩,以节约存储空间和降低通信成本。但是这些压缩只能用于写入文件系统,无法从Spark端用于数据库等其他用途。
另外,根据压缩格式的不同,可能会出现不能从多个工作节点读取的情况(有一些限制,比如必须从头开始读取),这将成为瓶颈。能从多个工作节点读取的格式称为“可分割”格式。


2. 支持Scala、Java、R、Python等编程语言(具备API)

这是字面上的意思。
通过支持各种不同的语言,并且容易集成第三方库也变得更加便捷。

3. 丰富多样的库

スクリーンショット 2016-09-09 16.01.06.png

Spark SQL: 構造化データや表形式データを扱う

Spark Streaming: ほぼリアルタイムでストリーム・データを処理する

MLlib: 機械学習を行う(コンポーネントとしてspark.mllibとspark.mlがあるが前者が非推奨になった模様)

GraphX: グラフ処理を行う

4. 多种引入场景(独立、YARN、Mesos、内嵌、云端)

スクリーンショット 2016-09-09 16.01.41.png

Apache Mesos也得到支持,但最常用于企业的是Hadoop YARN(Hadoop的核心组件),我们当然也支持它。
此外,Spark提供了集成的小规模独立群集系统,称为Spark Standalone,非常适合部署小规模群集的测试等用途。

在分散模式下,Spark采用一种主/从架构,其中包括一个中央协调器(驱动程序)和多个分布式工作者(执行器)。驱动程序以独立的Java进程运行,每个执行器也以独立的Java进程运行。这个驱动程序和执行器组合在一起被称为Spark应用程序。

image

如上图所示,Spark应用程序使用称为集群管理器的外部服务在多台计算机上启动。在此过程中,应用程序通过spark-submit命令进行提交。该命令会启动驱动程序,并调用用户指定的main()方法。然后驱动程序连接到集群管理器,并请求资源以启动执行器。随后,集群管理器以驱动程序的代理身份启动执行器。驱动程序执行用户应用程序,并根据程序中的RDD转换和操作将任务以任务的形式发送给执行器进行处理。任务在执行器进程中执行并保存计算结果。最后,当驱动程序的main()方法退出或者调用了SparkContext.stop()时,驱动程序停止执行器并释放从集群管理器获取的资源。这就是一系列Spark应用程序的流程。

司机

执行包含主函数的程序的进程。将生成SparkContext和RDD,并负责执行转换和操作。
此外,在执行用户程序任务的转换和执行任务调度方面,也承担责任。

关于前者,就Spark处理流程而言[补充]/ Ⅰ. 如前所述,这是将用户的程序转换为任务作为执行单位。Spark程序包括生成和转换RDD以及执行动作的结构,并生成由此操作构建的有向循环图。将此图转换为最优执行的阶段集。如前所述,阶段集包含多个任务。

关于后者,正如在IV. 调度任务执行顺序中所示,Spark的驱动程序负责调整执行者群组中每个任务的调度。执行者群组在启动时将自己注册到驱动程序,因此驱动程序可以始终了解其应用程序的执行者的情况。

执行者 zhě)

Spark的执行器是负责执行Spark作业的每个任务的工作进程。执行器在Spark应用程序启动时启动一次,并在应用程序运行期间通常持续运行。此外,即使执行器群组发生故障,Spark应用程序也可以继续处理。

主要执行者的角色包括执行构建应用程序任务组并将结果返回给驱动程序,以及通过被称为块管理器的服务在每个执行者内部提供用户程序缓存的RDD的内存存储。

5. 宽泛的处理模型(批处理、交互式、流式处理)

尤其是交互式shell非常方便。
不需要每次进行构建,可以快速验证逻辑的试验性编写,加快效果确认循环。外观和使用方法都类似于Ruby的irb。

试着实际运行一下

环境建立

我安装了Oracle Java SE。

(Note: The translation provided is in Simplified Chinese)

当切换Java版本或其他设置时,以下网站非常有参考价值:http://qiita.com/yohjizzz/items/194973bf2f34608ae85a

II. 下载Apache Spark。

请点击以下链接下载 Spark 的安装包:https://spark.apache.org/downloads.html。本次安装的版本是2.0.0。

    • ダウンロード手順1 : 「Chose a Spark release」で「2.0.0 (Jul 26 2016)」を選択。

 

    • ダウンロード手順2 : 「Chose a package type」で「Pre-build for Hadoop 2.7 and later」を選択。

 

    ダウンロード手順3 : 「spark-2.0.0-bin-hadoop2.7.tgz」をクリックしてダウンロードする。
Downloads | Apache Spark.png

在下载完成后,您可以在任意目录中进行解压,并启动spark-2.0.0-bin-hadoop2.7->bin->spark-shell。

_Users_hirokinaganuma_Desktop_SORACOM_Spark_spark-2.0.0-bin-hadoop2.7_bin.png

成功之后应该会出现以下的东西

スクリーンショット 2016-09-09 15.47.13.png
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
[init] error: error while loading <root>, Error accessing /Volumes/Transcend/Hack/SoraPractice/spark-2.0.0-bin-hadoop2.7/jars/._activation-1.1.1.jar

Failed to initialize compiler: object java.lang.Object in compiler mirror not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programmatically, settings.usejavacp.value = true.

Failed to initialize compiler: object java.lang.Object in compiler mirror not found.
** Note that as of 2.8 scala does not assume use of the java classpath.
** For the old behavior pass -usejavacp to scala, or if using a Settings
** object programmatically, settings.usejavacp.value = true.
Exception in thread "main" java.lang.NullPointerException

如果出现类似的情况,只需将PATH设置正确即可解决问题!

$ cd
$ vim .bash_profile

我已经在 .bash_profile 文件中以以下的方式进行了记录。

export SPARK_HOME="spark-2.0.0-bin-hadoop2.7までのpath"
export PATH="$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin"

最后

$ source .bash_profile

所以,只需重新加载.bash_profile文件即可完成。我相信这样就可以启动spark-shell了。

目前为止,环境搭建已经完成。

你好世界!我也尝试了类似的东西。
在显示为scala>的地方,继续输入sc….并尝试按照以下方式输入。

scala> sc.parallelize(1 to 10,1).map(_*2).reduce(_+_)

然后应该是这样的。在这种情况下,我们将1到10的数字分别乘以2并相加。

スクリーンショット 2016-09-09 15.50.38.png

2. 网页用户界面

接下来让我们在浏览器中尝试访问 localhost:8080。

那么你应该能够看到如下屏幕(部分信息已隐藏)。

Spark Master at spark___Hiroki-no-MacBook-Pro-2.local_7077.png

我們可以通過這個方便的Web用戶界面來查看Spark處理的狀況,並了解集群和主機的詳細信息。

其他

キーワード説明ジョブSparkの処理内でアクション系のメソッドを実行するとジョブが作成される。SparkではRDDと呼ばれる抽象データセットの変換を繰り返して目的の結果を得る。この一連の処理をジョブというステージジョブはデータのシャッフルが必要な場合に複数のステージに分割されるタスクステージは複数のタスクを持ち、ワーカーに対して送られる並列実行可能な処理をタスクという

小贴士

1. 启动集群时

$ sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/hirokinaganuma/Desktop/Hoge/SoraPractice/spark-2.0.0-bin-hadoop2.7/logs/spark-hirokinaganuma-org.apache.spark.deploy.master.Master-1-Hiroki-no-MacBook-Pro-2.local.out
failed to launch org.apache.spark.deploy.master.Master:
  /Users/hirokinaganuma/Desktop/Hoge/SoraPractice/spark-2.0.0-bin-hadoop2.7/bin/spark-class: line 71: /Library/Java/JavaVirtualMachines/jdk1.8.0_101.jdk/bin/java: No such file or directory
full log in /Users/hirokinaganuma/Desktop/Hoge/SoraPractice/spark-2.0.0-bin-hadoop2.7/logs/spark-hirokinaganuma-org.apache.spark.deploy.master.Master-1-Hiroki-no-MacBook-Pro-2.local.out
localhost: ssh: connect to host localhost port 22: Connection refused

如果出现类似的情况,我想指定使用jdk1.8.0。

export JAVA_HOME=`/usr/libexec/java_home -v 1.8`

如果那样做的话,问题就解决了。

随后

$ sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/hirokinaganuma/Desktop/Hoge/SoraPractice/spark-2.0.0-bin-hadoop2.7/logs/spark-hirokinaganuma-org.apache.spark.deploy.master.Master-1-Hiroki-no-MacBook-Pro-2.local.out
localhost: ssh: connect to host localhost port 22: Connection refused

好像变成了(没有解决)。

在亚马逊网络服务的EC2实例中运行

以下是三个网站链接:

[aws][amazon][ec2] Amazon ec2にSSHで接続する


http://qiita.com/ledmonster/items/8138d7cf0990ea1017da
http://qiita.com/mikoto/items/4273cd35dce254727363

Spark提供了用于在Amazon EC2上启动集群的脚本(EC2脚本)。但是,目前最新版本(2.0.1)好像没有这个脚本了,难道已经不推荐使用了吗?!

这个脚本可以启动节点群并在它们上安装Standalone集群管理器,一旦集群启动,就可以作为Standalone模式使用。EC2脚本可以管理多个命名的集群,这些集群可以通过EC2的安全组来识别。EC2脚本会为每个集群创建一个名为clustername-master和clustername-slaves的安全组。

看起来它会帮助完成实例的创建,但最好查看一下这篇文章。
http://qiita.com/mychaelstyle/items/b752087a0bee6e41c182

独立集群管理器

Standalone集群管理器提供了基本的的调度策略(例如,可设置内存和核心数的限制),可以设置每个应用程序资源的上限,从而可以并行执行多个应用程序。

首先,从这里开始,书中也写道,我们应该在这里进行第一次部署以熟悉操作。如果想要与其他应用程序一起运行或使用更丰富的调度功能,使用Mesos或YARN是不错的选择。

YARN具有队列的概念,而Mesos则支持应用程序在运行时更动态的共享。

为了高可用性,如果我们在生产环境中运行Standalone集群的配置,我们会针对工作节点的故障进行容错处理。为了提高集群主节点的可用性,可以使用Apache ZooKeeper(分布式协调系统)准备多个备用主节点,这样当出现故障时,Spark可以支持切换到新的主节点。

此时,集群管理器对于驱动程序支持两种部署模式。

    • クライアントモード

 

    • クライアントモードでは、ドライバはspark-submitを実行したマシン上で、spark-submitのコマンドの一部として動作する。(ドライバプログラムに直接入力を行えたり、出力を直接見たりできるから)

 

    • クラスタモード

 

    クラスタモードではドライバはStandaloneクラスタ内でいずれかのワーカーノード上のプロセスとして起動され、リクエストのエグゼキュータに対して接続します。切り替えるのにはspark-submit –deploy-mode clusterに渡さなきゃいけません。

Apache Mesos is a Chinese-created software system.

以下是概述
Apache Mesos是一個通用的集群管理器,可以在集群上運行分析工作負載、長期運行的網絡應用程序、鍵值存儲等服務。
在多主模式下運行時,可以使用ZooKeeper來選擇主節點。此外,Mesos上的Spark支持的應用程序運行模式僅為客戶端部署模式。這意味著“驅動程序在提交應用程序的機器上運行”。如果想在Mesos集群上運行驅動程序,則需要使用Aurora或Chronos等框架。
此外,與其他集群管理器不同,Mesos在同一集群中的執行器之間提供兩種資源共享模式。

fine-grainedモード
エグゼキュータはタスクを実行しながらMesosに要求するCPU数を増減させるので、複数のエグゼキュータはタスクを実行しているマシンはエグゼキュータ間のCPUリソースを動的に共有できます。ただし、タスクをスケジューリングするレイテンシが大きくなってしまう。

coarse-grainedモード
Sparkはかくエグゼキュータが使用するCPU数をあらかじめ決めておき、仮にエグゼキュータがタスクを実行していなくても、アプリケーションが終了するまではCPUを開放しません。

请参阅

undefined
广告
将在 10 秒后关闭
bannerAds