我使用Confluent for Kubernetes在AKS上配置了Confluent Platform – 创建Sink Connector章节
总结
Confluent for Kubernetes (CFK) 是一个云原生的控制平面,用于在私有云环境中(本例中使用 Azure Kubernetes Service(AKS))部署和管理 Confluent。它具备声明式 API,提供了标准且简洁的接口,可用于定制、部署和管理 Confluent 平台。
使用CFK在AKS上部署的Confluent Platform上创建Sink Connector的工作流概述如下:
-
- 准备 Kubernetes 环境(在预先准备中完成)
-
- 部署 Confluent for Kubernetes(在预先准备中完成)
-
- 部署 Confluent Platform(包括所需的连接器插件)(在预先准备中完成)
-
- 配置额外的 Confluent Platform 设置
- 进行额外的 Confluent Platform 部署(创建 Sink Connector)
本地环境
-
- macOS Monterey 12.3.1
-
- python 3.8.12
-
- Azure CLI 2.34.1
-
- helm v3.6.3
- kubectl v1.21.3
事前准备
-
- 请执行本文,确保AKS集群环境已经建立起来。
请执行本文,在AKS上部署Confluent Platform,并创建Topic。
请执行本文,确保Azure上已经建立了CosmosDB。
Sink CosmosDB 設定項目値Endpointhttps://iturucosmosdb01.documents.azure.com:443/CosmosDB アカウント名iturucosmosdb01データベース名CPDemoDB002コンテナ名container002パーティションsection
Confluent平台的附加配置
获取CosmosDB的访问密钥。
使用Terraform在工作目錄中構建CosmosDB,執行以下操作以獲取CosmosDB的訪問密鑰。
$ terraform output cosmosdb_account_key
"NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
更改Confluent Platform的配置
由于使用基本身份验证秘钥进行连接的配置更改目前无法正常运作(可能是我的能力不足),所以将以无认证的配置方式进行实施。
下沉连接器创建所需的自定义资源定义
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: cosmosdb-sink-connector002
namespace: akscfk231
spec:
class: "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector"
taskMax: 3
connectClusterRef:
name: connect
configs:
topics: "topic002"
key.converter: "io.confluent.connect.avro.AvroConverter"
key.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
value.converter: "io.confluent.connect.avro.AvroConverter"
value.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
connect.cosmos.connection.endpoint: "https://iturucosmosdb01.documents.azure.com:443/"
connect.cosmos.master.key: "NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
connect.cosmos.databasename: "CPDemoDB002"
connect.cosmos.containers.topicmap: "topic002#container002"
restartPolicy:
type: OnFailure
maxRetry: 10
将Confluent Platform添加至部署中
创建到Confluent平台的连接器
从自定义资源创建Cosmosdb Sink连接器。
$ kubectl apply -f kafka_sink_connector.yaml
确认已部署的Confluent平台资源。
$ kubectl get connect
NAME REPLICAS READY STATUS AGE
connect 1 1 RUNNING 15s
$ kubectl get connector
NAME STATUS CONNECTORSTATUS TASKS-READY AGE
cosmosdb-sink-connector002 CREATED RUNNING 3/3 30s
## connector の詳細確認
$ kubectl describe connector cosmosdb-sink-connector002
Name: cosmosdb-sink-connector002
Namespace: akscfk231
Labels: <none>
Annotations: <none>
API Version: platform.confluent.io/v1beta1
Kind: Connector
Metadata:
Creation Timestamp: 2022-07-07T06:04:19Z
Finalizers:
connector.finalizers.platform.confluent.io
Generation: 1
Managed Fields:
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:class:
f:configs:
.:
f:connect.cosmos.connection.endpoint:
f:connect.cosmos.containers.topicmap:
f:connect.cosmos.databasename:
f:connect.cosmos.master.key:
f:key.converter:
f:key.converter.schema.registry.url:
f:topics:
f:value.converter:
f:value.converter.schema.registry.url:
f:connectClusterRef:
.:
f:name:
f:restartPolicy:
.:
f:maxRetry:
f:type:
f:taskMax:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-07-07T06:04:19Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:finalizers:
.:
v:"connector.finalizers.platform.confluent.io":
f:ownerReferences:
.:
k:{"uid":"e188f675-166f-4254-8041-25e1b3d540c9"}:
Manager: manager
Operation: Update
Time: 2022-07-07T06:04:20Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:status:
f:connectorState:
f:tasksReady:
f:workerID:
Manager: manager
Operation: Update
Subresource: status
Time: 2022-07-07T06:04:35Z
Owner References:
API Version: platform.confluent.io/v1beta1
Block Owner Deletion: true
Controller: true
Kind: Connect
Name: connect
UID: e188f675-166f-4254-8041-25e1b3d540c9
Resource Version: 8863
UID: 0fd26042-d4df-4119-8762-82aaa63620e5
Spec:
Class: com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
Configs:
connect.cosmos.connection.endpoint: https://iturucosmosdb01.documents.azure.com:443/
connect.cosmos.containers.topicmap: topic002#container002
connect.cosmos.databasename: CPDemoDB002
connect.cosmos.master.key: 3dL10prVb08owex6MPFwhpccCIc8HpQUofwdvTw6GSQSm772AEZYmPnFd1gosedA45XsrdxnT7IEikBxahJKOA==
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schemaregistry.akscfk231.svc.cluster.local:8081
Topics: topic002
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schemaregistry.akscfk231.svc.cluster.local:8081
Connect Cluster Ref:
Name: connect
Restart Policy:
Max Retry: 10
Type: OnFailure
Task Max: 3
Status:
Connect Rest Endpoint: http://connect.akscfk231.svc.cluster.local:8083
Connector State: RUNNING
Kafka Cluster ID: brE6ta3QQ36Annph22sjfw
Restart Policy:
Max Retry: 10
Type: OnFailure
State: CREATED
Tasks Ready: 3/3
Worker ID: connect-0.connect.akscfk231.svc.cluster.local:8083
Events: <none>
此方法用於確認連接件的結構。
连接器确认
为了确认,将连接重定向到本地主机上。
$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
※ CTRL+C で終了できます
请打开另一个终端,然后查看 Connector 的状态。
$ curl localhost:8083/connectors
["cosmosdb-sink-connector002"]
$ curl localhost:8083/connectors/cosmosdb-sink-connector002/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 434 100 434 0 0 2170 0 --:--:-- --:--:-- --:--:-- 2180
{
"name": "cosmosdb-sink-connector002",
"connector": {
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
}
],
"type": "sink"
}
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | cosmosdb-sink-connector002 | RUNNING | RUNNING | RUNNING | RUNNING | com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
在Confluent Control Center上验证
$ kubectl confluent dashboard controlcenter
http://localhost:9021
整理工作
Pod/ secret/ namespace 的卸载方法
## Pod : confluent-operator
$ helm delete confluent-operator
## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_sink_connector.yaml
## secret情報
$ kubectl delete secret connector-credential
## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231
AKS集群的停止和启动
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
检查事件中是否有任何错误信息。
kubectl get events
总结
我确认了Confluent Platform所需的连接器“CosmosDB Sink Connector”可以进行配置。但是,当数据流入时,还需要调整数据转换方法等。
请提供相关信息
我参考了以下信息。
Kubernetes 上的 Confluent
管理连接器
confluent-kubernetes-examples/connector/datagen-source-connector/
连接器
连接
Azure Cosmos DB 的 Kafka Connect – Sink 连接器
Azure Cosmos DB 的 Kafka Connect – Source 连接器