Apache Kafka
前提:Zookeeperがポート番号2181番で起動している状態ではじめること。
-
- Apache Kafkaの導入からトピックの生成、メッセージ送受信方法を公開します。
- Pub/Subに興味があ流けど、Web上の情報だけではうまくいかなかった方、このマニュアルで、環境をそろえた上で、やってみてください。(動作確認済みです。)
zkServer.sh start
1 Scalaをインストールする。
※Scalaのバージョンは、Apache Kafka公式サイトの、推奨バージョンをインストールすること。
※今回は、kafka_2.10-0.8.2の推奨バージョンである、Scala 2.10をインストールする。
su
yum -y install wget
yum -y install tar
yum -y install java-1.6.0-openjdk-devel
mkdir /usr/local/download/
cd /usr/local/download/
wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
tar xzvf scala-2.10.4.tgz
mv /usr/local/download/scala-2.10.4 /usr/local/scala-2.10.4
cd /usr/local
ln -s scala-2.10.4/ scala
echo "export PATH=\$PATH:/usr/local/scala/bin" >> /etc/profile;
export PATH=$PATH:/usr/local/scala/bin
2 ダウンロードおよび解凍
cd /usr/local/download
wget http://ftp.meisei-u.ac.jp/mirror/apache/dist/kafka/0.8.2-beta/kafka_2.10-0.8.2-beta.tgz
tar xzvf kafka_2.10-0.8.2-beta.tgz
mv /usr/local/download/kafka_2.10-0.8.2-beta /usr/local/kafka_2.10-0.8.2-beta
cd /usr/local/
ln -s kafka_2.10-0.8.2-beta/ kafka
echo "export PATH=\$PATH:/usr/local/kafka/bin" >> /etc/profile;
export PATH=$PATH:/usr/local/kafka/bin
3 実行
kafka-server-start.sh /usr/local/kafka/config/server.properties &
4 トピックの生成
Let’s create a topic named “test” with a single partition and only one replica:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2
kafka-topics.sh --list --zookeeper localhost:2181
#>test2
5 メッセージ送受信テスト
5.1 Kafkaのメッセージ送受信をテストします。ターミナル(端末)を2つ起動し、2つともvm1にsshで接続し、root権限にします。
5.1.1 vm1のconsole1で次のコマンドを入力します。→入力待ち状態へ移行します。(プロデューサーとなる。)
kafka-console-producer.sh --broker-list localhost:9092 --topic test2
#>
5.1.2 vm1のconsole2で次のコマンドを入力します。→出力待ち状態へ移行します。(コンシューマーとなる。こちら側のコンソールは表示専用)
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test2 --from-beginning
#>
5.1.3 vm1のconsole1で何かメッセージを入力して、Enter(改行)します。→console2の画面にも、結果が出力されます。(通信成功)
Hello
#>Hello
6 マルチブローカークラスターのための設定
6.1 server.propertiesファイルの編集
-
- broker.idの設定(Zookeeperと同じように、各vmにユニークなidを振る必要があります。)
-
- 待ち受けポート番号の設定(VM1のみ9092番、VM2〜VM5は9093番〜9096番に設定します。)
-
- logディレクトリの設定(/tmp/kafka-logs)
- /tmp/kafka-logsディレクトリの作成
vm1の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=1' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9092' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm2の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=2' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9093' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm3の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=3' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9094' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm4の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=4' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9095' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
vm5の設定
sed -i -e 's/^broker\.id/#broker\.id/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#broker\.id/a broker\.id=5' /usr/local/kafka/config/server.properties;
sed -i -e 's/^port/#port/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#port/a port=9096' /usr/local/kafka/config/server.properties;
sed -i -e 's/^log\.dir/#log\.dir/g' /usr/local/kafka/config/server.properties;
sed -i -e '/^#log\.dir/a log\.dir=\/tmp\/kafka-logs' /usr/local/kafka/config/server.properties;
mkdir /tmp/kafka-logs
7 マルチブローカーでのkafkaの起動
kafka-server-stop.sh
kafka-server-start.sh /usr/local/kafka/config/server.properties &
8 マルチブローカー上でのトピックの作成
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic
9 マルチブローカー上でのトピックの表示
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
#>Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
#>Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
※各項目の説明
“leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
“replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
“isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
10 プロデューサーとコンシューマの起動
※起動は数秒置きながら行うとエラーが発生しない。
vm1上でプロデューサーを起動します。
kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
vm2〜vm5でコンシューマを起動します。
kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
起動がうまくいかない場合
- server.propertiesのhost.nameがコメントアウトされている場合は外す。
host.name=localhost
- logの読み込みに失敗している場合
- ログファイルを削除します。
rm -rf /var/log/kafka
mkdir /var/log/kafka
- 手動で起動します。
cd /usr/local/kafka/
pidproxy /var/run/kafka.pid /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertie