これはなに

EC2 on Kafkaをやる事がありそうだったので、その個人的な手順のメモです。
もし誰かの役に立てば幸いです。

対象のサーバにSSH

ssh -i ~/.ssh/鍵.pem ec2-user@<パブリックIP>

kafkaダウンロード

wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xzvf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0/

Zookeeper起動

bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka起動

bin/kafka-server-start.sh config/server.properties

Topic作成

bin/kafka-topics.sh --create --bootstrap-server=localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic

Topic一覧表示

 bin/kafka-topics.sh --list --bootstrap-server=localhost:9092

テスト送信

Topic送信

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

Topic受信

bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic my-topic

外から叩く設定

image.png

kafkaサーバ設定

vi config/server.properties

書き換える

#listeners=PLAINTEXT://:9092

listeners=PLAINTEXT://<インスタンスのプライベートIP>:9092

プロデューサー側、コンシューマー側共通

venv

mkdir [project dir]
cd [project dir]
python3 -m venv venv

venv有効化

source venv/bin/activate

kafka-pythonインストール

pip install kafka-python

プロデューサー側設定

from 'kafka-python' import KafkaProducer
producer = kafkapy.KafkaProducer(bootstrap_servers=['<kafkaサーバIP>:9092'])
v = producer.send('my-topic', b'test')
metadata = v.get(timeout=10)
print(v)

コンシューマー側設定

from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer('my-topic', bootstrap_servers=['<kafkaサーバIP>:9092'])
for msg in consumer:
    print(msg)

ZookeeperとKafkaのsystemd作成

/usr/localへの移動とシンボリックリンクの作成

sudo mv kafka_2.13-3.0.0 /usr/local/.
cd /usr/local/
sudo ln -s  kafka_2.13-3.0.0 kafka

zookeeper-server.service作成

#sudo vi /etc/systemd/system/zookeeper-server.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka_2.13-3.0.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-3.0.0/config/zookeeper.properties
ExecStop=/usr/local/kafka_2.13-3.0.0/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

kafka-server.service作成

#sudo vi /etc/systemd/system/kafka-server.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper-server.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk"
Environment="KAFKA_HEAP_OPTS=-Xmx256M -Xms128M"
ExecStart=/usr/local/kafka_2.13-3.0.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-3.0.0/config/server.properties
ExecStop=/usr/local/kafka_2.13-3.0.0/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

それぞれ有効化する

systemctl enable zookeeper-server
systemctl enable kafka-server

デーモン再起動

sudo systemctl daemon-reload
广告
将在 10 秒后关闭
bannerAds