我使用Confluent for Kubernetes在AKS上配置了Confluent Platform – 创建Sink Connector章节

总结

Confluent for Kubernetes (CFK) 是一个云原生的控制平面,用于在私有云环境中(本例中使用 Azure Kubernetes Service(AKS))部署和管理 Confluent。它具备声明式 API,提供了标准且简洁的接口,可用于定制、部署和管理 Confluent 平台。

使用CFK在AKS上部署的Confluent Platform上创建Sink Connector的工作流概述如下:

    1. 准备 Kubernetes 环境(在预先准备中完成)

 

    1. 部署 Confluent for Kubernetes(在预先准备中完成)

 

    1. 部署 Confluent Platform(包括所需的连接器插件)(在预先准备中完成)

 

    1. 配置额外的 Confluent Platform 设置

 

    进行额外的 Confluent Platform 部署(创建 Sink Connector)
image.png

本地环境

    • macOS Monterey 12.3.1

 

    • python 3.8.12

 

    • Azure CLI 2.34.1

 

    • helm v3.6.3

 

    kubectl v1.21.3

事前准备

    1. 请执行本文,确保AKS集群环境已经建立起来。

请执行本文,在AKS上部署Confluent Platform,并创建Topic。

请执行本文,确保Azure上已经建立了CosmosDB。

Sink CosmosDB 設定項目値Endpointhttps://iturucosmosdb01.documents.azure.com:443/CosmosDB アカウント名iturucosmosdb01データベース名CPDemoDB002コンテナ名container002パーティションsection

Confluent平台的附加配置

获取CosmosDB的访问密钥。

使用Terraform在工作目錄中構建CosmosDB,執行以下操作以獲取CosmosDB的訪問密鑰。

$ terraform output cosmosdb_account_key
"NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="

更改Confluent Platform的配置

由于使用基本身份验证秘钥进行连接的配置更改目前无法正常运作(可能是我的能力不足),所以将以无认证的配置方式进行实施。

下沉连接器创建所需的自定义资源定义

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: cosmosdb-sink-connector002
  namespace: akscfk231
spec:
  class: "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector"
  taskMax: 3
  connectClusterRef:
    name: connect
  configs:
    topics: "topic002"
    key.converter: "io.confluent.connect.avro.AvroConverter"
    key.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
    value.converter: "io.confluent.connect.avro.AvroConverter"
    value.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
    connect.cosmos.connection.endpoint: "https://iturucosmosdb01.documents.azure.com:443/"
    connect.cosmos.master.key: "NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
    connect.cosmos.databasename: "CPDemoDB002"
    connect.cosmos.containers.topicmap: "topic002#container002"
  restartPolicy:
    type: OnFailure
    maxRetry: 10

将Confluent Platform添加至部署中

创建到Confluent平台的连接器

从自定义资源创建Cosmosdb Sink连接器。

$ kubectl apply -f kafka_sink_connector.yaml

确认已部署的Confluent平台资源。

$ kubectl get connect  
NAME      REPLICAS   READY   STATUS    AGE
connect   1          1       RUNNING   15s

$ kubectl get connector
NAME                         STATUS    CONNECTORSTATUS   TASKS-READY   AGE
cosmosdb-sink-connector002   CREATED   RUNNING           3/3           30s


## connector の詳細確認
$ kubectl describe connector cosmosdb-sink-connector002
Name:         cosmosdb-sink-connector002
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>
API Version:  platform.confluent.io/v1beta1
Kind:         Connector
Metadata:
  Creation Timestamp:  2022-07-07T06:04:19Z
  Finalizers:
    connector.finalizers.platform.confluent.io
  Generation:  1
  Managed Fields:
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:class:
        f:configs:
          .:
          f:connect.cosmos.connection.endpoint:
          f:connect.cosmos.containers.topicmap:
          f:connect.cosmos.databasename:
          f:connect.cosmos.master.key:
          f:key.converter:
          f:key.converter.schema.registry.url:
          f:topics:
          f:value.converter:
          f:value.converter.schema.registry.url:
        f:connectClusterRef:
          .:
          f:name:
        f:restartPolicy:
          .:
          f:maxRetry:
          f:type:
        f:taskMax:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-07-07T06:04:19Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"connector.finalizers.platform.confluent.io":
        f:ownerReferences:
          .:
          k:{"uid":"e188f675-166f-4254-8041-25e1b3d540c9"}:
    Manager:      manager
    Operation:    Update
    Time:         2022-07-07T06:04:20Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:status:
        f:connectorState:
        f:tasksReady:
        f:workerID:
    Manager:      manager
    Operation:    Update
    Subresource:  status
    Time:         2022-07-07T06:04:35Z
  Owner References:
    API Version:           platform.confluent.io/v1beta1
    Block Owner Deletion:  true
    Controller:            true
    Kind:                  Connect
    Name:                  connect
    UID:                   e188f675-166f-4254-8041-25e1b3d540c9
  Resource Version:        8863
  UID:                     0fd26042-d4df-4119-8762-82aaa63620e5
Spec:
  Class:  com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
  Configs:
    connect.cosmos.connection.endpoint:   https://iturucosmosdb01.documents.azure.com:443/
    connect.cosmos.containers.topicmap:   topic002#container002
    connect.cosmos.databasename:          CPDemoDB002
    connect.cosmos.master.key:            3dL10prVb08owex6MPFwhpccCIc8HpQUofwdvTw6GSQSm772AEZYmPnFd1gosedA45XsrdxnT7IEikBxahJKOA==
    key.converter:                        io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url:    http://schemaregistry.akscfk231.svc.cluster.local:8081
    Topics:                               topic002
    value.converter:                      io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url:  http://schemaregistry.akscfk231.svc.cluster.local:8081
  Connect Cluster Ref:
    Name:  connect
  Restart Policy:
    Max Retry:  10
    Type:       OnFailure
  Task Max:     3
Status:
  Connect Rest Endpoint:  http://connect.akscfk231.svc.cluster.local:8083
  Connector State:        RUNNING
  Kafka Cluster ID:       brE6ta3QQ36Annph22sjfw
  Restart Policy:
    Max Retry:  10
    Type:       OnFailure
  State:        CREATED
  Tasks Ready:  3/3
  Worker ID:    connect-0.connect.akscfk231.svc.cluster.local:8083
Events:         <none>

此方法用於確認連接件的結構。

连接器确认

为了确认,将连接重定向到本地主机上。

$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

请打开另一个终端,然后查看 Connector 的状态。

$ curl localhost:8083/connectors                                 
["cosmosdb-sink-connector002"]


$ curl localhost:8083/connectors/cosmosdb-sink-connector002/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   434  100   434    0     0   2170      0 --:--:-- --:--:-- --:--:--  2180
{
  "name": "cosmosdb-sink-connector002",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    },
    {
      "id": 2,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    }
  ],
  "type": "sink"
}


$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
          jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
          column -s : -t| sed 's/\"//g'| sort
sink  |  cosmosdb-sink-connector002  |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector

在Confluent Control Center上验证

$ kubectl confluent dashboard controlcenter
http://localhost:9021
スクリーンショット 2022-06-29 17.34.10.png

整理工作

Pod/ secret/ namespace 的卸载方法

## Pod : confluent-operator
$ helm delete confluent-operator             

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_sink_connector.yaml

## secret情報
$ kubectl delete secret connector-credential

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

AKS集群的停止和启动

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

检查事件中是否有任何错误信息。

kubectl get events

总结

我确认了Confluent Platform所需的连接器“CosmosDB Sink Connector”可以进行配置。但是,当数据流入时,还需要调整数据转换方法等。

请提供相关信息

我参考了以下信息。

Kubernetes 上的 Confluent
管理连接器
confluent-kubernetes-examples/connector/datagen-source-connector/
连接器
连接
Azure Cosmos DB 的 Kafka Connect – Sink 连接器
Azure Cosmos DB 的 Kafka Connect – Source 连接器

广告
将在 10 秒后关闭
bannerAds