はじめに

この記事では、IBM提供のKafkaソリューションであるEvent Streamsを使用してAWS S3との連携を実現する方法を紹介します。Event Streamsでは様々なKafka Connectorを使用することができ、こちらにリストがまとまっています。今回はAmazon S3のSink Connectorを使用していきます。

基本的な流れは、以前の記事 – Event StreamsとMQ連携と同様になります。

1. 事前準備

1.1 Amazon S3環境

メッセージの連携先となるAmazon S3環境を設定します。
AWSの無料利用枠で検証しています。

image.png

事前にバケットを作成しておきます。

image.png

また、Kafkaが連携で使用するAWSのセキュリティ認証情報を設定します。
今回は検証目的のため、ルートユーザーのアクセスキーを使用します。

image.png

1.2 Event Streams環境

メッセージの連携元となるEvent Streams環境を設定します。
今回は、IBM CloudにCloud Pak for Integration 2023.2.1を導入して、Event Streams v11.2.0インスタンスをデプロイしています。
Event Streamsのコンソールに接続します。

image.png

1.2.1 トピック作成

ホーム画面の「トピックの作成」からデータ収集に使用するトピックを作成します。
今回は「test-topic」というトピックを作成しています。

image.png

1.2.2 クラスター接続設定

ホーム画面の「このクラスターに接続」から接続に使用する資格情報を確認・登録します。
Kafkaリスナーおよび資格情報欄で外部に対する「SCRAM資格情報の設定」を実施します。

image.png

今回は「kafka-user」という名前で設定し、以降はガイドに沿って権限を設定していきます。

image.png

2. コネクター環境セットアップ

「ツールボックス」を選択すると、Event Streamsに接続するための様々なツールが示されています。
今回は、コネクターを使用していきます。以下の順序で実施していきます。

image.png

2.1 Kafka Connect環境のセットアップ
2.2 Kafka Connect環境へのコネクターの追加
2.3 Kafka Connectとコネクターを開始

2.1 Kafka Connect環境のセットアップ

ここで示されている手順に沿って準備していきます。

image.png

Kafka Connect ZIPをダウンロードすると以下のような中身になっています。

kafkaconnect:
Dockerfile
kafka-connect.yaml
my-plugins

まずは、kafka-connect.yamlに今回の環境用の認証情報を設定していきます。
環境に合わせて以下の値を設定していきます。証明書の詳細はOCPコンソールから確認ができます。
・bootstrapServers
・image
・tls
・authentication

apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    eventstreams.ibm.com/use-connector-resources: "true"
spec:
  replicas: 1
  bootstrapServers: es-demo-kafka-bootstrap-cp4i.***.cloud:443
  image: image-registry.openshift-image-registry.svc:5000/cp4i/my-connect-cluster-image
  template:
    pod:
      imagePullSecrets: []
      metadata:
        annotations:
          eventstreams.production.type: CloudPakForIntegrationNonProduction
          productID: 2a79e49111f44ec3acd89608e56138f5
          productName: IBM Event Streams for Non Production
          productVersion: 11.2.0
          productMetric: VIRTUAL_PROCESSOR_CORE
          productChargedContainers: my-connect-cluster-connect
          cloudpakId: c8b82d189e7545f0892db9ef2731b90d
          cloudpakName: IBM Cloud Pak for Integration
          productCloudpakRatio: "2:1"
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  tls:
    trustedCertificates:
      - secretName: es-demo-cluster-ca-cert
        certificate: ca.crt
  authentication:
    type: scram-sha-512
    username: kafka-user
    passwordSecret:
      secretName: kafka-user
      password: password

2.2 Kafka Connect環境へのコネクター追加

次にmy-pluginsフォルダーに今回使用したいConnector用のプラグインファイルを配置します。
今回は、s3 sink connectorを使用しますので、こちらのGithubで提供される情報をもとにjarファイルを取得し、配置します。リリースv2.13.0での一通りのjarファイルを含めて以降のような配置となります。jarファイルが複数ある場合は、my-pluginsフォルダー下にディレクトリーを作成して配置する必要があります。

./kafkaconnect
|--Dockerfile
|--kafka-connect.yaml
|--my-plugins
|  |--s3
|  |  |--audience-annotations-0.11.0.jar
|  |  |--avro-1.11.0.jar
|  |  |--aws-java-sdk-core-1.12.479.jar
|  |  |--aws-java-sdk-kms-1.12.461.jar
|  |  |--aws-java-sdk-s3-1.12.461.jar
|  |  |--aws-java-sdk-sts-1.12.479.jar
|  |  |--checker-qual-3.12.0.jar
|  |  |--common-utils-7.2.2.jar
|  |  |--commons-beanutils-1.9.4.jar
|  |  |--commons-codec-1.15.jar
|  |  |--commons-collections-3.2.2.jar
|  |  |--commons-compress-1.21.jar
|  |  |--commons-configuration2-2.8.0.jar
|  |  |--commons-for-apache-kafka-connect-0.9.0.jar
|  |  |--commons-lang3-3.12.0.jar
|  |  |--commons-logging-1.2.jar
|  |  |--commons-pool-1.6.jar
|  |  |--commons-text-1.10.0.jar
|  |  |--dnsjava-2.1.7.jar
|  |  |--error_prone_annotations-2.11.0.jar
|  |  |--failureaccess-1.0.1.jar
|  |  |--guava-31.1-jre.jar
|  |  |--hadoop-annotations-3.3.5.jar
|  |  |--hadoop-common-3.3.5.jar
|  |  |--hadoop-shaded-guava-1.1.1.jar
|  |  |--httpclient-4.5.13.jar
|  |  |--httpcore-4.4.13.jar
|  |  |--ion-java-1.0.2.jar
|  |  |--j2objc-annotations-1.3.jar
|  |  |--jackson-annotations-2.13.2.jar
|  |  |--jackson-core-2.13.2.jar
|  |  |--jackson-databind-2.13.2.2.jar
|  |  |--jackson-dataformat-cbor-2.13.2.jar
|  |  |--jakarta.activation-api-1.2.1.jar
|  |  |--javax.annotation-api-1.3.2.jar
|  |  |--jaxb-api-2.2.2.jar
|  |  |--jaxb-impl-2.2.3-1.jar
|  |  |--jersey-json-1.20.jar
|  |  |--jettison-1.1.jar
|  |  |--jmespath-java-1.12.479.jar
|  |  |--joda-time-2.8.1.jar
|  |  |--jsr305-3.0.2.jar
|  |  |--kafka-avro-serializer-7.2.2.jar
|  |  |--kafka-connect-avro-converter-7.2.2.jar
|  |  |--kafka-connect-avro-data-7.2.2.jar
|  |  |--kafka-schema-converter-7.2.2.jar
|  |  |--kafka-schema-registry-client-7.2.2.jar
|  |  |--kafka-schema-serializer-7.2.2.jar
|  |  |--listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
|  |  |--logredactor-1.0.10.jar
|  |  |--logredactor-metrics-1.0.10.jar
|  |  |--minimal-json-0.9.5.jar
|  |  |--parquet-avro-1.11.1.jar
|  |  |--parquet-column-1.11.1.jar
|  |  |--parquet-common-1.11.1.jar
|  |  |--parquet-encoding-1.11.1.jar
|  |  |--parquet-format-structures-1.11.1.jar
|  |  |--parquet-hadoop-1.11.1.jar
|  |  |--parquet-jackson-1.11.1.jar
|  |  |--re2j-1.6.jar
|  |  |--reload4j-1.2.22.jar
|  |  |--s3-connector-for-apache-kafka-2.13.0.jar
|  |  |--slf4j-api-1.7.36.jar
|  |  |--slf4j-reload4j-1.7.36.jar
|  |  |--snappy-java-1.1.9.1.jar
|  |  |--stax-api-1.0-2.jar
|  |  |--stax2-api-4.2.1.jar
|  |  |--swagger-annotations-2.1.10.jar
|  |  |--woodstox-core-5.4.0.jar
|  |  |--zstd-jni-1.5.5-2.jar

これで準備が終わりましたので、これらのファイルをDockerコンテナーとしてビルドし、OCP環境にPushしていきます。

>docker login cp.icr.io -u cp -p ***
WARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded

# dockerイメージビルド
>docker build -t my-connect-cluster-image:latest kafkaconnect/
[+] Building 6.3s (8/8) FINISHED
 => [internal] load build definition from Dockerfile                                                0.0s
 => => transferring dockerfile: 134B                                                                0.0s
 => [internal] load .dockerignore                                                                   0.0s
 => => transferring context: 2B                                                                     0.0s
 => [internal] load metadata for cp.icr.io/cp/ibm-eventstreams-kafka:11.2.0                         5.9s
 => [auth] cp/ibm-eventstreams-kafka:pull token for cp.icr.io                                       0.0s
 => [internal] load build context                                                                   0.2s
 => => transferring context: 13.54MB                                                                0.1s
 => CACHED [1/2] FROM cp.icr.io/cp/ibm-eventstreams-kafka:11.2.0@sha256:***  0.0s
 => [2/2] COPY ./my-plugins/ /opt/kafka/plugins/                                                    0.1s
 => exporting to image                                                                              0.1s
 => => exporting layers                                                                             0.1s
 => => writing image sha256:c09376a99a08679869c2a24524bd9fd20a08730e88655868b202cd3b9c38029c        0.0s
 => => naming to docker.io/library/my-connect-cluster-image:latest                                  0.0s

# OCP環境レジストリーパスでタグ付け
> docker tag my-connect-cluster-image:latest default-route-openshift-image-registry.***.cloud/cp4i/my-connect-cluster-image:latest

# OCP環境にログイン
> docker login -u *** -p sha256~*** default-route-openshift-image-registry.***.cloudWARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded

# OCP環境のイメージレジストリーにプッシュ
> docker push default-route-openshift-image-registry.***.cloud/cp4i/my-connect-cluster-image:latest
The push refers to repository [default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image]
9bb3680d1518: Pushed
afbaf216b515: Pushed
604ea310d3b8: Pushed
c885a267df03: Pushed
d41c9d214c08: Pushed
1d4211b8047a: Pushed
b6b5e54a9d61: Pushed
5b0e52ab72d6: Pushed
b9c8807c07f0: Pushed
abd5331ad63d: Pushed
1d36f4b724f1: Pushed
28647eedfe80: Pushed
28d152ef0c4c: Pushed
a5a3a5497180: Pushed
b642d9aa2f45: Pushed
8b86f8fd3a7d: Pushed
99c6ecde084d: Pushed
28a458282a1e: Pushed
b66994f69e49: Pushed
f536fae8f746: Pushed
cd1cd3ddb935: Pushed
61094c2c6010: Pushed
1ff1dbf9158b: Pushed
f922a018877b: Pushed
52cbfc36b72b: Pushed
latest: digest: sha256:***7f632e size: 5555

2.3 Kafka Connectとコネクターを開始

2.3.1 kafka connect起動

以下の手順でKafkaConnectを開始します。

> oc apply -f kafka-connect.yaml
kafkaconnect.eventstreams.ibm.com/my-connect-cluster created

OCP環境からPodが開始され、エラーが出ていないことを確認します。

image.png
image.png

2.3.2 Kafka Connectorの開始

次にKafkaConnectorのリソースファイルを作成します。GitHubで提供しているS3 Connectorの情報や事前準備で確認した内容をもとに、編集しデプロイします。

> cat s3-sink.yaml
apiVersion: eventstreams.ibm.com/v1beta2 
kind: KafkaConnector 
metadata: 
  name: s3-sink-connector 
  labels: 
    eventstreams.ibm.com/cluster: my-connect-cluster 
spec: 
  class: io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector
  tasksMax: 1 
  config: 
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    format.output.type: jsonl
    topics: test-topic
    aws.access.key.id: ***
    aws.secret.access.key: ***
    aws.s3.region: us-east-1
    file.name.template: dir1/dir2/{{topic}}-{{partition:padding=true}}-{{start_offset:padding=true}}.gz
    aws.s3.bucket.name: showroom-s3


> oc apply -f s3-sink.yaml
kafkaconnector.eventstreams.ibm.com/s3-sink-connector created

OCPコンソールからkafkaconnectorのリソースが作成されていることを確認できます。
特にエラーが出ていないことを確認します。

image.png

3. 動作確認

以上でEvent Streams-Amazon S3の連携が実現できました。
Event Streamsのtest-topicにメッセージが到着すると、即時、Amazon S3に連携されることを確認していきます。

Event Streamsへのメッセージ登録には、ツールボックスのスターター・アプリケーションを使用します。

image.png

ローカル端末上で起動することができ、以下がテストツールの表示画面になります。左側がProducer用のテスト機能です。
ここでは、「{“test”:”test message”}」というメッセージを生成しました。

image.png

このツールにより、Event Streamsのtest-topicにメッセージが到着していることが確認できます。

image.png

Amazon S3にも、Kafka Connectorの設定に基づいた形式でファイルが作成されていることが確認できます。

image.png

以上です。

广告
将在 10 秒后关闭
bannerAds