在使用docker-compose实现Airflow时需要注意的事项

因为个人学习的需要,我实施了Airflow,这里记录了我当时的体会。希望这能帮到遇到同样问题的人们。

前提 (Qiantí)

    • LocalExecutorを使う

 

    • MySQLコンテナーとAirflowコンテナー

 

    AirflowからRedshiftにアクセス

Docker镜像 (Docker Image)

使用Python 3.7.7作为基础,在其中安装Airflow版本1.10.10。

    • Dockerfileを自分で書いてみるため

ついでにentrypoint.shも

学習目的なので、puckel/docker-airflowは遠慮したい

めちゃくちゃ参考にはする

公式Dockerイメージのmasterやlatestはバージョンが2.0で開発版2

空气流动增强版

在扩展Airflow的插件中,包括了从MySQL等数据库系到GCP的访问等功能。
根据官方的Dockerfile,如下所示,

ARG AIRFLOW_EXTRAS="async,aws,azure,celery,dask,elasticsearch,gcp,kubernetes,mysql,postgres,redis,slack,ssh,statsd,virtualenv"

由于生成 FERNET_KEY,因此 crypto在技术上是必需的。

由于后端数据库使用MySQL和Redshift进行连接,所以还需要相关的psycopg2。

入口点.sh

在这里,像文档和puckel一样,我们生成FERNET_KEY,并通过加密来保护连接,这样比在airflow.cfg中直接写入更安全。

“${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c “from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)”)}}”

“${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c “import cryptography.fernet as f; fernet_key = f.Fernet.generate_key().decode(); print(fernet_key)”)}}”

“${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c “import cryptography.fernet as f; key = f.Fernet.generate_key().decode(); print(key)”)}}”

使用nc (netcat) 命令来确认数据库是否已启动。

在docker-compose.yml中指定depends_on: mysql,仅是等待容器启动,不能确保数据库已经启动。puckel在entrypoint.sh脚本中使用nc命令来检查与数据库的连接是否建立,这对我来说非常有参考价值。

wait_for_port() {
  local name="$1" host="$2" port="$3"
  local j=0
  while ! nc -z "$host" "$port" >/dev/null 2>&1 < /dev/null; do
    j=$((j+1))
    if [ $j -ge $TRY_LOOP ]; then
      echo >&2 "$(date) - $host:$port still not reachable, giving up"
      exit 1
    fi
    echo "$(date) - waiting for $name... $j/$TRY_LOOP"
    sleep 5
  done
}

数据库的配置

MySQL的环境配置

设置以下环境变量(参考)。这些是数据库端的设置,但将使用相同的用户名和密码从Airflow访问。
– MYSQL_ROOT_PASSWORD
– MYSQL_DATABASE
– MYSQL_USER
– MYSQL_PASSWORD

我的.cnf文件

如Airflow数据库后端的说明所述,如果使用MySQL,则需要设置explicit_defaults_for_timestamp=1。
此外,还需要添加处理多字节字符的设置。

[mysqld]
character-set-server=utf8mb4
explicit_defaults_for_timestamp=1

[client]
default-character-set=utf8mb4

访问数据库

AIRFLOW__CORE__SQL_ALCHEMY_CONN的中文翻译可以是:空气流动__核心__SQL_ALCHEMY_CONN

默认情况下使用sqlite,但可以根据数据库和驱动程序进行更改。使用SqlAlchemy文档的写法作为参考。

    • MySQL + mysqlclient

mysql+mysqldb://user:password@host:port/db

PostgreSQL + psycopg2

postgresql+psycopg2://user:password@host:port/db

主机是在docker-compose.yml文件中指定的container_name,端口通常为3306(MySQL)或5432(PostgreSQL)。用户名和数据库名称是上面设置的内容。

使用环境变量

在哪里编写Airflow的各种配置设置?

正如文档中所述,可以在多个地方进行设置,并且环境变量具有优先权。例如,AWS的认证可以使用环境变量。对于需要静态的访问数据库等内容,可能适合在airflow.cfg中进行设置。对于生产环境,应该与团队成员制定规则。

    1. 设为环境变量

 

    1. 设为命令行环境变量

 

    1. 在airflow.cfg中设置

 

    1. 在airflow.cfg中的命令

 

    Airflow的内置默认值

越靠上的事物越優先考慮。

在哪里定义环境变量。

这也有很多种选择。这边应该更倾向于下面的那个。

    1. Dockerfile:对于变化较少的情况,可以使用默认设置。

entrypoint.sh:同上。

docker-compose.yml:可以根据其他容器进行更改,更加灵活。

.env文件:在docker-compose.yml中指定为env_file时,将在容器启动时被读取。不希望在Git中保留的认证信息可以写在这里。

DB的配置和SqlAlchemy的配置在docker-compose.yml文件中进行。

考虑到使用相同的设置,同样的描述位置可以使管理和维护更加容易。

version: "3.7"
services:
    mysql:
        image: mysql:5.7
        container_name: mysql
        environment:
            - MYSQL_ROOT_PASSWORD=password
            - MYSQL_USER=airflow
            - MYSQL_PASSWORD=airflow
            - MYSQL_DATABASE=airflow
        volumes:
            - ./mysql.cnf:/etc/mysql/conf.d/mysql.cnf:ro
        ports:
            - "3306:3306"
    airflow:
        build: .
        container_name: airflow
        depends_on:
            - mysql
        environment:
            - AIRFLOW_HOME=/opt/airflow
            - AIRFLOW__CORE__LOAD_EXAMPLES=False
            - AIRFLOW__CORE__EXECUTOR=LocalExecutor
            - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mysql:3306/airflow
            - MYSQL_PORT=3306
            - MYSQL_HOST=mysql
# 略

在中国,AWS和Redshift的连接通过.env文件来实现。

AWS的访问密钥和秘密密钥具有很高的机密性,因此不想将它们写入docker-compose.yml或entrypoint.sh。可能可以考虑将其写入airflow.cfg,但实际上可能需要与开发团队商讨。

暂且不管,通过GUI点点输入并不现代化。

在写作时,参考文献并按照以下的方式写作。

Conn IdConn TypeLoginPasswordHostPortSchemaEnvironment Variableredshift_conn_idpostgresawsuserpasswordyour-cluster-host5439devAIRFLOW_CONN_REDSHIFT_CONN_ID=postgres://awsuser:password@your-cluster-host:5439/devaws_conn_idawsyour-access-keyyour-secret-key

AIRFLOW_CONN_AWS_CONN_ID=aws://your-access-key:your-secret-key@

即使ID是小写字母,但在环境变量名称中会变成大写字母。

在AWS密钥中,即使没有主机也需要在末尾添加@。如果没有@会导致错误。另外,如果密钥中包含冒号或斜杠,则无法正确解析,建议重新生成密钥。

如果你想知道以GUI输入的连接URI的格式,只需要按照以下方式进行输出即可。

from airflow.hooks.base_hook import BaseHook

conn = BaseHook.get_connection('postgres_conn_id')
print(f"AIRFLOW_CONN_{conn.conn_id.upper()}='{conn.get_uri()}'")

通过环境变量设置Airflow的Key-Value。

与连接一样,还可以设置键值对。设置方法如下:

    1. 在图形用户界面(GUI)中进行设置。

 

    1. 在.py代码中进行设置。

 

    通过环境变量进行设置。

如果是代码的情况下,

from airflow.models import Variable
Variable.set(key="foo", value="bar")

如果是环境变量的情况

KeyValueEnvironment VariablefoobarAIRFLOW_VAR_FOO=bar

无论键是否为小写,环境变量的名称都会变为大写。

结束

我先给你介绍一下代码库。

我目前是Udacity的数据工程师。我接触过Cassandra,Redshift,Spark和Airflow。虽然被告知需要花费5个月的时间,但我只用了3个月就完成了,所以可能更好选择按月付费。此外,它会定期打折50%,建议您在那时注册。否则就太贵了。

在我撰写文章的过程中,我尝试了apache/airflow:1.10.10。使用docker run -it –name test -p 8080 -d apache/airflow:1.10.10 “”命令,可以启动一个打开bash的容器,因此可以灵活地操作,如docker exec test airflow initdb等。

广告
将在 10 秒后关闭
bannerAds