通过 MiNiFi 获取传感器数据并传输到 NiFi,然后将数据写入 HDFS 和 Hive
下面是连接传感器到树莓派并使用MiNiFi追踪传感器数据的步骤,然后将其传输到NiFi,并在NiFi中将其写入HDFS和Hive的介绍。
我希望实现的目标
-
- MiNiFiでセンサーデータを取得し、NiFiに転送する
-
- NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する
-
- NiFiでセンサーデータをHiveテーブルに保存する
- NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する
环境信息
-
- センサーをラズパイに接続
-
- ラズパイでセンサーデータを取るPythonスクリプト
- HDP3.1(Hadoop 3.1.1, Hive 3.1.0) & HDF 3.3.1(NiFi 1.8.0, Kafka 2.0.0)、クラスタは同一Ambariで管理
构成
整个数据流
让我们试试吧。
1、MiNiFi的配置设置
在树莓派自身的操作系统安装和与传感器连接完成后(这里略去不提),进行MiNiFi的安装。
MiNiFi的Tar文件可在Hortonworks网站上找到。
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/release-notes/content/hdf_repository_locations.html
sudo su -
cd /home/pi
#HortonworksのサイトからMiNiFiのtarファイルをダウンロードする
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-0.6.0.3.3.1.0-10-bin.tar.gz
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
#ファイル解答し、シンボリックリンクを作成する
tar -zxvf minifi-0.6.0.3.3.1.0-10-bin.tar.gz
tar -zxvf minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
ln -s minifi-0.6.0.3.3.1.0-10 minifi
ln -s minifi-toolkit-0.6.0.3.3.1.0-10 tool_minifi
#MiNiFiをインストールする
./minifi/bin/minifi.sh install
#必要に応じてリモートNiFiホストを/etc/hostsに追加する
使用Python脚本获取传感器数据。
Python脚本会在/home/pi/bme280-data文件夹中生成类似于2019-06-04.csv这样的文件。
数据内容为:以逗号分隔的传感器ID、日期、时间、气压、温度、湿度。
#coding: utf-8
import bme280_custom
import datetime
import os
dir_path = '/home/pi/bme280-data’
now = datetime.datetime.now()
filename = now.strftime('%Y-%m-%d’)
label = now.strftime('%H:%M:%S’)
csv = bme280_custom.readData()
if not os.path.exists('/home/pi/bme280-data’):
os.makedirs('/home/pi/bme280-data’)
f = open('/home/pi/bme280-data/'+filename+'.csv','a’)
f.write('1,'+filename +","+label+","+csv+"\n")
f.close()
请使用Cron或其他定时执行器,将该脚本定期执行(例如每隔10秒执行一次)。
3、使用NiFi创建数据流,并将其分发给MiNiFi。
由于在MiNiFi中直接创建数据流比较困难,通常的做法是在NiFi中创建,并以模板(.xml)的形式导出,然后使用MiNiFi工具包将.xml转换为.yml。
本次我们将使用TailFile处理器和RemoteProcessGroup。
sudo su -
cd /home/pi
#MiNiFi toolkitでxmlからymlに変換する
./tool_minifi/bin/config.sh transform /home/pi/sensor_minifi4.xml ./sensor_minifi4.yml
#既存のconfig.ymlファイルをバックアップし、新しいymlで上書きする
cp -p minifi/conf/config.yml minifi/conf/config.yml.bk
cp -p sensor_minifi4.yml minifi/conf/config.yml
#MiNiFiプロセスを起動する。他にstop, restartなどオプションがある
./minifi/bin/minifi.sh start
一旦达到这一点,MiNiFi就能够将传感器数据传输到NiFi。
使用NiFi将传感器数据作为原始数据通过Kafka传输并保存到HDFS中。
用NiFi将传感器数据保存到Hive表中。
在这里,我们将使用Hive Streaming将传感器数据实时添加到Hive表中。
要实现这一目标,Hive表需要满足一些要求。
请参阅此处的 “StreamingDataIngest-StreamingRequirements” 以获取详细信息。
1,ACIDサポートのため、hive-site.xmlに以下3つのパラメータを設定(HDP3.1ではすでに設定ずみ)
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true
hive.compactor.worker.threads > 0
2,テーブル作成時に以下を含むことが必要
STORED AS ORC
tblproperties("transactional"="true")
CLUSTERED BY (cloumn-name) INTO <num> BUCKETS
将按照以下方式创建表格(sensor_data_user1)。
CREATE TABLE sensor_data_user1(id int,time_str string, pressure double, temperature double, humidity double)
PARTITIONED BY(date_str string)
CLUSTERED BY (id) INTO 5 BUCKETS
STORED AS ORC
tblproperties("transactional" = "true");
接下来我将解释处理器。
Avro模式如下所示。
{
"type": "record",
"namespace": "sensor_data_schema1",
"name": "sensor_data_schema1",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "date_str", "type": "string" },
{ "name": "time_str", "type": "string" },
{ "name": "pressure", "type": "double" },
{ "name": "temperature", "type": "double" },
{ "name": "humidity", "type": "double" }
]
}
6、使用NiFi加工数据,当温度超过阈值时,发送Slack告警通知。
最后的备忘录:事前准备。 de : .)
# Kafka topic作成
cd /usr/hdp/current/kafka-broker
./kafka-topics.sh --create --zookeeper hdp-srv1.demotest.com:218,hdp-srv2.demotest.com:2181,hdp-srv3.demotest.com:2181 --replication-factor 3 --partitions 1 --topic sensor_data_user1
# HDFS folder作成
sudo su - hdfs
hdfs dfs -mkdir -p /tmp/sensor_data/user1
hdfs dfs -chmod -R 777 /tmp/sensor_data/user1
# sensor_data_user1テーブルにNiFiユーザーにrwx権限追加
hdfs dfs -setfacl -m -R user:nifi:rwx /warehouse/tablespace/managed/hive/sensor_data_user1