使用Nifi + Kafka + Strom 进行数据处理实践(2) ~向Kafka发送消息~

主题

本次我们将继续参考以下链接,按照上次的方式进行进展。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts

如果您之前没有看过上一期的话,我希望您可以从这里确认一下。

这次我们来看看卡夫卡。

卡夫卡是谁?

image.png

Kafka是一个能够高速处理大量消息的分布式消息系统,具备各种作为消息队列所需的功能。

大致上分为Producer(写入)、Topic(队列)和Consumer(读取)三部分构成。
Producer将数据写入Topic,而Consumer则从Topic中获取数据。

创建报告流程组。

这一次,我们将在前几次的HTTP API之外创建一个名为Reporting的进程组。

請允許我借用原文的詞句,但僅提供一個翻譯選項:

引用自參考文章,

在一个单一的团队中进行工作,当数据流变得庞大时,管理会变得复杂。在教程中,我们将其分为以下两部分:

    • データを外部から収集し、共通のフォーマットに変換する部分

 

    共通のフォーマットのデータを入力として、Kafkaにメッセージ登録を行う部分

通过这样做,在增加接收TCP、MQTT等消息的路由时,只需实现转换部分即可。

image.png

流程的布置

添加输入端口

image.png
image.png

发布Kafka_2_0的新增

image.png

和上次一样,从“配置”里进入“属性”,然后进行以下设置。

    • Kafka Brokers

 

    • localhost:6667

 

    • Topic Name

 

    • input

 

    • Delivery Guarantee

 

    • Guarantee Replicated Delivery

 

    • Kafka Key

 

    ${message.key}

Kafka消息具有键和值。键使用传入的FlowFile的message.key属性。值为FlowFile的内容。

如果使用ambari从kafka安装kafka brokers,则可以在Kafka > Configs中确认kafka brokers的端口。

image.png

在设置选项卡中,将”Automatically terminate relationships”选项中的”success”打勾。

image.png

将数据从HTTP API传输到Reporting。

image.png

在测试中通过控制台消费者来进行确认。

我們可以使用SSH登入到伺服器並執行以下指令來啟動Console Consumer。

cd /usr/hdp/current/kafka-broker
./bin/kafka-console-consumer.sh --topic input --bootstrap-server localhost:6667 --new-consumer

我会在终端上进行以下操作。

# curl -i -X POST -H "Content-type: application/json" -d '{"name": "C", "age": 20}' localhost:9095

#### Postした側 ####
HTTP/1.1 202 Accepted
Date: Tue, 15 Oct 2019 13:59:47 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)

{"Result": "succeeded"}

#### consumer ####
20

按下Ctrl+C

Processed a total of 1 messages

我认为应该这样表示。

现在,我们可以在Nifi中操作,对Post的数据进行修改并发送到Kafka,直至接收到该消息的流程。

下一次

接下来,我们将把这条消息发送给Storm进行实时分析。

广告
将在 10 秒后关闭
bannerAds