Apach KafkaはLinkedInで開発された分散メッセージングシステムです。ArduinoからMQTTブローカーにpublishしたセンシングデータをKafkaでconsumeする予定です。Riemann、Spark Streaming、Stormなどのリアルタイムストリーミング処理が目的です。まずはDockerでKafkaクラスタを構築します。

Docker Composeのインストール

KafkaのDockerイメージはwurstmeister/kafka-dockerを使います。Kafkaクラスタの構成管理にZookeeperが必要になります。このイメージはDocker Composeを使っているのでちょうど良い勉強になります。事前にインストールしておきます。

$ curl -L https://github.com/docker/compose/releases/download/1.2.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose; chmod +x /usr/local/bin/docker-compose

プロジェクトの作成

適当なディレクトリを作成して、リポジトリからgit cloneします。

$ cd ~/docker_apps
$ git clone https://github.com/wurstmeister/kafka-docker.git
$ cd kafka-docker

クラスタを起動する

Kafka Dockerの手順を読みながらクラスタの構築と簡単なテストまで行います。

docker-compose.yml

リポジトリにはクラスタ用と1台構成用のdocker-compose.ymlが用意されています。今回はブローカーを2台起動したいのでクラスタ用のdocker-compose.ymlを使います。YAMLのKAFKA_ADVERTISED_HOST_NAMEの値をDockerホストのIPアドレスに書き換えて実行します。

zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181"
kafka:
  build: .
  ports:
    - "9092"
  links:
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: 10.3.0.165
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock

最初にdocker-compose upするとkafkaのDockerイメージのビルドが始まります。必要なコンテナがすべて起動するまでしばらく待ちます。

$ docker-compose up

クラスタがすべて起動したら別シェルからKafkaブローカーを2台に変更します。

$ docker-compose scale kafka=2
Creating kafkadocker_kafka_2...
Starting kafkadocker_kafka_2...

psでコンテナの起動状況を確認します。

$ docker-compose ps
         Name                     Command                    State                     Ports
-----------------------------------------------------------------------------------------------------
kafkadocker_kafka_2       /bin/sh -c start-         Up                        0.0.0.0:32789->9092/tcp
                          kafka.sh
kafkadocker_kafka_3       /bin/sh -c start-         Up                        0.0.0.0:32790->9092/tcp
                          kafka.sh
kafkadocker_zookeeper_1   /bin/sh -c                Up                        0.0.0.0:32788->2181/tcp
                          /usr/sbin/sshd  ...                                 , 22/tcp, 2888/tcp,
                                                                              3888/tcp

Kafka Shell

Kafka Shellを起動する書式は以下です。

$ start-kafka-shell.sh <DOCKER_HOST_IP> <ZK_HOST:ZK_PORT>

ZK_HOST:PORTの値はdocker-compose psで確認した値を使います。

$ cd ~/docker_apps/kafka-docker/
$ ./start-kafka-shell.sh 10.3.0.165 10.3.0.165:32788

start-kafka-shell.shでは使い捨てのDockerコンテナを起動してbashを実行します。このコンテナを使いKafkaのproducerとconsumerのプロセスを起動します。

Kafkaコンテナの環境変数を確認しておきます。

$ echo $KAFKA_HOME
/opt/kafka_2.10-0.8.2.0
$ echo $ZK
10.3.0.165:32788

topicの作成

Kafka Shellを使いtopicを作成します。Kafkaのtopicはメッセージのカテゴリになります。このtopicはパーティションは4つ、メッセージのレプリカは2個の設定です。

$ $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic \
--partitions 4 --zookeeper $ZK --replication-factor 2
Created topic "topic".

作成したtopicを確認します。

$ $KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper $ZK
Topic:topic     PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: topic    Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic    Partition: 1    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: topic    Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: topic    Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1

producerの送信

Kafka Shellの一つからproducerを実行します。コンソールは文字列の入力待ちになります。

$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic \
--broker-list=`broker-list.sh`

consumerの受信

Kafka Shellを起動してconsumerを実行します。

$ ./start-kafka-shell.sh 10.3.0.165 10.3.0.165:32788
$ $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK

入力待ちのproducerのコンソールに文字列をタイプすると、consumerのコンソールに表示されます。

广告
将在 10 秒后关闭
bannerAds