我在AKS上使用Confluent for Kubernetes来配置Confluent Platform——源连接器创建部分
概述
Confluent for Kubernetes(CFK)是一个基于云原生的控制平台,用于在私有云环境(本例中为Azure Kubernetes Service(AKS))上部署和管理Confluent。它具备声明式API,提供了标准且简单的接口,用于定制、部署和管理Confluent平台。
使用CFK,在AKS上部署的Confluent Platform,创建源头连接器的工作流大致如下:
-
- 准备 Kubernetes 环境(在预先准备的情况下完成)
-
- 部署 Confluent for Kubernetes(在预先准备的情况下完成)
-
- 部署 Confluent Platform(包括必要的 Connector Plugin)(在预先准备的情况下完成)
-
- 进行 Confluent Platform 的附加配置
- 进行 Confluent Platform 的附加部署(创建 Source Connector)
本地环境
-
- macOS Monterey 12.3.1
-
- python 3.8.12
-
- Azure CLI 2.34.1
-
- helm v3.6.3
- kubectl v1.21.3
事先准备
-
- 执行本文,确保已建立 AKS 集群环境。
执行本文,确保已在 AKS 上部署 Confluent Platform,并创建了 Sink Connector。
编辑并执行本文,确保已在 Azure 上为 Source 新增定义了 CosmosDB 等数据库,并且已经建立。
Confluent 平台的额外配置
获取 CosmosDB 的访问密钥
使用Terraform在操作目录中构建CosmosDB,并执行以下命令获取CosmosDB的访问密钥。
$ terraform output cosmosdb_account_key
"NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
Confluent Platform 的配置更改
因为对于使用Basic身份验证密钥进行连接的配置更改目前无法正常工作(可能是我能力不足的原因),所以我将使用无身份验证的配置进行操作。
创建用于Source Connector的自定义资源定义。
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: cosmosdb-source-connector001
namespace: akscfk231
spec:
class: "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector"
taskMax: 3
connectClusterRef:
name: connect
configs:
topics: "topic001"
key.converter: "io.confluent.connect.avro.AvroConverter"
key.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
value.converter: "io.confluent.connect.avro.AvroConverter"
value.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
connect.cosmos.connection.endpoint: "https://iturucosmosdb01.documents.azure.com:443/"
connect.cosmos.master.key: "NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
connect.cosmos.databasename: "CPDemoDB001"
connect.cosmos.containers.topicmap: "topic001#container001"
restartPolicy:
type: OnFailure
maxRetry: 10
部署到Confluent平台
创建针对Confluent Platform的Connector
通过自定义资源创建Cosmosdb源连接器。
$ kubectl apply -f kafka_source_connector.yaml
确认已部署的Confluent Platform资源
$ kubectl get connect
NAME REPLICAS READY STATUS AGE
connect 1 1 RUNNING 22m
$ kubectl get connector
NAME STATUS CONNECTORSTATUS TASKS-READY AGE
cosmosdb-sink-connector002 CREATED RUNNING 3/3 8m24s
cosmosdb-source-connector001 CREATED RUNNING 3/3 33s
## connector の詳細確認
$ kubectl describe connector cosmosdb-source-connector001
Name: cosmosdb-source-connector001
Namespace: akscfk231
Labels: <none>
Annotations: <none>
API Version: platform.confluent.io/v1beta1
Kind: Connector
Metadata:
Creation Timestamp: 2022-07-07T06:12:10Z
Finalizers:
connector.finalizers.platform.confluent.io
Generation: 1
Managed Fields:
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:class:
f:configs:
.:
f:connect.cosmos.connection.endpoint:
f:connect.cosmos.containers.topicmap:
f:connect.cosmos.databasename:
f:connect.cosmos.master.key:
f:key.converter:
f:key.converter.schema.registry.url:
f:topics:
f:value.converter:
f:value.converter.schema.registry.url:
f:connectClusterRef:
.:
f:name:
f:restartPolicy:
.:
f:maxRetry:
f:type:
f:taskMax:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-07-07T06:12:10Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:finalizers:
.:
v:"connector.finalizers.platform.confluent.io":
f:ownerReferences:
.:
k:{"uid":"e188f675-166f-4254-8041-25e1b3d540c9"}:
Manager: manager
Operation: Update
Time: 2022-07-07T06:12:10Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:status:
f:connectorState:
f:tasksReady:
f:workerID:
Manager: manager
Operation: Update
Subresource: status
Time: 2022-07-07T06:12:40Z
Owner References:
API Version: platform.confluent.io/v1beta1
Block Owner Deletion: true
Controller: true
Kind: Connect
Name: connect
UID: e188f675-166f-4254-8041-25e1b3d540c9
Resource Version: 11227
UID: 6ef6bd16-d713-4e39-9b60-a7cdeac08502
Spec:
Class: com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector
Configs:
connect.cosmos.connection.endpoint: https://iturucosmosdb01.documents.azure.com:443/
connect.cosmos.containers.topicmap: topic001#container001
connect.cosmos.databasename: CPDemoDB001
connect.cosmos.master.key: 3dL10prVb08owex6MPFwhpccCIc8HpQUofwdvTw6GSQSm772AEZYmPnFd1gosedA45XsrdxnT7IEikBxahJKOA==
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schemaregistry.akscfk231.svc.cluster.local:8081
Topics: topic001
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schemaregistry.akscfk231.svc.cluster.local:8081
Connect Cluster Ref:
Name: connect
Restart Policy:
Max Retry: 10
Type: OnFailure
Task Max: 3
Status:
Connect Rest Endpoint: http://connect.akscfk231.svc.cluster.local:8083
Connector State: RUNNING
Kafka Cluster ID: brE6ta3QQ36Annph22sjfw
Restart Policy:
Max Retry: 10
Type: OnFailure
State: CREATED
Tasks Ready: 3/3
Worker ID: connect-0.connect.akscfk231.svc.cluster.local:8083
Events: <none>
确认构建连接器。
连接器的确认
为了确认,将连接重定向到本地主机。
$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
※ CTRL+C で終了できます
打开另一个终端,检查连接器的状态。
$ curl localhost:8083/connectors
["cosmosdb-source-connector001","cosmosdb-sink-connector002"]
$ curl localhost:8083/connectors/cosmosdb-source-connector001/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 438 100 438 0 0 2327 0 --:--:-- --:--:-- --:--:-- 2329
{
"name": "cosmosdb-source-connector001",
"connector": {
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
}
],
"type": "source"
}
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | cosmosdb-sink-connector002 | RUNNING | RUNNING | RUNNING | RUNNING | com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
source | cosmosdb-source-connector001 | RUNNING | RUNNING | RUNNING | RUNNING | com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector
在Confluent Control Center上进行确认
$ kubectl confluent dashboard controlcenter
http://localhost:9021
善後工作
Pod / secret / namespace 的卸载方法
## Pod : confluent-operator
$ helm delete confluent-operator
## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_sink_connector.yaml
$ kubectl delete -f kafka_source_connector.yaml
## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231
AKS 群集的停止和启动
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
在事件中检查是否有任何错误消息。
kubectl get events
概括
我已经确认能够配置Confluent Platform所需的Connector“CosmosDB源连接器”。然而,当数据流入时,可能需要再次调整数据转换的方法等。
请看以下信息。
我参考了下面的信息。
Confluent适用于Kubernetes
管理连接器
confluent-kubernetes-examples/connector/datagen-source-connector/
连接器
连接
Azure Cosmos DB使用Kafka Connect – Sink连接器
Azure Cosmos DB使用Kafka Connect – Source连接器
基本身份验证