我尝试使用 Kubernetes 上的 Confluent for Kubernetes,在 AKS 上配置 Confluent Platform – 数据流处理版

总结

Confluent for Kubernetes(CFK)是一个云原生控制平面,用于在私有云环境(本例中为Azure Kubernetes Service(AKS))上部署和管理Confluent。它带有一个声明式API,用于定制、部署和管理Confluent平台的标准简单接口。

image.png
    • データストリーミングの流れは以下となります。

Pythonプログラムを利用して疑似データ生成し CosmosDB に書き込みます
CosmosDBに書き込まれたデータは CosmosDB Sink Connector を経由して、CP の topic001 にストリーミングされます
topic001 のストリーミングデータの必要なカラムデータのみ、クエリ処理のため stream001 に転送されます
stream001 のデータをベースに stream002 として「カラム – section : ‘C’,’E’,’F’,’W’ のデータのみ」の抽出処理されます
stream002 で抽出処理されたストリーミングデータを topic002 に転送します
topic002 のデータを読み込むPythonプログラムを作成し、AKS上のPodで稼働させ、抽出されたデータを確認します


本地环境

    • macOS Monterey 12.3.1

 

    • python 3.8.12

 

    • Azure CLI 2.34.1

 

    • helm v3.6.3

 

    kubectl v1.21.3

使用程序

制作方的程序

以下是用于《数据流程:1》的Python程序。
此程序将在本地Mac上执行。

import time
from datetime import date, datetime
import random
import json
import argparse
import string
from faker.factory import Factory
from azure.cosmos import CosmosClient

endpoint = 'https://iturucosmosdb01.documents.azure.com:443/'
key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
databaseName = 'CPDemoDB001'
containerName = 'container001'


# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# ダミーセクション?(大文字アルファベットを定義)
section = string.ascii_uppercase


# JSONダミーデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': str(datetime.now().timestamp()),  # id
            'ctid': i,                              # create id
            'proc': proc,                           # データ生成プロセス名
            'section': random.choice(section),      # IoT機器セクション
            'iot_num': fake.zipcode(),              # IoT機器番号
            'iot_state': fake.prefecture(),         # IoT設置場所
            'val_1': random.uniform(100, 200),      # IoT値−1
            'val_2': random.uniform(50, 90),        # IoT値−2
            'created_at': generate_time()           # データ生成時間
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime


# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# CosmosDBへの接続
def ConnectionToCosmosDB():
    try:
      client = CosmosClient(endpoint, key)
      db = client.get_database_client(databaseName)
      container = db.get_container_client(containerName)
      # print("\nConnection established\n")
      return 1, container
    except Exception as err:
      return 0, err


# メイン : ターミナル出力用
def tm_main(count, proc, wait):
    print('ターミナル 出力\n')

    # ダミーデータ生成
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    # ターミナル出力
    for item in json_dict['items']:
        print(item)
        time.sleep(wait)


# メイン : Show Database
def show_db_main():
    print('CosmosDB への接続確認')
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
    else :
      print(conn)


# メイン : Write Database
def write_db_main(count, proc, wait):
    print('データベースへのデータ書き込み')

    # DBへの接続
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
      return

    # ダミーデータ生成
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    ## プログレスバーで進捗状況を表示
    # データのインサート
    try:
      # for item in tqdm(json_dict['items']):
      for item in json_dict['items']:
        print(list(item.values()))
        conn.create_item(item)
        time.sleep(wait)
      print("\nInserted",count,"row(s) of data.")
      print("Done.")
    except Exception as err:
      print(err)
      return


# メイン : Read Database
def read_db_main():
    print('データベースからのデータ読み込み')

    # DBへの接続
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
      return

    # Read table data
    items = conn.read_all_items()

    # print all rows
    try:
      [print(f'{item}') for item in items]
      print("Done.")
    except Exception as err:
      print(err)


# メイン : Delete Database
def delete_db_main():
    print('データベースのデータ全削除')

    # DBへの接続
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
      return

    try:
      for item in conn.query_items(query='SELECT * FROM container001',enable_cross_partition_query=True):
          print(item['section'])
          conn.delete_item(item, partition_key=item['section'])
      print("Done.")
    except Exception as err:
      print(err)


# メイン
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='ダミーデータの自動生成からのDBへの書込み')
    parser.add_argument('--count', type=int, default=5, help='データ作成件数(デフォルト:5件)')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名(デフォルト:111)')
    parser.add_argument('--mode', type=str, default='tm', help='tm(Terminal(デフォルト))/ db(db操作)')
    parser.add_argument('--wait', type=float, default=1, help='データ生成間隔(デフォルト:1.0秒)')
    parser.add_argument('--db', type=str, default='show', help='show(デフォルト) / write / read / delete')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 'tm'): 
      tm_main(args.count, args.proc, args.wait)
    elif (args.mode == 'db'):
      if (args.db == 'show'):
        show_db_main()
      elif (args.db == 'write'):
        write_db_main(args.count, args.proc, args.wait)
      elif (args.db == 'read'):
        read_db_main()
      elif (args.db == 'delete'):
        delete_db_main()
    else :
      print("パラメータ設定を確認ください --help")

    making_time = time.time() - start

    print("")
    print("処理時間:{0}".format(making_time) + " [sec]")
    print("")

消费者方案

在上述的「数据流式传输流程:6」中使用的Python程序如下所示。
此程序的执行将在AKS环境中的独立启动的Pod内进行。

from kafka import KafkaConsumer

# データのターミナル出力
def topic_to_tm(consumer):
    print('ターミナル 出力')

    # Read data from kafka
    try :
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, 
                                                    message.key, message.value.decode('utf-8')))
    except KeyboardInterrupt :
        print('\r\n Output to Terminal - interrupted!')
        return

# Kafka Topic からのデータ受取
def get_kafka_topic():
    # Initialize consumer variable and set property for JSON decode
    consumer = KafkaConsumer ('topic002', bootstrap_servers = ['kafka.akscfk231.svc.cluster.local:9092'])
    print(consumer)
    return consumer


if __name__ == '__main__':
    consumer = get_kafka_topic()
    topic_to_tm(consumer)

创建用于消费者端工作的Pod。

我們創建一個執行Python程式的使用者端Pod(以下稱為Consumer-Pod)。

将Python程序注册到Configmap中。

指定Python程序文件并创建Configmap。

## Configmapへの登録
$ kubectl create configmap python-consumer002-configmap --from-file=topic002_consumer.py
configmap/python-consumer002-configmap created


## Configmapの確認
$ kubectl get configmap python-consumer002-configmap                            
NAME                           DATA   AGE
python-consumer002-configmap   1      32s


## Configmapの詳細確認
$ kubectl describe configmap python-consumer002-configmap
Name:         python-consumer002-configmap
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>

Data
====
topic002_consumer.py:
----
from kafka import KafkaConsumer

# データのターミナル出力
def topic_to_tm(consumer):
    print('ターミナル 出力')

    # Read data from kafka
    try :
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, 
                                                    message.key, message.value.decode('utf-8')))
    except KeyboardInterrupt :
        print('\r\n Output to Terminal - interrupted!')
        return

# Kafka Topic からのデータ受取
def get_kafka_topic():
    # Initialize consumer variable and set property for JSON decode
    consumer = KafkaConsumer ('topic002', bootstrap_servers = ['kafka.akscfk231.svc.cluster.local:9092'])
    print(consumer)
    return consumer


if __name__ == '__main__':
    consumer = get_kafka_topic()
    topic_to_tm(consumer)

Events:  <none>

消费者-Pod的启动

Pod的清单文件如下。

apiVersion: v1
kind: Pod
metadata:
  name: python-client2
  namespace: akscfk231
spec:
  containers:
  - name: python-client2
    image: python:3.7.13-slim
    command:
      - bash
      - -c
      - "exec tail -f /dev/null"
    volumeMounts:
      - mountPath: /pyturu
        name: python-consumer
  volumes:
  - name: python-consumer
    configMap:
      name: python-consumer002-configmap

启动Pod并进行必要的配置。

## Pod起動
$ kubectl apply -f python-client2.yaml                      
pod/python-client2 created


## Podへの接続
$ kubectl exec -it python-client2 -- /bin/bash
root@python-client2:/# 


## Pythonプログラムの確認
root@python-client2:/# cd /pyturu
root@python-client2:/pyturu# ls -l
total 0
lrwxrwxrwx 1 root root 27 Aug 18 08:53 topic002_consumer.py -> ..data/topic002_consumer.py


## 必要なPythonライブラリのインストール
root@python-client2:/# pip install kafka-python
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 3.6 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2

程序的执行和结果

首先,我们执行 Consumer 端的程序,并确保能够接收数据。然后,Producer 端生成数据,并实时根据设定的数据抽取条件进行数据处理,在 Consumer 端确认接收数据。

消费者方面

在连接到Consumer-Pod的状态下,按照以下方式执行程序,进入数据接收状态。

root@python-client2:/pyturu# python topic002_consumer.py 
<kafka.consumer.group.KafkaConsumer object at 0x7fe75aec91d0>
ターミナル 出力

生产者方

按照以下方式执行程序:每0.5秒生成30个数据。

$ python cosmosdb_IoTdummy.py --mode db --db write --wait 0.5 --count 30
データベースへのデータ書き込み
['1660834305.732015', 0, '111', 'O', '859-1514', '熊本県', 158.00226177654937, 50.97531387546896, '2022-08-18T23:51:45.732165']
['1660834305.732184', 1, '111', 'O', '075-8277', '茨城県', 170.42993612928407, 77.83170884785234, '2022-08-18T23:51:45.732221']
['1660834305.732227', 2, '111', 'E', '554-6753', '北海道', 148.92876929443668, 82.22632577434096, '2022-08-18T23:51:45.732255']
['1660834305.73226', 3, '111', 'B', '487-6201', '滋賀県', 112.52823513843735, 71.56402107831435, '2022-08-18T23:51:45.732286']
['1660834305.73229', 4, '111', 'N', '170-6136', '青森県', 119.9081080641646, 73.91280181768438, '2022-08-18T23:51:45.732314']
['1660834305.732319', 5, '111', 'H', '394-5687', '福井県', 185.08908349053524, 54.60720831400024, '2022-08-18T23:51:45.732343']
['1660834305.732347', 6, '111', 'O', '752-3195', '徳島県', 163.65979062146778, 68.80266444058988, '2022-08-18T23:51:45.732370']
['1660834305.732374', 7, '111', 'V', '313-9684', '鳥取県', 186.76982229974786, 71.43296034005864, '2022-08-18T23:51:45.732397']
['1660834305.732401', 8, '111', 'Q', '128-7149', '沖縄県', 110.07222152971069, 74.39862438101605, '2022-08-18T23:51:45.732425']
['1660834305.732429', 9, '111', 'R', '361-6022', '兵庫県', 166.9902488217956, 89.09971572382713, '2022-08-18T23:51:45.732452']
['1660834305.732456', 10, '111', 'Z', '098-6251', '三重県', 158.87197665118143, 81.01997557755357, '2022-08-18T23:51:45.732480']
['1660834305.732488', 11, '111', 'O', '743-4912', '愛知県', 168.45018805372104, 58.38400703727193, '2022-08-18T23:51:45.732513']
['1660834305.732517', 12, '111', 'N', '244-6470', '宮崎県', 140.53376931066012, 81.83117092131732, '2022-08-18T23:51:45.732540']
['1660834305.732544', 13, '111', 'U', '093-0428', '秋田県', 137.67206357275325, 82.49823923324556, '2022-08-18T23:51:45.732571']
['1660834305.732576', 14, '111', 'R', '918-1763', '鳥取県', 133.23228040733093, 79.4502829818707, '2022-08-18T23:51:45.732600']
['1660834305.732604', 15, '111', 'J', '845-5792', '栃木県', 190.33081017479677, 56.93608388242184, '2022-08-18T23:51:45.732628']
['1660834305.732632', 16, '111', 'H', '282-8691', '三重県', 135.48589527133828, 68.90330426502534, '2022-08-18T23:51:45.732655']
['1660834305.732659', 17, '111', 'R', '971-8072', '東京都', 128.7487058948935, 75.92087004159501, '2022-08-18T23:51:45.732682']
['1660834305.732686', 18, '111', 'C', '734-2744', '山梨県', 172.25052850462095, 53.00702358904964, '2022-08-18T23:51:45.732708']
['1660834305.732712', 19, '111', 'I', '006-3248', '鹿児島県', 131.41670420062965, 84.72459508340276, '2022-08-18T23:51:45.732734']
['1660834305.732738', 20, '111', 'W', '090-5446', '香川県', 190.06449809221584, 81.7200152031348, '2022-08-18T23:51:45.732761']
['1660834305.732765', 21, '111', 'G', '648-4463', '群馬県', 182.8725661666906, 76.47743809460934, '2022-08-18T23:51:45.732788']
['1660834305.732792', 22, '111', 'T', '911-9217', '兵庫県', 158.22348413535988, 64.60388246056716, '2022-08-18T23:51:45.732814']
['1660834305.732818', 23, '111', 'C', '891-9247', '北海道', 176.11284094147544, 75.70547358717785, '2022-08-18T23:51:45.732840']
['1660834305.732848', 24, '111', 'B', '060-1112', '北海道', 192.3259788508905, 79.94946793864301, '2022-08-18T23:51:45.732872']
['1660834305.732876', 25, '111', 'Z', '953-7789', '山梨県', 124.87882930311295, 72.23162082053496, '2022-08-18T23:51:45.732899']
['1660834305.732903', 26, '111', 'W', '364-3961', '福井県', 132.41009809155906, 65.1689165300587, '2022-08-18T23:51:45.732925']
['1660834305.732929', 27, '111', 'P', '785-6103', '福島県', 104.55147953912095, 66.21606634357327, '2022-08-18T23:51:45.732952']
['1660834305.732956', 28, '111', 'Y', '916-4663', '宮城県', 194.5279802822507, 64.50228129345219, '2022-08-18T23:51:45.732978']
['1660834305.732982', 29, '111', 'I', '904-0606', '新潟県', 179.6807544670632, 69.48480016692815, '2022-08-18T23:51:45.733005']

Inserted 30 row(s) of data.
Done.

処理時間:16.733893156051636 [sec]

检查消费者端的数据

在上述的生产者端的30个数据中,被提取出5个数据,其中第四项(section)为「C, E, F, W」,并传送到消费者端。

root@python-client2:/pyturu# python topic002_consumer.py
<kafka.consumer.group.KafkaConsumer object at 0x7f6d50f9e210>
ターミナル 出力
topic002:0:0: key=None value={"ID":"1660834305.732227","CTID":2,"SECTION":"E","IOT_STATE":"北海道","VAL_1":148.92876929443668,"VAL_2":82.22632577434096,"CREATED_AT":"2022-08-18T23:51:45.732255"}
topic002:0:1: key=None value={"ID":"1660834305.732686","CTID":18,"SECTION":"C","IOT_STATE":"山梨県","VAL_1":172.25052850462095,"VAL_2":53.00702358904964,"CREATED_AT":"2022-08-18T23:51:45.732708"}
topic002:0:2: key=None value={"ID":"1660834305.732738","CTID":20,"SECTION":"W","IOT_STATE":"香川県","VAL_1":190.06449809221584,"VAL_2":81.7200152031348,"CREATED_AT":"2022-08-18T23:51:45.732761"}
topic002:0:3: key=None value={"ID":"1660834305.732818","CTID":23,"SECTION":"C","IOT_STATE":"北海道","VAL_1":176.11284094147544,"VAL_2":75.70547358717785,"CREATED_AT":"2022-08-18T23:51:45.732840"}
topic002:0:4: key=None value={"ID":"1660834305.732903","CTID":26,"SECTION":"W","IOT_STATE":"福井県","VAL_1":132.41009809155906,"VAL_2":65.1689165300587,"CREATED_AT":"2022-08-18T23:51:45.732925"}

其他确认

查看流媒体信息

让我们检查两个流的状态。除了上述数据之外,我们还从Producer端添加了30条附加数据。

## ksql-client2 の Podに接続します
$ kubectl exec -it ksql-client2 -- /bin/bash
[appuser@ksql-client2 ~]$ 


## stream001 の状況
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream001 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM001
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : AVRO
Kafka topic          : topic001 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM001 (ID STRING, CTID BIGINT, SECTION STRING, IOT_STATE STRING, VAL_1 DOUBLE, VAL_2 DOUBLE, CREATED_AT STRING) WITH (KAFKA_TOPIC='topic001', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Sources that have a DROP constraint on this source
--------------------------------------------------
STREAM002

Queries that read from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Runtime statistics by host
-------------------------
 Host                                             | Metric                    | Value      | Last Message             
----------------------------------------------------------------------------------------------------------------------
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-messages-per-sec |          0 | 2022-08-18T15:15:46.583Z 
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-total-bytes      |      15781 | 2022-08-18T15:15:46.583Z 
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-total-messages   |         60 | 2022-08-18T15:15:46.583Z 
----------------------------------------------------------------------------------------------------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic001)


## stream002 の状況
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream002 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM002
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON_SR
Kafka topic          : topic002 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT *
FROM STREAM001 STREAM001
WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W'))
EMIT CHANGES;

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Runtime statistics by host
-------------------------
 Host                                             | Metric           | Value      | Last Message             
-------------------------------------------------------------------------------------------------------------
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | messages-per-sec |          0 | 2022-08-18T15:15:46.577Z 
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | total-messages   |         10 | 2022-08-18T15:15:46.577Z 
-------------------------------------------------------------------------------------------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic002)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-akscfk231.ksqldb_query_CSAS_STREAM002_1

Kafka topic          : topic001
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag 
------------------------------------------------------
 0         | 0            | 65         | 65     | 0   
------------------------------------------------------

确认Schme信息

从Confluent Control Center下载的自动生成的Schema信息通过两个主题生成,具体如下:

topic001的Schema信息会自动生成。

{
  "connect.name": "inferred_name__1785624689",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "ctid",
      "type": "long"
    },
    {
      "name": "proc",
      "type": "string"
    },
    {
      "name": "section",
      "type": "string"
    },
    {
      "name": "iot_num",
      "type": "string"
    },
    {
      "name": "iot_state",
      "type": "string"
    },
    {
      "name": "val_1",
      "type": "double"
    },
    {
      "name": "val_2",
      "type": "double"
    },
    {
      "name": "created_at",
      "type": "string"
    },
    {
      "name": "_rid",
      "type": "string"
    },
    {
      "name": "_self",
      "type": "string"
    },
    {
      "name": "_etag",
      "type": "string"
    },
    {
      "name": "_attachments",
      "type": "string"
    },
    {
      "name": "_ts",
      "type": "long"
    },
    {
      "name": "_lsn",
      "type": "long"
    }
  ],
  "name": "inferred_name__1785624689",
  "type": "record"
}

自动生成的topic002的 Schema信息。

{
  "properties": {
    "CREATED_AT": {
      "connect.index": 6,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "CTID": {
      "connect.index": 1,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "connect.type": "int64",
          "type": "integer"
        }
      ]
    },
    "ID": {
      "connect.index": 0,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "IOT_STATE": {
      "connect.index": 3,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "SECTION": {
      "connect.index": 2,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "VAL_1": {
      "connect.index": 4,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "connect.type": "float64",
          "type": "number"
        }
      ]
    },
    "VAL_2": {
      "connect.index": 5,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "connect.type": "float64",
          "type": "number"
        }
      ]
    }
  },
  "type": "object"
}

总结

通过使用 Confluent for Kubernetes(CFK),我在 Azure Kubernetes Service(AKS)上部署了 Confluent Platform,并成功验证了数据流的完整性,共进行了九次确认。

    • 第1回:#1 CP on AKS with CFK – 準備編

第2回:#2 CP on AKS with CFK – CCC認証編

第3回:#3 CP on AKS with CFK – Connector Plugin編

第4回:#4 CP on AKS with CFK – Topic作成編

第5回:#5 CP on AKS with CFK – Sink Connector作成編

第6回:#6 CP on AKS with CFK – Source Connector作成編

第7回:#7 CP on AKS with CFK – Simpleテスト編

第8回:#8 CP on AKS with CFK – KSQL Stream 作成編

第9回:#9 CP on AKS with CFK – データストリーミング編

通过使用CFK,可以使Confluent平台实现DevOps(基础设施即代码化)。如果有时间的话,我想尝试一下。

参考信息

我已经参考了您提供的信息。非常感谢。

値转换器和键转换器的构成
使用Kafka Connect与Schema Registry
Avro,Schema Registry入门
关于Schema Registry的文章 一:Confluent Schema Registry
Confluent平台备忘录-(3)Schema Registry简易测试

广告
将在 10 秒后关闭
bannerAds