使用Filebeat和Logstash从多个文件中读取数据
首先
要做的事情
使用filebeat×logstash将具有不同领域的两种类型的文件进行读取。启动logstash和filebeat将使用docker-compose。
此外,针对每个文件,我们将指定索引,并验证其在kibana中的显示。
不做的事情
我们不使用常用的”logstash 多 pipeline”功能,用于接收多个数据输入。
由于本次只有两种类型的文件输入,所以我们不使用多个 pipeline,而是使用标签(tag)对文件进行分类的方式来执行。
环境 – .
工具版本
-
- logstash:8.5.2
-
- filebeat:8.5.2
-
- elasticsearch:8.5.2
-
- kibana:8.5.2
-
- docker:20.10.22
- docker-compose:1.29.2
目录结构
.
├ docker-compose.yml
├ filebeat
│ ├ conf
│ ├ └ filebeat.yml
│ └ log
│ ├ データファイル1.csv
│ └ データファイル2.csv
├ logstash
│ └ pipeline
│ └ logstash.conf
└ elasticsearch
└ data
使用数据
準備两种具有不同领域的CSV文件。
这次我们假设有一个带有登录功能的网页,并使用以下字段的超简易日志文件。
-
- log_login.csv
Date:ログイン日時
userID:ユーザID
state:ログイン状態
log_viewpage.csv
Date:メッセージ送信日時
userID:ユーザID
page:閲覧している画面
每个都有不同的字段,分别为”state”和”page”。
操作步骤
以下是进行文件创建的步骤:
1. 创建filebeat.yml。
2. 创建logstash.conf。
3. 创建docker-compose.yml。
4. 在kibana中进行确认。
创建filebeat.yml文件
对于由Filebeat收集的文件,我们会为每个文件添加标签,并将它们发送到Logstash。
filebeat.inputs:
- type: log
enabled: true
tags: ["LOGIN"]
paths:
- /usr/share/filebeat/log/log_login*.csv
- type: log
enabled: true
tags: ["VIEWPAGE"]
paths:
- /usr/share/filebeat/log/log_viewpage*.csv
output.logstash:
hosts: ["logstash:5044"]
创建logstash.conf文件。
Filebeat会对带有标签的各种文件进行处理,并将其发送到Elasticsearch。
input {
# input from Filebeat
beats {
port => 5044
}
}
filter {
# ファイルごとに読み込むfieldを指定
if "LOGIN" in [tags]{
csv {
columns => ["Date","userID","state"]
skip_header => true # ヘッダーの行読み込みを飛ばす
}
}
if "VIEWPAGE" in [tags] {
csv {
columns => ["Date","userID","page"]
skip_header => true # ヘッダーの行読み込みを飛ばす
}
}
date {
match => ["Date", "yyyy-MM-dd HH:mm:ss:SS"] # Dateを@timestampに置換
timezone =>"Asia/Tokyo"
}
mutate{
remove_field => ["input", "@version", "host", "ecs", "agent","tags", "event", "log","source","highlight","Date"]
}
}
output {
# ファイルごとに異なるindexを指定
if "LOGIN" in [tags]{
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "samplelogin_index"
}
}
if "VIEWPAGE" in [tags]{
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "sampleviewpage_index"
}
}
}
创建一个docker-compose.yml文件。
我們將建立Filebeat、Logstash、Elasticsearch、Kibana的配置。
version: "3"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.2
container_name: elasticsearch
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- ES_JAVA_OPTS=-Xms2g -Xmx2g
- xpack.security.enabled=false
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 9200:9200
volumes:
- ./elasticsearch/data:/usr/share/elasticsearch/data
- /var/run/docker.sock:/var/run/docker.sock
restart: always
logstash:
image: docker.elastic.co/logstash/logstash:8.5.2
container_name: logstash
ports:
- 5044:5044
environment:
- LS_JAVA_OPTS=-Xms1g -Xmx1g
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
restart: always
filebeat:
image: docker.elastic.co/beats/filebeat:8.5.2
container_name: filebeat
entrypoint: "filebeat -e -strict.perms=false"
volumes:
- ./filebeat/conf/filebeat.yml:/usr/share/filebeat/filebeat.yml
- ./filebeat/log:/usr/share/filebeat/log
- /var/run/docker.sock:/var/run/docker.sock
restart: always
kibana:
image: docker.elastic.co/kibana/kibana:8.5.2
container_name: kibana
environment:
- ELASTICSEARCH_HOST=https://elasticsearch:9200
- "I18N_LOCALE=ja-JP"
ports:
- 5601:5601
restart: always
用 Kibana 进行查看
最后
这次我们介绍了一种相对简单的方法,可以从多个文件中读取数据。
这次我们尝试了两种csv文件,但我认为无论是csv文件还是json文件等不同文件格式,都可以以相同的方式输入。
此外,虽然这次的文件数量不多还可以,但如果要导入更多种类的文件和数据,我认为使用multiple pipeline会更好。
请参考
-
- 絶対的に使った方がいいLogstashのMultiple Pipelinesについて書いてみた
-
- Elasticsearch + Kibana ローカル環境構築方法メモ
- Elastic Stack を Docker で構築し IIS ログを分析する