はじめに
Apache Kafkaは、オープンソースの分散イベントストリーミングプラットフォームです。Airbnb、LINE、PayPal、Netflix、LinkedIn、Twitter、Uberなど、多くの大企業で使用されており、非常に実績の多いSWです。
Kafkaは多くのコンポーネントで構築されるSWであり、それらを手動で運用することは難度が高いと言われていますが、そのKafkaをKubernetes上で稼働させることで、Kubernetesのオートヒーリングやオートスケーリングといった運用をサポートする機能の恩恵を受けることができます。
当記事では、Kafkaの導入・運用を自動化する Strimzi オペレータを使用して、構築済みのIBM Cloud Kubernetes Service(以下IKS)クラスターに対して、Kafkaを導入する手順を紹介します。
前提事項
当記事では以下のバージョンを前提とします。
-
- IKS : 1.22.2_1522 (VPC Gen2)
-
- Kafka : 3.0.0 (Scala 2.13)
- Strimzi : 0.26
StrimziによるKafka導入手順
基本的には Strimzi のQuick Start guideの通りに実行すれば問題ありません。Quick Start guide は Minikube への導入を前提としていますが、IKSに対しても同じ手順で導入することができます。ただ、Quick Start guideでは、外部に対してのKafka Brokerの公開をNodeportで行っていますが、VPC Gen2におけるIKSではNodeportでのインターネットへの公開はできなくなっている(パブリックIPアドレスが無い)ため、LoadBalancerを使用して公開することにします。
StrimziのリリースページからStrimzi Operatorのマニフェストファイルのzipをダウンロードし、任意のディレクトリに展開します。
Strimzi Operatorは、kafka ネームスペースにデプロイします。そのために、ダウンロードしたマニフェストファイルを展開したディレクトリでsedコマンドを実行し、ネームスペースの指定を「kafka」に変更します。
# Linuxの場合
sed -i 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
# Macの場合
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml
Strimzi Operatorをデプロイする kafka ネームスペース、および、Kafka Clusterをデプロイする my-kafka-project ネームスペースを作成します。
kubectl create namespace kafka
kubectl create namespace my-kafka-project
Strimzi のマニフェストファイルを展開したディレクトリ以下の install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml で、STRIMZI_NAMESPACE 環境変数の値を「my-kafka-project」に変更します。
env:
- name: STRIMZI_NAMESPACE
value: my-kafka-project
Strimzi のマニフェストファイルを展開したディレクトリで以下のコマンドを実行し、マニフェストファイルを適用し、CRDを作成します。
kubectl create -f install/cluster-operator/ -n kafka
以下のコマンドを実行し、Strimzi Operator に my-kafka-project ネームスペースに対する権限を付与します。
kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-kafka-project
kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-kafka-project
以下のコマンドを実行してマニフェストファイルを適用し、my-clusterという名前のKafkaクラスターを作成します。外部に対する公開の設定(spec.kafka.listenrs[2] の external の箇所)で、type: loadbalancer を指定することで、LoadBalancerサービスが作成され、インターネットからのアクセスが可能となります。また、ここではTLS暗号化を無効(tls: false)としていますが、tls: trueとすることで有効にすることができます。
cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: loadbalancer # ここでloadbalancerを指定することで、LoadBalancerサービスが作成される
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
EOF
以下のコマンドを実行し、KafkaクラスターがReady状態になるのを待ちます。600秒以内にReadyにならなければタイムアウトになりますが、その場合は再度実行し、Readyになるまで待ちます。
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s -n my-kafka-project
# 時間内にReadyになれば、以下の結果が表示されます。
# kafka.kafka.strimzi.io/my-cluster condition met
以下のコマンドを実行し、my-topicという名前のTopicを作成します。
cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 3
replicas: 1
EOF
次に、アクセスするための情報(ホスト名)を取得するため、以下のコマンドを実行します。これにより、xxxxxxxxxx.lb.appdomain.cloudのようなホスト名が取得できるはずです。
kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}{"\n"}' -n my-kafka-project
Kafkaの標準クライアントでアクセスする
Kafkaのダウンロードページから、Kafkaのバイナリファイルを取得します。「Binary downloads」と示されたファイル(tgzファイル)をダウンロードします。
ダウンロードしたtgzファイルを任意のディレクトリに展開します。
展開したディレクトリに移動し、以下のコマンドを実行します。これはKafkaクラスターのmy-topicTopicに対して、Producerとしてのアクセスを開始・実行しています。
bin/kafka-console-producer.sh --broker-list <上記で取得したホスト名>:9094 --topic my-topic
>のようなプロンプトが表示されるので、任意の文字を入力してEnterキーを押すことで、KafkaのTopicにメッセージが投げられます。
次に、Consumerとしてのアクセスを開始・実行します。別のターミナルを立ち上げ、以下のコマンドを実行してください。
bin/kafka-console-consumer.sh --bootstrap-server <上記で取得したホスト名>:9094 --topic my-topic --from-beginning
Consumer側では、Producer側で投げたメッセージが表示されます。さらにProducer側のターミナルで新しいメッセージを投げると、Consumer側のターミナルでリアルタイムで受信し、表示されます。
Strimzi Kafka CLIでアクセスする
Kafkaへのターミナルからのアクセスは、上記の通り、Kafkaのbinディレクトリ以下のシェルスクリプトを実行することで可能となっていますが、これをもっと扱いやすくするCLIとして、Strimzi Kafka CLIがあります(このCLIは、Strimziで構築されたKafkaクラスターへのアクセスでのみ利用できます)。
Strimzi Kafka CLIは、Homebrew、または、pipを使用してインストールできます。
Homebrewの場合は以下のコマンドを実行します。
brew tap systemcraftsman/strimzi-kafka-cli
brew install strimzi-kafka-cli
pipの場合は以下を実行します。
sudo pip install strimzi-kafka-cli
Strimzi Kafka CLIでは、kfk コマンドが提供されます。my-clusterのmy-topicTopicにアクセスするには以下の通り実行します。
Producer側は以下を実行します。
kfk console-producer --topic my-topic -n my-kafka-project -c my-cluster
Consumer側は以下を実行します。
kfk console-consumer --topic my-topic -n my-kafka-project -c my-cluster --from-beginning
以上です。
参考資料
当記事では以下のページを参考にしています。
-
- Strimzi Quick Start guide (0.26.0)
-
- Accessing Kafka: Part 4 – Load Balancers
-
- Strimzi Kafka CLI
- TLS Authentication on Strimzi by using Strimzi Kafka CLI