使用Fluentd + Elasticsearch + Kibana来对YAMAHA RTX1100的syslog进行类似的可视化

简述

我试着使用 fluentd + elasticsearch + kibana 将路由器的数据包过滤器中被拦截的IP日志等可视化。
使用了 fluentd 插件(fluent-plugin-geoip)来通过IP获取位置信息。

image
貼り付けた画像_2016_02_25_2_35.jpg

设定

条件。

    • ルータ(RTX1100)IP: 192.168.1.1

 

    • サーバ

OS: Ubuntu 14.04.3 LTS Trusty
IP: 192.168.1.100
kibana, fluentd, elasticsearchを動かす
LANからのアクセスのみ想定故、サーバのFWはガラ空き

安装软件包

公式的步骤可以作为参考。

    • fluentd

 

    • elasticsearch

kibana

ただし、OS起動時に自動起動させるようにするには、init.dスクリプトをどっかから拾ってくる必要がある

Kibana Beta init script
kibana4_init

Apache2反向代理设置

当启动Elasticsearch和Kibana时,它们分别在5601和9200端口上进行监听,为了方便起见,请随意进行反向代理设置。

<Location /kibana>
        Require ip 127 192.168.1
        ProxyPass http://localhost:5601
        ProxyPassReverse http://localhost:5601
</Location>

<Location /elasticsearch>
        Require ip 127 192.168.1
        ProxyPass http://localhost:9200
        ProxyPassReverse http://localhost:9200
</Location>

更改RTX1100配置

添加一个设置,可以将路由器的syslog发送到服务器中。

#
# SYSLOG configuration
#
syslog local address 192.168.1.1
syslog host 192.168.1.100
syslog info on
syslog notice on

Elasticsearch配置

据说Elasticsearch会自动进行字符串的元素解析,在使用Kibana进行可视化时,有人说“不将其设为未分析(not analyzed)会影响性能”,所以要准备一些未分析的字段。另外,将地理信息放入字段(geo_location)的类型设为geo_point。

$ curl -XPUT 192.168.1.100:9200/rtx1100-*/ -d "`cat elasticsearch_default_template.json`"

# lasticsearch_default_template.json
{
    "template": "rtx1100-*",
    "mappings": {
        "_default_": {
            "dynamic_templates": [
                {
                    "string_template" : {
                        "match" : "*",
                        "mapping": {
                            "type": "string",
                            "fields": {
                                "full": {
                                    "type": "string",
                                    "index": "not_analyzed"
                                }
                            }
                        },
                        "match_mapping_type": "string"
                    }
                }
            ],
            "properties": {
                "@timestamp": { "type": "date", "index": "not_analyzed" },
                "geo_location": {"type" : "geo_point" }
            }
        }
    }
}

选择一种方法将以下句子以中国语言中本地化的方式进行改写:

配置Fluid模块

安装 fluentd 插件。

使用下面的插件。

fluent/fluent-plugin-rewrite-tag-filter

record内のfieldを用いてtagを書き換えられたりできるフィルター

uken/fluent-plugin-elasticsearch

fluentdからelasticsearchに流しこめる

y-ken/fluent-plugin-geoip

IPから地理情報を取得できるすごいやつ

repeatedly/fluent-plugin-multi-format-parser

複数のパーサーを順番に試してくれるパーサー

通常だとパーサーは一つしか指定できない

tagomoris/fluent-plugin-parser

じゃなくての中でパースできるようにするやつ

hiraro/fluent-plugin-with-extra-fields-parser

パースした内容とは別に、指定した余計なフィールドをrecordにぶっ込めるようにするパーサー
いい感じのプラグインがないっぽいので作った

流畅d的设定文件

    • RTX1100から流れてきたsyslogを正規表現でマッチさせて分解

マッチしなかったそれ以外の形式のログは、分解しない

ログの種類ごとに(elasticsearchの)indexを分けて格納

# 動的フィルタ => index: rtx1100-inspect
- [INSPECT] PP[01][out][105] TCP xxx.xxx.xxx.xxx:xxxxx > yyy.yyy.yyy.yyy:yyyyy (yyyy/MM/dd hh:mm:ss)

# フィルタリジェクト => index: rtx1100-reject
# リジェクト系だけはfluent-plugin-geoipで発信源の地理情報を取得する
- PP[01] Rejected at IN(xxxx) filter: TCP xxx.xxx.xxx.xxx:xxxxx > yyy.yyy.yyy.yyy:yyyyy

# コンソールログイン/ログアウト => index: rtx1100-console
- Login succeeded for TELNET: xxx.xxx.xxx.xxx
- Logout from TELNET: xxx.xxx.xxx.xxx

# VPN接続/切断 => index: rtx1100-tunnel
- [L2TP] TUNNEL[1] connected from xxx.xxx.xxx.xxx
- [L2TP] TUNNEL[1] disconnect tunnel xxxxx complete

# その他 => rtx1100-other
# /etc/td-agent/td-agent.conf

####
## Source descriptions:
##

## syslog
<source>
  type syslog
  tag raw.rtx1100
  format none
</source>

####
## Output descriptions:
##

<match raw.rtx1100.**>
  type parser
  format multi_format
  key_name message
  remove_prefix raw
  add_prefix parsed
  <pattern>
      format with_extra_fields
      base_format /^\[INSPECT\]\s+(?<target>.+)\[(?<direction>.+)\]\[(?<filter_num>\d+)\]\s+(?<proto>.+)\s+(?<src_ip>.+):(?<src_port>.+)\s+>\s+(?<dest_ip>.+):(?<dest_port>.+)\s+\((?<time>.+)\)$/
      time_format '%Y/%m/%d %H:%M:%S'
      extra_fields { "log_type": "inspect" }
  </pattern>
  <pattern>
      format with_extra_fields
      base_format /^(?<target>.+)\s+Rejected\s+at\s+(?<direction>.+)\((?<filter_num>\d+)\)\s+filter:\s+(?<proto>.+)\s+(?<src_ip>.+):(?<src_port>.+)\s+>\s+(?<dest_ip>.+):(?<dest_port>.+)$/ 
      extra_fields { "log_type": "reject" }
  </pattern>
  <pattern>
      format with_extra_fields
      base_format /^Logout\s+from\s+(?<proto>.+):\s+(?<ip>.+)$/
      extra_fields { "log_type": "console_logout" }
  </pattern>
  <pattern>
      format with_extra_fields
      base_format /^Login\s+succeeded\s+for\s+(?<proto>.+):\s+(?<ip>.+)$/ 
      extra_fields { "log_type": "console_login" }
  </pattern>
  <pattern>
      format with_extra_fields
      base_format /^\[(?<proto>.+)\]\s+(?<tunnel>.+)\s+connected\s+from\s+(?<src_ip>.+)$/ 
      extra_fields { "log_type": "tunnel_connect" }
  </pattern>
  <pattern>
      format with_extra_fields
      base_format /^\[(?<proto>.+)\]\s+(?<tunnel>.+)\s+disconnect\s+tunnel\s+\d+\s+complete$/ 
      extra_fields { "log_type": "tunnel_disconnect" }
  </pattern>  
  <pattern>
      format with_extra_fields
      base_format /^(?<msg>.+)$/
      extra_fields { "log_type": "other" }
  </pattern>  
</match>

<match parsed.rtx1100.**>
  type rewrite_tag_filter
  rewriterule1 log_type   ^inspect$       rtx1100.inspect
  rewriterule2 log_type   ^reject$        temp.rtx1100.reject
  rewriterule3 log_type   ^console_(.+)$  rtx1100.console.$1
  rewriterule4 log_type   ^tunnel_(.+)$   rtx1100.tunnel.$1
  rewriterule5 log_type   ^other$         rtx1100.other
</match>

<match rtx1100.inspect.**>
  type elasticsearch
  logstash_format true
  logstash_prefix rtx1100-inspect
  include_tag_key true
  tag_key @log_name
  hosts localhost:9200
  buffer_type memory
  num_threads 1
  flush_interval 60
  retry_wait 1.0
  retry_limit 17
</match>

<match temp.rtx1100.reject.**>
  type  geoip
  geoip_lookup_key src_ip
  <record>
    geo_location  '{ "lat" : ${latitude["src_ip"]}, "lon" : ${longitude["src_ip"]} }'
    country_code  ${country_code["src_ip"]}
  </record>
  remove_tag_prefix temp.
  skip_adding_null_record  true
  flush_interval 1s
</match>
<match rtx1100.reject.**>
  type elasticsearch
  logstash_format true
  logstash_prefix rtx1100-reject
  include_tag_key true
  tag_key @log_name
  hosts localhost:9200
  buffer_type memory
  num_threads 1
  flush_interval 60
  retry_wait 1.0
  retry_limit 17
</match>

<match rtx1100.console.**>
  type elasticsearch
  logstash_format true
  logstash_prefix rtx1100-console
  include_tag_key true
  tag_key @log_name
  hosts localhost:9200
  buffer_type memory
  num_threads 1
  flush_interval 60
  retry_wait 1.0
  retry_limit 17
</match>

<match rtx1100.tunnel.**>
  type elasticsearch
  logstash_format true
  logstash_prefix rtx1100-tunnel
  include_tag_key true
  tag_key @log_name
  hosts localhost:9200
  buffer_type memory
  num_threads 1
  flush_interval 60
  retry_wait 1.0
  retry_limit 17
</match>

<match rtx1100.other.**>
  type elasticsearch
  logstash_format true
  logstash_prefix rtx1100-other
  include_tag_key true
  tag_key @log_name
  hosts localhost:9200
  buffer_type memory
  num_threads 1
  flush_interval 60
  retry_wait 1.0
  retry_limit 17
</match>

参考资料、引文、参考文献

    • 今日から始めるfluentd × Elasticsearch × kibana – カジュアルな解析・高速化 – 株式会社エウレカ

 

    • Fluentd、ElasticSearch、Kibana4によるログ分析環境の構築 | hrendoh’s memo

 

    • Kibana+Elasticsearchで文字列の完全一致と部分一致検索の両方を実現する – Qiita

 

    プロダクション環境でElasticsearch+kibana(fluentd)でログ可視化運用をしてみてわかった事 – shnagaiのインフラ備忘録
广告
将在 10 秒后关闭
bannerAds