本章では、Spark単独環境上の構築方法を基に記述しております。
standaloneやyarnなどのクラスター上で動かすことがあれば、私のそのたの投稿にクラスターの構築後方がありますのでご参照ください。
SparkStreamingで、単独環境もクラスタ環境もどちでもKafkaのデータ読取するのは構いませんです。
下記は、Sparkのクラスタ環境構築に関連する私の投稿です。興味をお持ちの方にはご参照ください。
-
- SaprkStandaloneクラスタ環境構築
- SaprkYarnクラスタ環境構築
1.Sparkバージョンの問題
下図によって、「spark-streaming-kafka-0-10」の方がPythonを対応されておきませんので、「spark-streaming-kafka-0-8」の方を用いられないといけませんでした。
ただし、「spark-streaming-kafka-0-8」の方は、Spark2.3.0以降にサポートされてなくなったので、今度、Spark2.2.3のほうを用い直してみます。
※Spark2.2.3の配置は、私のそのたの投稿をご参照ください。
Spark2.2.3のダウンロード先
2.Mavenの用意
Mavenに必要なJarを用意します。
コピー対象:
/SparkStreaming/spark-streaming-kafka-0-8_2.11
コピー先:
~/.m2/repository/org/apache/
spark-streaming-kafka-0-8_2.11-2.1.0.jarのダウンロード先
3.必須なJar用意
必須になるJarを用意します。
コピー対象:
/SparkStreaming/spark-core_2.11-1.5.2.jar
/SparkStreaming/spark-streaming-kafka-assembly_2.11-1.6.3.jar
コピー先:
~/usr/work/
spark-core_2.11-1.5.2.jarのダウンロード先
spark-streaming-kafka-assembly_2.11-1.6.3.jarのダウンロード先
4.Kafka_wordcount.py用意
SparkStreamingでKafkaのトピックからセンサデータを取得してWordCountを行うファイルを用意します。
コピー対象:
/SparkStreaming/kafka_wordcount.py
コピー先:
~/usr/work/
5.WordCount実行
以下を実行します。
# no cluster
spark-submit --jars spark-streaming-kafka-assembly_2.11-1.6.3.jar,spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka
以下の表示が確認しできれば起動済になります。
‘——————————————-
Time: 2019-02-19 00:11:50
‘——————————————-
※受信確認できない場合は、以下を繰り返して実行してみてください。
7.MQTT→Kafka→Spark連携環境起動手順
Kafkaクラスタ起動
#Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties
Topic作成
cd /usr/local/lib/kafka
# topic-mqtt-kafka確認
bin/kafka-topics.sh --describe --zookeeper 192.168.0.99:2181 --topic topic-mqtt-kafka
# 存在しない場合は作成
bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181
mosquittoより送信(MQTT送信に当たり)
cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Spark"
SparkStandaloneクラスタ起動
#Master起動
start-master.sh
#Slave起動
start-slaves.sh
SparkStreamingでデータ取得
cd /usr/work
#クラスタなし、かつKafka連携しない
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-mqtt-assembly_2.11-1.6.3.jar --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.1 ./mqtt_wordcount.py tcp://192.168.0.99:1883 topic-mqtt
#クラスタなし、かつKafka連携
spark-submit --jars spark-streaming-kafka-assembly_2.11-1.6.3.jar,spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka
#クラスタあり、かつKafka連携しない
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-mqtt-assembly_2.11-1.6.3.jar --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.2.1 ./mqtt_wordcount.py tcp://192.168.0.99:1883 topic-mqtt
#クラスタあり、かつKafka連携
spark-submit --master spark://192.168.0.100:7077 --deploy-mode client --jars ./spark-streaming-kafka-assembly_2.11-1.6.3.jar,./spark-core_2.11-1.5.2.jar --packages org.apache:spark-streaming-kafka-0-8_2.11:2.1.0 ./kafka_wordcount.py 192.168.0.99:2181 topic-mqtt-kafka
以下の表示が確認できればKafkaトピックからデータ取得するのは完璧になります。
‘——————————————-
Time: 2019-02-19 00:11:50
‘——————————————-
(u’Hello Spark’, 1)