立即将Avro格式的消息发送到Kafka的示例命令

这是一个在技术验证和调查过程中非常有用的样例命令,当你需要快速将Avro格式的消息发送到Kafka主题时。

将带有Avro格式的消息(包含键)写入到主题中。

kafka-avro-console-producer \
  --broker-list localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --topic sample-topic \
  --property parse.key=true \
  --property key.schema='{"type":"record",
                          "name":"myrecord",
                          "fields":[{"name":"col1","type":"string"}]}' \
  --property key.separator=" " \
  --property value.schema='{"type":"record",
                            "name":"myrecord",
                            "fields":[{"name":"col1","type":"string"},
                                      {"name":"col2","type":"string"},
                                      {"name":"col3","type":"string"}]}'

在执行上述命令后,命令行会要求输入,请输入以下消息并发送到Kafka的Topic。※每条消息后按Enter(换行)。

{"col1":"a1"} {"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"} {"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"} {"col1":"a3","col2":"aaa","col3":"test3"}

当输入完成后使用Ctrl+C来结束。

(如果要用一句话概括)

echo '{"col1":"a4"} {"col1":"a4","col2":"aaaa","col3":"test4"}' | \
kafka-avro-console-producer \
  --broker-list localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --topic sample-topic \
  --property parse.key=true \
  --property key.schema='{"type":"record",
                          "name":"myrecord",
                          "fields":[{"name":"col1","type":"string"}]}' \
  --property key.separator=" " \
  --property value.schema='{"type":"record",
                            "name":"myrecord",
                            "fields":[{"name":"col1","type":"string"},
                                      {"name":"col2","type":"string"},
                                      {"name":"col3","type":"string"}]}'

如果想要循环地间歇性地写入消息,可以参考这个来构建。

从主题中读取带有Avro格式的消息(包含键)

kafka-avro-console-consumer \
  --from-beginning --topic sample-topic \
  --bootstrap-server localhost:9092 \
  --property print.key=true \
  --property schema.registry.url=http://localhost:8081
{"col1":"a1"}	{"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"}	{"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"}	{"col1":"a3","col2":"aaa","col3":"test3"}
{"col1":"a4"}	{"col1":"a4","col2":"aaaa","col3":"test4"}

使用 Ctrl+C 来结束。

让我们来确认一下在该主题下发布的消息的架构。

使用Confluent的Schema Registry。

确认主题

curl -sX GET http://localhost:8081/subjects | jq . | grep sample-topic
  "sample-topic-key",
  "sample-topic-value",

确认键架构

curl -sX GET http://localhost:8081/subjects/sample-topic-key/versions/latest | jq .schema -r | jq
{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "col1",
      "type": "string"
    }
  ]
}

确认值模式

curl -sX GET http://localhost:8081/subjects/sample-topic-value/versions/latest | jq .schema -r | jq
{
  "type": "record",
  "name": "myrecord",
  "fields": [
    {
      "name": "col1",
      "type": "string"
    },
    {
      "name": "col2",
      "type": "string"
    },
    {
      "name": "col3",
      "type": "string"
    }
  ]
}
广告
将在 10 秒后关闭
bannerAds