通过 MiNiFi 获取传感器数据并传输到 NiFi,然后将数据写入 HDFS 和 Hive

下面是连接传感器到树莓派并使用MiNiFi追踪传感器数据的步骤,然后将其传输到NiFi,并在NiFi中将其写入HDFS和Hive的介绍。

我希望实现的目标

    • MiNiFiでセンサーデータを取得し、NiFiに転送する

 

    • NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する

 

    • NiFiでセンサーデータをHiveテーブルに保存する

 

    NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する

环境信息

    • センサーをラズパイに接続

 

    • ラズパイでセンサーデータを取るPythonスクリプト

 

    HDP3.1(Hadoop 3.1.1, Hive 3.1.0) & HDF 3.3.1(NiFi 1.8.0, Kafka 2.0.0)、クラスタは同一Ambariで管理

构成

enter image description here

整个数据流

enter image description here

让我们试试吧。

1、MiNiFi的配置设置

在树莓派自身的操作系统安装和与传感器连接完成后(这里略去不提),进行MiNiFi的安装。
MiNiFi的Tar文件可在Hortonworks网站上找到。
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/release-notes/content/hdf_repository_locations.html

sudo su -
cd /home/pi

#HortonworksのサイトからMiNiFiのtarファイルをダウンロードする
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-0.6.0.3.3.1.0-10-bin.tar.gz
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz

#ファイル解答し、シンボリックリンクを作成する
tar -zxvf minifi-0.6.0.3.3.1.0-10-bin.tar.gz
tar -zxvf minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
ln -s minifi-0.6.0.3.3.1.0-10 minifi
ln -s minifi-toolkit-0.6.0.3.3.1.0-10 tool_minifi

#MiNiFiをインストールする
./minifi/bin/minifi.sh install
#必要に応じてリモートNiFiホストを/etc/hostsに追加する

使用Python脚本获取传感器数据。

Python脚本会在/home/pi/bme280-data文件夹中生成类似于2019-06-04.csv这样的文件。

数据内容为:以逗号分隔的传感器ID、日期、时间、气压、温度、湿度。

#coding: utf-8
import bme280_custom
import datetime
import os

dir_path = '/home/pi/bme280-data’
now = datetime.datetime.now()
filename = now.strftime('%Y-%m-%d’)
label = now.strftime('%H:%M:%S’)
csv = bme280_custom.readData()
if not os.path.exists('/home/pi/bme280-data’):
  os.makedirs('/home/pi/bme280-data’)
f = open('/home/pi/bme280-data/'+filename+'.csv','a’)
f.write('1,'+filename +","+label+","+csv+"\n")
f.close()

请使用Cron或其他定时执行器,将该脚本定期执行(例如每隔10秒执行一次)。

3、使用NiFi创建数据流,并将其分发给MiNiFi。

由于在MiNiFi中直接创建数据流比较困难,通常的做法是在NiFi中创建,并以模板(.xml)的形式导出,然后使用MiNiFi工具包将.xml转换为.yml。
本次我们将使用TailFile处理器和RemoteProcessGroup。

enter image description here
sudo su -
cd /home/pi

#MiNiFi toolkitでxmlからymlに変換する
./tool_minifi/bin/config.sh transform /home/pi/sensor_minifi4.xml ./sensor_minifi4.yml

#既存のconfig.ymlファイルをバックアップし、新しいymlで上書きする
cp -p minifi/conf/config.yml minifi/conf/config.yml.bk
cp -p sensor_minifi4.yml minifi/conf/config.yml

#MiNiFiプロセスを起動する。他にstop, restartなどオプションがある
./minifi/bin/minifi.sh start

一旦达到这一点,MiNiFi就能够将传感器数据传输到NiFi。

使用NiFi将传感器数据作为原始数据通过Kafka传输并保存到HDFS中。

enter image description here
enter image description here

用NiFi将传感器数据保存到Hive表中。

在这里,我们将使用Hive Streaming将传感器数据实时添加到Hive表中。
要实现这一目标,Hive表需要满足一些要求。
请参阅此处的 “StreamingDataIngest-StreamingRequirements” 以获取详细信息。

1,ACIDサポートのため、hive-site.xmlに以下3つのパラメータを設定(HDP3.1ではすでに設定ずみ)
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 
hive.compactor.initiator.on = true 
hive.compactor.worker.threads > 0

2,テーブル作成時に以下を含むことが必要
STORED AS ORC
tblproperties("transactional"="true")
CLUSTERED BY (cloumn-name) INTO <num> BUCKETS

将按照以下方式创建表格(sensor_data_user1)。

CREATE TABLE sensor_data_user1(id int,time_str string, pressure double, temperature double, humidity double)
PARTITIONED BY(date_str string) 
CLUSTERED BY (id) INTO 5 BUCKETS 
STORED AS ORC 
tblproperties("transactional" = "true");
enter image description here

接下来我将解释处理器。

image.png

Avro模式如下所示。

    {
         "type": "record",
         "namespace": "sensor_data_schema1",
         "name": "sensor_data_schema1",
         "fields": [
               { "name": "id", "type": "int" },
           { "name": "date_str", "type": "string" },
           { "name": "time_str", "type": "string" },
           { "name": "pressure", "type": "double" },
           { "name": "temperature", "type": "double" },
           { "name": "humidity", "type": "double" }
         ]
    } 
enter image description here
enter image description here

6、使用NiFi加工数据,当温度超过阈值时,发送Slack告警通知。

enter image description here
image.png
enter image description here
image.png
image.png
enter image description here
enter image description here

最后的备忘录:事前准备。 de : .)

# Kafka topic作成
cd /usr/hdp/current/kafka-broker
./kafka-topics.sh --create --zookeeper hdp-srv1.demotest.com:218,hdp-srv2.demotest.com:2181,hdp-srv3.demotest.com:2181 --replication-factor 3 --partitions 1 --topic sensor_data_user1

# HDFS folder作成
sudo su - hdfs 
hdfs dfs -mkdir -p /tmp/sensor_data/user1
hdfs dfs -chmod -R 777 /tmp/sensor_data/user1

# sensor_data_user1テーブルにNiFiユーザーにrwx権限追加
hdfs dfs -setfacl -m -R user:nifi:rwx /warehouse/tablespace/managed/hive/sensor_data_user1
广告
将在 10 秒后关闭
bannerAds