尝试使用Apache Spark

Apache Spark 就是…

    • 分散処理を行うフレームワーク

 

    • 複数台でクラスタ構成を取り各ワーカーで処理を手分けして行う

 

    Hadoop の MapReduce 層と同じような役割を持ち、ファイルシステムベースでデータを扱う Hadoop に対してメモリベースでデータを扱うため高速

在 Kubernetes 环境中启动 Spark。

https://github.com/kubernetes/examples/blob/master/staging/spark/README.md にサンプルがある

创建Docker镜像

    • サンプルのイメージはバージョンが古いので作り直してみる

 

    ついでに Dockerfile 内でアメリカのサーバから直接ソースをダウンロードしているので事前にダウンロードしてコピーするように変更
FROM java:openjdk-8-jdk

ENV hadoop_ver 2.7.4
ENV spark_ver 2.2.0

# download from http://ftp.kddilabs.jp/infosystems/apache/hadoop/common/hadoop-${hadoop_ver}/hadoop-${hadoop_ver}-src.tar.gz
COPY hadoop.tgz /tmp/
# download from https://d3kbcqa49mib13.cloudfront.net/spark-${spark_ver}-bin-hadoop2.7.tgz
COPY spark.tgz /tmp/

RUN mkdir -p /opt && \
    cd /tmp && \
    tar -zxf hadoop.tgz && \
    mkdir -p /opt/hadoop/lib/ && \
    mv hadoop-${hadoop_ver}-src /opt/hadoop/lib/native && \
    echo Hadoop ${hadoop_ver} native libraries installed in /opt/hadoop/lib/native

RUN mkdir -p /opt && \
    cd /tmp && \
    tar -zxf spark.tgz && \
    mv spark-2.2.0-bin-hadoop2.7 /opt/spark && \
    echo Spark ${spark_ver} installed in /opt

# Add the GCS connector.
RUN cd /opt/spark/jars && \
    curl -O https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

# if numpy is installed on a driver it needs to be installed on all
# workers, so install it everywhere
RUN apt-get update && \
    apt-get install -y python-numpy netcat && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

ADD log4j.properties /opt/spark/conf/log4j.properties
ADD start-common.sh start-worker start-master /
ADD core-site.xml /opt/spark/conf/core-site.xml
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ENV PATH $PATH:/opt/spark/bin

启动主容器

    • このサンプルではクラスタリングに Standalone mode という Spark 組み込みのクラスタマネージャを使い、1 台のマスターと複数台のワーカーという構成を取る

 

    マスターのコンテナを立ち上げる Kubernetes の定義ファイルが ReplicationController で書かれていたので後継の Deployment に変更して立ち上げる

定义文件

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: spark-master-controller
spec:
  replicas: 1
  template:
    metadata:
      labels:
        component: spark-master
    spec:
      containers:
        - name: spark-master
          image: bl/spark:v1.0
          command: ["/start-master"]
          ports:
            - containerPort: 7077
            - containerPort: 8080
          resources:
            requests:
              cpu: 100m

指令

# マスターコンテナの立ち上げ
kubectl apply -f spark-master-controller.yml

# マスターサービスの立ち上げ. こちらはサンプルそのまま使用
kubectl apply -f spark-master-service.yml

启动工作容器

    ワーカーコンテナも同様に Deployment で定義し直して立ち上げる
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: spark-worker-controller
spec:
  replicas: 3
  template:
    metadata:
      labels:
        component: spark-worker
    spec:
      containers:
        - name: spark-worker
          image: bl/spark:v1.0
          command: ["/start-worker"]
          ports:
            - containerPort: 8081
          resources:
            requests:
              cpu: 100m
kubectl apply -f spark-worker-controller.yml

试着运行Spark

    • 今回作ったイメージだと /opt/spark/bin にあるスクリプトを使って Spark に処理を実行させることができる

 

    任意のコンテナにログインしてスクリプトを実行してみる
# 実行中の Pod を確認
kubectl get pods

# ログイン
kubectl exec -it spark-master-controller-xxxxxxxxxx /bin/sh

# Scala と Java のサンプルスクリプト実行には run-example を使う
/opt/spark/bin/run-example SparkPi

# Python のサンプルスクリプト実行には spark-submit を使う
/opt/spark/bin/spark-submit examples/src/main/python/pi.py

# R のサンプルスクリプト実行には spark-submit を使う
/opt/spark/bin/spark-submit examples/src/main/r/dataframe.R

创作可以在Spark上运行的脚本。

    • サンプルは /opt/spark/examples にあるのでこれを自分でコンパイルして実行してみる

 

    今回は Java で Spark Streaming を使って Kafka からデータを読み込んで Word Count するスクリプトを作ってみる

代码

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.spark;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import scala.Tuple2;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.Durations;

import org.apache.log4j.*;

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: JavaDirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
 *      topic1,topic2
 */

public final class KafkaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
                    "  <brokers> is a list of one or more Kafka brokers\n" +
                    "  <topics> is a list of one or more kafka topics to consume from\n\n");
            System.exit(1);
        }

        //StreamingExamples.setStreamingLogLevels();
        Logger.getRootLogger().setLevel(Level.WARN);

        String brokers = args[0];
        String topics = args[1];

        // Create context with a 2 seconds batch interval
        SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "my-consumer-group");
        kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        // Create direct kafka stream with brokers and topics
        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = messages.map(ConsumerRecord::value);
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();

        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }
}

pom.xml 的中文翻译可以是“项目对象模型”。

    • maven-assembly-plugin を使って依存しているパッケージも含んだ JAR ファイルを作成する

 

    spark_core_2.11 などのパッケージは Spark 側で用意するので含める必要はない
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.spark</groupId>
    <artifactId>spark-app</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.example.spark.KafkaWordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

执行

    作成した JAR ファイルをコンテナにコピーしてキックスクリプトで実行する
# ファイルのコピー
kubectl cp spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar spark-master-controller-xxxxxxxxxx:/opt/spark/jars/

# ログイン
kubectl exec -it spark-master-controller-xxxxxxxxxx /bin/sh

# 実行
/opt/spark/bin/spark-submit \
  --class com.example.spark.KafkaWordCount \
  --master spark://spark-master:7077 \
  /opt/spark/jars/spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar \
  kafka-0:9092 topic-0
广告
将在 10 秒后关闭
bannerAds