将Salesforce的变更数据与Confluent(Kafka)实时连接的方法
首先
这篇文章介绍了如何利用Salesforce的Change Data Capture (CDC)功能和Confluent的Salesforce用连接器,将Salesforce上的数据更改即时同步到Confluent(Kafka)的主题。文章内容包括了确认Salesforce向主题中写入的消息的具体形式。
参考网站
-
- Salesforce的数据变更捕获(Change Data Capture,CDC)功能
- Confluent平台上用于Salesforce的连接器(Confluent平台上的Salesforce数据变更捕获源连接器)
在第二个网站上有一个”快速入门”的页面,我正在参考它尝试使用,但只按照页面上所写的操作并不能顺利进行。而且,对于初学者来说,从零开始尝试Salesforce的说明太过简洁且复杂。即使查看官方网页和海外网站,也无法准确获取所需的信息,因此我尝试了几次并进行了一些试错。
接下来,我整理了成功后的相关信息,并将其记录下来,希望能作为一种有成效的方法对其他人有所帮助。
版本信息
-
- OS:Red Hat Enterprise Linux release 8.4 (Ootpa)
-
- Confluent:Confluent Platform 7.2.1 (試用版・シングルノード)
- Connector:SalesforceCdcSourceConnector 2.0.12
步骤1. 注册Salesforce开发人员版
根据我的了解,Salesforce似乎提供了一个30天的免费试用账户和一个专门供开发者使用的免费账户(开发者版)。只有开发者版才能使用Salesforce的Change Data Capture(CDC)功能。
开发者版本的注册将按照此网站上提供的方法进行。
第二步:Salesforce端的設置。
重新设置安全令牌。
选择右上角的齿轮图标,然后选择“设置”。在“快速搜索”搜索框中输入“OAuth”,然后选择筛选结果中的“OAuth和OpenID Connect设置”。将“允许OAuth用户名密码流”从关闭状态改为开启状态。
※ 这个步骤在Confluent的快速入门指南中没有提到,但是它是必需的。如果不执行此步骤,即使在Confluent的连接器中正确设置了身份验证信息,也会出现身份验证错误。关于此设置,请参考以下网站。
新组织中默认阻止 OAuth 2.0 用户名密码流。
第三步骤:在Confluent中设置Connector。
安装用于 Confluent 平台的 Salesforce 变更数据捕获源连接器。
执行命令以进行安装。
confluent-hub install confluentinc/kafka-connect-salesforce:2.0.12
(2) 重新启动Kafka Connect。
confluent local services connect stop
confluent local services connect start
确认Salesforce已经添加到可用的连接器插件中。
curl -s localhost:8083/connector-plugins | jq
(4) 我们将创建一个连接器。
curl -X PUT localhost:8083/connectors/sfcdc-account-source/config -H "Content-Type: application/json" \
--data '{
"name":"sfcdc-account-source",
"connector.class":"io.confluent.salesforce.SalesforceCdcSourceConnector",
"tasks.max":"1",
"kafka.topic":"sfcdc-account-topic",
"salesforce.cdc.channel" : "AccountChangeEvent",
"salesforce.initial.start" : "all",
"salesforce.consumer.key" : "3MVG・・(省略)・・edqI",
"salesforce.consumer.secret" : "9195・・(省略)・・C8B6",
"salesforce.username" : "xxxxx@xxx.com(Salesforceログイン用メールアドレス)",
"salesforce.password" : "xxxxxxxxxxxxxx(Salesforceログイン用パスワード)",
"salesforce.password.token" : "T2L・・(省略)・・Iuj",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}' | jq
-
- nameはコネクター名。
-
- connector.classはコネクタープラグインのクラス名(上記固定)
-
- tasksはスレッド数。このコネクターは上限が1という仕様。
-
- kafka.topicはトピック名。
-
- salesforce.cdc.channelは⚪︎⚪︎⚪︎ChangeEventとなる。⚪︎⚪︎⚪︎の部分はSalesforce内でのエンティティ名が入る。取引先(Account)のデータ変更イベントであればAccountChangeEventとなる。
-
- salesforce.initial.startはコネクターの作成前にトピック内にある全てのメッセージをコンシューム対象にするか(all)、コネクター作成後からトピックに書き込まれたメッセージをコンシューム対象にするか(latest)。
-
- salesforce.consumer.key、salesforce.consumer.secret、salesforce.username、salesforce.password、salesforce.password.tokenはSalesforceおよびSalesforceの接続アプリケーションに接続するための認証情報。
-
- confluent.topic.bootstrap.serversはブローカーリスト。
- confluent.topic.replication.factorはトピックのレプリカ数。デフォルト値が「3」のため試用版のシングルノード構成のConfluentでは「1」を指定する必要がある。
获取连接器的状态信息
curl -s localhost:8083/connectors/sfcdc-account-source/status | jq
第四步,使用Salesforce添加、修改或删除账户。
启用对账户的更改数据捕获。
第五步:确认Confluent(Kafka)的主题和模式。
确认架构主题
curl -sX GET http://localhost:8081/subjects | jq . | grep sfcdc
"sfcdc-account-topic-value"
- スキーマサブジェクトは「トピック名-key」または「トピック名-value」になるが、Salesforce CDC+コネクターによってトピックに書き込まれるメッセージのスキーマには「トピック名-key」のサブジェクトはない。
确认模式
curl -sX GET http://localhost:8081/subjects/sfcdc-account-topic-value/versions/latest | jq .schema -r | jq
{
"type": "record",
"name": "AccountChangeEvent",
"namespace": "io.confluent.salesforce",
"fields": [
{
"name": "Id",
"type": {
"type": "string",
"connect.doc": "Unique identifier for the object."
}
},
{
"name": "ReplayId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ChangeEventHeader",
"type": {
"type": "record",
"name": "ChangeEventHeader",
"namespace": "",
"fields": [
{
"name": "entityName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "recordIds",
"type": [
"null",
{
"type": "array",
"items": "string"
}
],
"default": null
},
{
"name": "changeType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "changedFields",
"type": [
"null",
{
"type": "array",
"items": "string"
}
],
"default": null
},
{
"name": "changeOrigin",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "transactionKey",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "sequenceNumber",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "commitTimestamp",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "commitUser",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "commitNumber",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.name": "ChangeEventHeader"
}
},
{
"name": "Name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "LastName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "FirstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Salutation",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ParentId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingStreet",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingCity",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingState",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingPostalCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingCountry",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingLatitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "BillingLongitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "BillingGeocodeAccuracy",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "BillingAddress",
"type": [
"null",
{
"type": "record",
"name": "Address",
"fields": [
{
"name": "GeocodeAccuracy",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "State",
"type": [
"null",
{
"type": "string",
"connect.doc": ""
}
],
"default": null
},
{
"name": "Street",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "PostalCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Country",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Latitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "City",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Longitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "CountryCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "StateCode",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.confluent.salesforce.Address"
}
],
"default": null
},
{
"name": "ShippingStreet",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingCity",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingState",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingPostalCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingCountry",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingLatitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "ShippingLongitude",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "ShippingGeocodeAccuracy",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ShippingAddress",
"type": [
"null",
"Address"
],
"default": null
},
{
"name": "Phone",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Fax",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "AccountNumber",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Website",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Sic",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Industry",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "AnnualRevenue",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NumberOfEmployees",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "Ownership",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "TickerSymbol",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Description",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Rating",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Site",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "OwnerId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CreatedDate",
"type": [
"null",
{
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "CreatedById",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "LastModifiedDate",
"type": [
"null",
{
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "LastModifiedById",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Jigsaw",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "JigsawCompanyId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CleanStatus",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "AccountSource",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "DunsNumber",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Tradestyle",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NaicsCode",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NaicsDesc",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "YearStarted",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SicDesc",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "DandbCompanyId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "OperatingHoursId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "CustomerPriority__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SLA__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Active__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "NumberofLocations__c",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "UpsellOpportunity__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SLASerialNumber__c",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "SLAExpirationDate__c",
"type": [
"null",
{
"type": "int",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Date",
"logicalType": "date"
}
],
"default": null
},
{
"name": "_ObjectType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "_EventType",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.confluent.salesforce.AccountChangeEvent"
}
请确认消息。
ksql> print 'sfcdc-account-topic' from beginning;
{
"Id": "0015j000016nbijAAA",
"ReplayId": "9021830",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nbijAAA"
],
"changeType": "CREATE",
"changedFields": [],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004bf8f-a136-35e0-232f-9f62eee73abb",
"sequenceNumber": 1,
"commitTimestamp": 1691131295000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677598623316
},
"Name": "TestAccount1",
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": "Prospect",
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": null,
"Fax": null,
"AccountNumber": "11111",
"Website": null,
"Sic": null,
"Industry": "Banking",
"AnnualRevenue": null,
"NumberOfEmployees": null,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": "0055j0000099yTYAAY",
"CreatedDate": 1691131295000,
"CreatedById": "0055j0000099yTYAAY",
"LastModifiedDate": 1691131295000,
"LastModifiedById": "0055j0000099yTYAAY",
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": "Pending",
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
"Id": "0015j000016nzGtAAI",
"ReplayId": "9022402",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nzGtAAI"
],
"changeType": "CREATE",
"changedFields": [],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004c09f-20f4-8968-5df4-ab3738e95f28",
"sequenceNumber": 1,
"commitTimestamp": 1691132462000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677611261555
},
"Name": "TestAccount2",
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": null,
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": "090-1111-1111",
"Fax": null,
"AccountNumber": "22222",
"Website": null,
"Sic": null,
"Industry": "Chemicals",
"AnnualRevenue": null,
"NumberOfEmployees": 10000,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": "0055j0000099yTYAAY",
"CreatedDate": 1691132462000,
"CreatedById": "0055j0000099yTYAAY",
"LastModifiedDate": 1691132462000,
"LastModifiedById": "0055j0000099yTYAAY",
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": "Pending",
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
"Id": "0015j000016nzGtAAI",
"ReplayId": "9022436",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nzGtAAI"
],
"changeType": "UPDATE",
"changedFields": [
"Phone",
"LastModifiedDate"
],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004c0b9-17df-c835-9f02-7ed1af25dea9",
"sequenceNumber": 1,
"commitTimestamp": 1691132573000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677612525804
},
"Name": null,
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": null,
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": "090-2222-2222",
"Fax": null,
"AccountNumber": null,
"Website": null,
"Sic": null,
"Industry": null,
"AnnualRevenue": null,
"NumberOfEmployees": null,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": null,
"CreatedDate": null,
"CreatedById": null,
"LastModifiedDate": 1691132573000,
"LastModifiedById": null,
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": null,
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
"Id": "0015j000016nzGtAAI",
"ReplayId": "9022465",
"ChangeEventHeader": {
"entityName": "Account",
"recordIds": [
"0015j000016nzGtAAI"
],
"changeType": "DELETE",
"changedFields": [],
"changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
"transactionKey": "0004c0cb-a153-ef60-2081-04ff9923ff23",
"sequenceNumber": 1,
"commitTimestamp": 1691132653000,
"commitUser": "0055j0000099yTYAAY",
"commitNumber": 677613463020
},
"Name": null,
"LastName": null,
"FirstName": null,
"Salutation": null,
"Type": null,
"ParentId": null,
"BillingStreet": null,
"BillingCity": null,
"BillingState": null,
"BillingPostalCode": null,
"BillingCountry": null,
"BillingLatitude": null,
"BillingLongitude": null,
"BillingGeocodeAccuracy": null,
"BillingAddress": null,
"ShippingStreet": null,
"ShippingCity": null,
"ShippingState": null,
"ShippingPostalCode": null,
"ShippingCountry": null,
"ShippingLatitude": null,
"ShippingLongitude": null,
"ShippingGeocodeAccuracy": null,
"ShippingAddress": null,
"Phone": null,
"Fax": null,
"AccountNumber": null,
"Website": null,
"Sic": null,
"Industry": null,
"AnnualRevenue": null,
"NumberOfEmployees": null,
"Ownership": null,
"TickerSymbol": null,
"Description": null,
"Rating": null,
"Site": null,
"OwnerId": null,
"CreatedDate": null,
"CreatedById": null,
"LastModifiedDate": null,
"LastModifiedById": null,
"Jigsaw": null,
"JigsawCompanyId": null,
"CleanStatus": null,
"AccountSource": null,
"DunsNumber": null,
"Tradestyle": null,
"NaicsCode": null,
"NaicsDesc": null,
"YearStarted": null,
"SicDesc": null,
"DandbCompanyId": null,
"OperatingHoursId": null,
"CustomerPriority__c": null,
"SLA__c": null,
"Active__c": null,
"NumberofLocations__c": null,
"UpsellOpportunity__c": null,
"SLASerialNumber__c": null,
"SLAExpirationDate__c": null,
"_ObjectType": "AccountChangeEvent",
"_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
最后(分析总结)
-
- Salesforceからキーありでメッセージングできるかもしれませんが、自然体ではメッセージのスキーマにはキーがないようです。メッセージの使い勝手として、キーは必要だと思います。コネクターのSMT(Single Message Transforms)によってキーを付与するか、KsqlDBのクエリーによるバリュー項目のキー昇格を行う必要があります。
-
- 既存のテーブルにメッセージをシンクする場合、項目の取捨選択や型変換が必要になります。これはksqlDBで対応することができます。
-
- トピックからJDBCSinkConnectorでDBにデータ反映する場合は、changeTypeが”DELETE”のメッセージをTombstoneレコードに変換する必要があります。ksqlDBでこれを実施する方法を確認済みですがテクニックが要ります。(別途方法を投稿できたらと思います)
-
- Salesforceのアカウントに割り当てられたチェンジイベント発行数の上限がユースケースに見合ったものかどうかあらかじめ確認が必要です。
- 上記の検証はパブリックなSalesforceのアカウントでのトライでしたが、プライベートな環境のSalesforceに対する接続には別途検証が必要になります。