Ubuntu 20.04でApache Kafkaをインストールする方法
著者は、寄付プログラム「書いて寄付する」の一環として、フリーかつオープンソースの基金に寄付をするために選択しました。
はじめに
Apache Kafkaは、大量のリアルタイムデータを処理するために設計された人気のある分散メッセージブローカーです。Kafkaクラスターは非常にスケーラブルで耐障害性があります。また、ActiveMQやRabbitMQなどの他のメッセージブローカーと比較して、はるかに高いスループットを持っています。一般的にはパブリッシュ/サブスクライブのメッセージングシステムとして使用されますが、多くの組織がログ集約のためにも利用しています。なぜなら、Kafkaはパブリッシュされたメッセージの永続的なストレージを提供しているからです。
パブリッシュ/サブスクライブのメッセージングシステムは、1つ以上のプロデューサがメッセージを公開することができ、コンシューマの数やメッセージの処理方法を考える必要がありません。購読されたクライアントは、自動的にアップデートや新しいメッセージの作成について通知されます。このシステムは、定期的に新しいメッセージが利用可能かどうかを確認するためにクライアントが定期的にポーリングするシステムよりも効率的かつスケーラブルです。
このチュートリアルでは、Ubuntu 20.04にApache Kafka 2.8.2をインストールおよび設定します。
前提条件
続けるために、以下が必要です:
- An Ubuntu 20.04 server with at least 4 GB of RAM and a non-root user with sudo privileges. You can set this up by following our Initial Server Setup guide if you do not have a non-root user set up. Installations with less than 4GB of RAM may cause the Kafka service to fail.
- OpenJDK 11 installed on your server. To install this version, follow our tutorial on How To Install Java with APT on Ubuntu 20.04. Kafka is written in Java, so it requires a JVM.
ステップ1 – Kafkaのためのユーザーを作成
Kafkaはネットワークを介してリクエストを処理できるため、まず最初に専用のユーザーをサービス用に作成します。これにより、Kafkaサーバーが侵害された場合にUbuntuマシンへの被害を最小限に抑えることができます。このステップでは、専用のkafkaユーザーを作成します。
あなたの非管理者sudoユーザーとしてサーバーにログインし、次にkafkaというユーザーを作成してください。
- sudo adduser kafka
パスワードを設定し、kafkaユーザーを作成するために指示に従ってください。
次に、adduserコマンドでsudoグループにkafkaユーザーを追加します。Kafkaの依存関係をインストールするためには、これらの特権が必要です。
- sudo adduser kafka sudo
あなたのカフカユーザーは準備ができました。suを使用して、カフカアカウントにログインしてください。
- su -l kafka
Kafka専用のユーザーを作成したので、Kafkaのバイナリをダウンロードして展開する準備が整いました。
ステップ2 — Kafkaバイナリのダウンロードと展開
このステップでは、Kafkaのバイナリをダウンロードして、kafkaユーザーのホームディレクトリに専用のフォルダに展開します。
最初に、/home/kafkaというディレクトリに「Downloads」というディレクトリを作成して、ダウンロードしたファイルを保管してください。
- mkdir ~/Downloads
カフカのバイナリをダウンロードするために、curlを使用してください。
- curl “https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz” -o ~/Downloads/kafka.tgz
「kafka」という名前のディレクトリを作成し、このディレクトリに移動してください。これはKafkaのインストールのベースディレクトリとして使用します。
- mkdir ~/kafka && cd ~/kafka
ダウンロードしたアーカイブをtarコマンドを使用して展開してください。
- tar -xvzf ~/Downloads/kafka.tgz –strip 1
アーカイブの内容を~/kafka/に抽出するように、–strip 1フラグを指定してください。他のディレクトリ(たとえば~/kafka/kafka_2.13-2.8.2/内のディレクトリ)には抽出しないようにします。
バイナリをダウンロードし、正常に抽出したので、Kafkaサーバーの設定を開始できます。
ステップ3 — Kafkaサーバーの設定
Kafkaのトピックは、メッセージを公開できるカテゴリーやグループ、フィードの名前です。ただし、Kafkaのデフォルトの動作ではトピックを削除することはできません。この動作を変更するには、設定ファイルを編集する必要があります。このステップでは、その編集作業を行います。
カフカの設定オプションは、server.propertiesに指定されています。nanoまたはお気に入りのエディタでこのファイルを開きます。
- nano ~/kafka/config/server.properties
最初に、Kafkaのトピックを削除する設定を追加します。次の行をファイルの最後に追加してください。
~/kafka/config/server.properties→ Kafka のサーバープロパティの設定は、~/kafka/config/server.properties ファイルで行われます。
delete.topic.enable = true
次に、log.dirsプロパティを変更してKafkaのログが保存されるディレクトリを変更します。log.dirsプロパティを探し、既存のルートを強調されたルートに置き換えてください。
「~/kafka/config/server.propertiesファイル」
log.dirs=/home/kafka/logs
ファイルを保存して閉じる。
Kafkaを設定したら、Kafkaサーバーを起動し、起動時に自動的に有効化するためのsystemdユニットファイルを作成できます。
ステップ4 — systemdユニットファイルの作成とKafkaサーバーの起動
このセクションでは、Kafkaサービスのためのsystemdユニットファイルを作成します。これらのファイルによって、Kafkaの開始、停止、再起動などの一般的なサービスアクションを、他のLinuxサービスと一貫性のある方法で実行することができます。
Kafkaは、クラスターの状態と設定を管理するためにZookeeperを使用しています。Zookeeperは多くの分散システムで使用されており、公式のZookeeperドキュメントでツールについて詳しく読むことができます。これらのユニットファイルでZookeeperをサービスとして使用します。
Zookeeperのためにユニットファイルを作成してください。
- sudo nano /etc/systemd/system/zookeeper.service
以下のユニットの定義をファイルに入力してください。
/etc/systemd/system/zookeeper.serviceを日本語で自然な言い方で言い換えると、次のようになります。
/zookeeper サービスの systemd ユニットファイル
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Zookeeperの[Unit]セクションでは、ネットワーキングとファイルシステムが準備完了している必要があることが指定されています。
[サービス] セクションでは、systemdがzookeeper-server-start.shとzookeeper-server-stop.shのシェルファイルを使用してサービスを起動および停止することが指定されています。また、異常終了した場合にはZookeeperが再起動されるようにも指定されています。
このコンテンツを追加した後、ファイルを保存して閉じてください。
次に、kafkaのsystemdサービスファイルを作成してください。
- sudo nano /etc/systemd/system/kafka.service
次の単位の定義をファイルに入力してください。 (Tsugi no tan’i no teigi o fairu ni nyūryoku shite kudasai.)
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
[オプション1]
[Unit] セクションでは、このユニットファイルが zookeeper.service に依存しており、kafka サービスの起動時に自動的に zookeeper も起動されるようになります。
「Service」セクションでは、systemdがkafka-server-start.shとkafka-server-stop.shシェルファイルを使用してサービスを起動および停止するように指定されています。また、Kafkaが異常終了した場合には再起動するようにも指定されています。
ファイルを保存して閉じる。
ユニットが定義されたら、以下のコマンドでKafkaを起動してください。
- sudo systemctl start kafka
カフカユニットのジャーナルログを確認して、サーバーが正常に起動していることを確認してください。
- sudo systemctl status kafka
このような出力があなたに届きます。
● kafka.service Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset> Active: active (running) since Wed 2023-02-01 23:44:12 UTC; 4s ago Main PID: 17770 (sh) Tasks: 69 (limit: 4677) Memory: 321.9M CGroup: /system.slice/kafka.service ├─17770 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /ho> └─17793 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMill>
現在、デフォルトポートとしてKafkaサーバーがポート9092でリスニングしています。
カフカサービスを開始しました。しかし、サーバーを再起動すると、カフカは自動的に再起動されません。サーバーブート時にカフカサービスを有効にするには、次のコマンドを実行してください。
- sudo systemctl enable zookeeper
「シンボリックリンクが作成された」という応答が届きます。
Created symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.
それから、このコマンドを実行してください。
- sudo systemctl enable kafka
シンボリックリンクが作成されたことをお知らせいたします。
Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.
このステップでは、KafkaとZookeeperのサービスを起動しました。次のステップでは、Kafkaのインストールを確認します。
ステップ5 – Kafkaのインストールをテストする。
このステップでは、Kafkaのインストールをテストします。Kafkaサーバーが期待通りに動作していることを確認するために、Hello Worldメッセージを発行して受信します。
Kafkaでメッセージを公開するには、次のことが必要です:
- A producer, who enables the publication of records and data to topics.
- A consumer, who reads messages and data from topics.
始める前に、「TutorialTopic」というトピックを作成してください。
- ~/kafka/bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic TutorialTopic
kafka-console-producer.sh スクリプトを使用して、コマンドラインからプロデューサーを作成することができます。引数として、Kafkaサーバーのホスト名、ポート、トピックが必要です。
トピックが作成されたという返信を受け取ります。
Created topic TutorialTopic.
「こんにちは、世界」という文字列をTutorialTopicトピックに公開してください。
- echo “Hello, World” | ~/kafka/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic TutorialTopic > /dev/null
次に、kafka-console-consumer.shスクリプトを使用してKafkaコンシューマを作成します。引数として、ZooKeeperサーバのホスト名およびポート、さらにトピック名を指定します。次のコマンドはTutorialTopicからメッセージを消費します。コンシューマが開始される前に公開されたメッセージを消費するため、–from-beginningフラグの使用に注意してください。
- ~/kafka/bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic TutorialTopic –from-beginning
設定の問題がなければ、ターミナルで「こんにちは、世界」という応答を受け取ることができます。
Hello, World
スクリプトは続行され、公開するさらなるメッセージを待ち続けます。これをテストするために、新しいターミナルウィンドウを開き、サーバーにログインしてください。kafkaユーザーとしてログインすることを忘れないでください。
- su -l kafka
この新しいターミナルで、プロデューサーを開始して、2つ目のメッセージを発行してください。
- echo “Hello World from Sammy at Silicon Cloud!” | ~/kafka/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic TutorialTopic > /dev/null
このメッセージは、消費者の出力結果があなたの元のターミナルに表示されます。
Hello, World Hello World from Sammy at Silicon Cloud!
テストが完了したら、オリジナルのターミナルで消費者スクリプトを停止するためにCTRL+Cを押してください。
今、Ubuntu 20.04にKafkaサーバーをインストールし、設定しました。次のステップでは、Kafkaサーバーのセキュリティを強化するために、いくつかの迅速な作業を行います。
ステップ6- Kafka サーバーの強化
インストールが完了したら、kafkaユーザーの管理者特権を削除し、Kafkaサーバーのセキュリティを強化できます。
それを行う前に、root以外のsudoユーザーとしてログアウトして、ログインし直してください。もしこのチュートリアルを開始した時と同じシェルセッションのままでいる場合は、exitと入力してください。
sudoグループからkafkaユーザーを削除してください。
- sudo deluser kafka sudo
Kafka サーバーのセキュリティをさらに向上させるために、passwd コマンドを使用して kafka ユーザーのパスワードをロックします。この操作により、誰もがこのアカウントを使用して直接サーバーにログインできなくなります。
- sudo passwd kafka -l
-lフラグは、ユーザーのパスワード(passwd)を変更するコマンドをロックします。
この時点では、kafkaとしてログインするためには、ルートまたはsudoユーザーのみが次のコマンドを使用することができます。
- sudo su – kafka
将来的に、パスワードを変更する機能を解除する場合は、パスワードを-uオプションとともに使用してください。
- sudo passwd kafka -u
現在、Kafkaユーザーの管理者権限を正常に制限しました。Kafkaの使用を開始する準備ができました。次のステップでKafkaTをシステムに追加することもできます。
ステップ7 – KafkaTのインストール(オプション)
KafkaTは、Kafkaクラスタの詳細を表示したり、コマンドラインから特定の管理タスクを実行する能力を向上させるために開発されました。これはRubyのgemですので、使用するためにはRubyが必要です。また、KafkaTに依存する他のgemをビルドするためには、build-essentialパッケージも必要です。
aptを使用してRubyとbuild-essentialパッケージをインストールしてください。
- sudo apt install ruby ruby-dev build-essential
「gemコマンドを使用して、KafkaTをインストールすることができます。」
- sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
kafkatのインストールプロセス中にZookeeperの警告やエラーを抑制するには、Wno-error=format-overflowコンパイルフラグが必要です。
インストールが完了したら、完了の通知を受け取ります。
… Done installing documentation for json, colored, retryable, highline, trollop, zookeeper, zk, kafkat after 3 seconds 8 gems installed
KafkaTは、.kafkatcfgを設定ファイルとして使用して、Kafkaサーバーのインストール先およびログディレクトリを特定します。また、KafkaTがZooKeeperインスタンスへの接続を行うためのエントリも含まれるべきです。
.kafkatcfg という新しいファイルを作成してください。
- nano ~/.kafkatcfg
KafkaサーバーとZookeeperインスタンスに必要な情報を指定するために、以下の行を追加してください。
~/.kafkatcfg
{
"kafka_path": "~/kafka",
"log_path": "/home/kafka/logs",
"zk_path": "localhost:2181"
}
ファイルを保存して閉じてください。KafkaTを使用する準備が整いました。
全てのKafkaパーティションの詳細を表示するには、次のコマンドを実行してください。
- kafkat partitions
以下の出力が得られます。
[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible. /var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated … Topic Partition Leader Replicas ISRs TutorialTopic 0 0 [0] [0] __consumer_offsets 0 0 [0] [0] … …
出力には、TutorialTopic と __consumer_offsets が含まれています。__consumer_offsets は、Kafka がクライアント関連の情報を保存するために使用する内部トピックです。__consumer_offsets から始まる行は安全に無視してください。
KafkaTについて詳しく学びたい場合は、そのGitHubリポジトリを参照してください。
結論
現在、Ubuntuサーバー上で安全にApache Kafkaが実行されています。Kafkaクライアントを使って、お好きなプログラミング言語にKafkaを統合することができます。
カフカについてもっと学ぶためには、ドキュメンテーションも参照することができます。