我尝试将 AKS 上的 Confluent Platform 和 Azure Synapse Analytics 进行协作,以构建数据分析基础设施
大纲
我已经在 AKS 上的 Confluent Platform 上对 topic / connector / stream 进行了配置,并且使用 Kafka Connector 连接了 Azure Database for MySQL 作为数据源,并连接了 Azure Synapse Analytics 作为数据分析平台的数据接收端。下面是步骤总结:
我已经确认了将数据写入MySQL,并且通过Confluent Platform的Stream实时处理数据,并将数据存储到Synapse Analytics中。
本地环境
-
- macOS Monterey 12.3.1
-
- python 3.8.12
-
- Azure CLI 2.34.1
-
- helm v3.6.3
- mssql-cli v1.0.0
事前准备
-
- 执行本文章,确保Confluent Platform在AKS上运行。
执行本文章,确保Azure Database for MySQL以单一配置运行。
执行本文章,确保Azure Synapse Analytics运行。
各种配置
确认MySQL服务器的附加设置。
请使用Azure门户,在Confluent Platform上运行的AKS中将从Azure Database for MySQL访问权限设置为“是”,这需要在资源的“连接安全性”设置中修改“允许访问Azure服务”为“是”。
请参考创建 Azure Database for MySQL 服务器的防火墙规则。
请确认以下”服务器参数”的项目值的变更。
-
- time_zone : SYSTEM → +9:00
- binlog_row_image : MINIMAL → FULL
确认Pod
## AKSクラスタ接続のためのための認証情報の取得
$ az aks get-credentials --resource-group rg_ituru_aks01 --name aks_ituru_cp01
## namespace のデフォルトとしての指定
$ kubectl config set-context --current --namespace akscp610
## Pod の確認
$ kubectl get pod -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
cp610-cp-control-center-f46bc647d-nb42w 1/1 Running 2 (2m52s ago) 3m33s 10.0.1.36 aks-cpdemo01-26269800-vmss000001 <none> <none>
cp610-cp-kafka-0 2/2 Running 1 (113s ago) 3m33s 10.0.1.16 aks-cpdemo01-26269800-vmss000000 <none> <none>
cp610-cp-kafka-1 2/2 Running 0 2m39s 10.0.1.47 aks-cpdemo01-26269800-vmss000001 <none> <none>
cp610-cp-kafka-2 2/2 Running 0 2m7s 10.0.1.79 aks-cpdemo01-26269800-vmss000002 <none> <none>
cp610-cp-kafka-connect-7b6cd684b5-6s5gf 2/2 Running 2 (111s ago) 3m33s 10.0.1.11 aks-cpdemo01-26269800-vmss000000 <none> <none>
cp610-cp-ksql-server-656b866794-kw76z 2/2 Running 0 3m33s 10.0.1.68 aks-cpdemo01-26269800-vmss000002 <none> <none>
cp610-cp-schema-registry-d8466d9dd-msm5p 2/2 Running 2 (2m43s ago) 3m33s 10.0.1.67 aks-cpdemo01-26269800-vmss000002 <none> <none>
cp610-cp-zookeeper-0 2/2 Running 0 3m33s 10.0.1.44 aks-cpdemo01-26269800-vmss000001 <none> <none>
cp610-cp-zookeeper-1 2/2 Running 0 2m39s 10.0.1.73 aks-cpdemo01-26269800-vmss000002 <none> <none>
cp610-cp-zookeeper-2 2/2 Running 0 115s 10.0.1.12 aks-cpdemo01-26269800-vmss000000 <none> <none>
kafka-client 1/1 Running 0 6m15s 10.0.1.69 aks-cpdemo01-26269800-vmss000002 <none> <none>
ksql-client 1/1 Running 0 6m2s 10.0.1.14 aks-cpdemo01-26269800-vmss000000 <none> <none>
## 起動サービスの確認
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
cp610-cp-control-center ClusterIP 10.1.0.73 <none> 9021/TCP 4m16s
cp610-cp-kafka ClusterIP 10.1.0.63 <none> 9092/TCP,5556/TCP 4m16s
cp610-cp-kafka-connect ClusterIP 10.1.0.26 <none> 8083/TCP,5556/TCP 4m16s
cp610-cp-kafka-headless ClusterIP None <none> 9092/TCP 4m16s
cp610-cp-ksql-server ClusterIP 10.1.0.86 <none> 8088/TCP,5556/TCP 4m16s
cp610-cp-schema-registry ClusterIP 10.1.0.157 <none> 8081/TCP,5556/TCP 4m16s
cp610-cp-zookeeper ClusterIP 10.1.0.119 <none> 2181/TCP,5556/TCP 4m16s
cp610-cp-zookeeper-headless ClusterIP None <none> 2888/TCP,3888/TCP 4m16s
创建 Kafka 主题
## Kafka-client への接続
$ kubectl exec -it kafka-client -- /bin/bash
## topic_002 の作成
[appuser@kafka-client ~]$ kafka-topics --zookeeper cp610-cp-zookeeper:2181 --create --topic topic002 --partitions 1 --replication-factor 1
Created topic topic002.
## topic の確認
[appuser@kafka-client ~]$ kafka-topics --zookeeper cp610-cp-zookeeper:2181 --list
连接器的定义文件
MySqlConnector的定义文件
请参考Debezium文档-入门-教程。
{
"name": "MySqlSourceConnector_1",
"config": {
"connectionTimeZone": "JST",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "iturumysql01.mysql.database.azure.com",
"database.port": "3306",
"database.user": "adminadmin@iturumysql01",
"database.password": "HogeHogeHoge!",
"database.server.name": "iturumysql01",
"database.ssl.mode": "preferred",
"database.history.kafka.bootstrap.servers": "cp610-cp-kafka:9092",
"database.history.kafka.topic": "history.cpdemo.inventory",
"database.include.list": "iotdummydb"
}
}
SqlDwSinkConnector的配置文件
请查看适用于Confluent Platform的Azure Synapse Analytics Sink Connector。
{
"name": "SqlDwSinkConnector_1",
"config": {
"connector.class": "io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector",
"tasks.max": "1",
"topics": "topic002",
"azure.sql.dw.url": "jdbc:sqlserver://cpworkspace01.sql.azuresynapse.net:1433;",
"azure.sql.dw.user": "adminadmin",
"azure.sql.dw.password": "HogeHogeHoge1!",
"azure.sql.dw.database.name": "cpdemosqldb",
"auto.create": "true",
"auto.evolve": "true",
"table.name.format": "kafka_${topic}"
}
}
连接器的设置
将Kafka Connect的连接端口(8083)重定向到本地环境。
$ kubectl port-forward --address localhost svc/cp610-cp-kafka-connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
※ CTRL+C で終了できます
打开另一个终端,并创建以下的 Kafka 连接器。
创建 MySqlConnector
-
- 以下の3つのトピックが自動生成されています
iturumysql01 :CREATE,DROP / Database,Table 等の操作ログ+Schema情報が保存される
iturumysql01.iotdummydb.inventory :データが保存される(MySQLの対象DBにデータがあるとき)
history.cpdemo.inventory :Connectorがデータベースのスキーマ履歴を保管するために利用
## MySQLへの接続確認がとれないとエラーとなります
$ curl -i -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' --data @MySqlSourceConnector.json
HTTP/1.1 201 Created
Date: Mon, 23 May 2022 23:43:16 GMT
Location: http://localhost:8083/connectors/MySqlSourceConnector_1
Content-Type: application/json
Content-Length: 600
Server: Jetty(9.4.33.v20201020)
{"name":"MySqlSourceConnector_1","config":{"connectionTimeZone":"JST","connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"iturumysql01.mysql.database.azure.com","database.port":"3306","database.user":"mysqladmin@iturumysql01","database.password":"NetworldPsg2!","database.server.name":"iturumysql01","database.ssl.mode":"preferred","database.history.kafka.bootstrap.servers":"cp610-cp-kafka:9092","database.history.kafka.topic":"history.cpdemo.inventory","database.include.list":"iotdummydb","name":"MySqlSourceConnector_1"},"tasks":[],"type":"source"}
ーーー ちなみに ーーー
## 削除の場合
$ curl -X DELETE http://localhost:8083/connectors/MySqlSourceConnector_1
创建Sink – SqlDwSinkConnector
## Synapse への接続確認がとれないとエラーとなります
$ curl -i -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' --data @SqlDwSinkConnector.json
HTTP/1.1 201 Created
Date: Thu, 26 May 2022 05:44:34 GMT
Location: http://localhost:8083/connectors/SqlDwSinkConnector_1
Content-Type: application/json
Content-Length: 493
Server: Jetty(9.4.33.v20201020)
{"name":"SqlDwSinkConnector_1","config":{"connector.class":"io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector","tasks.max":"1","topics":"topic002","azure.sql.dw.url":"jdbc:sqlserver://cpworkspace01.sql.azuresynapse.net:1433;","azure.sql.dw.user":"synapseadmin","azure.sql.dw.password":"NetworldPsg2!","azure.sql.dw.database.name":"CPDemoDataWarehouse","auto.create":"true","auto.evolve":"true","table.name.format":"kafka_${topic}","name":"SqlDwSinkConnector_1"},"tasks":[],"type":"sink"}
ーーー ちなみに ーーー
## 削除の場合
$ curl -X DELETE http://localhost:8083/connectors/SqlDwSinkConnector_1
连接器的确认
## ワーカーのアクティブなコネクターのリストの表示
$ curl localhost:8083/connectors
["MySqlSourceConnector_1","SqlDwSinkConnector_1"]
## MySqlSourceConnector の状態確認
$ curl localhost:8083/connectors/MySqlSourceConnector_1/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 176 100 176 0 0 3365 0 --:--:-- --:--:-- --:--:-- 3384
{
"name": "MySqlSourceConnector_1",
"connector": {
"state": "RUNNING",
"worker_id": "10.0.1.11:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.0.1.11:8083"
}
],
"type": "source"
}
## SqlDwSinkConnector の状態確認
$ curl localhost:8083/connectors/SqlDwSinkConnector_1/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 172 100 172 0 0 2487 0 --:--:-- --:--:-- --:--:-- 2492
{
"name": "SqlDwSinkConnector_1",
"connector": {
"state": "RUNNING",
"worker_id": "10.0.1.11:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.0.1.11:8083"
}
],
"type": "sink"
}
## まとめて確認する場合
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | SqlDwSinkConnector_1 | RUNNING | RUNNING | io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector
source | MySqlSourceConnector_1 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector
KSQL的配置
连接到KSQL
## Ksql-client への接続
$ kubectl exec -it ksql-client -- /bin/bash
## KsqlDB への接続
[appuser@ksql-client ~]$ ksql http://cp610-cp-ksql-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v6.1.0, Server v6.1.0 located at http://cp610-cp-ksql-server:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
## topic の確認
ksql> list topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------
cp610-cp-kafka-connect-config | 1 | 3
cp610-cp-kafka-connect-offset | 25 | 3
cp610-cp-kafka-connect-status | 5 | 3
history.cpdemo.inventory | 1 | 1
iturumysql01 | 1 | 1
iturumysql01.iotdummydb.inventory | 1 | 1
topic002 | 1 | 1
---------------------------------------------------------------------
使用 Avro Schema 进行关联创建流。
## MySQL Source Connector 用の stream001 を作成
ksql> CREATE STREAM stream001 WITH (kafka_topic = 'iturumysql01.iotdummydb.inventory', value_format = 'avro');
Message
----------------
Stream created
----------------
## Synapse Sink Connector 用の stream002 を作成し topic002 に紐付ける
ksql> CREATE STREAM stream002 WITH (KAFKA_TOPIC='topic002', VALUE_FORMAT='AVRO') AS
SELECT s001.after->id as id,
s001.after->section as section,
s001.after->iot_state as prefecture,
s001.after->val_1 as val_1,
s001.after->val_2 as val_2
FROM stream001 s001;
Message
----------------------------------------
Created query with ID CSAS_STREAM002_1
----------------------------------------
## Streamの確認
ksql> show streams;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
----------------------------------------------------------------------------------------
STREAM001 | iturumysql01.iotdummydb.inventory | KAFKA | AVRO | false
STREAM002 | topic002 | KAFKA | AVRO | false
----------------------------------------------------------------------------------------
一切准备就绪
数据流的确认
确认流
## データストリーミング状況を確認します
ksql> SELECT * FROM stream002 emit changes;
Press CTRL-C to interrupt
执行应用程序 (Zhí xù)
我从本地终端执行了在此文章中提到的”mysql_IoTdummy.py” Python程序。每隔1秒生成了5条数据。
$ python mysql_IoTdummy.py --mode db --wait 1--count 10
データベース・テーブルへのデータ書き込み
Connection established
['1653059784.916943', 0, '111', 'V', '786-4201', '徳島県', 158.33322633209377, 84.49057325157214, 1653059784.91714]
['1653059784.917148', 1, '111', 'L', '575-1674', '山口県', 135.8599096871533, 75.064630664443, 1653059784.917203]
['1653059784.917208', 2, '111', 'C', '607-0145', '佐賀県', 156.0637293462609, 78.21672571181065, 1653059784.917251]
['1653059784.917256', 3, '111', 'A', '749-7168', '北海道', 123.96884840139384, 80.95952082333669, 1653059784.917296]
['1653059784.917301', 4, '111', 'L', '843-4658', '香川県', 199.07957653525887, 53.37983182659535, 1653059784.917351]
Inserted 5 row(s) of data.
Done.
処理時間:6.554551839828491 [sec]
您可以在”stream002″上实时查看流媒体。
ksql> SELECT * FROM stream002 emit changes;
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ID |SECTION |PREFECTURE |VAL_1 |VAL_2 |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|1653059784.916943 |V |徳島県 |158.33322143554688 |84.49057006835938 |
|1653059784.917148 |L |山口県 |135.8599090576172 |75.06462860107422 |
|1653059784.917208 |C |佐賀県 |156.06373596191406 |78.21672821044922 |
|1653059784.917256 |A |北海道 |123.9688491821289 |80.95951843261719 |
|1653059784.917301 |L |香川県 |199.07957458496094 |53.37983322143555 |
Press CTRL-C to interrupt
在“topic002”话题中,我们会确认数据是否正在进行流式传输。
ksql> PRINT 'iturumysql01.iotdummydb.inventory' FROM BEGINNING;
ksql> PRINT 'topic002' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2022/05/20 15:16:25.700 Z, key: <null>, value: {"ID": "1653059784.916943", "SECTION": "V", "PREFECTURE": "徳島県", "VAL_1": 158.33322143554688, "VAL_2": 84.49057006835938}
rowtime: 2022/05/20 15:16:26.703 Z, key: <null>, value: {"ID": "1653059784.917148", "SECTION": "L", "PREFECTURE": "山口県", "VAL_1": 135.8599090576172, "VAL_2": 75.06462860107422}
rowtime: 2022/05/20 15:16:27.705 Z, key: <null>, value: {"ID": "1653059784.917208", "SECTION": "C", "PREFECTURE": "佐賀県", "VAL_1": 156.06373596191406, "VAL_2": 78.21672821044922}
rowtime: 2022/05/20 15:16:29.209 Z, key: <null>, value: {"ID": "1653059784.917256", "SECTION": "A", "PREFECTURE": "北海道", "VAL_1": 123.9688491821289, "VAL_2": 80.95951843261719}
rowtime: 2022/05/20 15:16:30.211 Z, key: <null>, value: {"ID": "1653059784.917301", "SECTION": "L", "PREFECTURE": "香川県", "VAL_1": 199.07957458496094, "VAL_2": 53.37983322143555}
Press CTRL-C to interrupt
用于 Azure Synapse Analytics 的数据检查
通过使用「mssql-cli」工具,在 Synapse 的 SQL 池中对数据库执行查询操作。确保数据通过 Confluent Platform 的 Kafka Connector 进行存储。
## 引数として SQLプール(データベース)名、ユーザー名、パスワードを指定して、Synapse の workspace に接続
$ mssql-cli -S cpworkspace01.sql.azuresynapse.net -U synapseadmin -P NetworldPsg2! -d cpdemosqldb
## テーブルでクエリを実行して、そのコンテンツを表示
cpdemosqldb> select * from kafka_topic002;
+------------------+-------------------+-----------+------------------+--------------+
| VAL_1 | ID | SECTION | VAL_2 | PREFECTURE |
|------------------+-------------------+-----------+------------------+--------------|
| 111.479415893555 | 1653984067.079385 | B | 66.560661315918 | 岡山県 |
| 189.111267089844 | 1653984229.080214 | S | 68.9491729736328 | 長野県 |
| 191.615036010742 | 1653984100.803468 | Q | 53.9585876464844 | 福岡県 |
| 199.33251953125 | 1653984229.080527 | F | 61.3692970275879 | 福岡県 |
| 151.738159179688 | 1653983738.752873 | B | 87.595329284668 | 大阪府 |
| 194.590621948242 | 1653983783.798177 | X | 87.0414962768555 | 栃木県 |
| 114.615188598633 | 1653983738.752919 | D | 89.7937088012695 | 宮城県 |
| 124.387306213379 | 1653984441.154836 | S | 74.9007797241211 | 長崎県 |
| 164.049896240234 | 1653983783.798038 | I | 72.0705490112305 | 岐阜県 |
| 198.455078125 | 1653984100.803527 | T | 65.5769958496094 | 福井県 |
: : : : :
处理完毕
Pod / namespace 的卸载方式
## Pod : Confluent Platform
$ helm delete cp610
W0407 17:38:18.658767 39907 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
release "cp610" uninstalled
## Pod : kafka-client / ksql-client
$ kubectl delete -f cp-helm-charts/examples/kafka-client.yaml
$ kubectl delete -f cp-helm-charts/examples/ksql-client.yaml
## namespace の削除方法(namespace配下のPodも削除される)
$ kubectl delete namespace akscp610
namespace "akscp610" deleted
停止和启动AKS集群
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
总结
通过使用Confluent平台作为核心,在Azure上可以轻松搭建数据流和数据分析基础设施。由于Kafka Connect支持连接多个资源,因此可以实现数据的聚合,并且还可以轻松连接数据分析基础设施。
请参考以下信息。
我已仔细阅读了您提供的信息,并表示感激。
-
- MySQLのTable更新情報をKafka上でksql処理する
-
- Schema Registry について書いていく その1: Confluent Schema Registry
- Debezium によるチェンジデータキャプチャー
另外,我也参考了以下资讯。
-
- Debezium MySQL Source Connector for Confluent Platform
-
- Azure Database for MySQL でのサーバー パラメーター
-
- 第2章 MySQL 用 Debezium コネクター
-
- Debezium User Guide
-
- Kafka Connect と Schema Registry の使用
-
- Kafka Connect とコネクターのモニタリング
-
- Kafka Connect のログ記録
- Azure Synapse Analytics Sink Connector for Confluent Platform