在ksqlDB中给Kafka消息添加键的方法
Kafka的消息由头部、键和值组成。
当需要在主题内唯一标识消息、启用按键最新消息、根据主题消息更新或删除数据库记录时,键就变得重要起来。
如果源服务或连接器不处理消息的键,则如果不进行任何操作,将向Kafka发送没有键的消息。
下面是使用ksqlDB流为消息添加键的示例,将Db2作为数据库,Kafka使用Confluent。
你正在尝试做什么?

创建一个针对Topic1中没有键的消息的Stream1。
以下是Topic1的内容:
可以确认没有钥匙。
ksql> print 'topic1' from beginning;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 06:38:11.074 Z, key: <null>, value: {"col1": "a1", "col2": "a", "col3": "test1"}, partition: 0
rowtime: 2023/08/14 06:38:16.039 Z, key: <null>, value: {"col1": "a2", "col2": "aa", "col3": "test2"}, partition: 0
rowtime: 2023/08/14 06:38:27.083 Z, key: <null>, value: {"col1": "a3", "col2": "aaa", "col3": "test3"}, partition: 0
为了在ksqlDB的流中处理消息,需要创建一个对应原始主题的流,以便从处理流中引用它进行加工处理。
Stream1对于原始主题的定义内容如下所示:
CREATE STREAM STREAM1
WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO');
ksql> SELECT * FROM STREAM1;
+-------------------------------+-------------------------------+-------------------------------+
|COL1 |COL2 |COL3 |
+-------------------------------+-------------------------------+-------------------------------+
|a1 |a |test1 |
|a2 |aa |test2 |
|a3 |aaa |test3 |
创建一个名为Stream2的流,通过获取Stream1的消息并在消息中添加键值。
创建一个进行加工处理的流。
在流的定义中,指定一个主题来写入加工后的消息。
CREATE STREAM STREAM2
WITH (KAFKA_TOPIC='topic2', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT
STRUCT(COL1:=STREAM1.COL1) AS COL1,
STREAM1.COL2,
STREAM1.COL3
FROM
STREAM1
PARTITION BY
STRUCT(COL1:=STREAM1.COL1)
EMIT CHANGES;
ksql> SELECT * FROM STREAM2;
+-------------------------------+-------------------------------+-------------------------------+
|COL1 |COL2 |COL3 |
+-------------------------------+-------------------------------+-------------------------------+
|{COL1=a1} |a |test1 |
|{COL1=a2} |aa |test2 |
|{COL1=a3} |aaa |test3 |
可以确认通过Stream2向Topic2写入了带有键的消息。
ksql> print 'topic2' from beginning;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 06:38:11.074 Z, key: {"COL1": "a1"}, value: {"COL2": "a", "COL3": "test1"}, partition: 0
rowtime: 2023/08/14 06:38:16.039 Z, key: {"COL1": "a2"}, value: {"COL2": "aa", "COL3": "test2"}, partition: 0
rowtime: 2023/08/14 06:38:27.083 Z, key: {"COL1": "a3"}, value: {"COL2": "aaa", "COL3": "test3"}, partition: 0
我们可以从Topic2的架构中确认也有键。
$ curl -sX GET http://localhost:8081/subjects/topic2-key/versions/latest | jq .schema -r | jq
{
"type": "record",
"name": "Stream2Key",
"namespace": "io.confluent.ksql.avro_schemas",
"fields": [
{
"name": "COL1",
"type": [
"null",
"string"
],
"default": null
}
]
}
$ curl -sX GET http://localhost:8081/subjects/topic2-value/versions/latest | jq .schema -r | jq
{
"type": "record",
"name": "KsqlDataSourceSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"fields": [
{
"name": "COL2",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "COL3",
"type": [
"null",
"string"
],
"default": null
}
]
}
通过ksqlDB的流处理,可以对没有键的消息进行加工,并将有键的消息写入到Topic中。如果在已写入有键消息的Topic上使用JDBC Sink Connector,就可以根据消息进行INSERT、UPDATE和DELETE操作,实现与后续协作数据库中的表的交互。
如果使用ksqlDB,你可以轻松地实现这样的功能,而不需要使用编程语言来实现应用程序,只需使用类似SQL的实现方式。