将从Metricbeat获取的数据导入到InfluxDB中

首先

我試著做了標題上所述的事情。

InfluxDB支持使用InfluxDB行协议或非嵌套的Json进行数据插入。然而,由Metricbeat输出的数据是嵌套的,因此我尝试使用Fluentd将其转换为非嵌套的Json格式,并将其放入InfluxDB。

Metricbeat使用的版本是v5.0.0,而InfluxDB使用的版本是v1.0.0。

目标数据源

我尝试使用metricbeat.yml中的cpu、load、diskio、filesystem、memory和network项目作为数据源。

#------------------------------- System Module -------------------------------
- module: system
  metricsets:
    # CPU stats
    - cpu

    # System Load stats
    - load

    # Per CPU core stats
    #- core

    # IO stats
    - diskio

    # Per filesystem stats
    - filesystem

    # File system summary stats
    #- fsstat

    # Memory stats
    - memory

    # Network stats
    - network

    # Per process stats
    #- process

生成的数据以以下Json格式输出。这个Json是load模块的示例。

{
    "@timestamp": "2016-05-23T08:05:34.853Z",
    "beat": {
        "hostname": "host.example.com",
        "name": "host.example.com"
    },
    "metricset": {
        "host": "localhost",
        "module": "system",
        "name": "load",
        "rtt": 115
    },
    "system": {
        "load": {
            "1": 1.09,
            "15": 0.49,
            "5": 0.65,
            "norm": {
                "1": 0.545,
                "15": 0.245,
                "5": 0.325
            }
        }
    },
    "type": "metricsets"
}

参考来源:https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-metricset-system-load.html

出处:https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-metricset-system-load.html

数据中继

将Metricbeat的数据传输到Kafka。使用Fluentd作为Kafka Consumer。通过Fluentd,消费和整理数据,并将其推送到InfluxDB中。

这次使用的Kafka版本是v0.10.0。因此,在metricbeat.yml文件中需要记录使用的Kafka版本。

  # Kafka version metricbeat is assumed to run against. Defaults to the oldest
  # supported stable version (currently version 0.8.2.0)
  version: 0.10.0

数据整理

Fluentd用于从Kafka消费和数据整理,并将其传输到InfluxDB。Fluentd的配置可以如下所示。

<source>
  @type kafka_group
  brokers kafkabroker001:9092,kafkabroker002:9092
  consumer_group consumer_group_001
  topics metrics-topic
  format json
</source>

<match metrics-topic>
  @type record_reformer
  tag ${record['metricset']['name']}
  enable_ruby true
  auto_typecast true
</match>

<match cpu>
  @type record_reformer
  tag beat_cpu
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    user_pct          ${record['system']['cpu']['user']['pct'].to_f}
    system_pct        ${record['system']['cpu']['system']['pct'].to_f}
    nice_pct          ${record['system']['cpu']['nice']['pct'].to_f}
    steal_pct         ${record['system']['cpu']['steal']['pct'].to_f}
    idle_pct          ${record['system']['cpu']['idle']['pct'].to_f}
    iowait_pct        ${record['system']['cpu']['iowait']['pct'].to_f}
    irq_pct           ${record['system']['cpu']['irq']['pct'].to_f}
    softirq_pct       ${record['system']['cpu']['softirq']['pct'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match memory>
  @type record_reformer
  tag beat_memory
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    actual_free       ${record['system']['memory']['actual']['free'].to_f}
    actual_used_bytes ${record['system']['memory']['actual']['used']['bytes'].to_f}
    actual_used_pct   ${record['system']['memory']['actual']['used']['pct'].to_f}
    free              ${record['system']['memory']['free'].to_f}
    swap_free         ${record['system']['memory']['swap']['free'].to_f}
    swap_total        ${record['system']['memory']['swap']['total'].to_f}
    swap_used_bytes   ${record['system']['memory']['swap']['used']['bytes'].to_f}
    swap_used_pct     ${record['system']['memory']['swap']['used']['pct'].to_f}
    total             ${record['system']['memory']['total'].to_f}
    used_bytes        ${record['system']['memory']['used']['bytes'].to_f}
    used_pct          ${record['system']['memory']['used']['pct'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match load>
  @type record_reformer
  tag beat_load
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    load_1            ${record['system']['load']['1'].to_f}
    load_15           ${record['system']['load']['15'].to_f}
    load_5            ${record['system']['load']['5'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match diskio>
  @type record_reformer
  tag beat_diskio
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    io_time           ${record['system']['diskio']['io']['time'].to_f}
    name              ${record['system']['diskio']['name'].to_f}
    read_bytes        ${record['system']['diskio']['read']['bytes'].to_f}
    read_count        ${record['system']['diskio']['read']['count'].to_f}
    read_time         ${record['system']['diskio']['read']['time'].to_f}
    write_bytes       ${record['system']['diskio']['write']['bytes'].to_f}
    write_count       ${record['system']['diskio']['write']['count'].to_f}
    write_time        ${record['system']['diskio']['write']['time'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match filesystem>
  @type record_reformer
  tag beat_filesystem
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    available         ${record['system']['filesystem']['available'].to_f}
    device_name       ${record['system']['filesystem']['device_name'].to_f}
    files             ${record['system']['filesystem']['files'].to_f}
    free              ${record['system']['filesystem']['free'].to_f}
    free_files        ${record['system']['filesystem']['free_files'].to_f}
    mount_point       ${record['system']['filesystem']['mount_point'].to_f}
    total             ${record['system']['filesystem']['total'].to_f}
    used_bytes        ${record['system']['filesystem']['used']['bytes'].to_f}
    used_pct          ${record['system']['filesystem']['used']['pct'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match network>
  @type record_reformer
  tag beat_network
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    name              ${record['system']['network']['name'].to_f}
    in_bytes          ${record['system']['network']['in']['bytes'].to_f}
    in_dropped        ${record['system']['network']['in']['dropped'].to_f}
    in_errors         ${record['system']['network']['in']['errors'].to_f}
    in_packets        ${record['system']['network']['in']['packets'].to_f}
    out_bytes         ${record['system']['network']['out']['bytes'].to_f}
    out_dropped       ${record['system']['network']['out']['dropped'].to_f}
    out_errors        ${record['system']['network']['out']['errors'].to_f}
    out_packets       ${record['system']['network']['out']['packets'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match beat_cpu beat_memory beat_load beat_diskio beat_filesystem beat_network>
  @type influxdb
  host influxdb001
  port 8086
  dbname metrics_db
  user kafka
  password kafka
  use_ssl false
  verify_ssl false
  tag_keys ["host"]
  time_precision s
  flush_interval 10s
</match>

有一个稍微困扰的问题是,在record_reformer中如果不重新命名标签,Fluentd会陷入无限循环。

<match cpu>
  @type record_reformer
  tag cpu
<match cpu>
  @type record_reformer
  tag beat_cpu

在InfluxDB中,会创建名为beat_XXX的Measurement。

最后

由于Metricbeat v5的发布,我突发奇想尝试了一下。不过,我是否能够找到一种更好的方法来格式化嵌套的Json呢…

广告
将在 10 秒后关闭
bannerAds