将Salesforce的变更数据与Confluent(Kafka)实时连接的方法

首先

这篇文章介绍了如何利用Salesforce的Change Data Capture (CDC)功能和Confluent的Salesforce用连接器,将Salesforce上的数据更改即时同步到Confluent(Kafka)的主题。文章内容包括了确认Salesforce向主题中写入的消息的具体形式。

参考网站

    1. Salesforce的数据变更捕获(Change Data Capture,CDC)功能

 

    Confluent平台上用于Salesforce的连接器(Confluent平台上的Salesforce数据变更捕获源连接器)

在第二个网站上有一个”快速入门”的页面,我正在参考它尝试使用,但只按照页面上所写的操作并不能顺利进行。而且,对于初学者来说,从零开始尝试Salesforce的说明太过简洁且复杂。即使查看官方网页和海外网站,也无法准确获取所需的信息,因此我尝试了几次并进行了一些试错。
接下来,我整理了成功后的相关信息,并将其记录下来,希望能作为一种有成效的方法对其他人有所帮助。

版本信息

    • OS:Red Hat Enterprise Linux release 8.4 (Ootpa)

 

    • Confluent:Confluent Platform 7.2.1 (試用版・シングルノード)

 

    Connector:SalesforceCdcSourceConnector 2.0.12

步骤1. 注册Salesforce开发人员版

根据我的了解,Salesforce似乎提供了一个30天的免费试用账户和一个专门供开发者使用的免费账户(开发者版)。只有开发者版才能使用Salesforce的Change Data Capture(CDC)功能。

image.png
image.png

开发者版本的注册将按照此网站上提供的方法进行。

image.png
image.png
image.png

第二步:Salesforce端的設置。

image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png
image.png

重新设置安全令牌。

image.png
image.png
image.png
image.png

选择右上角的齿轮图标,然后选择“设置”。在“快速搜索”搜索框中输入“OAuth”,然后选择筛选结果中的“OAuth和OpenID Connect设置”。将“允许OAuth用户名密码流”从关闭状态改为开启状态。

image.png

※ 这个步骤在Confluent的快速入门指南中没有提到,但是它是必需的。如果不执行此步骤,即使在Confluent的连接器中正确设置了身份验证信息,也会出现身份验证错误。关于此设置,请参考以下网站。

新组织中默认阻止 OAuth 2.0 用户名密码流。

第三步骤:在Confluent中设置Connector。

安装用于 Confluent 平台的 Salesforce 变更数据捕获源连接器。

image.png

执行命令以进行安装。

confluent-hub install confluentinc/kafka-connect-salesforce:2.0.12

(2) 重新启动Kafka Connect。

confluent local services connect stop
confluent local services connect start

确认Salesforce已经添加到可用的连接器插件中。

curl -s localhost:8083/connector-plugins | jq
image.png

(4) 我们将创建一个连接器。

curl -X PUT localhost:8083/connectors/sfcdc-account-source/config -H "Content-Type: application/json" \
--data '{
    "name":"sfcdc-account-source",
    "connector.class":"io.confluent.salesforce.SalesforceCdcSourceConnector",
    "tasks.max":"1",
    "kafka.topic":"sfcdc-account-topic",
    "salesforce.cdc.channel" : "AccountChangeEvent",
    "salesforce.initial.start" : "all",
    "salesforce.consumer.key" : "3MVG・・(省略)・・edqI",
    "salesforce.consumer.secret" : "9195・・(省略)・・C8B6",
    "salesforce.username" : "xxxxx@xxx.com(Salesforceログイン用メールアドレス)",
    "salesforce.password" : "xxxxxxxxxxxxxx(Salesforceログイン用パスワード)",
    "salesforce.password.token" : "T2L・・(省略)・・Iuj",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1"
}' | jq
    • nameはコネクター名。

 

    • connector.classはコネクタープラグインのクラス名(上記固定)

 

    • tasksはスレッド数。このコネクターは上限が1という仕様。

 

    • kafka.topicはトピック名。

 

    • salesforce.cdc.channelは⚪︎⚪︎⚪︎ChangeEventとなる。⚪︎⚪︎⚪︎の部分はSalesforce内でのエンティティ名が入る。取引先(Account)のデータ変更イベントであればAccountChangeEventとなる。

 

    • salesforce.initial.startはコネクターの作成前にトピック内にある全てのメッセージをコンシューム対象にするか(all)、コネクター作成後からトピックに書き込まれたメッセージをコンシューム対象にするか(latest)。

 

    • salesforce.consumer.key、salesforce.consumer.secret、salesforce.username、salesforce.password、salesforce.password.tokenはSalesforceおよびSalesforceの接続アプリケーションに接続するための認証情報。

 

    • confluent.topic.bootstrap.serversはブローカーリスト。

 

    confluent.topic.replication.factorはトピックのレプリカ数。デフォルト値が「3」のため試用版のシングルノード構成のConfluentでは「1」を指定する必要がある。

获取连接器的状态信息

curl -s localhost:8083/connectors/sfcdc-account-source/status | jq
image.png

第四步,使用Salesforce添加、修改或删除账户。

启用对账户的更改数据捕获。

image.png
image.png
image.png
image.png
image.png
image.png

第五步:确认Confluent(Kafka)的主题和模式。

确认架构主题

curl -sX GET http://localhost:8081/subjects | jq . | grep sfcdc
  "sfcdc-account-topic-value"
    スキーマサブジェクトは「トピック名-key」または「トピック名-value」になるが、Salesforce CDC+コネクターによってトピックに書き込まれるメッセージのスキーマには「トピック名-key」のサブジェクトはない。

确认模式

curl -sX GET http://localhost:8081/subjects/sfcdc-account-topic-value/versions/latest | jq .schema -r | jq
{
  "type": "record",
  "name": "AccountChangeEvent",
  "namespace": "io.confluent.salesforce",
  "fields": [
    {
      "name": "Id",
      "type": {
        "type": "string",
        "connect.doc": "Unique identifier for the object."
      }
    },
    {
      "name": "ReplayId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ChangeEventHeader",
      "type": {
        "type": "record",
        "name": "ChangeEventHeader",
        "namespace": "",
        "fields": [
          {
            "name": "entityName",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "recordIds",
            "type": [
              "null",
              {
                "type": "array",
                "items": "string"
              }
            ],
            "default": null
          },
          {
            "name": "changeType",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "changedFields",
            "type": [
              "null",
              {
                "type": "array",
                "items": "string"
              }
            ],
            "default": null
          },
          {
            "name": "changeOrigin",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "transactionKey",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "sequenceNumber",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "commitTimestamp",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "commitUser",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "commitNumber",
            "type": [
              "null",
              "long"
            ],
            "default": null
          }
        ],
        "connect.name": "ChangeEventHeader"
      }
    },
    {
      "name": "Name",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "LastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "FirstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Salutation",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Type",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ParentId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingStreet",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingCity",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingState",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingPostalCode",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingCountry",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingLatitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "BillingLongitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "BillingGeocodeAccuracy",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "BillingAddress",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Address",
          "fields": [
            {
              "name": "GeocodeAccuracy",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "State",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.doc": ""
                }
              ],
              "default": null
            },
            {
              "name": "Street",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "PostalCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Country",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Latitude",
              "type": [
                "null",
                "double"
              ],
              "default": null
            },
            {
              "name": "City",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Longitude",
              "type": [
                "null",
                "double"
              ],
              "default": null
            },
            {
              "name": "CountryCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "StateCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ],
          "connect.name": "io.confluent.salesforce.Address"
        }
      ],
      "default": null
    },
    {
      "name": "ShippingStreet",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingCity",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingState",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingPostalCode",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingCountry",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingLatitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "ShippingLongitude",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "ShippingGeocodeAccuracy",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "ShippingAddress",
      "type": [
        "null",
        "Address"
      ],
      "default": null
    },
    {
      "name": "Phone",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Fax",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "AccountNumber",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Website",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Sic",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Industry",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "AnnualRevenue",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NumberOfEmployees",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
    {
      "name": "Ownership",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "TickerSymbol",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Description",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Rating",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Site",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "OwnerId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "CreatedDate",
      "type": [
        "null",
        {
          "type": "long",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    },
    {
      "name": "CreatedById",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "LastModifiedDate",
      "type": [
        "null",
        {
          "type": "long",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    },
    {
      "name": "LastModifiedById",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Jigsaw",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "JigsawCompanyId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "CleanStatus",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "AccountSource",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "DunsNumber",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Tradestyle",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NaicsCode",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NaicsDesc",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "YearStarted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SicDesc",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "DandbCompanyId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "OperatingHoursId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "CustomerPriority__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SLA__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "Active__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "NumberofLocations__c",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "UpsellOpportunity__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SLASerialNumber__c",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SLAExpirationDate__c",
      "type": [
        "null",
        {
          "type": "int",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Date",
          "logicalType": "date"
        }
      ],
      "default": null
    },
    {
      "name": "_ObjectType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "_EventType",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "io.confluent.salesforce.AccountChangeEvent"
}

请确认消息。

ksql> print 'sfcdc-account-topic' from beginning;
image.png
{
  "Id": "0015j000016nbijAAA",
  "ReplayId": "9021830",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nbijAAA"
    ],
    "changeType": "CREATE",
    "changedFields": [],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004bf8f-a136-35e0-232f-9f62eee73abb",
    "sequenceNumber": 1,
    "commitTimestamp": 1691131295000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677598623316
  },
  "Name": "TestAccount1",
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": "Prospect",
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": null,
  "Fax": null,
  "AccountNumber": "11111",
  "Website": null,
  "Sic": null,
  "Industry": "Banking",
  "AnnualRevenue": null,
  "NumberOfEmployees": null,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": "0055j0000099yTYAAY",
  "CreatedDate": 1691131295000,
  "CreatedById": "0055j0000099yTYAAY",
  "LastModifiedDate": 1691131295000,
  "LastModifiedById": "0055j0000099yTYAAY",
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": "Pending",
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
  "Id": "0015j000016nzGtAAI",
  "ReplayId": "9022402",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nzGtAAI"
    ],
    "changeType": "CREATE",
    "changedFields": [],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004c09f-20f4-8968-5df4-ab3738e95f28",
    "sequenceNumber": 1,
    "commitTimestamp": 1691132462000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677611261555
  },
  "Name": "TestAccount2",
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": null,
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": "090-1111-1111",
  "Fax": null,
  "AccountNumber": "22222",
  "Website": null,
  "Sic": null,
  "Industry": "Chemicals",
  "AnnualRevenue": null,
  "NumberOfEmployees": 10000,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": "0055j0000099yTYAAY",
  "CreatedDate": 1691132462000,
  "CreatedById": "0055j0000099yTYAAY",
  "LastModifiedDate": 1691132462000,
  "LastModifiedById": "0055j0000099yTYAAY",
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": "Pending",
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
  "Id": "0015j000016nzGtAAI",
  "ReplayId": "9022436",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nzGtAAI"
    ],
    "changeType": "UPDATE",
    "changedFields": [
      "Phone",
      "LastModifiedDate"
    ],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004c0b9-17df-c835-9f02-7ed1af25dea9",
    "sequenceNumber": 1,
    "commitTimestamp": 1691132573000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677612525804
  },
  "Name": null,
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": null,
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": "090-2222-2222",
  "Fax": null,
  "AccountNumber": null,
  "Website": null,
  "Sic": null,
  "Industry": null,
  "AnnualRevenue": null,
  "NumberOfEmployees": null,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": 1691132573000,
  "LastModifiedById": null,
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": null,
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}
{
  "Id": "0015j000016nzGtAAI",
  "ReplayId": "9022465",
  "ChangeEventHeader": {
    "entityName": "Account",
    "recordIds": [
      "0015j000016nzGtAAI"
    ],
    "changeType": "DELETE",
    "changedFields": [],
    "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/",
    "transactionKey": "0004c0cb-a153-ef60-2081-04ff9923ff23",
    "sequenceNumber": 1,
    "commitTimestamp": 1691132653000,
    "commitUser": "0055j0000099yTYAAY",
    "commitNumber": 677613463020
  },
  "Name": null,
  "LastName": null,
  "FirstName": null,
  "Salutation": null,
  "Type": null,
  "ParentId": null,
  "BillingStreet": null,
  "BillingCity": null,
  "BillingState": null,
  "BillingPostalCode": null,
  "BillingCountry": null,
  "BillingLatitude": null,
  "BillingLongitude": null,
  "BillingGeocodeAccuracy": null,
  "BillingAddress": null,
  "ShippingStreet": null,
  "ShippingCity": null,
  "ShippingState": null,
  "ShippingPostalCode": null,
  "ShippingCountry": null,
  "ShippingLatitude": null,
  "ShippingLongitude": null,
  "ShippingGeocodeAccuracy": null,
  "ShippingAddress": null,
  "Phone": null,
  "Fax": null,
  "AccountNumber": null,
  "Website": null,
  "Sic": null,
  "Industry": null,
  "AnnualRevenue": null,
  "NumberOfEmployees": null,
  "Ownership": null,
  "TickerSymbol": null,
  "Description": null,
  "Rating": null,
  "Site": null,
  "OwnerId": null,
  "CreatedDate": null,
  "CreatedById": null,
  "LastModifiedDate": null,
  "LastModifiedById": null,
  "Jigsaw": null,
  "JigsawCompanyId": null,
  "CleanStatus": null,
  "AccountSource": null,
  "DunsNumber": null,
  "Tradestyle": null,
  "NaicsCode": null,
  "NaicsDesc": null,
  "YearStarted": null,
  "SicDesc": null,
  "DandbCompanyId": null,
  "OperatingHoursId": null,
  "CustomerPriority__c": null,
  "SLA__c": null,
  "Active__c": null,
  "NumberofLocations__c": null,
  "UpsellOpportunity__c": null,
  "SLASerialNumber__c": null,
  "SLAExpirationDate__c": null,
  "_ObjectType": "AccountChangeEvent",
  "_EventType": "JPyUm_b7b4SSjXobT5DOPg"
}

最后(分析总结)

    • Salesforceからキーありでメッセージングできるかもしれませんが、自然体ではメッセージのスキーマにはキーがないようです。メッセージの使い勝手として、キーは必要だと思います。コネクターのSMT(Single Message Transforms)によってキーを付与するか、KsqlDBのクエリーによるバリュー項目のキー昇格を行う必要があります。

 

    • 既存のテーブルにメッセージをシンクする場合、項目の取捨選択や型変換が必要になります。これはksqlDBで対応することができます。

 

    • トピックからJDBCSinkConnectorでDBにデータ反映する場合は、changeTypeが”DELETE”のメッセージをTombstoneレコードに変換する必要があります。ksqlDBでこれを実施する方法を確認済みですがテクニックが要ります。(別途方法を投稿できたらと思います)

 

    • Salesforceのアカウントに割り当てられたチェンジイベント発行数の上限がユースケースに見合ったものかどうかあらかじめ確認が必要です。

 

    上記の検証はパブリックなSalesforceのアカウントでのトライでしたが、プライベートな環境のSalesforceに対する接続には別途検証が必要になります。