Apache Spark – 关于分区的内容 –
首先
這篇文章試圖通過舉例來總結Apache Spark的分區概念。
有關Apache Spark的概述,請參閱”Apache Spark 進行分散處理入門”。
通过例题来了解分区的概念和操作方法。
以下是用Apache Spark演示分布处理的示例图解。
首先介绍了使用的函数说明,接着是处理的图像,最后按源代码的顺序进行介绍。
介绍了使用不同分区方式的三个例子:
– filter函数的例子
– flatmap函数的例子
– reduceByKey函数的例子
filter函数的示例
在进行要素单位转换时,可使用map()和filter等方法。
源代码在这里
package other;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Created by hirokinaganuma on 2016/10/11.
*/
public class Sample02 {
public static void main(String[] args) throws Exception {
String master;
if (args.length > 0) {
master = args[0];
} else {
master = "local";
}
JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile("bin/input/*.txt");
JavaRDD<String> result = input.filter(s-> !s.contains("Triangle"));
result.saveAsTextFile("bin/output/output02");
sc.stop();
}
}
Rectangle
Circle
Triangle
Circle
Rectangle
Triangle
Triangle
Rectangle
RDD是一种分布式集合,可以保存大量的数据元素。RDD是设计用于在由多台机器组成的集群上进行分布式处理的,在内部被划分为多个块,称为partition。在Spark中,partition成为分布式处理的单位。通过在每个partition上在多台机器上进行处理,可以处理无法在单个机器上处理的数据。
将被分割的文件作为一个RDD来处理,只需编写input.filter(s -> !s.contains(“Triangle”)),即可实现无需分布式意识的编程。
flatMap函数的示例
此函数是从一个元素生成多个元素的一对多关系转换,而不是像 filter 和 map 函数一样是一对一的关系。
源代码在这里。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
/**
* Created by hirokinaganuma on 2016/10/11.
*/
public class Sample03 {
public static void main(String[] args) throws Exception {
String master;
if (args.length > 0) {
master = args[0];
} else {
master = "local";
}
JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile("bin/input/flatmap/*.txt");
JavaRDD<String> result = input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
}
);
result.saveAsTextFile("bin/output/output03");
sc.stop();
}
}
I am from Fukuoka
Hello Apache Spark
Tokyo Institute of Technology
reduceByKey函数的示例
reduceByKey是一個需要以前的RDD中的元素與其他分片中的元素進行整合處理的操作。這個轉換針對的是一個由鍵值對元素組成的RDD,它將具有相同鍵的元素進行合併處理。Spark按照每個分片獨立進行分散處理,所以具有相同鍵的元素必須完全包含在同一個分片中。因此,在reduce操作之前需要進行重分佈(shuffle)。重分佈操作根據鍵將返還前的RDD元素重新分配到轉換後的RDD分片中。因此,重分佈可以保證具有相同鍵的元素被包含在同一個分片中。
圖中沒有表達出來,但是在進行重分佈之前,在每個分片內部執行了reduce操作,以減少通信成本。
源代码在这里。
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* Created by hirokinaganuma on 2016/10/11.
*/
public class Sample04 {
public static void main(String[] args) throws Exception {
String master;
if (args.length > 0) {
master = args[0];
} else {
master = "local";
}
JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile("bin/input/reducebykey/*.txt");
JavaRDD<String> words = input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
}
);
JavaPairRDD<String, Integer> pairrdd = words.mapToPair(
new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String x){
return new Tuple2(x, 1);
}
}
);
JavaPairRDD<String, Integer> output = pairrdd.reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer x, Integer y){
return x+y;
}
});
output.saveAsTextFile("bin/output/output04");
}
}
fish
cat
cat
fish
fish
cat
cat
dog
Apache Spark的partition是什么意思?
RDD是一种分布式集合,以大量数据作为元素。RDD是在由多台机器组成的集群上进行分布式处理设计的,内部被分割成了称为partition的块。在Spark中,partition成为了分布式处理的单位。通过在每个partition上多台机器上处理RDD,可以处理无法在单台机器上处理的数据。
士气
在分散计算中,通信成本非常高,最小化网络流量可以大幅提升性能。
为了减少通信量,需要通过控制RDD的分区来优化Spark程序。
例如,Spark可以通过编程来确保键的集合出现在集中的节点上(通过reduceByKey()进行介绍)。特别是在数据集多次重用键操作(如连接)的情况下,分区是很有用的。这是因为Spark的许多操作都会通过网络进行基于键的洗牌。它们都可以受益于分区。
数据的分区
在Java或Scala中,您可以使用partitioner属性来指定RDD的分区方法。在spark.Partitioner对象的值中,您将设置一个指示RDD中每个键的目标分区的内容。
当在分区化的RDD上执行像reduceByKey()这样的操作时,每个键的所有值都在单个机器上本地计算,并且只有最终的reduce()值会从每个工作节点返回主节点(如前面讲过的)。
更详细地说,在shuffle过程中,将具有相同键的元素分配到同一个partition是partitioner的工作。partitioner根据转换后的RDD的partition数和分配对象键的内容来确定分配元素的目标partition。在Spark中,默认情况下使用键的哈希值除以转换后的RDD的partition数所得的余数来确定分配的目标partition。
另外,如果要对两个RDD进行操作,并且事先进行了分区处理,那么至少两个RDD中的一个将不会进行洗牌。
此外,Spark会了解每个操作对分区的内部影响,因此会自动为由数据分区操作生成的RDD设置分区器。
持续存在优化
但是,在对RDD进行分区并将生成的RDD用作转换目标时,应该使用persist()进行持久化。原因是,接下来对RDD的操作将重新评估由分区生成的整个RDD系列,因此对分区之前的RDD进行多次哈希分区是没有意义的。
请参考
-
- http://www.ne.jp/asahi/hishidama/home/tech/scala/spark/partition.html
-
- https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/
- http://qiita.com/Hiroki11x/items/4f5129094da4c91955bc