docker kafkaでコンテイナーでメッセージングを開発する人のためここに記録を残します。
この投稿の環境は以下のようです。
-
- Ubuntu 20.04 LTS
- Docker version 20.10.12, build e91ed57
docker コンテイナーはzookeeperまで入ってる https://hub.docker.com/r/bitnami/kafka を使用しました。
上のリンクのoverviewの通り直接githubにあるyamlファイルをダウンして実行しました。
$ curl -sSL https://raw.githubusercontent.com/bitnami/bitnami-docker-kafka/master/docker-compose.yml > docker-compose.yml
$ docker-compose up -d
ここで適用されたdocker-compose.ymlは以下のようです。
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.1
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
docker-compose up -dを実行するとzookeeperとkafka二つのコンテイナーが立ち上がられます。
Pythonでproducerとconsumerを作ってテストをして見ました。
# Consumer.py
from kafka import KafkaConsumer
from json import loads
import time
topic_name = 'topic_test'
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
value_deserializer = lambda x: loads(x.decode('utf-8')),
)
start = time.time()
print("[begin] Topic: %sで consumerがメッセージを受け取る。" % (topic_name))
for message in consumer:
print("Partition: %d, Offset: %d, Value: %s" % (message.partition, message.offset, message.value))
print("[end]掛かる時間 : ", time.time() - start)
# Producer.py
from kafka import KafkaProducer
from json import dumps
import time
topic_name = 'topic_test'
producer = KafkaProducer(
acks=0,
compression_type = 'gzip',
bootstrap_servers=['localhost:9092'],
value_serializer = lambda x: dumps(x).encode('utf-8')
)
start = time.time()
print("[begin] producerからメッセージ転送スタート")
for i in range(100):
data = {'str': 'result'+str(i)}
print("メッセージ転送中..." + data['str'])
producer.send(topic_name, value=data)
producer.flush()
print("[end] 掛かる時間:", time.time() - start)
でも実行するとproducer.pyはflush()で動作が止まってるし、consumer.pyではkafka接続で止まってしまいます。
原因はlocahostがコンテイナーとubuntuサーバーの解析が異なってるからでした。https://hub.docker.com/r/bitnami/kafkaのoverview をちゃんと見たら Apache Kafka development setup exampleのセックションがありました。以下は開発用のdocker-compose.ymlです。
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
以前で設置したコンテイナーを消してこれで再び実行。
でも、kafkaのコンテイナーが立ち上げられないです。
docker logsで誤りを確認して見ると
[2022-04-22 16:00:52,785] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentBrokerIdException: Configured broker.id 1 doesn't match stored broker.id ...
ブロッカーが問題だったのか。。。一応上のdocker-compose.ymlからKAFKA BROKER ID=1を削除してコンテイナーを立ち上げ直します。
先にpythonコードをもう一度実行すると
$ python3 Producer.py
[begin] producerからメッセージ転送スタート
メッセージ転送中...result0
メッセージ転送中...result1
メッセージ転送中...result2
...
メッセージ転送中...result97
メッセージ転送中...result98
メッセージ転送中...result99
[end] 掛かる時間: 0.03677988052368164
$ python3 Comsumer.py
[begin] Topic: topic_testで consumerがメッセージを受け取る。
Partition: 0, Offset: 200, Value: {'str': 'result0'}
Partition: 0, Offset: 201, Value: {'str': 'result1'}
Partition: 0, Offset: 202, Value: {'str': 'result2'}
Partition: 0, Offset: 203, Value: {'str': 'result3'}
Partition: 0, Offset: 204, Value: {'str': 'result4'}
...
Partition: 0, Offset: 296, Value: {'str': 'result96'}
Partition: 0, Offset: 297, Value: {'str': 'result97'}
Partition: 0, Offset: 298, Value: {'str': 'result98'}
Partition: 0, Offset: 299, Value: {'str': 'result99'}
Docker kafkaコンテイナーを通じてproducer, consumerの間にメッセージが正常的にやりとるをするのを確認できました。
参考で最終のdocker-compose.ymlは以下のようです。
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.1
hostname: kafka
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local