1machineでkafka環境を作って戯れる
1.環境設定
OS&JDK
CentOS7 をインストールしたのち
-
- firewalld をオフ
-
- selinux をオフ
- JDK8 をインストール
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
- /etc/profile or ~/.bashrc にJAVA_HOME設定
export JAVA_HOME=/usr/lib/jvm/java
ZooKeeper
CDHのRPMを利用する /etc/yum.repos.d/ に以下を配置
https://archive.cloudera.com/cdh5/redhat/7/x86_64/cdh/cloudera-cdh5.repo
でyum install して、サービス起動と、自動起動設定もしておく
# yum -y install zookeeper-server
# service zookeeper-server start
# systemctl enable zookeeper-server.service
ポート2181が空いてれば起動成功かな zookeeper-client を使って確認してもよい
Kafka
Apache サイトのバイナリtarball をダウンロード、展開
開発用なのでヒープサイズはちっちゃくてよい(デフォルトは1GB)
export KAFKA_HEAP_OPTS="-Xmx256M"
それに合わせて config/server.propertiesの各種値も1ケタ小さくしておこう
socket.request.max.bytes=10485760
log.retention.bytes=107374182
log.segment.bytes=107374182
そしてスタート
$ bin/kafka-server-start.sh config/server.properties &
疎通確認もしておく. topic 作って確認
$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
--replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
メッセージを入れて取り出す 二つターミナルを用意して以下のようにする
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
うまくメッセージパッシングできていることを確認
2.Kafka Streamsとの戯れ
デモの実行
v0.10で導入された kafka streams を試してみる。サンプルのデモを実行するため、二つのトピック streams-file-input と streams-wordcount-output を作成しておく
$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
--replication-factor 1 --partitions 1 --topic streams-file-input
$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
--replication-factor 1 --partitions 1 --topic streams-wordcount-output
input となるstreamの方には適当に、Apache Licenceの文章でも入れておく
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
**** Apache Licence の文章******
Ctrl+D
そして、クラスパスを通して、Streamアプリを実行
$ ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
そして結果を確認
$ ./bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 --topic streams-wordcount-output \
--from-beginning --formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Kafka Streaming の概要
というか Kafka Streaming とはなんぞや?ドキュメント http://docs.confluent.io/2.1.0-alpha1/streams/index.html に概要が書かれているが、要は
-
- Kafka のクライアントライブラリとして実装された
-
- ストリーミングを実現するAPI群で
-
- トピックAのメッセージを処理してトピックBに詰め込むことを可能にする
-
- High Level なDSLと Low Level なインターフェイスを持ち
- スケーラブルでエラーセーフにすることもできる
とのこと
3.Kafka Streamsのアプリ開発
環境整備
男は黙ってvi. 以下のようなactivateファイルを用意して開発時には source activate とする
PS1=(kafka)$PS1
KAFKA_HOME=/home/______/kafka/kafka
for i in $KAFKA_HOME/libs/*.jar
do
CLASSPATH=$i:$CLASSPATH
done
export CLASSPATH
何回も実行して出力先のトピックが汚れるのもいやなので、毎回作り直す。以下のようなスクリプトをinit.shというように名前をつけて都度リフレッシュ。
./kafka/bin/kafka-topics.sh --delete --topic b --zookeeper localhost:2181
./kafka/bin/kafka-topics.sh --create --topic b --partitions 1 --replication-factor 1 --zookeeper localhost:2181
なお、トピックの削除には、サーバ側の設定が必要。以下のように設定して再起動。
delete.topic.enable=true
テストコードのインターフェース
以下の「はまった点」にもあるように、同じ名前のアプリケーション名を使いまわしていると、前回の状態をKafka側が覚えている、すなわち同じコンシューマグループのクライアントとして認識されるため、試験をするときはアプリケーション名を都度変えて実行する
はまった点
さっきはうまくいっていたのに今回はうまくいかない
:同じアプリケーション名だと、前回の状態を引き継ぐので、意図通りにConsumeできていない可能性がある