はじめに

Apache Kafkaは、オープンソースの分散イベントストリーミングプラットフォームです。Airbnb、LINE、PayPal、Netflix、LinkedIn、Twitter、Uberなど、多くの大企業で使用されており、非常に実績の多いSWです。

Kafkaは多くのコンポーネントで構築されるSWであり、それらを手動で運用することは難度が高いと言われていますが、そのKafkaをKubernetes上で稼働させることで、Kubernetesのオートヒーリングやオートスケーリングといった運用をサポートする機能の恩恵を受けることができます。

当記事では、Kafkaの導入・運用を自動化する Strimzi オペレータを使用して、構築済みのIBM Cloud Kubernetes Service(以下IKS)クラスターに対して、Kafkaを導入する手順を紹介します。

前提事項

当記事では以下のバージョンを前提とします。

    • IKS : 1.22.2_1522 (VPC Gen2)

 

    • Kafka : 3.0.0 (Scala 2.13)

 

    Strimzi : 0.26

StrimziによるKafka導入手順

基本的には Strimzi のQuick Start guideの通りに実行すれば問題ありません。Quick Start guide は Minikube への導入を前提としていますが、IKSに対しても同じ手順で導入することができます。ただ、Quick Start guideでは、外部に対してのKafka Brokerの公開をNodeportで行っていますが、VPC Gen2におけるIKSではNodeportでのインターネットへの公開はできなくなっている(パブリックIPアドレスが無い)ため、LoadBalancerを使用して公開することにします。

StrimziのリリースページからStrimzi Operatorのマニフェストファイルのzipをダウンロードし、任意のディレクトリに展開します。

Strimzi Operatorは、kafka ネームスペースにデプロイします。そのために、ダウンロードしたマニフェストファイルを展開したディレクトリでsedコマンドを実行し、ネームスペースの指定を「kafka」に変更します。

# Linuxの場合
sed -i 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml

# Macの場合
sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml

Strimzi Operatorをデプロイする kafka ネームスペース、および、Kafka Clusterをデプロイする my-kafka-project ネームスペースを作成します。

kubectl create namespace kafka
kubectl create namespace my-kafka-project

Strimzi のマニフェストファイルを展開したディレクトリ以下の install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml で、STRIMZI_NAMESPACE 環境変数の値を「my-kafka-project」に変更します。

env:
 - name: STRIMZI_NAMESPACE
   value: my-kafka-project

Strimzi のマニフェストファイルを展開したディレクトリで以下のコマンドを実行し、マニフェストファイルを適用し、CRDを作成します。

kubectl create -f install/cluster-operator/ -n kafka

以下のコマンドを実行し、Strimzi Operator に my-kafka-project ネームスペースに対する権限を付与します。

kubectl create -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-kafka-project

kubectl create -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-kafka-project

以下のコマンドを実行してマニフェストファイルを適用し、my-clusterという名前のKafkaクラスターを作成します。外部に対する公開の設定(spec.kafka.listenrs[2] の external の箇所)で、type: loadbalancer を指定することで、LoadBalancerサービスが作成され、インターネットからのアクセスが可能となります。また、ここではTLS暗号化を無効(tls: false)としていますが、tls: trueとすることで有効にすることができます。

cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: loadbalancer  # ここでloadbalancerを指定することで、LoadBalancerサービスが作成される
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF

以下のコマンドを実行し、KafkaクラスターがReady状態になるのを待ちます。600秒以内にReadyにならなければタイムアウトになりますが、その場合は再度実行し、Readyになるまで待ちます。

kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s -n my-kafka-project

# 時間内にReadyになれば、以下の結果が表示されます。
# kafka.kafka.strimzi.io/my-cluster condition met

以下のコマンドを実行し、my-topicという名前のTopicを作成します。

cat << EOF | kubectl create -n my-kafka-project -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  partitions: 3
  replicas: 1
EOF

次に、アクセスするための情報(ホスト名)を取得するため、以下のコマンドを実行します。これにより、xxxxxxxxxx.lb.appdomain.cloudのようなホスト名が取得できるはずです。

kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].hostname}{"\n"}' -n my-kafka-project

Kafkaの標準クライアントでアクセスする

Kafkaのダウンロードページから、Kafkaのバイナリファイルを取得します。「Binary downloads」と示されたファイル(tgzファイル)をダウンロードします。

ダウンロードしたtgzファイルを任意のディレクトリに展開します。

展開したディレクトリに移動し、以下のコマンドを実行します。これはKafkaクラスターのmy-topicTopicに対して、Producerとしてのアクセスを開始・実行しています。

bin/kafka-console-producer.sh --broker-list <上記で取得したホスト名>:9094 --topic my-topic

>のようなプロンプトが表示されるので、任意の文字を入力してEnterキーを押すことで、KafkaのTopicにメッセージが投げられます。

次に、Consumerとしてのアクセスを開始・実行します。別のターミナルを立ち上げ、以下のコマンドを実行してください。

bin/kafka-console-consumer.sh --bootstrap-server <上記で取得したホスト名>:9094 --topic my-topic --from-beginning

Consumer側では、Producer側で投げたメッセージが表示されます。さらにProducer側のターミナルで新しいメッセージを投げると、Consumer側のターミナルでリアルタイムで受信し、表示されます。

Strimzi Kafka CLIでアクセスする

Kafkaへのターミナルからのアクセスは、上記の通り、Kafkaのbinディレクトリ以下のシェルスクリプトを実行することで可能となっていますが、これをもっと扱いやすくするCLIとして、Strimzi Kafka CLIがあります(このCLIは、Strimziで構築されたKafkaクラスターへのアクセスでのみ利用できます)。

Strimzi Kafka CLIは、Homebrew、または、pipを使用してインストールできます。

Homebrewの場合は以下のコマンドを実行します。

brew tap systemcraftsman/strimzi-kafka-cli
brew install strimzi-kafka-cli

pipの場合は以下を実行します。

sudo pip install strimzi-kafka-cli

Strimzi Kafka CLIでは、kfk コマンドが提供されます。my-clusterのmy-topicTopicにアクセスするには以下の通り実行します。

Producer側は以下を実行します。

kfk console-producer --topic my-topic -n my-kafka-project -c my-cluster

Consumer側は以下を実行します。

kfk console-consumer --topic my-topic -n my-kafka-project -c my-cluster --from-beginning

以上です。

参考資料

当記事では以下のページを参考にしています。

    • Strimzi Quick Start guide (0.26.0)

 

    • Accessing Kafka: Part 4 – Load Balancers

 

    • Strimzi Kafka CLI

 

    TLS Authentication on Strimzi by using Strimzi Kafka CLI
广告
将在 10 秒后关闭
bannerAds