使用ksqlDB将消息进行加工并转换为Tombstone记录的技巧
在Kafka中,消息可以是事件本身,由协调方产生,也可以作为CDC捕获的源表的变更内容的表示方式。
如果用作源表的变更内容,那么INSERT/UPDATE/DELETE在消息中会体现出源表的操作,但是DELETE时的消息内容会稍有特殊。
消息的内容与CDC产品规格有关,但基本上与上述内容相同。
以下为中文翻译:
要点:
-
- メッセージはキー部とバリュー部に分かれます。
-
- INSERT/UPDATEがあると、更新後のレコードの主キーをキー部に、全カラムをバリュー部にセットしたメッセージがKafkaに連携されます。
- DELETEがあると、削除されたレコードの主キーをキー部に、バリュー部はnullにしたメッセージがKafkaに連携されます。
通过DELETE操作删除的消息被称为墓碑记录。
墓碑记录是在Kafka流中的表(KTable)或数据库表中,用于删除具有相同键的记录。
有些发送消息到Kafka的组件,为了实现删除功能,可能需要对消息进行处理,自行创建Tombstone记录。
这是通过在海外网站上进行调查、试错多次后找到的方法,虽然有点专业,但如果能对某人有所帮助就好了。
我正在努力做的事情 (Wǒ zuò de
-
- kafka-avro-console-producerでTopic1にメッセージを書き込みます。メッセージはキーを示す「ID」、更新区分を示す項目「TYPE」、値を示す「VAL」で構成します。
-
- Topic1に対するストリームStream1を作成し、加工処理を行うストリームから参照できるようにします。
-
- Stream2を作成しTopic1のメッセージの更新区分「TYPE」が’DEL’のメッセージをTombstoneレコードに変換します。変換後のメッセージはTopic2に書き込まれます。
- Topic2に対してJDBC Sink ConnectorをアタッチしてDB内のTable1が更新されるようにします。結果的に、Talbe1のレコードが上記図で示す通りになっていれば成功です。
使用kafka-avro-console-producer向Topic1写入消息。
kafka-avro-console-producer \
--broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic topic1 \
--property parse.key=true \
--property key.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"ID","type":"string"}]}' \
--property key.separator=" " \
--property value.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"ID","type":"string"},
{"name":"TYPE","type":"string"},
{"name":"VAL","type":"string"}]}'
{"ID":"1"} {"ID":"1","TYPE":"INS","VAL":"aa"}
{"ID":"2"} {"ID":"2","TYPE":"INS","VAL":"bb"}
{"ID":"3"} {"ID":"3","TYPE":"INS","VAL":"cc"}
{"ID":"2"} {"ID":"2","TYPE":"UPD","VAL":"bbb"}
{"ID":"3"} {"ID":"3","TYPE":"DEL","VAL":"cc"}
ksql> print 'topic1' from beginning;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "TYPE": "INS", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "INS", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "INS", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "UPD", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "DEL", "VAL": "cc"}, partition: 0
创建Stream1(准备处理消息)
CREATE STREAM STREAM1
WITH (KAFKA_TOPIC='topic1', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');
在Stream2中,将Topic1的部分信息加工成墓碑记录。
CREATE STREAM STREAM2
WITH (WRAP_SINGLE_VALUE=FALSE,
KAFKA_TOPIC='topic2',
KEY_FORMAT='AVRO',
VALUE_FORMAT='AVRO')
AS SELECT
ROWKEY,
CASE WHEN TYPE='DEL'
THEN CAST(NULL AS STRUCT<ID STRING, VAL STRING>)
ELSE STRUCT(ID:=STREAM1.ID, VAL:=STREAM1.VAL)
END
FROM
STREAM1
EMIT CHANGES;
-
- WITH句の「WRAP_SINGLE_VALUE」がポイントです。
TRUE(デフォルト):ksqlDB は列がレコード内の名前付き列としてシリアル化されていることを期待します。
FALSE:ksqlDB は列が匿名値としてシリアル化されていると想定します。名前のついた列ではなくJSON構造になるSTRUCTもしくはnullを抽出対象としてSELECTで指定できます。
SELECT句のCASE式もポイントです。
TYPEが’DEL'(削除)なら、バリュー部をnullにします。通常のRDBMSのSQLのようにCASE式の複数条件結果はどれも同じ型である必要があります。メッセージのバリューに複数の項目がある場合は上記のようにNULLをSTRUCT型でCASTしたものを抽出対象とします。
TYPEが’DEL'(削除)以外なら、メッセージのバリューに複数の項目がある場合は、上記のようにSTRUCTに値をセットした結果を抽出対象とします。
ksql> print 'topic2' from beginning;
Key format: AVRO or KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: <null>, partition: 0
我确认第五条消息已经记录在了Tombstone记录中。
在Topic2主题中,通过JDBC Sink连接器将更新反映到后续的协作数据库。
curl -X PUT localhost:8083/connectors/table1-connector/config -H "Content-Type: application/json" \--data '{
"name":"table1-connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"topic2",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:postgresql://localhost:5432/testdb1",
"connection.user":"******",
"connection.password":"*********",
"insert.mode":"upsert",
"auto.create":"true",
"delete.enabled":"true",
"table.name.format":"public.table1",
"pk.mode":"record_key",
"pk.fields":"ID",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor": "1"
}' | jq
如果JDBC Sink Connector正常运行,则数据将映射到下一个数据库表中。
以下是我们在此次验证中使用的PostgreSQL数据库的确认结果。
testdb1=# \d table1;
テーブル "public.table1"
列 | 型 | 照合順序 | Null 値を許容 | デフォルト
-----+------+----------+---------------+------------
ID | text | | not null |
VAL | text | | |
インデックス:
"table1_pkey" PRIMARY KEY, btree ("ID")
testdb1=# select * from table1;
ID | VAL
----+-----
1 | aa
2 | bbb
(2 行)
在ksqlDB中,我可以将消息进行加工并转换为Tombstone记录。
在ksqlDB中,你可以通过类似SQL的实现轻松实现此类功能,而无需使用编程语言来实现应用程序。
我常常参考的网站:ksqlDB 参考资料。