使用Kafka通过NiFi进行消费,将数据通过WebSocket传递到Zeppelin,在D3中实时绘制图形

这是Hortonworks Advent Calendar 12/21的文章。

继12/3 NiFi 1.1.0的WebSocket支持之后,这次还是关于WebSocket的内容。

这是一个以教程形式提供学习Zeppelin、NiFi、Kafka和D3等多种知识的划算选择。

前几天,我收到一个朋友的建议,他说:“如果能够通过NiFi的WebSocket,在Zeppelin上实时显示图表,那不是很酷吗?” 这就是我开始进行的原因。

首先,从设计图的形象开始。

リアルタイムグラフ描画

飞艇(左上)是一个方便的工具,可以将多个图表以笔记本的形式汇总,并在成员之间共享和分析可视化数据。

这次我们将在Zeppelin中显示一个像图中那样的饼图。要在饼图中显示的数据将通过NiFi通过WebSocket推送。

既然我们可以从服务器端进行推送,那么让我们试试从NiFi接收Kafka消息。我们设想了一种使用场景,即将诸如Spark和Storm等流分析结果发布到Kafka,并在Zeppelin中实时显示。

要试用,您需要安装和启动Zeppelin、NiFi、Kafka和Web浏览器。安装和启动各个软件只需几个命令,非常简单。

搭建环境

Zeppelin的安装和启动

从Zeppelin的下载页面下载二进制包,并解压到任意位置。执行以下命令后,您可以通过8080端口访问Zeppelin的用户界面:

$ cd zeppelin-0.6.2-bin-all
$ ./bin/zeppelin-daemon.sh start

安装和启动 Kafka

按照Kafka的QuickStart指南,下载并解压到合适的位置,然后使用以下命令首先启动Zookeeper:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

接下来打开另一个终端窗口,启动Kafka服务器。

$ bin/kafka-server-start.sh config/server.properties

我们再打开一个终端,创建一个新的主题吧。


$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# ちゃんと作成できたか確認
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

让我们使用QuickStart中提供的命令,通过终端确认是否可以发送和接收消息。

安装和启动NiFi

从NiFi的下载页面下载二进制文件,并将其解压到适当的位置。

在展开的目录中有一个设置文件,让我们稍微修改一下。Zeppelin和NiFi的默认HTTP监听端口都是8080,所以我们要更改NiFi的端口。

$ vi conf/nifi.properties

# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=
nifi.web.http.port=8081 // 8080以外に変える

然后,启动NiFi。

$ ./bin/nifi.sh start
# 起動には少々時間がかかるので、ログを見ときましょう
$ tail -f logs/nifi-app.log
(次のログが表示されれば、起動が完了した合図です)
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer NiFi has started. The UI is available at the following URLs:
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer http://127.0.0.1:8081/nifi

现在让我们在浏览器中通过localhost:8081访问NiFi的用户界面。接下来,我们来构建NiFi的流程吧!

创建一个NiFi流程

首先需要数据流,从Kafka到NiFi。

接下来,我们将构建一个相对复杂的数据流。即使一开始无法完全设计好,但通过动态调整和逐步搭建流程的方式,这正是NiFi的优势所在。现在我们有一个已准备好的Kafka主题,让我们从那里开始获取数据吧。

image

将画面上部排列的图标中最左边的Processor图标拖放到白色画布上。

当在搜索框中输入”kafka”,会出现各种结果。针对每个Kafka版本,都有相应的处理器。

image
Kafkaプロセッサ0.8.xGetKafka, PutKafka0.9.xConsumeKafka, PublishKafka0.10.xConsumeKafka_0_10, PublishKafka_0_10

为了跟上Kafka快速的版本升级,巧妙地使用NiFi也是不错的选择。在NiFi中,可以创建适应多个Kafka版本的数据流。

好的,因为这次使用的是Kafka 0.10.x,所以请双击选择ConsumeKafka_0_10。

恭喜!现在您已成功添加了NiFi处理器。

image

将鼠标悬停在警告图标上时,您将收到 “主题名称”、 “组ID”和”关系成功”未设置的提示。请右击处理器,然后点击”配置”。

image

在PROPERTIES选项卡中,点击Topic Name(s)的Value列,将先前创建的主题名称test输入到Group ID中,可以输入任何内容。在这里我们使用了Consumer Group的名称nifi。

image

如果设置完毕,请点击”应用”按钮关闭对话框。

image

以下是這句話的中文同義句: 「這個警告意味著關係成功是不可能的。這意味著接收到的消息從Kafka沒有設定轉發目標,我們需要創建一個。」

在这种情况下非常方便的是UpdateAttribute处理器。让我们添加UpdateAttribute处理器吧。

然后,将鼠标悬停在ConsumeKafka_0_10中间,会显示一个圆形箭头,将其拖动并放置在UpdateAttribute处理器上。

image

然后,会显示“创建连接”的对话框。点击“添加”来创建连接。

image

通过这样操作,成功地创建了Connection的连接。由于已完成ConsumeKafka_0_10的设置,警告图标已消失,并且变为停止状态。

image

UpdateAttribute 警告无需在意,可以继续使用。尝试启动 ConsumeKafka_0_10!

image

处理器图标变成了播放中的图标,并偶尔在右上方能看到数字1。这表示活动线程数。在NiFi中,同一流程中的多个处理器会并行运行。

image

向Kafka主题发送消息

NiFi已准备就绪,现在让我们将消息发送到Kafka。请在控制台输入以下命令:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(何かテキストを入力してEnterすると送信される)
Hello Kafka.

在等待下一个输入的状态下,保持该命令一直运行。

当我返回到NiFi的用户界面时,它显示”Queued 1″并标记为”success”!似乎接收到了某些东西。

image

右击连接,选择“列表队列”。

image

当然,您可以在成功的消息留存在NiFi中进行确认。在NiFi中,将这些数据称为FlowFile。

image

点击左侧的信息图标,您可以查看FlowFile的各种信息。

迷你测验

我们可以在NiFi的用户界面上确认从控制台发送的字符串。您知道可以在哪里查看吗?让我们探索一下UI吧 🙂

使用NiFi通过WebSocket将数据推送

卡夫卡(Kafka)-> NiFi的路径已经开辟,原始图像中绘制的右下部分的饼已经完成!接下来我们要创建通过WebSocket将接收到的消息推送给客户端的部分。

简化话题,首先从单一客户开始。

突然创建复杂的流程是一件困难的事情。首先,我们可以使用简化的流程来制作原型。

WebSocket可以处理多个客户端。NiFi的WebSocket组件通过ID管理每个客户端的会话。由于处理多个客户端需要一些技巧,所以我们可以先推迟处理,并将其设计为可以与单个客户端进行交互的流程。

将NiFi变成WebSocket服务器。

请参考我之前的文章,详细介绍了NiFi的WebSocket实现。

为了整理WebSocket周围的流程,让我们创建一个处理组。

image

在画布上放置一个”Process Group”图标,并随意为其命名。

image
image

当您双击所创建的进程组时,您可以进入其中。在这里,我们将进行WebSocket服务器的实现配置。

请使用原生的中文进行以下释义,只需要给出一个选项:

ListenWebSocket和JettyWebSocketServer。

放置ListenWebSocket处理器,并在”配置”中显示”PROPERTIES”选项卡。从WebSocket服务器控制器服务中选择”创建新服务”。

image

创建JettyWebSocketServer,并将Server URL Path指定为/realtime-data。

image

申请并完成处理器的设置。

请再次打开ListenWebSocket的属性,然后点击旁边显示的箭头,再点击编辑图标,跳转到ControllerService的设置页面。

image

将Listen Port设置为9091并点击APPLY以使设置生效。然后点击闪电图标以启用此ControllerService。

image

再次从ListenWebSocket的Configure中打开SETTINGS选项卡。由于二进制消息和文本消息本次不会被接收和使用,所以将自动关闭。

image

与ConsumeKafka相同,connected之前暂时放置UpdateAttribute,并启动ListenWebSocket。

image

现在,已经准备好接受WebSocket客户端的连接了!

websocket.org的回音测试

使用websocket.org的Echo Test进行连接确认。

请在 Location 中输入连接到 NiFi 的 URL。连接后,如果日志中显示“CONNECTED”,则表示连接成功。

image

在NiFi的一侧,客户端连接的事件应该会生成为FlowFile。当从队列中查看内容时,应该会设置以下属性:

image

如果有这些信息,我们可以将消息返回给连接的WebSocket客户端。
让我们将这些信息保存在NiFi的缓存中。

将WebSocket会话ID存储到分布式缓存中。

来吧,让我们尝试一下使用NiFi标准提供的DistributedCache机制。它类似于在NiFi上运行的内存中的键值存储。简单地说,它就是一个关联数组。您可以使用任意的键来存储任意的值,并且可以在流程的各个地方进行共享。

首先,让我们创建一个DistributedCache服务器。点击UI左侧的操作面板的设置图标,将显示ControllerService的列表页面。

image
image

这里显示了刚刚创建的JettyWebSocketServer,对吗?点击加号图标,我们添加一个DistributedMapCacheServer。

image
image

DistributedMapCacheServer的配置可以保持默认值,接下来我们需要添加连接到该缓存服务器的DistributedMapCacheClient。将Server Hostname设置为localhost。

image

让我们启用已创建的缓存服务器和客户端服务。

image

准备好缓存服务器后,让我们存储数据。我们要捕获WebSocket客户端连接时生成的会话ID。由于缓存服务器会存储FlowFile的内容部分,所以我们需要提取websocket.session.id的属性值,并将其抽取到内容部分中。

使用ReplaceText处理器来实现此功能。该处理器能够用于替换文本格式FlowFile内容中的字符串。将Replacement Strategy设置为Always Replace,将Replacement Value设置为${websocket.session.id}。这是NiFi表达式,将返回FlowFile属性的值。详细信息将在以后的机会中提供。使用此设置,将使用相应的属性值替换FlowFile内容的值。

image

添加PutDistributedMapCache处理器来连接ReplaceText的输出。缓存的键是固定值,设置为websocket.session.id。还要指定之前创建的DistributedMapCacheClientService。

image

虽然变得很长,但是我认为按照这个过程进行下去,会得到如下图所示的流程。将“ListenWebSocket”的“connected”部分的蓝色点拖动,将连接目标移动到“ReplaceText”中。这也是NiFi独有的特点!

image

替换文本(ReplaceText)和分布式映射缓存放置(PutDistributedMapCache)之间的故障关系已自动终止。流程已按照以下方式完成:

image

这个生成的WebSocket会话ID已缓存在缓存中!

将从Kafka获取的消息推送到WebSocket客户端。

让我们来尝试向WebSocket客户端推送消息吧!我们将使用PutWebSocket处理器。让我们查看一下设置, 默认情况下是这样的:

image

会话ID、控制器服务ID和端点ID是根据传入的FlowFile进行解析的。这在处理多个客户端时非常有效,但由于本次是单个客户端的原型,因此我们可以设定为固定值。

点击ControllerService列表中的JettyWebSocketServer信息图标,将其SETTINGS选项卡中的Id复制。

image
image

将已复制的ControllerService的Id设置为PutWebSocket处理器的WebSocket ControllerService Id。WebSocket端点Id只需要指定ListenWebSocket处理器正在等待的路径,所以我们设为/realtime-data吧。

image

只要有WebSocket Session Id,就可以向WebSocket连接的对方返回消息。

因为我们已经在DistributedCache中保存了会话ID,所以现在需要配置FetchDistributedMapCache来提取它。在Cache Entry Identifier中,我们应该指定固定值为websocket.session.id。在存储会话ID到Distributed Cache Service时,我们可以再次利用之前用于存储会话ID的ControllerService。

为了把从缓存中获取的会话ID保存在websocket.session.id属性中,需要设置将缓存值放入属性的步骤。
这样一来,可以在FlowFile的内容中保留从Kafka接收到的消息,并且还可以将WebSocket的会话ID添加到属性中。

image

目前为止,我们在名为WebSocket Server的处理器组内定义了一系列的处理器。而ConsumeKafka位于根处理器组中。为了在处理器组之间传递流文件,我们需要在最前面添加一个输入端口。因此,我们将输入端口命名为push,用于将传递过来的流文件推送给客户端。

让我们开始推送、获取分布式地图缓存和插入WebSocket。

image

接下来,让我们回到根Process Group和NiFi Flow,将从ConsumeKafka_0_10获取的数据传输到WebSocket Server,将整个流程连接起来。

image

那么,让我们尝试使用Kafka的控制台生产者(console-producer)从控制台发送消息。同时,我们也会确认通过WebSocket是否可以接收到Echo Test的消息。

image

哇!终于联系上了!

在Zeppelin上显示图表

终于到了使用Zeppelin的时候了!从Zeppelin的用户界面(UI)开始,我们创建一个新的笔记本吧。笔记本的名称可以任意取。

image

让我们试一试,在Zeppelin中显示饼图。输入如下图所示的代码,点击开始即可显示饼图:

image

这段代码是将具有键值对的二维表形式的数据传递给Zeppelin。然后,Zeppelin可以通过表格、柱状图、饼图等多种方式来显示该数据。

这次,如果能以这种方式传递通过WebSocket接收到的数据就好了,但是我不知道具体的方法。暂且使用%angular解释器,因为它可以自由地定制输出,我会采用这种方法。如果有人知道更简单的方法,请告诉我。

使用Angular解释器执行任意的Javascript代码。

在Zeppelin中,有许多预置的解释器可以使用。当像%angular这样指定开头时,可以切换执行代码的引擎。

本次我们将使用Angular来编写JavaScript代码。虽然说起来,我们并不会太多地使用Angular本身。。

让我们试试运行以下代码。在script标签中执行JavaScript代码,并准备一个用于显示输出结果的div来执行。运行后,结果将显示如下图所示。

image

使用D3绘制饼图

好的,现在让我们使用D3绘制饼图。虽然代码比较长,但请尝试将以下代码复制粘贴到Zeppelin中。为了确保在同一笔记本中创建多个段落时不重复,graphId会根据顺序进行编号。


%angular

<script>
var graphId = '#realtime-graph-2';
var w = 600;
var h = 400;
var r = Math.min(w, h) / 2;
var color = d3.scale.category20c();

var svg = d3.select(graphId).append('svg')
            .attr('width', w)
            .attr('height', h)
            .append('g')
            .attr('transform', 'translate(' + r + ',' + r + ')');

var pie = d3.layout.pie()
            .sort(null)
            .value(function(d) {return d.value});

var arc = d3.svg.arc()
            .outerRadius(r);


var updatePiChart = function(dataPoints) {

    var arcs = svg.selectAll('.slice')
        .data(pie(dataPoints));


    // Add new slices if needed.
    var g = arcs.enter()
        .append('g')
        .attr('class', 'slice');
    g.append('path');
    g.append('text')
        .attr('text-anchor', 'middle');

    // Update existing and added slices.
    arcs.select('path')
        .attr('fill', function(d, i){
            return color(i);
        })
        .attr('d', function(d) {
            return arc(d);
        });

    arcs.select('text')
        .attr('transform', function(d) {
            d.innerRadius = 0;
            d.outerRadius = r;
            return 'translate(' + arc.centroid(d) + ")";
        })
        .text(function(d){
            return d.data.label;
        });

    // Remove exitted slices.
    arcs.exit().remove();
}

</script>

<div id="realtime-graph-2">
</div>

D3这部分有点复杂,我想解释一下,但会在另一个机会上…我已经更新了updatePiChart函数,传入数据后会绘制饼图。在Zeppelin上执行这段代码,会生成updatePiChart函数,你可以试着从Web浏览器的开发者工具中运行它。

image

dataPoints是一个数组,存储了包含标签和数值的对象。

[{"label": "a", "value": 1},
 {"label": "b", "value": 2},
 {"label": "c", "value": 3}]

你能成功显示图表吗?如果你更改了dataPoints的内容并执行updatePiChart函数,我认为饼图会被更新。

只要把从WebSocket接收到的数据传输过来就可以了!

使用WebSocket与NiFi建立连接

在先前的代码中,我们将添加与NiFi建立WebSocket连接的代码。必需的代码如下所示:

%angular

<script>
var graphId = '#realtime-graph-2';

// ...

var updatePiChart = function(dataPoints) {
 // ...
}


// ここから追加
var wsUri = "ws://localhost:9091/realtime-data";

websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {console.log('connected')};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {
    console.log(evt)
    var dataPoints = JSON.parse(evt.data);
    updatePiChart(dataPoints);
};
</script>

<div id="realtime-graph-2">
</div>

卡夫卡 -> 尼菲 ->(WebSocket)-> 泽菲林 -> D3!!

这样一切准备就绪了!通过Zeppelin执行段落并使用WebSocket连接NiFi。

接下来,让我们使用console-publisher将dataPoints的JSON字符串发送到Kafka。

NiFi成为WebSocket服务器,成功实现实时图表更新!

image

支持多个WebSocket客户端

为了简化问题,在本文中仅需考虑单一客户端。我们将WebSocket会话ID存储在分布式缓存中,并在从Kafka接收消息时使用它。

如果这样的话,当另一个客户端通过WebSocket连接时,缓存中的WebSocket会话ID会被更新,导致无法在多个客户端上使用。

在NiFi的流程中,可以实现以下循环的一个解决方案:

当WebSocket客户端连接时,将循环不断地执行此操作。它将接收到的JSON数据和接收时间戳一起存储在分布式缓存中。

在循环流文件中,包含WebSocket会话ID、控制器服务ID、终端点ID和最后的推送时间戳。

当下检查缓存时,如果信息比上次更新的信息要新,就向WebSocket客户端发送推送,否则什么都不做继续循环。

如果构建这样的流程,每当客户端连接时都会创建一个Flow File,并且客户端的信息将保持在该Flow File内,因此能够支持多个客户端。

我已经在Realtime Data Visualization with Zeppelin via NiFi WebSocket的Gist中发布了支持多个客户端的NiFi模板文件。如果您有兴趣,请下载并导入到NiFi中,然后探索其中的内容。

总结

在本次教程中,我们尝试构建一个通过组合各种开源软件来显示实时图表的机制。

不仅可以将NiFi用作数据收集工具,还可以从NiFi发送信息。而且,我们只需构建NiFi流程,而无需在服务器端进行任何编码。

如果您在年底和年初期间想尝试一些新鲜事物,务必试一试!

广告
将在 10 秒后关闭
bannerAds