本章では、Spark単独環境上の構築方法を基に記述しております。
standaloneやyarnなどのクラスター上で動かすことがあれば、私のそのたの投稿にクラスターの構築後方がありますのでご参照ください。
SparkStreamingで、単独環境もクラスタ環境もどちでもKafkaのデータ読取するのは構いませんです。

下記は、Sparkのクラスタ環境構築に関連する私の投稿です。興味をお持ちの方にはご参照ください。

    • SaprkStandaloneクラスタ環境構築

 

    SaprkYarnクラスタ環境構築

1.Sparkバージョンの問題

下図によって、「spark-streaming-kafka-0-10」の方がPythonを対応されておきませんので、「spark-streaming-kafka-0-8」の方を用いられないといけませんでした。
ただし、「spark-streaming-kafka-0-8」の方は、Spark2.3.0以降にサポートされてなくなったので、今度、Spark2.2.3のほうを用い直してみます。

※Spark2.2.3の配置は、私のそのたの投稿をご参照ください。
Spark2.2.3のダウンロード先

2.Mavenの用意

Mavenに必要なJarを用意します。
コピー対象:

/SparkStreaming/spark-streaming-kafka-0-8_2.11

コピー先:

~/.m2/repository/org/apache/

spark-streaming-kafka-0-8_2.11-2.1.0.jarのダウンロード先

3.必須なJar用意

必須になるJarを用意します。
コピー対象:

/SparkStreaming/spark-core_2.11-1.5.2.jar
/SparkStreaming/spark-streaming-kafka-assembly_2.11-1.6.3.jar

コピー先:

~/usr/work/

spark-core_2.11-1.5.2.jarのダウンロード先
spark-streaming-kafka-assembly_2.11-1.6.3.jarのダウンロード先

4.Kafka_wordcount.py用意

SparkStreamingでKafkaのトピックからセンサデータを取得してWordCountを行うファイルを用意します。
コピー対象:

/SparkStreaming/kafka_wordcount.py

コピー先:

~/usr/work/

5.WordCount実行

以下を実行します。

# no cluster
spark-submit --jars spark-streaming-kafka-assembly_2.11-1.6.3.jar,spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka

以下の表示が確認しできれば起動済になります。

‘——————————————-
Time: 2019-02-19 00:11:50
‘——————————————-

※受信確認できない場合は、以下を繰り返して実行してみてください。

7.MQTT→Kafka→Spark連携環境起動手順

Kafkaクラスタ起動

#Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties

Topic作成

cd /usr/local/lib/kafka
# topic-mqtt-kafka確認
bin/kafka-topics.sh --describe --zookeeper  192.168.0.99:2181 --topic topic-mqtt-kafka
# 存在しない場合は作成
bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181

mosquittoより送信(MQTT送信に当たり)

cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Spark"

SparkStandaloneクラスタ起動

#Master起動
start-master.sh
#Slave起動
start-slaves.sh

SparkStreamingでデータ取得

cd /usr/work
#クラスタなし、かつKafka連携しない
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-mqtt-assembly_2.11-1.6.3.jar --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.1 ./mqtt_wordcount.py tcp://192.168.0.99:1883 topic-mqtt
#クラスタなし、かつKafka連携
spark-submit --jars spark-streaming-kafka-assembly_2.11-1.6.3.jar,spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka
#クラスタあり、かつKafka連携しない
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-mqtt-assembly_2.11-1.6.3.jar --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.1 ./mqtt_wordcount.py tcp://192.168.0.99:1883 topic-mqtt
#クラスタあり、かつKafka連携
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-kafka-assembly_2.11-1.6.3.jar,./spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka

以下の表示が確認できればKafkaトピックからデータ取得するのは完璧になります。

‘——————————————-
Time: 2019-02-19 00:11:50
‘——————————————-
(u’Hello Spark’, 1)

广告
将在 10 秒后关闭
bannerAds