我尝试将 AKS 上的 Confluent Platform 和 Azure Synapse Analytics 进行协作,以构建数据分析基础设施

大纲

我已经在 AKS 上的 Confluent Platform 上对 topic / connector / stream 进行了配置,并且使用 Kafka Connector 连接了 Azure Database for MySQL 作为数据源,并连接了 Azure Synapse Analytics 作为数据分析平台的数据接收端。下面是步骤总结:

我已经确认了将数据写入MySQL,并且通过Confluent Platform的Stream实时处理数据,并将数据存储到Synapse Analytics中。

image.png

本地环境

    • macOS Monterey 12.3.1

 

    • python 3.8.12

 

    • Azure CLI 2.34.1

 

    • helm v3.6.3

 

    mssql-cli v1.0.0

事前准备

    1. 执行本文章,确保Confluent Platform在AKS上运行。

执行本文章,确保Azure Database for MySQL以单一配置运行。

执行本文章,确保Azure Synapse Analytics运行。


各种配置

确认MySQL服务器的附加设置。

请使用Azure门户,在Confluent Platform上运行的AKS中将从Azure Database for MySQL访问权限设置为“是”,这需要在资源的“连接安全性”设置中修改“允许访问Azure服务”为“是”。

请参考创建 Azure Database for MySQL 服务器的防火墙规则。

请确认以下”服务器参数”的项目值的变更。

    • time_zone : SYSTEM → +9:00

 

    binlog_row_image : MINIMAL → FULL

确认Pod

## AKSクラスタ接続のためのための認証情報の取得
$ az aks get-credentials --resource-group rg_ituru_aks01 --name aks_ituru_cp01

## namespace のデフォルトとしての指定
$ kubectl config set-context --current --namespace akscp610

## Pod の確認
$ kubectl get pod -o wide
NAME                                       READY   STATUS    RESTARTS        AGE     IP          NODE                               NOMINATED NODE   READINESS GATES
cp610-cp-control-center-f46bc647d-nb42w    1/1     Running   2 (2m52s ago)   3m33s   10.0.1.36   aks-cpdemo01-26269800-vmss000001   <none>           <none>
cp610-cp-kafka-0                           2/2     Running   1 (113s ago)    3m33s   10.0.1.16   aks-cpdemo01-26269800-vmss000000   <none>           <none>
cp610-cp-kafka-1                           2/2     Running   0               2m39s   10.0.1.47   aks-cpdemo01-26269800-vmss000001   <none>           <none>
cp610-cp-kafka-2                           2/2     Running   0               2m7s    10.0.1.79   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-kafka-connect-7b6cd684b5-6s5gf    2/2     Running   2 (111s ago)    3m33s   10.0.1.11   aks-cpdemo01-26269800-vmss000000   <none>           <none>
cp610-cp-ksql-server-656b866794-kw76z      2/2     Running   0               3m33s   10.0.1.68   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-schema-registry-d8466d9dd-msm5p   2/2     Running   2 (2m43s ago)   3m33s   10.0.1.67   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-zookeeper-0                       2/2     Running   0               3m33s   10.0.1.44   aks-cpdemo01-26269800-vmss000001   <none>           <none>
cp610-cp-zookeeper-1                       2/2     Running   0               2m39s   10.0.1.73   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-zookeeper-2                       2/2     Running   0               115s    10.0.1.12   aks-cpdemo01-26269800-vmss000000   <none>           <none>
kafka-client                               1/1     Running   0               6m15s   10.0.1.69   aks-cpdemo01-26269800-vmss000002   <none>           <none>
ksql-client                                1/1     Running   0               6m2s    10.0.1.14   aks-cpdemo01-26269800-vmss000000   <none>           <none>


## 起動サービスの確認
$ kubectl get svc
NAME                          TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
cp610-cp-control-center       ClusterIP   10.1.0.73    <none>        9021/TCP            4m16s
cp610-cp-kafka                ClusterIP   10.1.0.63    <none>        9092/TCP,5556/TCP   4m16s
cp610-cp-kafka-connect        ClusterIP   10.1.0.26    <none>        8083/TCP,5556/TCP   4m16s
cp610-cp-kafka-headless       ClusterIP   None         <none>        9092/TCP            4m16s
cp610-cp-ksql-server          ClusterIP   10.1.0.86    <none>        8088/TCP,5556/TCP   4m16s
cp610-cp-schema-registry      ClusterIP   10.1.0.157   <none>        8081/TCP,5556/TCP   4m16s
cp610-cp-zookeeper            ClusterIP   10.1.0.119   <none>        2181/TCP,5556/TCP   4m16s
cp610-cp-zookeeper-headless   ClusterIP   None         <none>        2888/TCP,3888/TCP   4m16s

创建 Kafka 主题

## Kafka-client への接続
$ kubectl exec -it kafka-client -- /bin/bash

## topic_002 の作成
[appuser@kafka-client ~]$ kafka-topics --zookeeper cp610-cp-zookeeper:2181 --create --topic topic002 --partitions 1 --replication-factor 1
Created topic topic002.

## topic の確認
[appuser@kafka-client ~]$ kafka-topics --zookeeper cp610-cp-zookeeper:2181 --list

连接器的定义文件

MySqlConnector的定义文件

请参考Debezium文档-入门-教程。

{
  "name": "MySqlSourceConnector_1",
  "config": {
    "connectionTimeZone": "JST",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "iturumysql01.mysql.database.azure.com",
    "database.port": "3306",
    "database.user": "adminadmin@iturumysql01",
    "database.password": "HogeHogeHoge!",
    "database.server.name": "iturumysql01",
    "database.ssl.mode": "preferred",
    "database.history.kafka.bootstrap.servers": "cp610-cp-kafka:9092",
    "database.history.kafka.topic": "history.cpdemo.inventory",
    "database.include.list": "iotdummydb"
  }
}

SqlDwSinkConnector的配置文件

请查看适用于Confluent Platform的Azure Synapse Analytics Sink Connector。

{
  "name": "SqlDwSinkConnector_1",
  "config": {
    "connector.class": "io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector",
    "tasks.max": "1",
    "topics": "topic002",
    "azure.sql.dw.url": "jdbc:sqlserver://cpworkspace01.sql.azuresynapse.net:1433;",
    "azure.sql.dw.user": "adminadmin",
    "azure.sql.dw.password": "HogeHogeHoge1!",
    "azure.sql.dw.database.name": "cpdemosqldb",
    "auto.create": "true",
    "auto.evolve": "true",
    "table.name.format": "kafka_${topic}"
  }
}

连接器的设置

将Kafka Connect的连接端口(8083)重定向到本地环境。

$ kubectl port-forward --address localhost svc/cp610-cp-kafka-connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

打开另一个终端,并创建以下的 Kafka 连接器。

创建 MySqlConnector

    • 以下の3つのトピックが自動生成されています

iturumysql01 :CREATE,DROP / Database,Table 等の操作ログ+Schema情報が保存される
iturumysql01.iotdummydb.inventory :データが保存される(MySQLの対象DBにデータがあるとき)
history.cpdemo.inventory :Connectorがデータベースのスキーマ履歴を保管するために利用

## MySQLへの接続確認がとれないとエラーとなります
$ curl -i -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' --data @MySqlSourceConnector.json

HTTP/1.1 201 Created
Date: Mon, 23 May 2022 23:43:16 GMT
Location: http://localhost:8083/connectors/MySqlSourceConnector_1
Content-Type: application/json
Content-Length: 600
Server: Jetty(9.4.33.v20201020)

{"name":"MySqlSourceConnector_1","config":{"connectionTimeZone":"JST","connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"iturumysql01.mysql.database.azure.com","database.port":"3306","database.user":"mysqladmin@iturumysql01","database.password":"NetworldPsg2!","database.server.name":"iturumysql01","database.ssl.mode":"preferred","database.history.kafka.bootstrap.servers":"cp610-cp-kafka:9092","database.history.kafka.topic":"history.cpdemo.inventory","database.include.list":"iotdummydb","name":"MySqlSourceConnector_1"},"tasks":[],"type":"source"}

ーーー ちなみに ーーー
## 削除の場合
$ curl -X DELETE http://localhost:8083/connectors/MySqlSourceConnector_1

创建Sink – SqlDwSinkConnector

## Synapse への接続確認がとれないとエラーとなります
$ curl -i -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' --data @SqlDwSinkConnector.json

HTTP/1.1 201 Created
Date: Thu, 26 May 2022 05:44:34 GMT
Location: http://localhost:8083/connectors/SqlDwSinkConnector_1
Content-Type: application/json
Content-Length: 493
Server: Jetty(9.4.33.v20201020)

{"name":"SqlDwSinkConnector_1","config":{"connector.class":"io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector","tasks.max":"1","topics":"topic002","azure.sql.dw.url":"jdbc:sqlserver://cpworkspace01.sql.azuresynapse.net:1433;","azure.sql.dw.user":"synapseadmin","azure.sql.dw.password":"NetworldPsg2!","azure.sql.dw.database.name":"CPDemoDataWarehouse","auto.create":"true","auto.evolve":"true","table.name.format":"kafka_${topic}","name":"SqlDwSinkConnector_1"},"tasks":[],"type":"sink"}

ーーー ちなみに ーーー
## 削除の場合
$ curl -X DELETE http://localhost:8083/connectors/SqlDwSinkConnector_1

连接器的确认

## ワーカーのアクティブなコネクターのリストの表示
$ curl localhost:8083/connectors
["MySqlSourceConnector_1","SqlDwSinkConnector_1"]


## MySqlSourceConnector の状態確認
$ curl localhost:8083/connectors/MySqlSourceConnector_1/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   176  100   176    0     0   3365      0 --:--:-- --:--:-- --:--:--  3384
{
  "name": "MySqlSourceConnector_1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.1.11:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.1.11:8083"
    }
  ],
  "type": "source"
}


## SqlDwSinkConnector の状態確認
$ curl localhost:8083/connectors/SqlDwSinkConnector_1/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   172  100   172    0     0   2487      0 --:--:-- --:--:-- --:--:--  2492
{
  "name": "SqlDwSinkConnector_1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.1.11:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.1.11:8083"
    }
  ],
  "type": "sink"
}


## まとめて確認する場合
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
          jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
          column -s : -t| sed 's/\"//g'| sort

sink    |  SqlDwSinkConnector_1    |  RUNNING  |  RUNNING  |  io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector
source  |  MySqlSourceConnector_1  |  RUNNING  |  RUNNING  |  io.debezium.connector.mysql.MySqlConnector

KSQL的配置

连接到KSQL

## Ksql-client への接続
$ kubectl exec -it ksql-client -- /bin/bash

## KsqlDB への接続
[appuser@ksql-client ~]$ ksql http://cp610-cp-ksql-server:8088

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
                  
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v6.1.0, Server v6.1.0 located at http://cp610-cp-ksql-server:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!


## topic の確認
ksql> list topics;

 Kafka Topic                       | Partitions | Partition Replicas 
---------------------------------------------------------------------
 cp610-cp-kafka-connect-config     | 1          | 3                  
 cp610-cp-kafka-connect-offset     | 25         | 3                  
 cp610-cp-kafka-connect-status     | 5          | 3                  
 history.cpdemo.inventory          | 1          | 1                  
 iturumysql01                      | 1          | 1                  
 iturumysql01.iotdummydb.inventory | 1          | 1                  
 topic002                          | 1          | 1                  
---------------------------------------------------------------------

使用 Avro Schema 进行关联创建流。

## MySQL Source Connector 用の stream001 を作成
ksql> CREATE STREAM stream001 WITH (kafka_topic = 'iturumysql01.iotdummydb.inventory', value_format = 'avro');

 Message        
----------------
 Stream created 
----------------


## Synapse Sink Connector 用の stream002 を作成し topic002 に紐付ける
ksql> CREATE STREAM stream002 WITH (KAFKA_TOPIC='topic002', VALUE_FORMAT='AVRO') AS 
        SELECT  s001.after->id as id,
                s001.after->section as section, 
                s001.after->iot_state as prefecture,
                s001.after->val_1 as val_1,
                s001.after->val_2 as val_2
        FROM stream001 s001; 

 Message                                
----------------------------------------
 Created query with ID CSAS_STREAM002_1 
----------------------------------------


## Streamの確認
ksql> show streams;

 Stream Name | Kafka Topic                       | Key Format | Value Format | Windowed 
----------------------------------------------------------------------------------------
 STREAM001   | iturumysql01.iotdummydb.inventory | KAFKA      | AVRO         | false    
 STREAM002   | topic002                          | KAFKA      | AVRO         | false    
----------------------------------------------------------------------------------------

一切准备就绪


数据流的确认

确认流

## データストリーミング状況を確認します
ksql> SELECT * FROM stream002 emit changes;

Press CTRL-C to interrupt

执行应用程序 (Zhí xù)

我从本地终端执行了在此文章中提到的”mysql_IoTdummy.py” Python程序。每隔1秒生成了5条数据。

$ python mysql_IoTdummy.py --mode db  --wait 1--count 10

データベース・テーブルへのデータ書き込み

Connection established

['1653059784.916943', 0, '111', 'V', '786-4201', '徳島県', 158.33322633209377, 84.49057325157214, 1653059784.91714]
['1653059784.917148', 1, '111', 'L', '575-1674', '山口県', 135.8599096871533, 75.064630664443, 1653059784.917203]
['1653059784.917208', 2, '111', 'C', '607-0145', '佐賀県', 156.0637293462609, 78.21672571181065, 1653059784.917251]
['1653059784.917256', 3, '111', 'A', '749-7168', '北海道', 123.96884840139384, 80.95952082333669, 1653059784.917296]
['1653059784.917301', 4, '111', 'L', '843-4658', '香川県', 199.07957653525887, 53.37983182659535, 1653059784.917351]
Inserted 5 row(s) of data.
Done.

処理時間:6.554551839828491 [sec]

您可以在”stream002″上实时查看流媒体。

ksql> SELECT * FROM stream002 emit changes;

+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ID                     |SECTION                |PREFECTURE             |VAL_1                  |VAL_2                  |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|1653059784.916943      |V                      |徳島県                    |158.33322143554688     |84.49057006835938      |
|1653059784.917148      |L                      |山口県                    |135.8599090576172      |75.06462860107422      |
|1653059784.917208      |C                      |佐賀県                    |156.06373596191406     |78.21672821044922      |
|1653059784.917256      |A                      |北海道                    |123.9688491821289      |80.95951843261719      |
|1653059784.917301      |L                      |香川県                    |199.07957458496094     |53.37983322143555      |

Press CTRL-C to interrupt

在“topic002”话题中,我们会确认数据是否正在进行流式传输。

ksql> PRINT 'iturumysql01.iotdummydb.inventory' FROM BEGINNING;

ksql> PRINT 'topic002' FROM BEGINNING;

Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2022/05/20 15:16:25.700 Z, key: <null>, value: {"ID": "1653059784.916943", "SECTION": "V", "PREFECTURE": "徳島県", "VAL_1": 158.33322143554688, "VAL_2": 84.49057006835938}
rowtime: 2022/05/20 15:16:26.703 Z, key: <null>, value: {"ID": "1653059784.917148", "SECTION": "L", "PREFECTURE": "山口県", "VAL_1": 135.8599090576172, "VAL_2": 75.06462860107422}
rowtime: 2022/05/20 15:16:27.705 Z, key: <null>, value: {"ID": "1653059784.917208", "SECTION": "C", "PREFECTURE": "佐賀県", "VAL_1": 156.06373596191406, "VAL_2": 78.21672821044922}
rowtime: 2022/05/20 15:16:29.209 Z, key: <null>, value: {"ID": "1653059784.917256", "SECTION": "A", "PREFECTURE": "北海道", "VAL_1": 123.9688491821289, "VAL_2": 80.95951843261719}
rowtime: 2022/05/20 15:16:30.211 Z, key: <null>, value: {"ID": "1653059784.917301", "SECTION": "L", "PREFECTURE": "香川県", "VAL_1": 199.07957458496094, "VAL_2": 53.37983322143555}

Press CTRL-C to interrupt

用于 Azure Synapse Analytics 的数据检查

通过使用「mssql-cli」工具,在 Synapse 的 SQL 池中对数据库执行查询操作。确保数据通过 Confluent Platform 的 Kafka Connector 进行存储。

## 引数として SQLプール(データベース)名、ユーザー名、パスワードを指定して、Synapse の workspace に接続
$ mssql-cli -S cpworkspace01.sql.azuresynapse.net -U synapseadmin -P NetworldPsg2! -d cpdemosqldb

## テーブルでクエリを実行して、そのコンテンツを表示
cpdemosqldb> select * from kafka_topic002;
+------------------+-------------------+-----------+------------------+--------------+
| VAL_1            | ID                | SECTION   | VAL_2            | PREFECTURE   |
|------------------+-------------------+-----------+------------------+--------------|
| 111.479415893555 | 1653984067.079385 | B         | 66.560661315918  | 岡山県       |
| 189.111267089844 | 1653984229.080214 | S         | 68.9491729736328 | 長野県       |
| 191.615036010742 | 1653984100.803468 | Q         | 53.9585876464844 | 福岡県       |
| 199.33251953125  | 1653984229.080527 | F         | 61.3692970275879 | 福岡県       |
| 151.738159179688 | 1653983738.752873 | B         | 87.595329284668  | 大阪府       |
| 194.590621948242 | 1653983783.798177 | X         | 87.0414962768555 | 栃木県       |
| 114.615188598633 | 1653983738.752919 | D         | 89.7937088012695 | 宮城県       |
| 124.387306213379 | 1653984441.154836 | S         | 74.9007797241211 | 長崎県       |
| 164.049896240234 | 1653983783.798038 | I         | 72.0705490112305 | 岐阜県       |
| 198.455078125    | 1653984100.803527 | T         | 65.5769958496094 | 福井県       |
      :                    :            :                 :             :

处理完毕

Pod / namespace 的卸载方式

## Pod : Confluent Platform
$ helm delete cp610             
W0407 17:38:18.658767   39907 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
release "cp610" uninstalled

## Pod : kafka-client / ksql-client
$ kubectl delete -f cp-helm-charts/examples/kafka-client.yaml
$ kubectl delete -f cp-helm-charts/examples/ksql-client.yaml

## namespace の削除方法(namespace配下のPodも削除される)
$ kubectl delete namespace akscp610
namespace "akscp610" deleted

停止和启动AKS集群

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

总结

通过使用Confluent平台作为核心,在Azure上可以轻松搭建数据流和数据分析基础设施。由于Kafka Connect支持连接多个资源,因此可以实现数据的聚合,并且还可以轻松连接数据分析基础设施。

请参考以下信息。

我已仔细阅读了您提供的信息,并表示感激。

    • MySQLのTable更新情報をKafka上でksql処理する

 

    • Schema Registry について書いていく その1: Confluent Schema Registry

 

    Debezium によるチェンジデータキャプチャー

另外,我也参考了以下资讯。

    • Debezium MySQL Source Connector for Confluent Platform

 

    • Azure Database for MySQL でのサーバー パラメーター

 

    • 第2章 MySQL 用 Debezium コネクター

 

    • Debezium User Guide

 

    • Kafka Connect と Schema Registry の使用

 

    • Kafka Connect とコネクターのモニタリング

 

    • Kafka Connect のログ記録

 

    Azure Synapse Analytics Sink Connector for Confluent Platform
广告
将在 10 秒后关闭
bannerAds