Apache Spark – 关于分区的内容 –

首先

這篇文章試圖通過舉例來總結Apache Spark的分區概念。
有關Apache Spark的概述,請參閱”Apache Spark 進行分散處理入門”。

通过例题来了解分区的概念和操作方法。

以下是用Apache Spark演示分布处理的示例图解。
首先介绍了使用的函数说明,接着是处理的图像,最后按源代码的顺序进行介绍。
介绍了使用不同分区方式的三个例子:
– filter函数的例子
– flatmap函数的例子
– reduceByKey函数的例子

filter函数的示例

在进行要素单位转换时,可使用map()和filter等方法。

関数説明map()引数に関数を一つ取り、その関数をRDD内の各要素に適応し、その結果を新しい値とするRDDを返すfilter()引数に関数を一つ取り、そのフィルタ関数が真になる要素だけを含むRDDを返す
スクリーンショット 2016-10-11 18.15.57.png
スクリーンショット 2016-10-11 18.16.04.png
スクリーンショット 2016-10-11 18.16.07.png

源代码在这里

package other;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * Created by hirokinaganuma on 2016/10/11.
 */
public class Sample02 {
    public static void main(String[] args) throws Exception {
        String master;
        if (args.length > 0) {
            master = args[0];
        } else {
            master = "local";
        }
        JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
        JavaRDD<String> input = sc.textFile("bin/input/*.txt");
        JavaRDD<String> result = input.filter(s-> !s.contains("Triangle"));
        result.saveAsTextFile("bin/output/output02");
        sc.stop();
    }
}
Rectangle
Circle
Triangle
Circle
Rectangle
Triangle
Triangle
Rectangle

RDD是一种分布式集合,可以保存大量的数据元素。RDD是设计用于在由多台机器组成的集群上进行分布式处理的,在内部被划分为多个块,称为partition。在Spark中,partition成为分布式处理的单位。通过在每个partition上在多台机器上进行处理,可以处理无法在单个机器上处理的数据。

将被分割的文件作为一个RDD来处理,只需编写input.filter(s -> !s.contains(“Triangle”)),即可实现无需分布式意识的编程。

flatMap函数的示例

此函数是从一个元素生成多个元素的一对多关系转换,而不是像 filter 和 map 函数一样是一对一的关系。

関数説明flatMap()引数に関数を一つ取り、その関数をRDD内の各要素に適応して呼ばれるが、この関数はその結果を返すIteratorを返します。それらのIterator全てから返された要素を値とするRDDを最優的に返します
スクリーンショット 2016-10-11 20.09.53.png
スクリーンショット 2016-10-11 20.09.55.png
スクリーンショット 2016-10-11 20.09.57.png

源代码在这里。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.Arrays;

/**
 * Created by hirokinaganuma on 2016/10/11.
 */
public class Sample03 {
    public static void main(String[] args) throws Exception {
        String master;
        if (args.length > 0) {
            master = args[0];
        } else {
            master = "local";
        }
        JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
        JavaRDD<String> input = sc.textFile("bin/input/flatmap/*.txt");
        JavaRDD<String> result = input.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String x) {
                        return Arrays.asList(x.split(" "));
                    }
                }
        );
        result.saveAsTextFile("bin/output/output03");
        sc.stop();
    }
}
I am from Fukuoka
Hello Apache Spark
Tokyo Institute of Technology

reduceByKey函数的示例

reduceByKey是一個需要以前的RDD中的元素與其他分片中的元素進行整合處理的操作。這個轉換針對的是一個由鍵值對元素組成的RDD,它將具有相同鍵的元素進行合併處理。Spark按照每個分片獨立進行分散處理,所以具有相同鍵的元素必須完全包含在同一個分片中。因此,在reduce操作之前需要進行重分佈(shuffle)。重分佈操作根據鍵將返還前的RDD元素重新分配到轉換後的RDD分片中。因此,重分佈可以保證具有相同鍵的元素被包含在同一個分片中。

圖中沒有表達出來,但是在進行重分佈之前,在每個分片內部執行了reduce操作,以減少通信成本。

関数説明reduceByKey()引数に関数を一つ取り、その関数は同じキーの値を結合する際のreduce処理である
スクリーンショット 2016-10-11 21.32.57.png
スクリーンショット 2016-10-11 21.33.05.png
スクリーンショット 2016-10-11 21.33.07.png

源代码在这里。

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * Created by hirokinaganuma on 2016/10/11.
 */
public class Sample04 {
    public static void main(String[] args) throws Exception {
        String master;
        if (args.length > 0) {
            master = args[0];
        } else {
            master = "local";
        }
        JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
        JavaRDD<String> input = sc.textFile("bin/input/reducebykey/*.txt");
        JavaRDD<String> words = input.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String x) {
                        return Arrays.asList(x.split(" "));
                    }
                }
        );

        JavaPairRDD<String, Integer> pairrdd = words.mapToPair(
                new PairFunction<String, String, Integer>(){
                    public Tuple2<String, Integer> call(String x){
                        return new Tuple2(x, 1);
                    }
                }
        );

        JavaPairRDD<String, Integer> output = pairrdd.reduceByKey(new Function2<Integer, Integer, Integer>(){
            public Integer call(Integer x, Integer y){
                return x+y;
            }
        });

        output.saveAsTextFile("bin/output/output04");
    }
}
fish
cat
cat
fish
fish
cat
cat
dog

Apache Spark的partition是什么意思?

RDD是一种分布式集合,以大量数据作为元素。RDD是在由多台机器组成的集群上进行分布式处理设计的,内部被分割成了称为partition的块。在Spark中,partition成为了分布式处理的单位。通过在每个partition上多台机器上处理RDD,可以处理无法在单台机器上处理的数据。

士气

在分散计算中,通信成本非常高,最小化网络流量可以大幅提升性能。
为了减少通信量,需要通过控制RDD的分区来优化Spark程序。

例如,Spark可以通过编程来确保键的集合出现在集中的节点上(通过reduceByKey()进行介绍)。特别是在数据集多次重用键操作(如连接)的情况下,分区是很有用的。这是因为Spark的许多操作都会通过网络进行基于键的洗牌。它们都可以受益于分区。

数据的分区

在Java或Scala中,您可以使用partitioner属性来指定RDD的分区方法。在spark.Partitioner对象的值中,您将设置一个指示RDD中每个键的目标分区的内容。

当在分区化的RDD上执行像reduceByKey()这样的操作时,每个键的所有值都在单个机器上本地计算,并且只有最终的reduce()值会从每个工作节点返回主节点(如前面讲过的)。

更详细地说,在shuffle过程中,将具有相同键的元素分配到同一个partition是partitioner的工作。partitioner根据转换后的RDD的partition数和分配对象键的内容来确定分配元素的目标partition。在Spark中,默认情况下使用键的哈希值除以转换后的RDD的partition数所得的余数来确定分配的目标partition。

另外,如果要对两个RDD进行操作,并且事先进行了分区处理,那么至少两个RDD中的一个将不会进行洗牌。
此外,Spark会了解每个操作对分区的内部影响,因此会自动为由数据分区操作生成的RDD设置分区器。

持续存在优化

但是,在对RDD进行分区并将生成的RDD用作转换目标时,应该使用persist()进行持久化。原因是,接下来对RDD的操作将重新评估由分区生成的整个RDD系列,因此对分区之前的RDD进行多次哈希分区是没有意义的。

请参考

    • http://www.ne.jp/asahi/hishidama/home/tech/scala/spark/partition.html

 

    • https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/

 

    http://qiita.com/Hiroki11x/items/4f5129094da4c91955bc
广告
将在 10 秒后关闭
bannerAds