【Kubernetes】使用Kubernetes构建最强ETL基础设施【Airflow】

首先

去年年底我试着用了一下Airflow,觉得非常方便。
最近经常听到Airflow这个词,我尝试入门它了!在EC2上尝试运行了一下【持续集成/持续交付】。

于是作为下一步,我尝试与Kubernetes进行集成。我搜索了一下,似乎有一个被称为“Airflow on Kubernetes”的项目。据我所查,日语的文章并不多,我希望这篇文章对大家有帮助。

以下是对上述内容的中文表达:

=====2020年1月7日 更新=====
以下是相关文章的链接:
【在Kubernetes上使用Airflow】关于构建Docker镜像和部署Pod的机制
【在Kubernetes上使用Airflow】使用KubernetesPodOperator的方法

Airflow在Kubernetes上是什么?

Airflow是一种能够进行任务调度和GUI管理执行的工具,可以通过构建集群来处理构建的DAG。而在Kubernetes上部署Airflow的Pod,可以通过使用Kubernetes集群来实现Airflow所管理的处理。

可以利用现有的Kubernetes环境构建ETL基础设施,而无需专门组建集群,这样可以保证空气流通。

建立环境

我們可以快速地通過兩個步驟來建立環境,但是由於有一些陷阱,我們也會整理它們。
步驟一:建立 Kubernetes 集群
步驟二:在 Kubernetes 上部署 Airflow

步骤1:建立Kubernetes集群

我认为有很多种方法可以做到这一点,所以您可以选择您喜欢的方法进行操作。
我在以下的文章中使用kubeadm在AWS上的CentOS 7上安装了Kubernetes 1.11。

请留意

我认为,如果一开始尝试使用t2.micro进行构建,则由于内存错误,pod可能会被驱逐并无法正常运行。因此,至少使用t2.small来构建主节点/工作节点应该是个好主意。

第二步:在Kubernetes中部署Airflow。

首先,需要在主机上安装Git。

sudo yum install git

使用Git命令克隆官方的Airflow仓库。

git clone https://github.com/apache/incubator-airflow.git

接下来,切换到目录并执行以下脚本。

cd incubator-airflow/scripts/ci/kubernetes
./docker/build.sh

运行这个脚本会生成Airflow的镜像。当然接下来需要部署,这里需要注意的是:

注意要点

不仅主节点,所有工作节点都需要airflow的映像。如果没有这个映像,将无法进行部署。

当所有节点都成功创建了Airflow的镜像后,我们将在主节点及以下节点执行命令。

./kube/deploy.sh -d persistent_mode

最终,Airflow的Pod部署状态将通过echo持续输出。如果所有节点上都已创建了Airflow镜像,那么我认为Airflow和PostgreSQL的Pod应该已经成功部署了。

如果出现问题,可以使用Ctl+C停止输出,然后使用kubectl describe pod/[aiflow的pod名]查看无法部署的原因。

现在环境建设已经完成,接下来我们将开始实际运行。

实践!Kubernetes 上的 Airflow。

只要到了这一步,就简单了。只需在Airflow的管理页面上运行所创建的DAG即可。

空气流管理界面

通过将部署的Airflow Pod进行端口转发,可以访问管理界面。

kubectl port-forward [Airflowのポッド名] 8080:8080 --address="0.0.0.0"

由于Airflow的管理应用程序在8080端口运行,因此需要将8080端口进行端口转发。为了接受来自外部的访问,还需要使用选项—address=”0.0.0.0″。

如果能够通过访问 http://[kubernetes的master的IP]:8080/ 并打开以下界面,那就代表成功了。

スクリーンショット 2019-02-12 22.28.36.png

主要是在这里进行DAG的管理,但是这里列出的示例脚本在哪儿呢?

答案就在Airflow的容器中。

DAG脚本的管理

进入Airflow Pod的命令如下所示。

kubectl exec -it [Airflowポッド名] /bin/bash

DAG脚本的管理位于以下目录路径中。

root@[Airflowポッド名]:~#cd root/airflow/dags/
root@[Airflowポッド名]:~/airflow/das/# ls
__init__.py                               example_docker_operator.py              example_passing_params_via_test_command.py   example_xcom.py
__init__.pyc                              example_docker_operator.pyc             example_passing_params_via_test_command.pyc  example_xcom.pyc
dev_etl.py                                example_http_operator.py                example_python_operator.py                   libs
dev_etl.pyc                               example_http_operator.pyc               example_python_

请问上述文件名是否与管理界面的DAG名称相符?如果是的话,将生成的DAG脚本放入此目录即可在管理界面中执行。(dev_etl.py是我自己编写的DAG。)

在Airflow的pod中不能使用vim,因此建议使用apt-get install vim命令安装vim,或者使用kubectl cp [脚本名] [Airflow pod名称]:/root/airflow/dags/命令将脚本发送到pod中。

DAG执行

现在只需执行了。

只需将左侧的开关打开即可执行。打开后,DAG将根据Schedule栏中的时间进行执行。如果想立即执行,在Links栏中按下最左侧的按钮,无论Schedule如何,都会立即执行。

试试运行一个名为example_bash_operator的DAG。

スクリーンショット 2019-02-12 22.49.27.png

我认为,在DAG开始执行之后,通过kubectl get pod命令来检查pod,结果应如下所示。

スクリーンショット 2019-02-12 22.50.04.png

我可以看到已部署了一个名为DAG任务ID-~的Pod名称。

顺便提一下,这个DAG脚本是用以下的Python代码编写的。

from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_bash_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
)

run_this_last = DummyOperator(
    task_id='run_this_last',
    dag=dag,
)

# [START howto_operator_bash]
run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    dag=dag,
)
# [END howto_operator_bash]

run_this >> run_this_last

for i in range(3):
    task = BashOperator(
        task_id='runme_' + str(i),
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        dag=dag,
    )
    task >> run_this

# [START howto_operator_bash_template]
also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
    dag=dag,
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last

if __name__ == "__main__":
    dag.cli()

无需特殊设置,Airflow管理并执行的DAG将自动部署在Kubernetes上,正如从此脚本所了解的。

然而,上述部署的容器镜像是什么呢?

使用Airflow生成DAG时指定图像

我个人的调查结果是“airflow:latest”。
嗯!这个是我在调查中发现的,在部署中需要使用的镜像就是Airflow on Kubernetes的默认镜像。

以下是指定了 airflow:latest 的DAG 脚本。

from __future__ import print_function
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from libs.helper import print_stuff
from airflow.models import DAG
import os

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='dev_etl', default_args=args,
    schedule_interval=None
)


def test_volume_mount():
    print()    


# You can use annotations on your kubernetes pods!
start_task = PythonOperator(
    task_id="start_task", python_callable=print_stuff, dag=dag,
    executor_config={
        "KubernetesExecutor": {
            "annotations": {"test": "annotation"}
        }
    }
)

# You can mount volume or secret to the worker pod
second_task = BashOperator(
    task_id="four_task", bash_command='python /tmp/script/airflow_on_kubernetes.py', dag=dag,
    executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)

start_task.set_downstream(second_task)

执行结果与之前的结果没有变化。接下来将这个image更换成Docker Hub上的另一个image并尝试执行,可能会出现错误。(我自己也多次遇到错误。)

Airflow的一个遗憾之处是当pod未部署时无法输出日志(Airflow执行的DAG点击→TreeView红色方块点击→查看日志)。因此,要找出为什么指定不同的镜像会失败花费了很长时间。

偶然在部署pod失败时尝试使用describe输出日志,结果显示出了“airflow命令找不到”的错误,我得出了这个结论。

所以,如果以后要指定图像,可能要么以这个Airflow图像为基础,要么创建一个能使用Airflow命令的图像。

当我们能够指定映像后,下一步我们想要尝试将其挂载到主机上。

使用Airflow的话,可以生成已在主机上进行挂载设置的DAG。

以下是一个简单的ETL的DAG示例,将数据从数据库服务器中获取的值写入到最后挂载的目录中保存的文件中。

from __future__ import print_function
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from libs.helper import print_stuff
from airflow.models import DAG
import os

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='dev_etl', default_args=args,
    schedule_interval=None
)

# You can use annotations on your kubernetes pods!
start_task = PythonOperator(
    task_id="start_task", python_callable=print_stuff, dag=dag,
    executor_config={
        "KubernetesExecutor": {
            "annotations": {"test": "annotation"}
        }
    }
)

# You can mount volume or secret to the worker pod
second_task = BashOperator(
    task_id="run_python", bash_command='python /tmp/script/airflow_on_kubernetes.py', dag=dag,
    executor_config={
        "KubernetesExecutor": {
            "image": "airflow:latest",
            "volumes": [
                {
                    "name": "example-kubernetes-test-volume",
                    "hostPath": {"path": "/home/centos/output/"},
                },
            ],
            "volume_mounts": [
                {
                    "mountPath": "/tmp/",
                    "name": "example-kubernetes-test-volume",
                },
            ]
        }
    }
)

start_task.set_downstream(second_task)

上述的是DAG脚本。顺便提一下,出现在Airflow管理界面上的名称是指的dag_id(而非脚本名)。

重点在于volumes和volume_mounts的设置。
通过volumes的hostPath设置要挂载的主机路径。通过volume_mounts的mountPath指定容器中的路径。

提前将airflow_on_kubernetes.py文件放置在所有主机(worker节点)的/home/centos/output目录中。

# coding:utf-8
import sqlalchemy
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from logip import *
from setting2 import session as se2

if __name__=='__main__':
    with open('/tmp/db.txt', 'w') as f:
        for record in se2.query(LogIP).all():
            f.write(record.ip)
            f.write('\n')

在脚本中,将文件保存在/tmp/db.txt中。

当执行这个命令时,会在/home/centos/output/目录下生成db.txt文件,这证明容器已经成功挂载到主机上了。(请按照需要将”sqlalchemy”部分进行相应修改)另外,还可以确认容器可以访问位于其他服务器上的数据库。

最后。

我已经在Kubernetes上使用Airflow执行的DAG部署到了Kubernetes。

个人来说,我很遗憾的是,如果Pod没有部署,它就无法在Airflow上的日志中输出。但是,能够省去编写yaml文件的工作确实很方便,我认为这一点很不错。

由于挂载容易,因此将脚本分布到工作节点上,并从Pod侧引用和执行该脚本的流程既高效又可以通过Airflow的View日志查看Pod上的错误,这对于持续集成/持续部署也非常有用。

我希望可以帮助那些想要接触Airflow on Kubernetes的人们。

广告
将在 10 秒后关闭
bannerAds