我在AKS上使用Confluent for Kubernetes来配置Confluent Platform——源连接器创建部分

概述

Confluent for Kubernetes(CFK)是一个基于云原生的控制平台,用于在私有云环境(本例中为Azure Kubernetes Service(AKS))上部署和管理Confluent。它具备声明式API,提供了标准且简单的接口,用于定制、部署和管理Confluent平台。

使用CFK,在AKS上部署的Confluent Platform,创建源头连接器的工作流大致如下:

    1. 准备 Kubernetes 环境(在预先准备的情况下完成)

 

    1. 部署 Confluent for Kubernetes(在预先准备的情况下完成)

 

    1. 部署 Confluent Platform(包括必要的 Connector Plugin)(在预先准备的情况下完成)

 

    1. 进行 Confluent Platform 的附加配置

 

    进行 Confluent Platform 的附加部署(创建 Source Connector)
image.png

本地环境

    • macOS Monterey 12.3.1

 

    • python 3.8.12

 

    • Azure CLI 2.34.1

 

    • helm v3.6.3

 

    kubectl v1.21.3

事先准备

    1. 执行本文,确保已建立 AKS 集群环境。

执行本文,确保已在 AKS 上部署 Confluent Platform,并创建了 Sink Connector。

编辑并执行本文,确保已在 Azure 上为 Source 新增定义了 CosmosDB 等数据库,并且已经建立。

Sink CosmosDB 設定項目値Endpointhttps://iturucosmosdb01.documents.azure.com:443/CosmosDB アカウント名iturucosmosdb01データベース名CPDemoDB002コンテナ名container002パーティションsection
Source CosmosDB 設定項目値Endpointhttps://iturucosmosdb01.documents.azure.com:443/CosmosDB アカウント名iturucosmosdb01データベース名CPDemoDB001コンテナ名container001パーティションsection

Confluent 平台的额外配置

获取 CosmosDB 的访问密钥

使用Terraform在操作目录中构建CosmosDB,并执行以下命令获取CosmosDB的访问密钥。

$ terraform output cosmosdb_account_key
"NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="

Confluent Platform 的配置更改

因为对于使用Basic身份验证密钥进行连接的配置更改目前无法正常工作(可能是我能力不足的原因),所以我将使用无身份验证的配置进行操作。

创建用于Source Connector的自定义资源定义。

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: cosmosdb-source-connector001
  namespace: akscfk231
spec:
  class: "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector"
  taskMax: 3
  connectClusterRef:
    name: connect
  configs:
    topics: "topic001"
    key.converter: "io.confluent.connect.avro.AvroConverter"
    key.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
    value.converter: "io.confluent.connect.avro.AvroConverter"
    value.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
    connect.cosmos.connection.endpoint: "https://iturucosmosdb01.documents.azure.com:443/"
    connect.cosmos.master.key: "NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
    connect.cosmos.databasename: "CPDemoDB001"
    connect.cosmos.containers.topicmap: "topic001#container001"
  restartPolicy:
    type: OnFailure
    maxRetry: 10

部署到Confluent平台

创建针对Confluent Platform的Connector

通过自定义资源创建Cosmosdb源连接器。

$ kubectl apply -f kafka_source_connector.yaml

确认已部署的Confluent Platform资源

$ kubectl get connect  
NAME      REPLICAS   READY   STATUS    AGE
connect   1          1       RUNNING   22m

$ kubectl get connector
NAME                           STATUS    CONNECTORSTATUS   TASKS-READY   AGE
cosmosdb-sink-connector002     CREATED   RUNNING           3/3           8m24s
cosmosdb-source-connector001   CREATED   RUNNING           3/3           33s


## connector の詳細確認
$ kubectl describe connector cosmosdb-source-connector001
Name:         cosmosdb-source-connector001
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>
API Version:  platform.confluent.io/v1beta1
Kind:         Connector
Metadata:
  Creation Timestamp:  2022-07-07T06:12:10Z
  Finalizers:
    connector.finalizers.platform.confluent.io
  Generation:  1
  Managed Fields:
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:class:
        f:configs:
          .:
          f:connect.cosmos.connection.endpoint:
          f:connect.cosmos.containers.topicmap:
          f:connect.cosmos.databasename:
          f:connect.cosmos.master.key:
          f:key.converter:
          f:key.converter.schema.registry.url:
          f:topics:
          f:value.converter:
          f:value.converter.schema.registry.url:
        f:connectClusterRef:
          .:
          f:name:
        f:restartPolicy:
          .:
          f:maxRetry:
          f:type:
        f:taskMax:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-07-07T06:12:10Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"connector.finalizers.platform.confluent.io":
        f:ownerReferences:
          .:
          k:{"uid":"e188f675-166f-4254-8041-25e1b3d540c9"}:
    Manager:      manager
    Operation:    Update
    Time:         2022-07-07T06:12:10Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:status:
        f:connectorState:
        f:tasksReady:
        f:workerID:
    Manager:      manager
    Operation:    Update
    Subresource:  status
    Time:         2022-07-07T06:12:40Z
  Owner References:
    API Version:           platform.confluent.io/v1beta1
    Block Owner Deletion:  true
    Controller:            true
    Kind:                  Connect
    Name:                  connect
    UID:                   e188f675-166f-4254-8041-25e1b3d540c9
  Resource Version:        11227
  UID:                     6ef6bd16-d713-4e39-9b60-a7cdeac08502
Spec:
  Class:  com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector
  Configs:
    connect.cosmos.connection.endpoint:   https://iturucosmosdb01.documents.azure.com:443/
    connect.cosmos.containers.topicmap:   topic001#container001
    connect.cosmos.databasename:          CPDemoDB001
    connect.cosmos.master.key:            3dL10prVb08owex6MPFwhpccCIc8HpQUofwdvTw6GSQSm772AEZYmPnFd1gosedA45XsrdxnT7IEikBxahJKOA==
    key.converter:                        io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url:    http://schemaregistry.akscfk231.svc.cluster.local:8081
    Topics:                               topic001
    value.converter:                      io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url:  http://schemaregistry.akscfk231.svc.cluster.local:8081
  Connect Cluster Ref:
    Name:  connect
  Restart Policy:
    Max Retry:  10
    Type:       OnFailure
  Task Max:     3
Status:
  Connect Rest Endpoint:  http://connect.akscfk231.svc.cluster.local:8083
  Connector State:        RUNNING
  Kafka Cluster ID:       brE6ta3QQ36Annph22sjfw
  Restart Policy:
    Max Retry:  10
    Type:       OnFailure
  State:        CREATED
  Tasks Ready:  3/3
  Worker ID:    connect-0.connect.akscfk231.svc.cluster.local:8083
Events:         <none>

确认构建连接器。

连接器的确认

为了确认,将连接重定向到本地主机。

$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

打开另一个终端,检查连接器的状态。

$ curl localhost:8083/connectors                                 
["cosmosdb-source-connector001","cosmosdb-sink-connector002"]


$ curl localhost:8083/connectors/cosmosdb-source-connector001/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   438  100   438    0     0   2327      0 --:--:-- --:--:-- --:--:--  2329
{
  "name": "cosmosdb-source-connector001",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    },
    {
      "id": 2,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    }
  ],
  "type": "source"
}


$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
          jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
          column -s : -t| sed 's/\"//g'| sort
sink    |  cosmosdb-sink-connector002    |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
source  |  cosmosdb-source-connector001  |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector

在Confluent Control Center上进行确认

$ kubectl confluent dashboard controlcenter
http://localhost:9021
スクリーンショット 2022-07-07 15.42.24.png

善後工作

Pod / secret / namespace 的卸载方法

## Pod : confluent-operator
$ helm delete confluent-operator             

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_sink_connector.yaml
$ kubectl delete -f kafka_source_connector.yaml

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

AKS 群集的停止和启动

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

在事件中检查是否有任何错误消息。

kubectl get events

概括

我已经确认能够配置Confluent Platform所需的Connector“CosmosDB源连接器”。然而,当数据流入时,可能需要再次调整数据转换的方法等。

请看以下信息。

我参考了下面的信息。

Confluent适用于Kubernetes
管理连接器
confluent-kubernetes-examples/connector/datagen-source-connector/
连接器
连接
Azure Cosmos DB使用Kafka Connect – Sink连接器
Azure Cosmos DB使用Kafka Connect – Source连接器
基本身份验证

广告
将在 10 秒后关闭
bannerAds