docker kafkaでコンテイナーでメッセージングを開発する人のためここに記録を残します。

この投稿の環境は以下のようです。

    • Ubuntu 20.04 LTS

 

    Docker version 20.10.12, build e91ed57

docker コンテイナーはzookeeperまで入ってる https://hub.docker.com/r/bitnami/kafka を使用しました。

上のリンクのoverviewの通り直接githubにあるyamlファイルをダウンして実行しました。

$ curl -sSL https://raw.githubusercontent.com/bitnami/bitnami-docker-kafka/master/docker-compose.yml > docker-compose.yml
$ docker-compose up -d

ここで適用されたdocker-compose.ymlは以下のようです。

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.1
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

docker-compose up -dを実行するとzookeeperとkafka二つのコンテイナーが立ち上がられます。

Pythonでproducerとconsumerを作ってテストをして見ました。

# Consumer.py
from kafka import KafkaConsumer
from json import loads
import time

topic_name = 'topic_test'
consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=['localhost:9092'],
        value_deserializer = lambda x: loads(x.decode('utf-8')),
        )

start = time.time()
print("[begin] Topic: %sで consumerがメッセージを受け取る。" % (topic_name))

for message in consumer:
    print("Partition: %d, Offset: %d, Value: %s" % (message.partition, message.offset, message.value))

print("[end]掛かる時間 : ", time.time() - start)
# Producer.py
from kafka import KafkaProducer
from json import dumps
import time

topic_name = 'topic_test'

producer = KafkaProducer(
        acks=0,
        compression_type = 'gzip',
        bootstrap_servers=['localhost:9092'],
        value_serializer = lambda x: dumps(x).encode('utf-8')
        )

start = time.time()

print("[begin] producerからメッセージ転送スタート")

for i in range(100):
    data = {'str': 'result'+str(i)}
    print("メッセージ転送中..." + data['str'])
    producer.send(topic_name, value=data)


producer.flush()

print("[end] 掛かる時間:", time.time() - start)

でも実行するとproducer.pyはflush()で動作が止まってるし、consumer.pyではkafka接続で止まってしまいます。

原因はlocahostがコンテイナーとubuntuサーバーの解析が異なってるからでした。https://hub.docker.com/r/bitnami/kafkaのoverview をちゃんと見たら Apache Kafka development setup exampleのセックションがありました。以下は開発用のdocker-compose.ymlです。

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

以前で設置したコンテイナーを消してこれで再び実行。

でも、kafkaのコンテイナーが立ち上げられないです。

docker logsで誤りを確認して見ると

[2022-04-22 16:00:52,785] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentBrokerIdException: Configured broker.id 1 doesn't match stored broker.id ...

ブロッカーが問題だったのか。。。一応上のdocker-compose.ymlからKAFKA BROKER ID=1を削除してコンテイナーを立ち上げ直します。

先にpythonコードをもう一度実行すると

$ python3 Producer.py
[begin] producerからメッセージ転送スタート
メッセージ転送中...result0
メッセージ転送中...result1
メッセージ転送中...result2
...
メッセージ転送中...result97
メッセージ転送中...result98
メッセージ転送中...result99
[end] 掛かる時間: 0.03677988052368164
$ python3 Comsumer.py
[begin] Topic: topic_testで consumerがメッセージを受け取る
Partition: 0, Offset: 200, Value: {'str': 'result0'}
Partition: 0, Offset: 201, Value: {'str': 'result1'}
Partition: 0, Offset: 202, Value: {'str': 'result2'}
Partition: 0, Offset: 203, Value: {'str': 'result3'}
Partition: 0, Offset: 204, Value: {'str': 'result4'}
...
Partition: 0, Offset: 296, Value: {'str': 'result96'}
Partition: 0, Offset: 297, Value: {'str': 'result97'}
Partition: 0, Offset: 298, Value: {'str': 'result98'}
Partition: 0, Offset: 299, Value: {'str': 'result99'}

Docker kafkaコンテイナーを通じてproducer, consumerの間にメッセージが正常的にやりとるをするのを確認できました。

参考で最終のdocker-compose.ymlは以下のようです。

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:3.1
    hostname: kafka
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local
广告
将在 10 秒后关闭
bannerAds