使用Nifi + Kafka + Strom 进行数据处理实践(1)~Nifi基础部分~
主题
我打算在这个新版本中参考以下内容进行对Nifi、Kafka和Storm的验证。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts
主要讨论的是Nifi这部分。
Apache NiFi 是什么?
用于自动化和管理系统之间的数据流的开源软件(OSS)
Nifi的基本操作
登陆
Nifi使用的是运行Nifi的服务器上的9090端口。
创建流程组
请拖动红色圈出的部分,将其移至方格图上,并释放。
这次,我们将其命名为HTTP API。
按下ADD按钮,稍作等待后将生成以下的Process Group。
然后,双击进入其中。
定义处理
进入HTTP API后,左下角的Operator会变成HTTP API。
添加HandleHttpRequest处理器
在搜索框中输入”Handle”后,会出现”HandleHttpRequest”,双击它即可。
(增加之后)
当显示如上图所示时,请右击已添加的HandleHttpRequest处理器,并选择”配置”。
请转到”属性”选项卡,将”Listening Port”更改为9095。
然后请选择”应用”。
添加 ReplaceText
在这里,设置返回HTTP响应体的字符串。
与之前一样,添加处理器(Processor)。
对于新增的Processor,右键点击并选择“配置”。
移至属性选项卡,将替换值更改为{“Result”: “成功”},
将替换策略更改为“始终替换”,
然后选择“应用”。
添加HandleHttpResponse处理器
接下来,我们要添加HandleHttpResponce处理器。
(经过追加)
然后,我们将设置HandleHttpResponse的状态代码。
在HandleHttpResponse上右键单击,选择配置。
然后,在属性中将HTTP状态代码固定为202(接受)。
连接三个处理器
NiFi 数据流是通过将处理器与关系连接来生成流动性。
您可以从处理器的中心拖动鼠标,然后将其释放在目标处理器上以进行连接。
确认From Processor和To Processor,选择Success作为For relationships,并选择ADD。
(连接后)
将Failure传递给LogAttribute
如果处理器处理失败,传入的 FlowFile 将会流向 failure 的 Relationship。
为了能够查看失败内容,我们会添加一个 LogAttribute 处理器。
(加上之后)
然后,我们通过“关系”进行连接。
自动终止不需要的关系
如果NiFi内部的FlowFile到达了没有下一个目的地(即最终节点),它们将被删除。
在本次配置中,HandleHttpResponse和LogAttribute中没有定义处理成功后的操作。
因此,需要添加一个自动清除NifiFlow的设置。
在与以下的关系上,使用鼠标右键点击处理器,并选择配置。在“设置”选项卡中的“自动终止关系”中,勾选成功(`success`)。
-
- HandleHttpResponse: success
- LogAttribute: success
创建Http Context Map
在HandleHttpRequest上右击,选择Configure。
然后从PROPERTIES选项卡的Http Context Map下拉菜单中选择Create new service…。
然后,屏幕会显示如下的界面。
选择“CREATE”。
选择APPLY后,屏幕将显示如下内容。
选择右侧或中间类似于雷电图标,将显示以下界面,然后选择启用。
在HandleHttpRequest的HTTP上下文映射中,同样指定ControllerService。这样,处理器所需的配置就完成了。
开始流程
请点击左边的”操作”按钮,并开始流程。
首先,在格子上单击一次,然后取消选择特定的处理器,再进行此步骤。
只要所有启动成功,就会呈现如下情况。
用cURL进行简单测试
curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 10:47:51 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
在Nifi中,可以从输入/输出端口查看处理流程。
数据转换
制造处理器
在将收到的POST数据注册到Kafka之前,我们将进行数据格式的整理。
增加下一个处理器:
-
- EvaluateJsonPath: FlowFileのcontentをJSONとしてパースし、JSONPathで任意の要素の値を抽出します。
-
- 抽出結果をFlowFileのAttributeに保存します。
-
- ReplaceText: Kafkaに登録する際、FlowFileのContentがメッセージのvalueとなります。
- 抽出したJSON内の値を再びFlowFileのContentに戻します。
对JsonPath进行评估
进行Process的添加。
(增添)
然后,通过右键单击选择“配置”。
在“属性”中进行以下设置。
以下两个会通过点击右上角的“+”按钮进行添加:
– message.key
$.name
– message.value
$.age
通过这一方法,message.key: $.name(即Json中name对应的value)以键值对形式存储在NifiFlow中。
message.value也是同样方式。
更换文字
进行添加操作。
(增加之後)
之后,使用鼠标右键点击选择“配置”。
在“属性”中进行以下设定。
增加输出端口
处理的连接
我們將連接所創建的這個過程,並且執行之前進行的測試。
在EvaluateJsonPath和ReplaceText之间的关系中,选择匹配项。
那么,我来进行启动并进行测试。
测试结果会怎么样呢?
curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 13:21:56 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
刚刚与之前相同的结果。
那么,关于这次创建的流程,它的流入和流出都是1。
我們試著在 Output Port 的前一個 Connection 上按右鍵,然後選擇 List Queue。
下一次
下一次,我想通过这个Nifi向Kafka发送消息。