在ksqlDB中给Kafka消息添加键的方法

Kafka的消息由头部、键和值组成。
当需要在主题内唯一标识消息、启用按键最新消息、根据主题消息更新或删除数据库记录时,键就变得重要起来。

如果源服务或连接器不处理消息的键,则如果不进行任何操作,将向Kafka发送没有键的消息。
下面是使用ksqlDB流为消息添加键的示例,将Db2作为数据库,Kafka使用Confluent。

你正在尝试做什么?

image.png

创建一个针对Topic1中没有键的消息的Stream1。

以下是Topic1的内容:
可以确认没有钥匙。

ksql> print 'topic1' from beginning;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 06:38:11.074 Z, key: <null>, value: {"col1": "a1", "col2": "a", "col3": "test1"}, partition: 0
rowtime: 2023/08/14 06:38:16.039 Z, key: <null>, value: {"col1": "a2", "col2": "aa", "col3": "test2"}, partition: 0
rowtime: 2023/08/14 06:38:27.083 Z, key: <null>, value: {"col1": "a3", "col2": "aaa", "col3": "test3"}, partition: 0

为了在ksqlDB的流中处理消息,需要创建一个对应原始主题的流,以便从处理流中引用它进行加工处理。
Stream1对于原始主题的定义内容如下所示:

CREATE STREAM STREAM1
  WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO');
ksql> SELECT * FROM STREAM1;
+-------------------------------+-------------------------------+-------------------------------+
|COL1                           |COL2                           |COL3                           |
+-------------------------------+-------------------------------+-------------------------------+
|a1                             |a                              |test1                          |
|a2                             |aa                             |test2                          |
|a3                             |aaa                            |test3                          |

创建一个名为Stream2的流,通过获取Stream1的消息并在消息中添加键值。

创建一个进行加工处理的流。
在流的定义中,指定一个主题来写入加工后的消息。

CREATE STREAM STREAM2
  WITH (KAFKA_TOPIC='topic2', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
  AS SELECT
    STRUCT(COL1:=STREAM1.COL1) AS COL1,
    STREAM1.COL2,
    STREAM1.COL3
  FROM
    STREAM1
  PARTITION BY
    STRUCT(COL1:=STREAM1.COL1)
  EMIT CHANGES;
ksql> SELECT * FROM STREAM2;
+-------------------------------+-------------------------------+-------------------------------+
|COL1                           |COL2                           |COL3                           |
+-------------------------------+-------------------------------+-------------------------------+
|{COL1=a1}                      |a                              |test1                          |
|{COL1=a2}                      |aa                             |test2                          |
|{COL1=a3}                      |aaa                            |test3                          |

可以确认通过Stream2向Topic2写入了带有键的消息。

ksql> print 'topic2' from beginning;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 06:38:11.074 Z, key: {"COL1": "a1"}, value: {"COL2": "a", "COL3": "test1"}, partition: 0
rowtime: 2023/08/14 06:38:16.039 Z, key: {"COL1": "a2"}, value: {"COL2": "aa", "COL3": "test2"}, partition: 0
rowtime: 2023/08/14 06:38:27.083 Z, key: {"COL1": "a3"}, value: {"COL2": "aaa", "COL3": "test3"}, partition: 0

我们可以从Topic2的架构中确认也有键。

$ curl -sX GET http://localhost:8081/subjects/topic2-key/versions/latest | jq .schema -r | jq
{
  "type": "record",
  "name": "Stream2Key",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "COL1",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}
$ curl -sX GET http://localhost:8081/subjects/topic2-value/versions/latest | jq .schema -r | jq
{
  "type": "record",
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "COL2",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "COL3",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

通过ksqlDB的流处理,可以对没有键的消息进行加工,并将有键的消息写入到Topic中。如果在已写入有键消息的Topic上使用JDBC Sink Connector,就可以根据消息进行INSERT、UPDATE和DELETE操作,实现与后续协作数据库中的表的交互。

如果使用ksqlDB,你可以轻松地实现这样的功能,而不需要使用编程语言来实现应用程序,只需使用类似SQL的实现方式。

广告
将在 10 秒后关闭
bannerAds