【彻底解释】Airflow Fluentd Elasticsearch Docker的协作方式
简要概述
如果您使用Airflow的远程日志记录功能,可以将任务日志发送到S3或GCS等对象存储中,并且还可以使用能够进行全文搜索的Elasticsearch。但是,要将日志转发到Elasticsearch,还需要使用Fluentd或Logstash等日志收集工具,并且需要花费时间来理解配置和进行操作确认。
在这篇文章中,我们将以官方的 docker-compose.yaml 为基础,搭建一个使用 Airflow、Fluentd、Elasticsearch 的开发环境,并解释如何连接每个组件以及可能遇到的陷阱。
首先,让我们掌握整体的流程。
概括来说,以上的图表表明:
-
- Airflow 将任务日志写入标准输出
-
- Docker 的日志驱动将日志传输给 Fluentd
-
- Fluentd 将日志传输给 Elasticsearch
- Airflow 从 Elasticsearch 获取日志
迷人之处在于,Airflow的日志如何传输到Elasticsearch。虽然可以直接从Elasticsearch获取日志,但无法直接将日志传送到Elasticsearch。要将任务日志传输到Elasticsearch,需要在其间插入日志收集工具,如日志驱动程序和Fluentd。
接下来,让我们来看一下各个组件的设置方法。
弹性搜索
首先,需要准备一个用于测试的 Elasticsearch 环境。将以下内容添加到 docker-compose.yaml 文件中,将会启动一个单节点的 Elasticsearch 集群。如果想要了解更多有关配置项的详细信息,可以参考官方文档。需要注意的是,由于是用于测试的环境,所以安全性配置被放宽了。
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.0.1
volumes:
- elasticsearch-volume:/usr/share/elasticsearch/data
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false" # 注意:本番向けではない
healthcheck:
test:
[
"CMD",
"curl",
"--fail",
"localhost:9200/_cluster/health?wait_for_status=yellow&timeout=10s"
]
interval: 10s
restart: always
volumes:
elasticsearch-volume:
Airflow 和 Elasticsearch 的协同配合
为了与Elasticsearch集成,需要将Airflow任务日志的输出目标更改为stdout,输入目标更改为Elasticsearch。由于Elasticsearch的提供者套件中有ElasticsearchTaskHandler,因此可以使用以下四个配置项来替换默认的任务处理程序。
stdout
にするelasticsearch.hostelasticsearch:9200読み込み先のホスト名を指定するelasticsearch.json_formattrue書き込み/読み込み形式を JSON にする可以在docker-compose.yaml中按照以下方式进行配置。
---
x-airflow-common:
&airflow-common
image: apache/airflow:2.2.4
environment:
&airflow-common-env
AIRFLOW__LOGGING__REMOTE_LOGGING: "true"
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "true"
AIRFLOW__ELASTICSEARCH__HOST: "elasticsearch:9200"
AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "true"
Airflow 和 Fluentd 的集成
在将Airflow任务日志的输出定向到stdout之后,需要通过日志驱动程序将其传输到接下来创建的Fluentd服务中。由于任务日志是由Airflow的worker输出的,所以需要更改docker-compose.yaml文件中airflow-worker服务的logging配置。
services:
airflow-worker:
# 省略
logging:
driver: "fluentd"
options:
fluentd-address: localhost:24224
tag: airflow.worker
depends_on:
# 省略
fluentd:
condition: service_healthy
我已经将驱动程序设置为Fluentd,并指定了Fluentd的主机名。虽然看起来很直观,但你是否注意到在fluentd-address中指定了localhost的地方?原因是,日志驱动程序通过服务上下文之外向Fluentd发送数据。换句话说,fluentd-address指定的值是从Docker主机看到的Fluentd服务的地址。
为了从Fluentd端识别任务日志,将标签设置为airflow.worker。
Fluentd 和 Elasticsearch 的协作
为了将日志从Fluentd传输到Elasticsearch,需要一个用于日志收集/传输的Fluentd服务。在本节中,将按顺序说明如何创建Fluentd容器映像、编写配置文件以及docker-compose.yaml的服务定义方法。
创建 Fluentd 的容器镜像
继承公式容器映像,安装elasticsearch和prometheus插件。本次安装中不打算使用prometheus进行指标收集,而是希望将其用作健康检查端点。
FROM fluent/fluentd:v1.14.5-debian-1.0
USER root
RUN apt-get -y update \
&& apt-get -y install gcc make curl \
&& gem install fluent-plugin-elasticsearch:5.2.0 fluent-plugin-prometheus:2.0.2 --no-document
USER fluent
编写Fluentd的配置文件
在fluent.conf文件中设置以解析来自日志驱动程序的任务日志,并将其传输到elasticsearch。
├── docker-compose.yaml
└── fluentd
├── Dockerfile
└── conf
└── fluent.conf
内容如下。
# (1)
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# (2)
<filter airflow.worker>
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
# (3)
<match airflow.worker>
@type elasticsearch
host elasticsearch
port 9200
logstash_format true
logstash_prefix fluentd.${tag}
logstash_dateformat %Y%m%d
include_tag_key true
tag_key @log_name
flush_interval 1s
</match>
# (4)
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>
补充上述设置的内容是:
-
- 使用指令和转发输入插件,提供从日志驱动程序接收数据的TCP端点。
使用过滤器指令,将airflow.worker的日志解析为JSON格式。
使用匹配指令和elasticsearch输出插件,将airflow.worker的日志转发到elasticsearch。
使用prometheus输入插件,提供供Prometheus服务器抓取的度量HTTP端点。
定义Fluentd的服务
最后,我们会在 docker-compose.yaml 文件中定义一个使用 Fluentd 容器镜像和配置文件的服务。
services:
fluentd:
build: ./fluentd
volumes:
- ./fluentd/conf:/fluentd/etc
ports:
- 24224:24224
restart: always
healthcheck:
test: [ "CMD", "curl", "--fail", "localhost:24231/metrics" ]
interval: 10s
start_period: 30s
建立环境
使用docker compose up命令启动环境后,等待约2分钟然后打开http://localhost:8080,将会显示Airflow的登录页面。
結束
在这篇文章中,我们解释了如何将Airflow,Fluentd和Elasticsearch进行集成,并介绍了每个组件的设置步骤。虽然说明是基于docker compose的开发环境,但这些知识也可以在AWS Elastic Container Service(ECS)或Kubernetes等容器执行平台上发挥作用,希望您能充分利用。