使用Fluentd,Kafka,InfluxDB和Grafana搭建指标监控环境

首先

在搭建度量监测环境时,我考虑使用最现代化的OSS组合,于是尝试了Fluentd、Kafka、InfluxDB和Grafana。需要注意的是,本文未记录各中间件的搭建步骤。更倾向于简单描述通过Fluentd获取服务器度量指标数据,作为Kafka的生产者将数据传送至Kafka,再将数据作为Kafka消费者流向InfluxDB的部分。

总结

各个中间件的职责如下:

MiddlewareRoleFluentdKafka Producer & ConsumerKafkaMessaging QueueInfluxDBTSDBGrafanaWeb Interface
qiita-metrics.png

在没有集群功能的开源版InfluxDB中实现简单冗余

在v0.12以前,OSS版InfluxDB原本就具有集群功能,但自v0.12开始,OSS版InfluxDB的集群功能已被废除,今后将通过Influx Enterprise和Influx Cloud提供该功能。

请务必参考这里的详细日语文章。

    https://yakst.com/en/posts/3870

由于本次使用的是OSS版InfluxDB,我们选择了Kafka来实现对数据库的双写,并且保证了相当程度的容错性。通过使用Kafka,指标数据在被投入InfluxDB之前会在Kafka集群中进行冗余存储,以保持在保留期内。同时,使用Fluentd作为Kafka Consumer,并通过Consumer Group实现多台的扩展。这样一来,无论从监控的客户端到InfluxDB的哪个环节出现故障,数据都能够可靠地保留在某个节点上,避免了单点故障。

Fluentd的配置(Kafka生产者)

本次我们使用Fluentd来获取度量数据的方法包括dstat和df。如果没有安装dstat,需要提前安装。此外,还需要安装Fluentd的v0.12或更高版本。以下是需要安装的应用程序和Fluent的插件列表。

预先安装的应用程序
    • dstat

 

    Fluentd (v0.12 or later)
需要安装的Fluentd插件。
    • fluent-plugin-record-reformer

 

    • fluent-plugin-dstat

 

    fluent-plugin-kafka

将Fluentd的配置文件设置为以下内容。

<source>
  @type dstat
  tag raw_dstat
  option -cmldrn
  delay 10
  tmp_file /tmp/dstat_all.csv
</source>

<source>
  @type exec
  tag raw_df
  command df -TP | sed 1d | sed 's/%//' | sed 's/\s\+/\t/g'
  run_interval 10s
  format tsv
  keys device,type,size,used,available,capacity,mounted_on
</source>

<filter raw_df>
  @type record_transformer
  enable_ruby true
  <record>
    hostname ${hostname}
  </record>
</filter>

<match raw_dstat raw_df>
  @type kafka_buffered
  brokers kafkabroker001:9092,kafkabroker002:9092
  default_topic metrics-topic
  flush_interval 60
  buffer_type file
  buffer_path /tmp/td-agent.*.buffer
  output_data_type json
  output_include_tag true
  output_include_time true
</match>

假设Broker是kafkabroker001:9092和kafkabroker002:9092,Topic名称为metrics-topic。关于df,我们能获取到设备的容量,但是Fluentd的标签名变成了df.<设备名>,并且获取到的json中没有设备名,因此我们需要使用reord_reformaer进行修改。

Fluentd的配置(Kafka消费者)

这次,我们将使用Fluentd作为Kafka的消费者。作为消费者,我们从Kafka中获取数据并将其进行双重发布到InfluxDB。

需要进行安装的Fluentd插件
    • fluent-plugin-record-reformer

 

    • fluent-plugin-influxdb

 

    fluent-plugin-kafka

将Fluentd的配置文件conf设置如下。

<source>
  @type kafka_group
  brokers kafkabroker001:9092,kafkabroker002:9092
  consumer_group consumer_group_001
  topics metrics-topic
  format json
</source>

<match metrics-topic>
  @type record_reformer
  tag ${record['tag']}
  enable_ruby true
  auto_typecast true
</match>

<match raw_dstat>
  @type copy
  <store>
    @type record_reformer
    tag cpu
    enable_ruby true
    auto_typecast true
    renew_record true
    <record>
      host   ${record['hostname']}
      usr    ${record['dstat']['total_cpu_usage']['usr'].to_f}
      sys    ${record['dstat']['total_cpu_usage']['sys'].to_f}
      idl    ${record['dstat']['total_cpu_usage']['idl'].to_f}
      wai    ${record['dstat']['total_cpu_usage']['wai'].to_f}
      hiq    ${record['dstat']['total_cpu_usage']['hiq'].to_f}
      siq    ${record['dstat']['total_cpu_usage']['siq'].to_f}
      time   ${record['time']}
    </record>
  </store>
  <store>
    @type record_reformer
    tag mem
    enable_ruby true
    auto_typecast true
    renew_record true
    <record>
      host   ${record['hostname']}
      used   ${record['dstat']['memory_usage']['used'].to_f}
      buff   ${record['dstat']['memory_usage']['buff'].to_f}
      cach   ${record['dstat']['memory_usage']['cach'].to_f}
      free   ${record['dstat']['memory_usage']['free'].to_f}
      time   ${record['time']}
    </record>
  </store>
  <store>
    @type record_reformer
    tag load
    enable_ruby true
    auto_typecast true
    renew_record true
    <record>
      host   ${record['hostname']}
      1m     ${record['dstat']['load_avg']['1m'].to_f}
      5m     ${record['dstat']['load_avg']['5m'].to_f}
      15m    ${record['dstat']['load_avg']['15m'].to_f}
      time   ${record['time']}
    </record>
  </store>
  <store>
    @type record_reformer
    tag disk
    enable_ruby true
    auto_typecast true
    renew_record true
    <record>
      host   ${record['hostname']}
      read   ${record['dstat']['dsk/total']['read'].to_f}
      writ   ${record['dstat']['dsk/total']['writ'].to_f}
      time   ${record['time']}
    </record>
  </store>
  <store>
    @type record_reformer
    tag diskio
    enable_ruby true
    auto_typecast true
    renew_record true
    <record>
      host   ${record['hostname']}
      read   ${record['dstat']['io/total']['read'].to_f}
      writ   ${record['dstat']['io/total']['writ'].to_f}
      time   ${record['time']}
    </record>
  </store>
  <store>
    @type record_reformer
    tag net
    enable_ruby true
    auto_typecast true
    renew_record true
    <record>
      host   ${record['hostname']}
      recv   ${record['dstat']['net/total']['recv'].to_f}
      send   ${record['dstat']['net/total']['send'].to_f}
      time   ${record['time']}
    </record>
  </store>
</match>

<match raw_df>
  @type record_reformer
  tag df
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host       ${record['hostname']}
    size       ${record['size'].to_f}
    used       ${record['used'].to_f}
    available  ${record['available'].to_f}
    capacity   ${record['capacity'].to_f}
    device     ${record['device']}
    type       ${record['type']}
    mounted_on ${record['mounted_on']}
    time       ${record['time']}
  </record>
</match>

<filter df cpu mem load disk diskio net>
  @type record_transformer
  renew_time_key time
</filter>

<filter df cpu mem load disk diskio net>
  @type record_transformer
  remove_keys time
</filter>

<match df cpu mem load disk diskio net>
  @type copy
  <store>
    @type influxdb
    host influxdb001
    port 8086
    dbname metrics_db
    user kafka
    password kafka
    use_ssl false
    verify_ssl false
    tag_keys ["host", "device"]
    time_precision s
    flush_interval 10s
  </store>
  <store>
    @type influxdb
    host influxdb002
    port 8086
    dbname metrics_db
    user kafka
    password kafka
    use_ssl false
    verify_ssl false
    tag_keys ["host", "device", "type", "mounted_on",]
    time_precision s
    flush_interval 10s
  </store>
</match>

假设InfluxDB的主机名分别为influxdb001和influxdb002,并且假设数据库名称均为metrics_db。另外,在InfluxDB的标签中,我们使用字符串类型将host和device设置为了获取值。

卡夫卡和InfluxDB

请提前创建Kafka的Topic和InfluxDB的数据库。在本文中,不会详细讨论调优的细节。

Grafana 可视化工具

grafana.png

如果在从Grafana连接到InfluxDB时,在InfluxDB的前面放置一个负载均衡器,并设置Grafana连接到负载均衡器,即使其中一个InfluxDB由于故障而崩溃,仍然可以继续通过Grafana查看监控数据。

最后

如果有人告诉我不要做这么麻烦的事,直接用Telegraf,那也没什么关系。但是,如果原本已经用Fluentd做了其他日志收集的目的,又不想额外再安装Telegraf,那么在确实想用Fluentd进行指标监控的情况下,本次的做法是不是合理的方法呢?如果有Fluentd插件可以不用dstat或df,而是直接检查/proc目录下的情况,那可能会更智能一些。这次只是尝试着使用已有的东西来实现是否可行罢了。

为庆祝InfluxDB v1.0.0的发布,这次我写了这篇文章。

广告
将在 10 秒后关闭
bannerAds