使用Nifi + Kafka + Strom 进行数据处理实践(2) ~向Kafka发送消息~
主题
本次我们将继续参考以下链接,按照上次的方式进行进展。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts
如果您之前没有看过上一期的话,我希望您可以从这里确认一下。
这次我们来看看卡夫卡。
卡夫卡是谁?

Kafka是一个能够高速处理大量消息的分布式消息系统,具备各种作为消息队列所需的功能。
大致上分为Producer(写入)、Topic(队列)和Consumer(读取)三部分构成。
Producer将数据写入Topic,而Consumer则从Topic中获取数据。
创建报告流程组。
这一次,我们将在前几次的HTTP API之外创建一个名为Reporting的进程组。
請允許我借用原文的詞句,但僅提供一個翻譯選項:
引用自參考文章,
在一个单一的团队中进行工作,当数据流变得庞大时,管理会变得复杂。在教程中,我们将其分为以下两部分:
-
- データを外部から収集し、共通のフォーマットに変換する部分
- 共通のフォーマットのデータを入力として、Kafkaにメッセージ登録を行う部分
通过这样做,在增加接收TCP、MQTT等消息的路由时,只需实现转换部分即可。

流程的布置
添加输入端口


发布Kafka_2_0的新增

和上次一样,从“配置”里进入“属性”,然后进行以下设置。
-
- Kafka Brokers
-
- localhost:6667
-
- Topic Name
-
- input
-
- Delivery Guarantee
-
- Guarantee Replicated Delivery
-
- Kafka Key
- ${message.key}
Kafka消息具有键和值。键使用传入的FlowFile的message.key属性。值为FlowFile的内容。
如果使用ambari从kafka安装kafka brokers,则可以在Kafka > Configs中确认kafka brokers的端口。

在设置选项卡中,将”Automatically terminate relationships”选项中的”success”打勾。

将数据从HTTP API传输到Reporting。

在测试中通过控制台消费者来进行确认。
我們可以使用SSH登入到伺服器並執行以下指令來啟動Console Consumer。
cd /usr/hdp/current/kafka-broker
./bin/kafka-console-consumer.sh --topic input --bootstrap-server localhost:6667 --new-consumer
我会在终端上进行以下操作。
# curl -i -X POST -H "Content-type: application/json" -d '{"name": "C", "age": 20}' localhost:9095
#### Postした側 ####
HTTP/1.1 202 Accepted
Date: Tue, 15 Oct 2019 13:59:47 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
#### consumer ####
20
按下Ctrl+C
Processed a total of 1 messages
我认为应该这样表示。
现在,我们可以在Nifi中操作,对Post的数据进行修改并发送到Kafka,直至接收到该消息的流程。
下一次
接下来,我们将把这条消息发送给Storm进行实时分析。