Apache Airflow 实践教程

首先

为了构建机器学习流水线,我学习了Apache Airflow。我试着通过官方教程等进行实际操作,但理解进展不大,所以我购买了《The Complete Hands-On Introduction to Apache Airflow》。虽然是英语的,但它有字幕,是非常好的教材,我推荐它。在本文中,我将根据这门课程介绍流水线构建的一系列步骤。

环境搭建

我使用 venv 构建了虚拟环境。Apache Airflow 的版本和依赖关系都根据这里的要求进行了调整(截至2022年1月为止,版本为2.1.0)。

$ python3 -m venv sandbox
$ source sandbox/bin/activate
(sandbox) $ sandbox/bin/python3 -m pip install --upgrade pip
(sandbox) $ pip install apache-airflow==2.1.0 --constraint https://gist.githubusercontent.com/marclamberti/742efaef5b2d94f44666b0aec020be7c/raw/21c88601337250b6fd93f1adceb55282fb07b7ed/constraint.txt
(sandbox) $ airflow db init
(sandbox) $ airflow users create --username admin --password admin --firstname Taro
--lastname Yamada --role Admin --email admin@airflow.com

运行 airflow webserver 和 airflow scheduler,访问 http://localhost:8080,打开以下画面。

airflow_home.png

稍后,由于界面上还有一些操作需要进行,所以请保持打开状态。

管线概览

由于已创建了名为~/airflow的目录,因此在其中创建一个名为dags的目录。DAG是Directed Acyclic Graph的缩写,翻译成中文为有向非循环图。这是在图形理论中出现的术语,简而言之,表示只能单向前进且没有循环的结构。通过dag目录来构建管道。

(airflow-test) $ cd ~/airflow
(airflow-test) $ mkdir dags

在本文中,我们最终将建立如下所示的流水线。

user_processing.png

以下是每个任务的简要说明。

    1. 创建表格:创建一个用于保存数据的表格。

API是否可用:等待Random User Generator网站的API变为可用状态。

提取用户:调用API获取用户信息。

处理用户:从获取的用户信息中提取所需的数据并保存到CSV文件中。

存储用户:将CSV文件中的数据存储到表格中。

建立流水线

在 airflow/dags/user_processing.py 文件中创建任务。

0. 创建DAG对象的实例

在按顺序创建任务之前,首先要创建包含任务的DAG实例。在以下的“with”语法中对每个任务进行定义。

from datetime import datetime

from airflow.models import DAG

default_args = {
    "start_date": datetime(2020, 1, 1),
}


with DAG(
    "user_processing",
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False,
) as dag:

    # define tasks/operators

创建表格

创建一个用于保存数据的表格。由于使用了SQLite,因此需要使用SqliteOperator,可以通过以下命令进行安装。

(sandbox) $ pip install 'apache-airflow-providers-sqlite'

Airflow有许多提供商,根据需要需要安装。有关详细信息,请参考提供商软件包参考资料。

在下面,定义一个任务。如果表不存在,则创建一个具有6个列的表格。

...
from airflow.providers.sqlite.operators.sqlite import SqliteOperator

...

with DAG(...) as dag:

    creating_table = SqliteOperator(
        task_id="creating_table",
        sqlite_conn_id="db_sqlite",
        sql="""
            CREATE TABLE IF NOT EXISTS users (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL,
                username TEXT NOT NULL,
                password TEXT NOT NULL,
                email TEXT NOT NULL PRIMARY KEY
            );
            """,
    )

需要与数据库建立连接才能连接到表格,因此需要从之前打开的画面中创建。选择顶部的 [Admin] -> [Connections],然后点击 [+] 按钮进行新建。配置请参考以下图片。

sqlite_conn.png

将Conn Id设置为sqlite_conn_id的同名。将Host指定为airflow.db的路径。

如果可以做到这一点,就进行任务测试,以确保任务能够运行正常。以下是测试命令。如果没有出现错误,则说明已正确创建。

(sandbox) $ airflow tasks test user_processing creating_table 2020-01-01

2. API是否可用

为了创建一个等待 API 可用的处理过程,使用 Sensor。以下是 Sensor 的官方文档中的描述。

传感器是一种特定类型的运算符,会一直运行,直到满足特定的条件。例如,一个特定的文件降落在HDFS或S3中,一个分区出现在Hive中,或者是某个特定的时间。传感器是从BaseSensorOperator派生而来的,会在指定的poke_interval间隔内运行poke方法,直到它返回True为止。

基本上,它是用于等待某事发生并保证其行动的工具,就像这次一样。由于使用HTTP,因此请使用以下命令来安装提供程序。

(sandbox) $ pip install apache-airflow-providers-http==2.0.0

使用 HttpSensor 的代码如下所示。

...
from airflow.providers.http.sensors.http import HttpSensor

...

with DAG(...) as dag:

    ...

    is_api_available = HttpSensor(
        task_id="is_api_available", http_conn_id="user_api", endpoint="api/"
    )

建立与API的连接。与SQLite相同,按照以下图片所示进行设置。

api_conn.png

在这里对刚刚创建的任务进行测试。

(sandbox) $ airflow tasks test user_processing is_api_available 2020-01-01

3. 提取用户

定义一个从API获取数据的任务。

...
import json
from airflow.providers.http.operators.http import SimpleHttpOperator

...

with DAG(...) as dag:

    ...

    extracting_user = SimpleHttpOperator(
        task_id="extracting_user",
        http_conn_id="user_api",
        endpoint="api/",
        method="GET",
        response_filter=lambda response: json.loads(response.text),
        log_response=True,
    )

只需要一个选项:与之前一样进行测试将获得以下结果。可以确认获得与Random User Generator API相似的结果。

(sandbox) $ airflow tasks test user_processing extracting_user 2020-01-01
...
[2022-01-21 06:56:21,045] {http.py:115} INFO - {"results":[{"gender":"male","name":{"title":"Mr","first":"Darren","last":"King"},"location":{"street":{"number":6614,"name":"Bridge Road"},"city":"Exeter","state":"Clwyd","country":"United Kingdom","postcode":"K86 3ZN","coordinates":{"latitude":"6.5705","longitude":"-158.8391"},"timezone":{"offset":"-3:30","description":"Newfoundland"}},"email":"darren.king@example.com","login":{"uuid":"384a152f-66a9-472d-a592-e1f8557e564f","username":"purplecat905","password":"flyers","salt":"yyPLDi3z","md5":"495955a7b7b4fc0664ac0fc580f2cca4","sha1":"f85a89f3cfb86675bc42b5c0d787e247bbb0ad5e","sha256":"b0824af8b0fcc5de0a996b3f6c41e7ea6a916a749b0aeeb6c648264f7f3c4447"},"dob":{"date":"1951-03-25T05:24:14.327Z","age":71},"registered":{"date":"2019-01-22T19:37:00.577Z","age":3},"phone":"017683 00410","cell":"0752-503-650","id":{"name":"NINO","value":"PJ 90 72 17 S"},"picture":{"large":"https://randomuser.me/api/portraits/men/56.jpg","medium":"https://randomuser.me/api/portraits/med/men/56.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/56.jpg"},"nat":"GB"}],"info":{"seed":"53304cf938cd3a25","results":1,"page":1,"version":"1.3"}}
...

4. 处理用户

由于获得了上述数据,我们希望能够适当地处理并提取仅需要注册到表中的数据。使用PythonOperator在Python脚本中执行处理,因此需要通过python_callable定义执行函数。

在本次情况下,需要通过 extracting_user 获取的数据,所以要使用 Xcoms。Xcoms 是 Cross Communications 的缩写,它是一种允许任务之间进行相互通信的机制。详细信息请参考官方文档。由于任务的返回值默认是被推送的,因此可以通过 xcom_pull() 接收数据。(*请注意,禁止传递像数据框这样大的数据。)

...
from airflow.operators.python import PythonOperator
from pandas import json_normalize

...


def _processing_user(ti):
    users = ti.xcom_pull(task_ids=["extracting_user"])
    if not len(users) or "results" not in users[0]:
        raise ValueError("User is Empy")
    user = users[0]["results"][0]
    processed_user = json_normalize(
        {
            "firstname": user["name"]["first"],
            "lastname": user["name"]["last"],
            "country": user["location"]["country"],
            "username": user["login"]["username"],
            "password": user["login"]["password"],
            "email": user["email"],
        }
    )
    processed_user.to_csv("/tmp/processed_user.csv", index=None, header=False)

with DAG(...) as dag:

    ...

    processing_user = PythonOperator(
        task_id="processing_user", python_callable=_processing_user
    )

使用以下命令进行测试,确认从 cat tmp/processed_user.csv 提取的数据已存储到文件中。

(sandbox) $ airflow tasks test user_processing processing_user 2020-01-01
...
(sandbox) $ cat /tmp/processed_user.csv
Darren,King,United Kingdom,purplecat905,flyers,darren.king@example.com

5. 存储用户

最后用 BashOperator 将 CSV 数据保存到数据库中。省略了详细说明。

...
from airflow.operators.bash import BashOperator

...


with DAG(...) as dag:

    ...

    storing_user = BashOperator(
        ask_id="storing_user",
        bash_command='echo -e ".separator ","\n.import  /tmp/processed_user.csv users" | sqlite3 /home/airflow/airflow/airflow.db',
    )

可以使用以下命令来确认数据是否存储在数据库中。

(sandbox) $ airflow tasks test user_processing storing_user 2020-01-01
...
(sandbox) $ splite3 airflow.db
sqlite> select * from users;
Ana|Blanchard|France|sadbutterfly288|bounty|ana.blanchard@example.com

6. 依赖关系

完成以上步骤后,返回浏览器,从[DAG]列表中选择’user_processing’,然后点击[Graph View]。这样,会呈现如下图所示的状态。

only_tasks.png

在这个阶段,任务只是零散地创建,并且需要定义它们的执行顺序。如果需要的话,可以这样描述:

...


with DAG(...) as dag:

    ...

    (
        creating_table
        >> is_api_available
        >> extracting_user
        >> processing_user
        >> storing_user
    )

这个顺序被称为依赖关系,可以通过使用>>来定义。刷新页面后,可以确认已创建了像附件一样的流水线。按下左上角的切换按钮,可以执行整个流水线。

user_processing.png

最后

使用Apache Airflow进行简单的管道构建。我们想要考虑使用各种提供商以及处理用于机器学习的大数据。

广告
将在 10 秒后关闭
bannerAds