1machineでkafka環境を作って戯れる

1.環境設定

OS&JDK

CentOS7 をインストールしたのち

    • firewalld をオフ

 

    • selinux をオフ

 

    JDK8 をインストール
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
    /etc/profile or ~/.bashrc にJAVA_HOME設定
export JAVA_HOME=/usr/lib/jvm/java

ZooKeeper

CDHのRPMを利用する /etc/yum.repos.d/ に以下を配置

https://archive.cloudera.com/cdh5/redhat/7/x86_64/cdh/cloudera-cdh5.repo

でyum install して、サービス起動と、自動起動設定もしておく

# yum -y install zookeeper-server
# service zookeeper-server start
# systemctl enable zookeeper-server.service

ポート2181が空いてれば起動成功かな zookeeper-client を使って確認してもよい

Kafka

Apache サイトのバイナリtarball をダウンロード、展開

開発用なのでヒープサイズはちっちゃくてよい(デフォルトは1GB)

export KAFKA_HEAP_OPTS="-Xmx256M"

それに合わせて config/server.propertiesの各種値も1ケタ小さくしておこう

socket.request.max.bytes=10485760
log.retention.bytes=107374182
log.segment.bytes=107374182

そしてスタート

$ bin/kafka-server-start.sh config/server.properties &

疎通確認もしておく. topic 作って確認

$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
  --replication-factor 1 --partitions 1 --topic test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

メッセージを入れて取り出す 二つターミナルを用意して以下のようにする

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

うまくメッセージパッシングできていることを確認

2.Kafka Streamsとの戯れ

デモの実行

v0.10で導入された kafka streams を試してみる。サンプルのデモを実行するため、二つのトピック streams-file-input と streams-wordcount-output を作成しておく

$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
  --replication-factor 1 --partitions 1 --topic streams-file-input
$ bin/kafka-topics.sh --create --zookeeper localhost:2181\
  --replication-factor 1 --partitions 1 --topic streams-wordcount-output

input となるstreamの方には適当に、Apache Licenceの文章でも入れておく

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input
**** Apache Licence の文章******
Ctrl+D

そして、クラスパスを通して、Streamアプリを実行

$  ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

そして結果を確認

$ ./bin/kafka-console-consumer.sh \
--zookeeper localhost:2181 --topic streams-wordcount-output \
--from-beginning --formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Kafka Streaming の概要

というか Kafka Streaming とはなんぞや?ドキュメント http://docs.confluent.io/2.1.0-alpha1/streams/index.html に概要が書かれているが、要は

    • Kafka のクライアントライブラリとして実装された

 

    • ストリーミングを実現するAPI群で

 

    • トピックAのメッセージを処理してトピックBに詰め込むことを可能にする

 

    • High Level なDSLと Low Level なインターフェイスを持ち

 

    スケーラブルでエラーセーフにすることもできる

とのこと

3.Kafka Streamsのアプリ開発

環境整備

男は黙ってvi. 以下のようなactivateファイルを用意して開発時には source activate とする

PS1=(kafka)$PS1
KAFKA_HOME=/home/______/kafka/kafka
for i in $KAFKA_HOME/libs/*.jar
do
CLASSPATH=$i:$CLASSPATH
done
export CLASSPATH

何回も実行して出力先のトピックが汚れるのもいやなので、毎回作り直す。以下のようなスクリプトをinit.shというように名前をつけて都度リフレッシュ。

./kafka/bin/kafka-topics.sh --delete --topic b --zookeeper localhost:2181
./kafka/bin/kafka-topics.sh --create --topic b --partitions 1 --replication-factor 1 --zookeeper localhost:2181

なお、トピックの削除には、サーバ側の設定が必要。以下のように設定して再起動。

delete.topic.enable=true

テストコードのインターフェース

以下の「はまった点」にもあるように、同じ名前のアプリケーション名を使いまわしていると、前回の状態をKafka側が覚えている、すなわち同じコンシューマグループのクライアントとして認識されるため、試験をするときはアプリケーション名を都度変えて実行する

はまった点

さっきはうまくいっていたのに今回はうまくいかない
:同じアプリケーション名だと、前回の状態を引き継ぐので、意図通りにConsumeできていない可能性がある

广告
将在 10 秒后关闭
bannerAds