はじめに
IBM Event StreamsはApache Kafkaをベースにしたイベント・ストリームプラットフォームです。Event Streamsでは様々なサービスとデータ連携するためのConnectorを提供されています。今回は、MQ source connectorを使用して、MQのキューにメッセージが到着したことをイベントとして検知し、Event Streamsのtopicに収集する方法を紹介します。
1. 事前準備
1.1 MQ環境
メッセージの収集元となるMQを設定します。
今回は、IBM Cloudのフリー環境を使用しています。

接続用の資格情報も登録しておきます。

1.2 Event Streams環境
メッセージの収集先となるEvent Streamsを設定します。
今回は、IBM CloudにCloud Pak for Integration 2022.4.1を導入して使用しています。
Event Streamsインスタンスに接続すると、以下のようなコンソールが表示されます。

1.2.1 トピック作成
ホーム画面の「トピックの作成」からデータ収集に使用するトピックを作成します。
今回は「user」というトピックを作成しています。

1.2.2 クラスター接続設定
ホーム画面の「このクラスターに接続」から接続に使用する資格情報を確認・登録します。
Kafkaリスナーおよび資格情報欄で内部に対する「TLS資格情報の生成」を実施します。

以下のような流れで設定していきます。




ここで設定した証明書は、OCPコンソールからも確認ができます。

2. コネクター環境セットアップ
「ツールボックス」を選択すると、Event Streamsに接続するための様々なツールが示されています。
今回は、コネクターを使用していきます。以下の順序で実施していきます。
2.1 Kafka Connect環境のセットアップ
2.2 Kafka Connect環境へのコネクターの追加
2.3 Kafka Connectとコネクターを開始

2.1. Kafka Connect環境のセットアップ
ここで示されている手順に沿って準備していきます。

Kafka Connect ZIPをダウンロードすると以下のような中身になっています。
kafkaconnect:
Dockerfile
kafka-connect.yaml
my-plugins
まずは、kafka-connect.yamlに今回の環境用の認証情報を設定していきます。
環境に合わせて以下の値を設定していきます。証明書の詳細はOCPコンソールから確認ができます。
・bootstrapServers
・image
・tls
・authentication
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
eventstreams.ibm.com/use-connector-resources: "true"
spec:
replicas: 1
bootstrapServers: es-demo-kafka-bootstrap.cp4i.svc:9093
image: image-registry.openshift-image-registry.svc:5000/cp4i/my-connect-cluster-image:latest
template:
pod:
imagePullSecrets: []
metadata:
annotations:
eventstreams.production.type: CloudPakForIntegrationNonProduction
productID: 2a79e49111f44ec3acd89608e56138f5
productName: IBM Event Streams for Non Production
productVersion: 11.1.3
productMetric: VIRTUAL_PROCESSOR_CORE
productChargedContainers: my-connect-cluster-connect
cloudpakId: c8b82d189e7545f0892db9ef2731b90d
cloudpakName: IBM Cloud Pak for Integration
cloudpakVersion: 2022.4.1
productCloudpakRatio: "2:1"
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
tls:
trustedCertificates:
- secretName: es-demo-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
certificate: user.crt
key: user.key
secretName: kafka-user
2.2 Kafka Connect環境へのコネクターの追加
次にmy-pluginsフォルダーに今回使用したいConnector用のプラグインファイルを配置します。
今回は、mq-source connectorを使用しますので、こちらのGithubで提供される情報をもとにjarファイルを作成し、配置します。
これで準備が終わりましたので、これらのファイルをDockerコンテナーとしてビルドし、OCP環境にPushしていきます。
>docker login cp.icr.io -u cp -p ***
WARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded
# dockerイメージビルド
>docker build -t my-connect-cluster-image:latest C:\kafkaconnect
[+] Building 6.3s (8/8) FINISHED
=> [internal] load build definition from Dockerfile 0.0s
=> => transferring dockerfile: 134B 0.0s
=> [internal] load .dockerignore 0.0s
=> => transferring context: 2B 0.0s
=> [internal] load metadata for cp.icr.io/cp/ibm-eventstreams-kafka:11.1.3 5.9s
=> [auth] cp/ibm-eventstreams-kafka:pull token for cp.icr.io 0.0s
=> [internal] load build context 0.2s
=> => transferring context: 13.54MB 0.1s
=> CACHED [1/2] FROM cp.icr.io/cp/ibm-eventstreams-kafka:11.1.3@sha256:c914ec93fd1131a429f08b538e 0.0s
=> [2/2] COPY ./my-plugins/ /opt/kafka/plugins/ 0.1s
=> exporting to image 0.1s
=> => exporting layers 0.1s
=> => writing image sha256:c09376a99a08679869c2a24524bd9fd20a08730e88655868b202cd3b9c38029c 0.0s
=> => naming to docker.io/library/my-connect-cluster-image:latest 0.0s
# OCP環境レジストリーパスでタグ付け
> docker tag my-connect-cluster-image:latest default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image:latest
# OCP環境にログイン
> docker login -u *** -p sha256~*** default-route-openshift-image-registry.***.us-east.containers.appdomain.cloudWARNING! Using --password via the CLI is insecure. Use --password-stdin.
Login Succeeded
# OCP環境のイメージレジストリーにプッシュ
> docker push default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image:latest
The push refers to repository [default-route-openshift-image-registry.***.us-east.containers.appdomain.cloud/cp4i/my-connect-cluster-image]
9bb3680d1518: Pushed
afbaf216b515: Pushed
604ea310d3b8: Pushed
c885a267df03: Pushed
d41c9d214c08: Pushed
1d4211b8047a: Pushed
b6b5e54a9d61: Pushed
5b0e52ab72d6: Pushed
b9c8807c07f0: Pushed
abd5331ad63d: Pushed
1d36f4b724f1: Pushed
28647eedfe80: Pushed
28d152ef0c4c: Pushed
a5a3a5497180: Pushed
b642d9aa2f45: Pushed
8b86f8fd3a7d: Pushed
99c6ecde084d: Pushed
28a458282a1e: Pushed
b66994f69e49: Pushed
f536fae8f746: Pushed
cd1cd3ddb935: Pushed
61094c2c6010: Pushed
1ff1dbf9158b: Pushed
f922a018877b: Pushed
52cbfc36b72b: Pushed
latest: digest: sha256:ba6fd8d172c8e379c873d6dc9a811f8d390a8f0047923099ba211d06bd7f632e size: 5555
2.3 Kafka Connectとコネクターを開始
2.3.1 kafka connectビルド
以下の手順でKafkaConnectを開始します。
> oc apply -f kafka-connect.yaml
kafkaconnect.eventstreams.ibm.com/my-connect-cluster created
OCP環境からPodが開始され、エラーが出ていないことを確認します。


2.3.2 Kafka Connectorの開始
GitHubで提供しているConnectorのリソースファイルを接続先の環境に合わせて編集し、開始します。
> cat mq-source.yaml
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
name: mq-source
labels:
eventstreams.ibm.com/cluster: my-connect-cluster
spec:
class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
tasksMax: 1
config:
mq.queue.manager: "USER.MGMT"
mq.connection.name.list: "user***.appdomain.cloud(30684)"
mq.channel.name: "CLOUD.APP.SVRCONN"
mq.queue: "DEV.QUEUE.1"
mq.user.name: "showroomapp"
mq.password: "***"
topic: "user"
mq.connection.mode: client
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
> oc apply -f mq-source.yaml
kafkaconnector.eventstreams.ibm.com/mq-source created
OCPコンソールからkafkaconnectorのリソースが作成されていることを確認できます。
特にエラーが出ていないことを確認します。


また、Kafka ConnectのPodログにMQとの接続が確立さえたメッセージが表示されることも確認できます。

3. 動作確認
以上でMQメッセージをKafkaに連携する準備が整いました。
MQ Consoleから今回接続しているキューに対してメッセージを登録します。

Event StreamsのConsoleから対象のトピックを確認すると、メッセージを受信できていることが確認できます。

以上です。