使用NiFi的API以编程方式来控制流程
Apache NiFi(以下简称为NiFi)是与Apache Kafka、Apache Storm一起,并包含在Hortonworks DataFlow(HDF)套件中的开源软件,用于数据收集和流式分析。
NiFi提供了各种处理器,可以与RDBMS、AWS、Azure等云服务、Hadoop生态系统等多样化的数据存储进行连接,使系统间的数据协同成为无需编程即可实现的。
样本数据流程「现在的邮政编码」
我计划为Advent Calendar写几篇文章,所以我制作了一个示例数据流。关于这个数据流的详细信息,我会在另一天介绍。
这个数据流使用了ZipCloud邮编搜索API和OpenWeatherMap天气信息API,收集与邮编相关的信息,并将其保存到JSON文件中。也可以称之为信息的”丰富化”。
我們參考了免費天氣預報API OpenWeatherMap,該API可在OpenWeatherMap上使用。
这是NiFi的数据流程。我们将它整理在ProcessGroup中,并准备了一个名为“调查”的InputPort,以便从父流程中调用。
利用NiFi的API的主题今天是什么?
好吧,NiFi的Web用户界面非常丰富,可以做很多事情,但是经常有人问,UI的操作是否无法自动化?
对于这个问题的答案是:“可以!使用 NiFi 的 REST API!”
通过浏览器在NiFi的Web UI上执行的操作都是通过调用NiFi的REST API实现的。通过在脚本中执行这些操作,可以实现自动化。
实施一发即中的流程
一個需求NiFi自動化的原因是想要像既有的ETL一樣以工作單位為基礎進行執行。NiFi可以一直啟動流程,以流式方式進行事件驅動處理,所以曾經熟悉使用像ETL的工作排程的人可能會感到有點困惑。
在NiFi中,只需进行一次操作,就可以从某个地方获取数据,稍加处理,然后将数据保存到这个数据库中并结束。
NiFi就像一个持续运行的流程,它从某处获取新数据,稍作处理,然后保存到这个数据库中。
让我举一个具体的例子吧。
让我们假设我们想要测试前面提到的”邮政编码现在”。我们只需传递一次输入的FlowFile,并希望确认其结果。在这种情况下,我们可以使用GenerateFlowFile来生成测试用的FlowFile。
启动这个GenerateFlowFile的问题在于NiFi的调度器。
如果在NiFi中将Scheduling Strategy设置为Timer驱动,它将按照运行计划的设置间隔进行调度。默认情况下,间隔为0秒,只要线程有空余,它将始终运行。
由于本次目的是希望只执行一次,所以将它设定为1d,并在其后一天才会再次执行。以这种状态开始,我们可以生成一个测试用的FlowFile,并在之后的一天内什么也不做。
换句话说,如果保持现状,明天的测试会再次运行。此外,要在同一天执行第二次及以后的测试,则需要停止GenerateFlowFile处理器,并再次启动它。
希望能够传达出”只运行一次的Job处理? 这不太像NiFi”这样的意思。
通过NiFi的REST API进行解决
尽管如此,由于各种原因,可能会有人想要实施一次性的操作或自动化。在这种情况下,可以使用REST API。
在使用浏览器操作NiFi Web UI时,若结合浏览器的开发者工具使用,可以观察到进行了什么操作后会发送怎样的HTTP请求。
右键点击「テスト开始」的GenerateFlowFile并选择Start,将发送一个PUT请求到http://localhost:8080/nifi-api/processors/e0f759d1-0158-1000-3ec4-997043625a4c。
当我们检查从浏览器发送的请求的有效负载时,可以看到JSON中包含了state: “RUNNING”。这样就可以通过它来操作处理器的启动/停止状态。
顺便提一下,REST API的端点列表详细说明在NiFi的REST API文档中。
用cURL命令打也可以实现这个,但是能更方便地执行就更好了。
我尝试创建了一个适用于JavaScript的API客户端。
由于我经常需要用REST API进行NiFi的自动化测试,因此我在Node.js中编写了一个用于执行自动化脚本的API客户端。
请查看这些代码以查看可用的Function列表。
使用方法:
# Githubからプロジェクトをクローン
git clone https://github.com/ijokarumawak/nifi-api-client-js.git
# インストール
$ cd nifi-api-client-js
$ npm install
假设将项目克隆到/tmp文件夹,现在我们要创建一个使用该项目的JS文件。
/tmp/test-zipcode-now.js
/tmp/nifi-api-client-js (クローンしたAPIクライアント)
现将test-zipcode-now.js文件的内容更改为如下所示:
// クライアントライブラリを読み込む
var nifiApiClient = require('./nifi-api-client-js');
// 操作するNiFiの接続先設定
var conf = {
host: 'localhost',
port: 8080
};
// クライアントを初期化
var nifiApi = new nifiApiClient.NiFiApi(conf);
// 引数でstart/stopを指定
var running = (process.argv.length > 2 && 'start' === process.argv[2]);
// ProcessorのUUIDを指定して操作
nifiApi.updateProcessorState('e0f759d1-0158-1000-3ec4-997043625a4c', running, (err) => {
if (err) return console.log('Failed.', err);
console.log('Processor is now', running ? 'running' : 'stopped');
});
然后,使用以下命令,我成功地启动和停止了处理器!
# スタート!
$ node test-zipcode-now.js start
Processor is now running
# ストップ!
$ node test-zipcode-now.js stop
Processor is now stopped
当您确认NiFi的用户界面后,可以确认它已经启动了。顺便提一下,点击相关的处理器即可在操作面板中查看UUID。
综合一下
你对此有什么看法呢?使用NiFi的REST API可以做什么呢?可以实现UI上能做的一切!我个人也正在利用这个框架来创建一个NiFi系统测试项目。如果对你有参考价值就太好了。