【彻底解释】Airflow Fluentd Elasticsearch Docker的协作方式

简要概述

如果您使用Airflow的远程日志记录功能,可以将任务日志发送到S3或GCS等对象存储中,并且还可以使用能够进行全文搜索的Elasticsearch。但是,要将日志转发到Elasticsearch,还需要使用Fluentd或Logstash等日志收集工具,并且需要花费时间来理解配置和进行操作确认。

在这篇文章中,我们将以官方的 docker-compose.yaml 为基础,搭建一个使用 Airflow、Fluentd、Elasticsearch 的开发环境,并解释如何连接每个组件以及可能遇到的陷阱。

首先,让我们掌握整体的流程。

airflow_elasticsearch.png

概括来说,以上的图表表明:

    1. Airflow 将任务日志写入标准输出

 

    1. Docker 的日志驱动将日志传输给 Fluentd

 

    1. Fluentd 将日志传输给 Elasticsearch

 

    Airflow 从 Elasticsearch 获取日志

迷人之处在于,Airflow的日志如何传输到Elasticsearch。虽然可以直接从Elasticsearch获取日志,但无法直接将日志传送到Elasticsearch。要将任务日志传输到Elasticsearch,需要在其间插入日志收集工具,如日志驱动程序和Fluentd。

接下来,让我们来看一下各个组件的设置方法。

コンポーネントバージョンosmacOS 12.2.1airflow2.2.4elasticsearch8.0.1fluentd1.14.5docker20.10.12docker compose2.2.3

弹性搜索

首先,需要准备一个用于测试的 Elasticsearch 环境。将以下内容添加到 docker-compose.yaml 文件中,将会启动一个单节点的 Elasticsearch 集群。如果想要了解更多有关配置项的详细信息,可以参考官方文档。需要注意的是,由于是用于测试的环境,所以安全性配置被放宽了。

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.0.1
    volumes:
      - elasticsearch-volume:/usr/share/elasticsearch/data
    environment:
      - "discovery.type=single-node"
      - "xpack.security.enabled=false" # 注意:本番向けではない
    healthcheck:
      test:
        [
          "CMD",
          "curl",
          "--fail",
          "localhost:9200/_cluster/health?wait_for_status=yellow&timeout=10s"
        ]
      interval: 10s
    restart: always
volumes:
  elasticsearch-volume:

Airflow 和 Elasticsearch 的协同配合

为了与Elasticsearch集成,需要将Airflow任务日志的输出目标更改为stdout,输入目标更改为Elasticsearch。由于Elasticsearch的提供者套件中有ElasticsearchTaskHandler,因此可以使用以下四个配置项来替换默认的任务处理程序。

項目設定説明logging.remote_loggingtrueリモートロギング機能を有効にするelasticsearch.write_stdouttrueタスクログの出力先を stdout にするelasticsearch.hostelasticsearch:9200読み込み先のホスト名を指定するelasticsearch.json_formattrue書き込み/読み込み形式を JSON にする
如果elasticsearch.json_format保持为false,可能会出现协作无法顺利进行的情况。

可以在docker-compose.yaml中按照以下方式进行配置。

---
x-airflow-common:
  &airflow-common
  image: apache/airflow:2.2.4
  environment:
    &airflow-common-env
    AIRFLOW__LOGGING__REMOTE_LOGGING: "true"
    AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "true"
    AIRFLOW__ELASTICSEARCH__HOST: "elasticsearch:9200"
    AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "true"

Airflow 和 Fluentd 的集成

在将Airflow任务日志的输出定向到stdout之后,需要通过日志驱动程序将其传输到接下来创建的Fluentd服务中。由于任务日志是由Airflow的worker输出的,所以需要更改docker-compose.yaml文件中airflow-worker服务的logging配置。

services:
  airflow-worker:
    # 省略
    logging:
      driver: "fluentd"
      options:
        fluentd-address: localhost:24224
        tag: airflow.worker
    depends_on:
      # 省略
      fluentd:
        condition: service_healthy

我已经将驱动程序设置为Fluentd,并指定了Fluentd的主机名。虽然看起来很直观,但你是否注意到在fluentd-address中指定了localhost的地方?原因是,日志驱动程序通过服务上下文之外向Fluentd发送数据。换句话说,fluentd-address指定的值是从Docker主机看到的Fluentd服务的地址。

为了从Fluentd端识别任务日志,将标签设置为airflow.worker。

Fluentd 和 Elasticsearch 的协作

为了将日志从Fluentd传输到Elasticsearch,需要一个用于日志收集/传输的Fluentd服务。在本节中,将按顺序说明如何创建Fluentd容器映像、编写配置文件以及docker-compose.yaml的服务定义方法。

创建 Fluentd 的容器镜像

继承公式容器映像,安装elasticsearch和prometheus插件。本次安装中不打算使用prometheus进行指标收集,而是希望将其用作健康检查端点。

FROM fluent/fluentd:v1.14.5-debian-1.0
USER root
RUN apt-get -y update \
    && apt-get -y install gcc make curl \
    && gem install fluent-plugin-elasticsearch:5.2.0 fluent-plugin-prometheus:2.0.2 --no-document
USER fluent

编写Fluentd的配置文件

在fluent.conf文件中设置以解析来自日志驱动程序的任务日志,并将其传输到elasticsearch。

需要将文件名设为 fluent.conf。
├── docker-compose.yaml
└── fluentd
    ├── Dockerfile
    └── conf
        └── fluent.conf

内容如下。

# (1)
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# (2)
<filter airflow.worker>
  @type parser
  key_name log
  reserve_data true
  remove_key_name_field true
  <parse>
    @type json
  </parse>
</filter>

# (3)
<match airflow.worker>
  @type elasticsearch
  host elasticsearch
  port 9200
  logstash_format true
  logstash_prefix fluentd.${tag}
  logstash_dateformat %Y%m%d
  include_tag_key true
  tag_key @log_name
  flush_interval 1s
</match>

# (4)
<source>
  @type prometheus
  bind 0.0.0.0
  port 24231
  metrics_path /metrics
</source>

补充上述设置的内容是:

    1. 使用指令和转发输入插件,提供从日志驱动程序接收数据的TCP端点。

使用过滤器指令,将airflow.worker的日志解析为JSON格式。

使用匹配指令和elasticsearch输出插件,将airflow.worker的日志转发到elasticsearch。

使用prometheus输入插件,提供供Prometheus服务器抓取的度量HTTP端点。

定义Fluentd的服务

最后,我们会在 docker-compose.yaml 文件中定义一个使用 Fluentd 容器镜像和配置文件的服务。

services:
  fluentd:
    build: ./fluentd
    volumes:
      - ./fluentd/conf:/fluentd/etc
    ports:
      - 24224:24224
    restart: always
    healthcheck:
      test: [ "CMD", "curl", "--fail", "localhost:24231/metrics" ]
      interval: 10s
      start_period: 30s

建立环境

使用docker compose up命令启动环境后,等待约2分钟然后打开http://localhost:8080,将会显示Airflow的登录页面。

如果使用默认的docker-compose.yaml配置,容器有可能无法通过健康检查。您可以将scheduler | triggerer | webserver | worker | flower服务的健康检查间隔和超时分别设置为30秒来避免此问题。
airflow_task_log.png

結束

在这篇文章中,我们解释了如何将Airflow,Fluentd和Elasticsearch进行集成,并介绍了每个组件的设置步骤。虽然说明是基于docker compose的开发环境,但这些知识也可以在AWS Elastic Container Service(ECS)或Kubernetes等容器执行平台上发挥作用,希望您能充分利用。

广告
将在 10 秒后关闭
bannerAds