将从Metricbeat获取的数据导入到InfluxDB中
首先
我試著做了標題上所述的事情。
InfluxDB支持使用InfluxDB行协议或非嵌套的Json进行数据插入。然而,由Metricbeat输出的数据是嵌套的,因此我尝试使用Fluentd将其转换为非嵌套的Json格式,并将其放入InfluxDB。
Metricbeat使用的版本是v5.0.0,而InfluxDB使用的版本是v1.0.0。
目标数据源
我尝试使用metricbeat.yml中的cpu、load、diskio、filesystem、memory和network项目作为数据源。
#------------------------------- System Module -------------------------------
- module: system
metricsets:
# CPU stats
- cpu
# System Load stats
- load
# Per CPU core stats
#- core
# IO stats
- diskio
# Per filesystem stats
- filesystem
# File system summary stats
#- fsstat
# Memory stats
- memory
# Network stats
- network
# Per process stats
#- process
生成的数据以以下Json格式输出。这个Json是load模块的示例。
{
"@timestamp": "2016-05-23T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"metricset": {
"host": "localhost",
"module": "system",
"name": "load",
"rtt": 115
},
"system": {
"load": {
"1": 1.09,
"15": 0.49,
"5": 0.65,
"norm": {
"1": 0.545,
"15": 0.245,
"5": 0.325
}
}
},
"type": "metricsets"
}
参考来源:https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-metricset-system-load.html
出处:https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-metricset-system-load.html
数据中继
将Metricbeat的数据传输到Kafka。使用Fluentd作为Kafka Consumer。通过Fluentd,消费和整理数据,并将其推送到InfluxDB中。
这次使用的Kafka版本是v0.10.0。因此,在metricbeat.yml文件中需要记录使用的Kafka版本。
# Kafka version metricbeat is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
version: 0.10.0
数据整理
Fluentd用于从Kafka消费和数据整理,并将其传输到InfluxDB。Fluentd的配置可以如下所示。
<source>
@type kafka_group
brokers kafkabroker001:9092,kafkabroker002:9092
consumer_group consumer_group_001
topics metrics-topic
format json
</source>
<match metrics-topic>
@type record_reformer
tag ${record['metricset']['name']}
enable_ruby true
auto_typecast true
</match>
<match cpu>
@type record_reformer
tag beat_cpu
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['beat']['hostname']}
user_pct ${record['system']['cpu']['user']['pct'].to_f}
system_pct ${record['system']['cpu']['system']['pct'].to_f}
nice_pct ${record['system']['cpu']['nice']['pct'].to_f}
steal_pct ${record['system']['cpu']['steal']['pct'].to_f}
idle_pct ${record['system']['cpu']['idle']['pct'].to_f}
iowait_pct ${record['system']['cpu']['iowait']['pct'].to_f}
irq_pct ${record['system']['cpu']['irq']['pct'].to_f}
softirq_pct ${record['system']['cpu']['softirq']['pct'].to_f}
time ${Time.parse(record['@timestamp']).to_i}
</record>
</match>
<match memory>
@type record_reformer
tag beat_memory
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['beat']['hostname']}
actual_free ${record['system']['memory']['actual']['free'].to_f}
actual_used_bytes ${record['system']['memory']['actual']['used']['bytes'].to_f}
actual_used_pct ${record['system']['memory']['actual']['used']['pct'].to_f}
free ${record['system']['memory']['free'].to_f}
swap_free ${record['system']['memory']['swap']['free'].to_f}
swap_total ${record['system']['memory']['swap']['total'].to_f}
swap_used_bytes ${record['system']['memory']['swap']['used']['bytes'].to_f}
swap_used_pct ${record['system']['memory']['swap']['used']['pct'].to_f}
total ${record['system']['memory']['total'].to_f}
used_bytes ${record['system']['memory']['used']['bytes'].to_f}
used_pct ${record['system']['memory']['used']['pct'].to_f}
time ${Time.parse(record['@timestamp']).to_i}
</record>
</match>
<match load>
@type record_reformer
tag beat_load
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['beat']['hostname']}
load_1 ${record['system']['load']['1'].to_f}
load_15 ${record['system']['load']['15'].to_f}
load_5 ${record['system']['load']['5'].to_f}
time ${Time.parse(record['@timestamp']).to_i}
</record>
</match>
<match diskio>
@type record_reformer
tag beat_diskio
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['beat']['hostname']}
io_time ${record['system']['diskio']['io']['time'].to_f}
name ${record['system']['diskio']['name'].to_f}
read_bytes ${record['system']['diskio']['read']['bytes'].to_f}
read_count ${record['system']['diskio']['read']['count'].to_f}
read_time ${record['system']['diskio']['read']['time'].to_f}
write_bytes ${record['system']['diskio']['write']['bytes'].to_f}
write_count ${record['system']['diskio']['write']['count'].to_f}
write_time ${record['system']['diskio']['write']['time'].to_f}
time ${Time.parse(record['@timestamp']).to_i}
</record>
</match>
<match filesystem>
@type record_reformer
tag beat_filesystem
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['beat']['hostname']}
available ${record['system']['filesystem']['available'].to_f}
device_name ${record['system']['filesystem']['device_name'].to_f}
files ${record['system']['filesystem']['files'].to_f}
free ${record['system']['filesystem']['free'].to_f}
free_files ${record['system']['filesystem']['free_files'].to_f}
mount_point ${record['system']['filesystem']['mount_point'].to_f}
total ${record['system']['filesystem']['total'].to_f}
used_bytes ${record['system']['filesystem']['used']['bytes'].to_f}
used_pct ${record['system']['filesystem']['used']['pct'].to_f}
time ${Time.parse(record['@timestamp']).to_i}
</record>
</match>
<match network>
@type record_reformer
tag beat_network
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['beat']['hostname']}
name ${record['system']['network']['name'].to_f}
in_bytes ${record['system']['network']['in']['bytes'].to_f}
in_dropped ${record['system']['network']['in']['dropped'].to_f}
in_errors ${record['system']['network']['in']['errors'].to_f}
in_packets ${record['system']['network']['in']['packets'].to_f}
out_bytes ${record['system']['network']['out']['bytes'].to_f}
out_dropped ${record['system']['network']['out']['dropped'].to_f}
out_errors ${record['system']['network']['out']['errors'].to_f}
out_packets ${record['system']['network']['out']['packets'].to_f}
time ${Time.parse(record['@timestamp']).to_i}
</record>
</match>
<match beat_cpu beat_memory beat_load beat_diskio beat_filesystem beat_network>
@type influxdb
host influxdb001
port 8086
dbname metrics_db
user kafka
password kafka
use_ssl false
verify_ssl false
tag_keys ["host"]
time_precision s
flush_interval 10s
</match>
有一个稍微困扰的问题是,在record_reformer中如果不重新命名标签,Fluentd会陷入无限循环。
<match cpu>
@type record_reformer
tag cpu
<match cpu>
@type record_reformer
tag beat_cpu
在InfluxDB中,会创建名为beat_XXX的Measurement。
最后
由于Metricbeat v5的发布,我突发奇想尝试了一下。不过,我是否能够找到一种更好的方法来格式化嵌套的Json呢…