立即将Avro格式的消息发送到Kafka的示例命令
这是一个在技术验证和调查过程中非常有用的样例命令,当你需要快速将Avro格式的消息发送到Kafka主题时。
将带有Avro格式的消息(包含键)写入到主题中。
kafka-avro-console-producer \
--broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic sample-topic \
--property parse.key=true \
--property key.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"}]}' \
--property key.separator=" " \
--property value.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"},
{"name":"col2","type":"string"},
{"name":"col3","type":"string"}]}'
在执行上述命令后,命令行会要求输入,请输入以下消息并发送到Kafka的Topic。※每条消息后按Enter(换行)。
{"col1":"a1"} {"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"} {"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"} {"col1":"a3","col2":"aaa","col3":"test3"}
当输入完成后使用Ctrl+C来结束。
(如果要用一句话概括)
echo '{"col1":"a4"} {"col1":"a4","col2":"aaaa","col3":"test4"}' | \
kafka-avro-console-producer \
--broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic sample-topic \
--property parse.key=true \
--property key.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"}]}' \
--property key.separator=" " \
--property value.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"},
{"name":"col2","type":"string"},
{"name":"col3","type":"string"}]}'
如果想要循环地间歇性地写入消息,可以参考这个来构建。
从主题中读取带有Avro格式的消息(包含键)
kafka-avro-console-consumer \
--from-beginning --topic sample-topic \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property schema.registry.url=http://localhost:8081
{"col1":"a1"} {"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"} {"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"} {"col1":"a3","col2":"aaa","col3":"test3"}
{"col1":"a4"} {"col1":"a4","col2":"aaaa","col3":"test4"}
使用 Ctrl+C 来结束。
让我们来确认一下在该主题下发布的消息的架构。
使用Confluent的Schema Registry。
确认主题
curl -sX GET http://localhost:8081/subjects | jq . | grep sample-topic
"sample-topic-key",
"sample-topic-value",
确认键架构
curl -sX GET http://localhost:8081/subjects/sample-topic-key/versions/latest | jq .schema -r | jq
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "col1",
"type": "string"
}
]
}
确认值模式
curl -sX GET http://localhost:8081/subjects/sample-topic-value/versions/latest | jq .schema -r | jq
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
},
{
"name": "col3",
"type": "string"
}
]
}