使用Fluentd,Kafka,InfluxDB和Grafana搭建指标监控环境
首先
在搭建度量监测环境时,我考虑使用最现代化的OSS组合,于是尝试了Fluentd、Kafka、InfluxDB和Grafana。需要注意的是,本文未记录各中间件的搭建步骤。更倾向于简单描述通过Fluentd获取服务器度量指标数据,作为Kafka的生产者将数据传送至Kafka,再将数据作为Kafka消费者流向InfluxDB的部分。
总结
各个中间件的职责如下:
在没有集群功能的开源版InfluxDB中实现简单冗余
在v0.12以前,OSS版InfluxDB原本就具有集群功能,但自v0.12开始,OSS版InfluxDB的集群功能已被废除,今后将通过Influx Enterprise和Influx Cloud提供该功能。
请务必参考这里的详细日语文章。
- https://yakst.com/en/posts/3870
由于本次使用的是OSS版InfluxDB,我们选择了Kafka来实现对数据库的双写,并且保证了相当程度的容错性。通过使用Kafka,指标数据在被投入InfluxDB之前会在Kafka集群中进行冗余存储,以保持在保留期内。同时,使用Fluentd作为Kafka Consumer,并通过Consumer Group实现多台的扩展。这样一来,无论从监控的客户端到InfluxDB的哪个环节出现故障,数据都能够可靠地保留在某个节点上,避免了单点故障。
Fluentd的配置(Kafka生产者)
本次我们使用Fluentd来获取度量数据的方法包括dstat和df。如果没有安装dstat,需要提前安装。此外,还需要安装Fluentd的v0.12或更高版本。以下是需要安装的应用程序和Fluent的插件列表。
预先安装的应用程序
-
- dstat
- Fluentd (v0.12 or later)
需要安装的Fluentd插件。
-
- fluent-plugin-record-reformer
-
- fluent-plugin-dstat
- fluent-plugin-kafka
将Fluentd的配置文件设置为以下内容。
<source>
@type dstat
tag raw_dstat
option -cmldrn
delay 10
tmp_file /tmp/dstat_all.csv
</source>
<source>
@type exec
tag raw_df
command df -TP | sed 1d | sed 's/%//' | sed 's/\s\+/\t/g'
run_interval 10s
format tsv
keys device,type,size,used,available,capacity,mounted_on
</source>
<filter raw_df>
@type record_transformer
enable_ruby true
<record>
hostname ${hostname}
</record>
</filter>
<match raw_dstat raw_df>
@type kafka_buffered
brokers kafkabroker001:9092,kafkabroker002:9092
default_topic metrics-topic
flush_interval 60
buffer_type file
buffer_path /tmp/td-agent.*.buffer
output_data_type json
output_include_tag true
output_include_time true
</match>
假设Broker是kafkabroker001:9092和kafkabroker002:9092,Topic名称为metrics-topic。关于df,我们能获取到设备的容量,但是Fluentd的标签名变成了df.<设备名>,并且获取到的json中没有设备名,因此我们需要使用reord_reformaer进行修改。
Fluentd的配置(Kafka消费者)
这次,我们将使用Fluentd作为Kafka的消费者。作为消费者,我们从Kafka中获取数据并将其进行双重发布到InfluxDB。
需要进行安装的Fluentd插件
-
- fluent-plugin-record-reformer
-
- fluent-plugin-influxdb
- fluent-plugin-kafka
将Fluentd的配置文件conf设置如下。
<source>
@type kafka_group
brokers kafkabroker001:9092,kafkabroker002:9092
consumer_group consumer_group_001
topics metrics-topic
format json
</source>
<match metrics-topic>
@type record_reformer
tag ${record['tag']}
enable_ruby true
auto_typecast true
</match>
<match raw_dstat>
@type copy
<store>
@type record_reformer
tag cpu
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
usr ${record['dstat']['total_cpu_usage']['usr'].to_f}
sys ${record['dstat']['total_cpu_usage']['sys'].to_f}
idl ${record['dstat']['total_cpu_usage']['idl'].to_f}
wai ${record['dstat']['total_cpu_usage']['wai'].to_f}
hiq ${record['dstat']['total_cpu_usage']['hiq'].to_f}
siq ${record['dstat']['total_cpu_usage']['siq'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag mem
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
used ${record['dstat']['memory_usage']['used'].to_f}
buff ${record['dstat']['memory_usage']['buff'].to_f}
cach ${record['dstat']['memory_usage']['cach'].to_f}
free ${record['dstat']['memory_usage']['free'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag load
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
1m ${record['dstat']['load_avg']['1m'].to_f}
5m ${record['dstat']['load_avg']['5m'].to_f}
15m ${record['dstat']['load_avg']['15m'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag disk
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
read ${record['dstat']['dsk/total']['read'].to_f}
writ ${record['dstat']['dsk/total']['writ'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag diskio
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
read ${record['dstat']['io/total']['read'].to_f}
writ ${record['dstat']['io/total']['writ'].to_f}
time ${record['time']}
</record>
</store>
<store>
@type record_reformer
tag net
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
recv ${record['dstat']['net/total']['recv'].to_f}
send ${record['dstat']['net/total']['send'].to_f}
time ${record['time']}
</record>
</store>
</match>
<match raw_df>
@type record_reformer
tag df
enable_ruby true
auto_typecast true
renew_record true
<record>
host ${record['hostname']}
size ${record['size'].to_f}
used ${record['used'].to_f}
available ${record['available'].to_f}
capacity ${record['capacity'].to_f}
device ${record['device']}
type ${record['type']}
mounted_on ${record['mounted_on']}
time ${record['time']}
</record>
</match>
<filter df cpu mem load disk diskio net>
@type record_transformer
renew_time_key time
</filter>
<filter df cpu mem load disk diskio net>
@type record_transformer
remove_keys time
</filter>
<match df cpu mem load disk diskio net>
@type copy
<store>
@type influxdb
host influxdb001
port 8086
dbname metrics_db
user kafka
password kafka
use_ssl false
verify_ssl false
tag_keys ["host", "device"]
time_precision s
flush_interval 10s
</store>
<store>
@type influxdb
host influxdb002
port 8086
dbname metrics_db
user kafka
password kafka
use_ssl false
verify_ssl false
tag_keys ["host", "device", "type", "mounted_on",]
time_precision s
flush_interval 10s
</store>
</match>
假设InfluxDB的主机名分别为influxdb001和influxdb002,并且假设数据库名称均为metrics_db。另外,在InfluxDB的标签中,我们使用字符串类型将host和device设置为了获取值。
卡夫卡和InfluxDB
请提前创建Kafka的Topic和InfluxDB的数据库。在本文中,不会详细讨论调优的细节。
Grafana 可视化工具
如果在从Grafana连接到InfluxDB时,在InfluxDB的前面放置一个负载均衡器,并设置Grafana连接到负载均衡器,即使其中一个InfluxDB由于故障而崩溃,仍然可以继续通过Grafana查看监控数据。
最后
如果有人告诉我不要做这么麻烦的事,直接用Telegraf,那也没什么关系。但是,如果原本已经用Fluentd做了其他日志收集的目的,又不想额外再安装Telegraf,那么在确实想用Fluentd进行指标监控的情况下,本次的做法是不是合理的方法呢?如果有Fluentd插件可以不用dstat或df,而是直接检查/proc目录下的情况,那可能会更智能一些。这次只是尝试着使用已有的东西来实现是否可行罢了。
为庆祝InfluxDB v1.0.0的发布,这次我写了这篇文章。