使用ksqlDB将消息进行加工并转换为Tombstone记录的技巧

在Kafka中,消息可以是事件本身,由协调方产生,也可以作为CDC捕获的源表的变更内容的表示方式。
如果用作源表的变更内容,那么INSERT/UPDATE/DELETE在消息中会体现出源表的操作,但是DELETE时的消息内容会稍有特殊。

image.png

消息的内容与CDC产品规格有关,但基本上与上述内容相同。

以下为中文翻译:
要点:

    • メッセージはキー部とバリュー部に分かれます。

 

    • INSERT/UPDATEがあると、更新後のレコードの主キーをキー部に、全カラムをバリュー部にセットしたメッセージがKafkaに連携されます。

 

    DELETEがあると、削除されたレコードの主キーをキー部に、バリュー部はnullにしたメッセージがKafkaに連携されます。

通过DELETE操作删除的消息被称为墓碑记录。
墓碑记录是在Kafka流中的表(KTable)或数据库表中,用于删除具有相同键的记录。

有些发送消息到Kafka的组件,为了实现删除功能,可能需要对消息进行处理,自行创建Tombstone记录。
这是通过在海外网站上进行调查、试错多次后找到的方法,虽然有点专业,但如果能对某人有所帮助就好了。

我正在努力做的事情 (Wǒ zuò de

image.png
    • kafka-avro-console-producerでTopic1にメッセージを書き込みます。メッセージはキーを示す「ID」、更新区分を示す項目「TYPE」、値を示す「VAL」で構成します。

 

    • Topic1に対するストリームStream1を作成し、加工処理を行うストリームから参照できるようにします。

 

    • Stream2を作成しTopic1のメッセージの更新区分「TYPE」が’DEL’のメッセージをTombstoneレコードに変換します。変換後のメッセージはTopic2に書き込まれます。

 

    Topic2に対してJDBC Sink ConnectorをアタッチしてDB内のTable1が更新されるようにします。結果的に、Talbe1のレコードが上記図で示す通りになっていれば成功です。

使用kafka-avro-console-producer向Topic1写入消息。

kafka-avro-console-producer \
  --broker-list localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --topic topic1 \
  --property parse.key=true \
  --property key.schema='{"type":"record",
                          "name":"myrecord",
                          "fields":[{"name":"ID","type":"string"}]}' \
  --property key.separator=" " \
  --property value.schema='{"type":"record",
                            "name":"myrecord",
                            "fields":[{"name":"ID","type":"string"},
                                      {"name":"TYPE","type":"string"},
                                      {"name":"VAL","type":"string"}]}'
{"ID":"1"} {"ID":"1","TYPE":"INS","VAL":"aa"}
{"ID":"2"} {"ID":"2","TYPE":"INS","VAL":"bb"}
{"ID":"3"} {"ID":"3","TYPE":"INS","VAL":"cc"}
{"ID":"2"} {"ID":"2","TYPE":"UPD","VAL":"bbb"}
{"ID":"3"} {"ID":"3","TYPE":"DEL","VAL":"cc"}
ksql> print 'topic1' from beginning;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "TYPE": "INS", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "INS", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "INS", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "UPD", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "DEL", "VAL": "cc"}, partition: 0

创建Stream1(准备处理消息)

CREATE STREAM STREAM1
  WITH (KAFKA_TOPIC='topic1', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');

在Stream2中,将Topic1的部分信息加工成墓碑记录。

CREATE STREAM STREAM2
  WITH (WRAP_SINGLE_VALUE=FALSE,
        KAFKA_TOPIC='topic2',
        KEY_FORMAT='AVRO',
        VALUE_FORMAT='AVRO')
  AS SELECT
    ROWKEY,
    CASE WHEN TYPE='DEL' 
      THEN CAST(NULL AS STRUCT<ID STRING, VAL STRING>)
      ELSE STRUCT(ID:=STREAM1.ID, VAL:=STREAM1.VAL)
    END
  FROM
    STREAM1
  EMIT CHANGES;
    • WITH句の「WRAP_SINGLE_VALUE」がポイントです。

TRUE(デフォルト):ksqlDB は列がレコード内の名前付き列としてシリアル化されていることを期待します。
FALSE:ksqlDB は列が匿名値としてシリアル化されていると想定します。名前のついた列ではなくJSON構造になるSTRUCTもしくはnullを抽出対象としてSELECTで指定できます。

SELECT句のCASE式もポイントです。

TYPEが’DEL'(削除)なら、バリュー部をnullにします。通常のRDBMSのSQLのようにCASE式の複数条件結果はどれも同じ型である必要があります。メッセージのバリューに複数の項目がある場合は上記のようにNULLをSTRUCT型でCASTしたものを抽出対象とします。
TYPEが’DEL'(削除)以外なら、メッセージのバリューに複数の項目がある場合は、上記のようにSTRUCTに値をセットした結果を抽出対象とします。

ksql> print 'topic2' from beginning;
Key format: AVRO or KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: <null>, partition: 0

我确认第五条消息已经记录在了Tombstone记录中。

在Topic2主题中,通过JDBC Sink连接器将更新反映到后续的协作数据库。

curl -X PUT localhost:8083/connectors/table1-connector/config -H "Content-Type: application/json" \--data '{
"name":"table1-connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"topic2",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:postgresql://localhost:5432/testdb1",
"connection.user":"******",
"connection.password":"*********",
"insert.mode":"upsert",
"auto.create":"true",
"delete.enabled":"true",
"table.name.format":"public.table1",
"pk.mode":"record_key",
"pk.fields":"ID",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor": "1"
}' | jq

如果JDBC Sink Connector正常运行,则数据将映射到下一个数据库表中。
以下是我们在此次验证中使用的PostgreSQL数据库的确认结果。

testdb1=# \d table1;
              テーブル "public.table1"
   |    | 照合順序 | Null 値を許容 | デフォルト 
-----+------+----------+---------------+------------
 ID  | text |          | not null      | 
 VAL | text |          |               | 
インデックス:
    "table1_pkey" PRIMARY KEY, btree ("ID")

testdb1=# select * from table1;
 ID | VAL 
----+-----
 1  | aa
 2  | bbb
(2 )
image.png

在ksqlDB中,我可以将消息进行加工并转换为Tombstone记录。
在ksqlDB中,你可以通过类似SQL的实现轻松实现此类功能,而无需使用编程语言来实现应用程序。

我常常参考的网站:ksqlDB 参考资料。