尝试使用Apache Spark
Apache Spark 就是…
-
- 分散処理を行うフレームワーク
-
- 複数台でクラスタ構成を取り各ワーカーで処理を手分けして行う
- Hadoop の MapReduce 層と同じような役割を持ち、ファイルシステムベースでデータを扱う Hadoop に対してメモリベースでデータを扱うため高速
在 Kubernetes 环境中启动 Spark。
https://github.com/kubernetes/examples/blob/master/staging/spark/README.md にサンプルがある
创建Docker镜像
-
- サンプルのイメージはバージョンが古いので作り直してみる
- ついでに Dockerfile 内でアメリカのサーバから直接ソースをダウンロードしているので事前にダウンロードしてコピーするように変更
FROM java:openjdk-8-jdk
ENV hadoop_ver 2.7.4
ENV spark_ver 2.2.0
# download from http://ftp.kddilabs.jp/infosystems/apache/hadoop/common/hadoop-${hadoop_ver}/hadoop-${hadoop_ver}-src.tar.gz
COPY hadoop.tgz /tmp/
# download from https://d3kbcqa49mib13.cloudfront.net/spark-${spark_ver}-bin-hadoop2.7.tgz
COPY spark.tgz /tmp/
RUN mkdir -p /opt && \
cd /tmp && \
tar -zxf hadoop.tgz && \
mkdir -p /opt/hadoop/lib/ && \
mv hadoop-${hadoop_ver}-src /opt/hadoop/lib/native && \
echo Hadoop ${hadoop_ver} native libraries installed in /opt/hadoop/lib/native
RUN mkdir -p /opt && \
cd /tmp && \
tar -zxf spark.tgz && \
mv spark-2.2.0-bin-hadoop2.7 /opt/spark && \
echo Spark ${spark_ver} installed in /opt
# Add the GCS connector.
RUN cd /opt/spark/jars && \
curl -O https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar
# if numpy is installed on a driver it needs to be installed on all
# workers, so install it everywhere
RUN apt-get update && \
apt-get install -y python-numpy netcat && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
ADD log4j.properties /opt/spark/conf/log4j.properties
ADD start-common.sh start-worker start-master /
ADD core-site.xml /opt/spark/conf/core-site.xml
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ENV PATH $PATH:/opt/spark/bin
启动主容器
-
- このサンプルではクラスタリングに Standalone mode という Spark 組み込みのクラスタマネージャを使い、1 台のマスターと複数台のワーカーという構成を取る
- マスターのコンテナを立ち上げる Kubernetes の定義ファイルが ReplicationController で書かれていたので後継の Deployment に変更して立ち上げる
定义文件
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: spark-master-controller
spec:
replicas: 1
template:
metadata:
labels:
component: spark-master
spec:
containers:
- name: spark-master
image: bl/spark:v1.0
command: ["/start-master"]
ports:
- containerPort: 7077
- containerPort: 8080
resources:
requests:
cpu: 100m
指令
# マスターコンテナの立ち上げ
kubectl apply -f spark-master-controller.yml
# マスターサービスの立ち上げ. こちらはサンプルそのまま使用
kubectl apply -f spark-master-service.yml
启动工作容器
- ワーカーコンテナも同様に Deployment で定義し直して立ち上げる
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: spark-worker-controller
spec:
replicas: 3
template:
metadata:
labels:
component: spark-worker
spec:
containers:
- name: spark-worker
image: bl/spark:v1.0
command: ["/start-worker"]
ports:
- containerPort: 8081
resources:
requests:
cpu: 100m
kubectl apply -f spark-worker-controller.yml
试着运行Spark
-
- 今回作ったイメージだと /opt/spark/bin にあるスクリプトを使って Spark に処理を実行させることができる
- 任意のコンテナにログインしてスクリプトを実行してみる
# 実行中の Pod を確認
kubectl get pods
# ログイン
kubectl exec -it spark-master-controller-xxxxxxxxxx /bin/sh
# Scala と Java のサンプルスクリプト実行には run-example を使う
/opt/spark/bin/run-example SparkPi
# Python のサンプルスクリプト実行には spark-submit を使う
/opt/spark/bin/spark-submit examples/src/main/python/pi.py
# R のサンプルスクリプト実行には spark-submit を使う
/opt/spark/bin/spark-submit examples/src/main/r/dataframe.R
创作可以在Spark上运行的脚本。
-
- サンプルは /opt/spark/examples にあるのでこれを自分でコンパイルして実行してみる
- 今回は Java で Spark Streaming を使って Kafka からデータを読み込んで Word Count するスクリプトを作ってみる
代码
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.spark;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.Durations;
import org.apache.log4j.*;
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: JavaDirectKafkaWordCount <brokers> <topics>
* <brokers> is a list of one or more Kafka brokers
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
* $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
* topic1,topic2
*/
public final class KafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
" <topics> is a list of one or more kafka topics to consume from\n\n");
System.exit(1);
}
//StreamingExamples.setStreamingLogLevels();
Logger.getRootLogger().setLevel(Level.WARN);
String brokers = args[0];
String topics = args[1];
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", "my-consumer-group");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
pom.xml 的中文翻译可以是“项目对象模型”。
-
- maven-assembly-plugin を使って依存しているパッケージも含んだ JAR ファイルを作成する
- spark_core_2.11 などのパッケージは Spark 側で用意するので含める必要はない
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.spark</groupId>
<artifactId>spark-app</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.spark.KafkaWordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
执行
- 作成した JAR ファイルをコンテナにコピーしてキックスクリプトで実行する
# ファイルのコピー
kubectl cp spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar spark-master-controller-xxxxxxxxxx:/opt/spark/jars/
# ログイン
kubectl exec -it spark-master-controller-xxxxxxxxxx /bin/sh
# 実行
/opt/spark/bin/spark-submit \
--class com.example.spark.KafkaWordCount \
--master spark://spark-master:7077 \
/opt/spark/jars/spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar \
kafka-0:9092 topic-0