とりあえずkafka試してみたいけれど、apacheのOSSなのでサーバ立てるのもめんどくさそうってことでDockerでさくっと立ててちょっと試してみた。
OSXマシンに立てたいってことであれば、Mac OSXにApache Kafkaをインストールして使えるようにするまでという記事の方にある通り、brewでインストールできてくっそ楽なので、こっちでやれば良し
準備
https://github.com/spotify/docker-kafka
spotifyが提供しているリポジトリを利用した。
https://github.com/wurstmeister/kafka-docker
の方が有名っぽいけれど、composeするのめんどくさいし、動けばいいでしょ程度の安直な考え
動かし方はREADMEにある通り
docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka
2181番がzookeeperのポートで、9092がkafkaのポートになる。ホストマシンのIPを与えよう。DNSで解決できるのであればドメインでも良し。
これでkafkaサーバが立つ。
起動したらconsoleを離してくれないけれど、detachしても生きてるので適当にコンソール閉じるなりしてもOK
Ctrl + Cとかするとkafkaも死ぬので、やらないように
簡単なメッセージを送る
kafkaは簡単に言うとproducer, broker, consumerで成り立つ。
brokerがざっくりkafka本体という考えで良い。producerがpublishし、consumerがsubscribeする。
詳しくはApache Kafkaに入門したとかにまとまっているのでここを読めば良し。
quick startとかにあるように、本体付属のshを使うのが楽だ。
container内では /opt/kafka-(version)/bin/ 以下にある。
docker exec -it test_kafka bash
などでcontainerの中でbashを動かす
topicは、扱うメッセージの単位だと考えていい。メッセージを送る時、受け取る時いずれも必ずtopicの指定は必要になる。
ここにはないけれど、試しに存在しないtopicをsubscribeしてみたりしたら、勝手にtopicができていた。とはいえ設定できる項目は多いので、そのような方法で増やすと後々首を閉めそうなので辞めた方が良さそう
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
で、シンプルなtestというtopicが誕生する。
後はクイックスタートに沿って実行してみるととりあえず動くはずだ
Clientライブラリからメッセージをやりとりしてみる
ここまではshを使ってきたが、普通は各システムからメッセージを送るはずなので、そこらへんもどんな感触か試したい。
今回は
https://github.com/dpkp/kafka-python
というkafkaのクライアントライブラリを使った。理由は、最初に適当に書きたいからスクリプト言語が良かったのと、rubyクライアントのconsumerが動かなかったためだ。
目的の言語などがある場合は、クライアント一覧のページから探せば良さそうだ
requirememts.txt を用意して
kafka-python
を入れて、 pip install -r requirements.txt してとりあえず使えるようにする。
topicの作成からclientにまかせてみようとしたら、明示的に作るAPIはなさそうだった
githubのissueを見ると、どうやら自動で作られるしかなさそうだ。
StackOverflow曰く、Zookeeperをいじって作る口がJavaならあるっぽいので、そういうことをしたいなら結局はjavaしかないのかもしれない。
一番簡単なスクリプトを以下に書く
producer
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="your.kafka.host.ip:9092")
producer.send('test', b'foobaa').get(timeout=1)
consumer
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers='your.kafka.host.ip:9092')
for msg in consumer:
print(msg)
consumerを先に動かしてproducerを動かすとメッセージが届くはずだ
kafkaは内部にストレージがあり、一定期間メッセージを保存するので、それらを取り出すやり方もあるはずなので、ためしてみるといいと思う。