使用Kafka通过NiFi进行消费,将数据通过WebSocket传递到Zeppelin,在D3中实时绘制图形
这是Hortonworks Advent Calendar 12/21的文章。
继12/3 NiFi 1.1.0的WebSocket支持之后,这次还是关于WebSocket的内容。
这是一个以教程形式提供学习Zeppelin、NiFi、Kafka和D3等多种知识的划算选择。
前几天,我收到一个朋友的建议,他说:“如果能够通过NiFi的WebSocket,在Zeppelin上实时显示图表,那不是很酷吗?” 这就是我开始进行的原因。
首先,从设计图的形象开始。
飞艇(左上)是一个方便的工具,可以将多个图表以笔记本的形式汇总,并在成员之间共享和分析可视化数据。
这次我们将在Zeppelin中显示一个像图中那样的饼图。要在饼图中显示的数据将通过NiFi通过WebSocket推送。
既然我们可以从服务器端进行推送,那么让我们试试从NiFi接收Kafka消息。我们设想了一种使用场景,即将诸如Spark和Storm等流分析结果发布到Kafka,并在Zeppelin中实时显示。
要试用,您需要安装和启动Zeppelin、NiFi、Kafka和Web浏览器。安装和启动各个软件只需几个命令,非常简单。
搭建环境
Zeppelin的安装和启动
从Zeppelin的下载页面下载二进制包,并解压到任意位置。执行以下命令后,您可以通过8080端口访问Zeppelin的用户界面:
$ cd zeppelin-0.6.2-bin-all
$ ./bin/zeppelin-daemon.sh start
安装和启动 Kafka
按照Kafka的QuickStart指南,下载并解压到合适的位置,然后使用以下命令首先启动Zookeeper:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
接下来打开另一个终端窗口,启动Kafka服务器。
$ bin/kafka-server-start.sh config/server.properties
我们再打开一个终端,创建一个新的主题吧。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# ちゃんと作成できたか確認
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
让我们使用QuickStart中提供的命令,通过终端确认是否可以发送和接收消息。
安装和启动NiFi
从NiFi的下载页面下载二进制文件,并将其解压到适当的位置。
在展开的目录中有一个设置文件,让我们稍微修改一下。Zeppelin和NiFi的默认HTTP监听端口都是8080,所以我们要更改NiFi的端口。
$ vi conf/nifi.properties
# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=
nifi.web.http.port=8081 // 8080以外に変える
然后,启动NiFi。
$ ./bin/nifi.sh start
# 起動には少々時間がかかるので、ログを見ときましょう
$ tail -f logs/nifi-app.log
(次のログが表示されれば、起動が完了した合図です)
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer NiFi has started. The UI is available at the following URLs:
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer http://127.0.0.1:8081/nifi
现在让我们在浏览器中通过localhost:8081访问NiFi的用户界面。接下来,我们来构建NiFi的流程吧!
创建一个NiFi流程
首先需要数据流,从Kafka到NiFi。
接下来,我们将构建一个相对复杂的数据流。即使一开始无法完全设计好,但通过动态调整和逐步搭建流程的方式,这正是NiFi的优势所在。现在我们有一个已准备好的Kafka主题,让我们从那里开始获取数据吧。
将画面上部排列的图标中最左边的Processor图标拖放到白色画布上。
当在搜索框中输入”kafka”,会出现各种结果。针对每个Kafka版本,都有相应的处理器。
为了跟上Kafka快速的版本升级,巧妙地使用NiFi也是不错的选择。在NiFi中,可以创建适应多个Kafka版本的数据流。
好的,因为这次使用的是Kafka 0.10.x,所以请双击选择ConsumeKafka_0_10。
恭喜!现在您已成功添加了NiFi处理器。
将鼠标悬停在警告图标上时,您将收到 “主题名称”、 “组ID”和”关系成功”未设置的提示。请右击处理器,然后点击”配置”。
在PROPERTIES选项卡中,点击Topic Name(s)的Value列,将先前创建的主题名称test输入到Group ID中,可以输入任何内容。在这里我们使用了Consumer Group的名称nifi。
如果设置完毕,请点击”应用”按钮关闭对话框。
以下是這句話的中文同義句: 「這個警告意味著關係成功是不可能的。這意味著接收到的消息從Kafka沒有設定轉發目標,我們需要創建一個。」
在这种情况下非常方便的是UpdateAttribute处理器。让我们添加UpdateAttribute处理器吧。
然后,将鼠标悬停在ConsumeKafka_0_10中间,会显示一个圆形箭头,将其拖动并放置在UpdateAttribute处理器上。
然后,会显示“创建连接”的对话框。点击“添加”来创建连接。
通过这样操作,成功地创建了Connection的连接。由于已完成ConsumeKafka_0_10的设置,警告图标已消失,并且变为停止状态。
UpdateAttribute 警告无需在意,可以继续使用。尝试启动 ConsumeKafka_0_10!
处理器图标变成了播放中的图标,并偶尔在右上方能看到数字1。这表示活动线程数。在NiFi中,同一流程中的多个处理器会并行运行。
向Kafka主题发送消息
NiFi已准备就绪,现在让我们将消息发送到Kafka。请在控制台输入以下命令:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(何かテキストを入力してEnterすると送信される)
Hello Kafka.
在等待下一个输入的状态下,保持该命令一直运行。
当我返回到NiFi的用户界面时,它显示”Queued 1″并标记为”success”!似乎接收到了某些东西。
右击连接,选择“列表队列”。
当然,您可以在成功的消息留存在NiFi中进行确认。在NiFi中,将这些数据称为FlowFile。
点击左侧的信息图标,您可以查看FlowFile的各种信息。
迷你测验
我们可以在NiFi的用户界面上确认从控制台发送的字符串。您知道可以在哪里查看吗?让我们探索一下UI吧 🙂
使用NiFi通过WebSocket将数据推送
卡夫卡(Kafka)-> NiFi的路径已经开辟,原始图像中绘制的右下部分的饼已经完成!接下来我们要创建通过WebSocket将接收到的消息推送给客户端的部分。
简化话题,首先从单一客户开始。
突然创建复杂的流程是一件困难的事情。首先,我们可以使用简化的流程来制作原型。
WebSocket可以处理多个客户端。NiFi的WebSocket组件通过ID管理每个客户端的会话。由于处理多个客户端需要一些技巧,所以我们可以先推迟处理,并将其设计为可以与单个客户端进行交互的流程。
将NiFi变成WebSocket服务器。
请参考我之前的文章,详细介绍了NiFi的WebSocket实现。
为了整理WebSocket周围的流程,让我们创建一个处理组。
在画布上放置一个”Process Group”图标,并随意为其命名。
当您双击所创建的进程组时,您可以进入其中。在这里,我们将进行WebSocket服务器的实现配置。
请使用原生的中文进行以下释义,只需要给出一个选项:
ListenWebSocket和JettyWebSocketServer。
放置ListenWebSocket处理器,并在”配置”中显示”PROPERTIES”选项卡。从WebSocket服务器控制器服务中选择”创建新服务”。
创建JettyWebSocketServer,并将Server URL Path指定为/realtime-data。
申请并完成处理器的设置。
请再次打开ListenWebSocket的属性,然后点击旁边显示的箭头,再点击编辑图标,跳转到ControllerService的设置页面。
将Listen Port设置为9091并点击APPLY以使设置生效。然后点击闪电图标以启用此ControllerService。
再次从ListenWebSocket的Configure中打开SETTINGS选项卡。由于二进制消息和文本消息本次不会被接收和使用,所以将自动关闭。
与ConsumeKafka相同,connected之前暂时放置UpdateAttribute,并启动ListenWebSocket。
现在,已经准备好接受WebSocket客户端的连接了!
websocket.org的回音测试
使用websocket.org的Echo Test进行连接确认。
请在 Location 中输入连接到 NiFi 的 URL。连接后,如果日志中显示“CONNECTED”,则表示连接成功。
在NiFi的一侧,客户端连接的事件应该会生成为FlowFile。当从队列中查看内容时,应该会设置以下属性:
如果有这些信息,我们可以将消息返回给连接的WebSocket客户端。
让我们将这些信息保存在NiFi的缓存中。
将WebSocket会话ID存储到分布式缓存中。
来吧,让我们尝试一下使用NiFi标准提供的DistributedCache机制。它类似于在NiFi上运行的内存中的键值存储。简单地说,它就是一个关联数组。您可以使用任意的键来存储任意的值,并且可以在流程的各个地方进行共享。
首先,让我们创建一个DistributedCache服务器。点击UI左侧的操作面板的设置图标,将显示ControllerService的列表页面。
这里显示了刚刚创建的JettyWebSocketServer,对吗?点击加号图标,我们添加一个DistributedMapCacheServer。
DistributedMapCacheServer的配置可以保持默认值,接下来我们需要添加连接到该缓存服务器的DistributedMapCacheClient。将Server Hostname设置为localhost。
让我们启用已创建的缓存服务器和客户端服务。
准备好缓存服务器后,让我们存储数据。我们要捕获WebSocket客户端连接时生成的会话ID。由于缓存服务器会存储FlowFile的内容部分,所以我们需要提取websocket.session.id的属性值,并将其抽取到内容部分中。
使用ReplaceText处理器来实现此功能。该处理器能够用于替换文本格式FlowFile内容中的字符串。将Replacement Strategy设置为Always Replace,将Replacement Value设置为${websocket.session.id}。这是NiFi表达式,将返回FlowFile属性的值。详细信息将在以后的机会中提供。使用此设置,将使用相应的属性值替换FlowFile内容的值。
添加PutDistributedMapCache处理器来连接ReplaceText的输出。缓存的键是固定值,设置为websocket.session.id。还要指定之前创建的DistributedMapCacheClientService。
虽然变得很长,但是我认为按照这个过程进行下去,会得到如下图所示的流程。将“ListenWebSocket”的“connected”部分的蓝色点拖动,将连接目标移动到“ReplaceText”中。这也是NiFi独有的特点!
替换文本(ReplaceText)和分布式映射缓存放置(PutDistributedMapCache)之间的故障关系已自动终止。流程已按照以下方式完成:
这个生成的WebSocket会话ID已缓存在缓存中!
将从Kafka获取的消息推送到WebSocket客户端。
让我们来尝试向WebSocket客户端推送消息吧!我们将使用PutWebSocket处理器。让我们查看一下设置, 默认情况下是这样的:
会话ID、控制器服务ID和端点ID是根据传入的FlowFile进行解析的。这在处理多个客户端时非常有效,但由于本次是单个客户端的原型,因此我们可以设定为固定值。
点击ControllerService列表中的JettyWebSocketServer信息图标,将其SETTINGS选项卡中的Id复制。
将已复制的ControllerService的Id设置为PutWebSocket处理器的WebSocket ControllerService Id。WebSocket端点Id只需要指定ListenWebSocket处理器正在等待的路径,所以我们设为/realtime-data吧。
只要有WebSocket Session Id,就可以向WebSocket连接的对方返回消息。
因为我们已经在DistributedCache中保存了会话ID,所以现在需要配置FetchDistributedMapCache来提取它。在Cache Entry Identifier中,我们应该指定固定值为websocket.session.id。在存储会话ID到Distributed Cache Service时,我们可以再次利用之前用于存储会话ID的ControllerService。
为了把从缓存中获取的会话ID保存在websocket.session.id属性中,需要设置将缓存值放入属性的步骤。
这样一来,可以在FlowFile的内容中保留从Kafka接收到的消息,并且还可以将WebSocket的会话ID添加到属性中。
目前为止,我们在名为WebSocket Server的处理器组内定义了一系列的处理器。而ConsumeKafka位于根处理器组中。为了在处理器组之间传递流文件,我们需要在最前面添加一个输入端口。因此,我们将输入端口命名为push,用于将传递过来的流文件推送给客户端。
让我们开始推送、获取分布式地图缓存和插入WebSocket。
接下来,让我们回到根Process Group和NiFi Flow,将从ConsumeKafka_0_10获取的数据传输到WebSocket Server,将整个流程连接起来。
那么,让我们尝试使用Kafka的控制台生产者(console-producer)从控制台发送消息。同时,我们也会确认通过WebSocket是否可以接收到Echo Test的消息。
哇!终于联系上了!
在Zeppelin上显示图表
终于到了使用Zeppelin的时候了!从Zeppelin的用户界面(UI)开始,我们创建一个新的笔记本吧。笔记本的名称可以任意取。
让我们试一试,在Zeppelin中显示饼图。输入如下图所示的代码,点击开始即可显示饼图:
这段代码是将具有键值对的二维表形式的数据传递给Zeppelin。然后,Zeppelin可以通过表格、柱状图、饼图等多种方式来显示该数据。
这次,如果能以这种方式传递通过WebSocket接收到的数据就好了,但是我不知道具体的方法。暂且使用%angular解释器,因为它可以自由地定制输出,我会采用这种方法。如果有人知道更简单的方法,请告诉我。
使用Angular解释器执行任意的Javascript代码。
在Zeppelin中,有许多预置的解释器可以使用。当像%angular这样指定开头时,可以切换执行代码的引擎。
本次我们将使用Angular来编写JavaScript代码。虽然说起来,我们并不会太多地使用Angular本身。。
让我们试试运行以下代码。在script标签中执行JavaScript代码,并准备一个用于显示输出结果的div来执行。运行后,结果将显示如下图所示。
使用D3绘制饼图
好的,现在让我们使用D3绘制饼图。虽然代码比较长,但请尝试将以下代码复制粘贴到Zeppelin中。为了确保在同一笔记本中创建多个段落时不重复,graphId会根据顺序进行编号。
%angular
<script>
var graphId = '#realtime-graph-2';
var w = 600;
var h = 400;
var r = Math.min(w, h) / 2;
var color = d3.scale.category20c();
var svg = d3.select(graphId).append('svg')
.attr('width', w)
.attr('height', h)
.append('g')
.attr('transform', 'translate(' + r + ',' + r + ')');
var pie = d3.layout.pie()
.sort(null)
.value(function(d) {return d.value});
var arc = d3.svg.arc()
.outerRadius(r);
var updatePiChart = function(dataPoints) {
var arcs = svg.selectAll('.slice')
.data(pie(dataPoints));
// Add new slices if needed.
var g = arcs.enter()
.append('g')
.attr('class', 'slice');
g.append('path');
g.append('text')
.attr('text-anchor', 'middle');
// Update existing and added slices.
arcs.select('path')
.attr('fill', function(d, i){
return color(i);
})
.attr('d', function(d) {
return arc(d);
});
arcs.select('text')
.attr('transform', function(d) {
d.innerRadius = 0;
d.outerRadius = r;
return 'translate(' + arc.centroid(d) + ")";
})
.text(function(d){
return d.data.label;
});
// Remove exitted slices.
arcs.exit().remove();
}
</script>
<div id="realtime-graph-2">
</div>
D3这部分有点复杂,我想解释一下,但会在另一个机会上…我已经更新了updatePiChart函数,传入数据后会绘制饼图。在Zeppelin上执行这段代码,会生成updatePiChart函数,你可以试着从Web浏览器的开发者工具中运行它。
dataPoints是一个数组,存储了包含标签和数值的对象。
[{"label": "a", "value": 1},
{"label": "b", "value": 2},
{"label": "c", "value": 3}]
你能成功显示图表吗?如果你更改了dataPoints的内容并执行updatePiChart函数,我认为饼图会被更新。
只要把从WebSocket接收到的数据传输过来就可以了!
使用WebSocket与NiFi建立连接
在先前的代码中,我们将添加与NiFi建立WebSocket连接的代码。必需的代码如下所示:
%angular
<script>
var graphId = '#realtime-graph-2';
// ...
var updatePiChart = function(dataPoints) {
// ...
}
// ここから追加
var wsUri = "ws://localhost:9091/realtime-data";
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {console.log('connected')};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {
console.log(evt)
var dataPoints = JSON.parse(evt.data);
updatePiChart(dataPoints);
};
</script>
<div id="realtime-graph-2">
</div>
卡夫卡 -> 尼菲 ->(WebSocket)-> 泽菲林 -> D3!!
这样一切准备就绪了!通过Zeppelin执行段落并使用WebSocket连接NiFi。
接下来,让我们使用console-publisher将dataPoints的JSON字符串发送到Kafka。
NiFi成为WebSocket服务器,成功实现实时图表更新!
支持多个WebSocket客户端
为了简化问题,在本文中仅需考虑单一客户端。我们将WebSocket会话ID存储在分布式缓存中,并在从Kafka接收消息时使用它。
如果这样的话,当另一个客户端通过WebSocket连接时,缓存中的WebSocket会话ID会被更新,导致无法在多个客户端上使用。
在NiFi的流程中,可以实现以下循环的一个解决方案:
当WebSocket客户端连接时,将循环不断地执行此操作。它将接收到的JSON数据和接收时间戳一起存储在分布式缓存中。
在循环流文件中,包含WebSocket会话ID、控制器服务ID、终端点ID和最后的推送时间戳。
当下检查缓存时,如果信息比上次更新的信息要新,就向WebSocket客户端发送推送,否则什么都不做继续循环。
如果构建这样的流程,每当客户端连接时都会创建一个Flow File,并且客户端的信息将保持在该Flow File内,因此能够支持多个客户端。
我已经在Realtime Data Visualization with Zeppelin via NiFi WebSocket的Gist中发布了支持多个客户端的NiFi模板文件。如果您有兴趣,请下载并导入到NiFi中,然后探索其中的内容。
总结
在本次教程中,我们尝试构建一个通过组合各种开源软件来显示实时图表的机制。
不仅可以将NiFi用作数据收集工具,还可以从NiFi发送信息。而且,我们只需构建NiFi流程,而无需在服务器端进行任何编码。
如果您在年底和年初期间想尝试一些新鲜事物,务必试一试!