Apache Airflow 实践教程
首先
为了构建机器学习流水线,我学习了Apache Airflow。我试着通过官方教程等进行实际操作,但理解进展不大,所以我购买了《The Complete Hands-On Introduction to Apache Airflow》。虽然是英语的,但它有字幕,是非常好的教材,我推荐它。在本文中,我将根据这门课程介绍流水线构建的一系列步骤。
环境搭建
我使用 venv 构建了虚拟环境。Apache Airflow 的版本和依赖关系都根据这里的要求进行了调整(截至2022年1月为止,版本为2.1.0)。
$ python3 -m venv sandbox
$ source sandbox/bin/activate
(sandbox) $ sandbox/bin/python3 -m pip install --upgrade pip
(sandbox) $ pip install apache-airflow==2.1.0 --constraint https://gist.githubusercontent.com/marclamberti/742efaef5b2d94f44666b0aec020be7c/raw/21c88601337250b6fd93f1adceb55282fb07b7ed/constraint.txt
(sandbox) $ airflow db init
(sandbox) $ airflow users create --username admin --password admin --firstname Taro
--lastname Yamada --role Admin --email admin@airflow.com
运行 airflow webserver 和 airflow scheduler,访问 http://localhost:8080,打开以下画面。
稍后,由于界面上还有一些操作需要进行,所以请保持打开状态。
管线概览
由于已创建了名为~/airflow的目录,因此在其中创建一个名为dags的目录。DAG是Directed Acyclic Graph的缩写,翻译成中文为有向非循环图。这是在图形理论中出现的术语,简而言之,表示只能单向前进且没有循环的结构。通过dag目录来构建管道。
(airflow-test) $ cd ~/airflow
(airflow-test) $ mkdir dags
在本文中,我们最终将建立如下所示的流水线。
以下是每个任务的简要说明。
-
- 创建表格:创建一个用于保存数据的表格。
API是否可用:等待Random User Generator网站的API变为可用状态。
提取用户:调用API获取用户信息。
处理用户:从获取的用户信息中提取所需的数据并保存到CSV文件中。
存储用户:将CSV文件中的数据存储到表格中。
建立流水线
在 airflow/dags/user_processing.py 文件中创建任务。
0. 创建DAG对象的实例
在按顺序创建任务之前,首先要创建包含任务的DAG实例。在以下的“with”语法中对每个任务进行定义。
from datetime import datetime
from airflow.models import DAG
default_args = {
"start_date": datetime(2020, 1, 1),
}
with DAG(
"user_processing",
schedule_interval="@daily",
default_args=default_args,
catchup=False,
) as dag:
# define tasks/operators
创建表格
创建一个用于保存数据的表格。由于使用了SQLite,因此需要使用SqliteOperator,可以通过以下命令进行安装。
(sandbox) $ pip install 'apache-airflow-providers-sqlite'
Airflow有许多提供商,根据需要需要安装。有关详细信息,请参考提供商软件包参考资料。
在下面,定义一个任务。如果表不存在,则创建一个具有6个列的表格。
...
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
...
with DAG(...) as dag:
creating_table = SqliteOperator(
task_id="creating_table",
sqlite_conn_id="db_sqlite",
sql="""
CREATE TABLE IF NOT EXISTS users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
""",
)
需要与数据库建立连接才能连接到表格,因此需要从之前打开的画面中创建。选择顶部的 [Admin] -> [Connections],然后点击 [+] 按钮进行新建。配置请参考以下图片。
将Conn Id设置为sqlite_conn_id的同名。将Host指定为airflow.db的路径。
如果可以做到这一点,就进行任务测试,以确保任务能够运行正常。以下是测试命令。如果没有出现错误,则说明已正确创建。
(sandbox) $ airflow tasks test user_processing creating_table 2020-01-01
2. API是否可用
为了创建一个等待 API 可用的处理过程,使用 Sensor。以下是 Sensor 的官方文档中的描述。
传感器是一种特定类型的运算符,会一直运行,直到满足特定的条件。例如,一个特定的文件降落在HDFS或S3中,一个分区出现在Hive中,或者是某个特定的时间。传感器是从BaseSensorOperator派生而来的,会在指定的poke_interval间隔内运行poke方法,直到它返回True为止。
基本上,它是用于等待某事发生并保证其行动的工具,就像这次一样。由于使用HTTP,因此请使用以下命令来安装提供程序。
(sandbox) $ pip install apache-airflow-providers-http==2.0.0
使用 HttpSensor 的代码如下所示。
...
from airflow.providers.http.sensors.http import HttpSensor
...
with DAG(...) as dag:
...
is_api_available = HttpSensor(
task_id="is_api_available", http_conn_id="user_api", endpoint="api/"
)
建立与API的连接。与SQLite相同,按照以下图片所示进行设置。
在这里对刚刚创建的任务进行测试。
(sandbox) $ airflow tasks test user_processing is_api_available 2020-01-01
3. 提取用户
定义一个从API获取数据的任务。
...
import json
from airflow.providers.http.operators.http import SimpleHttpOperator
...
with DAG(...) as dag:
...
extracting_user = SimpleHttpOperator(
task_id="extracting_user",
http_conn_id="user_api",
endpoint="api/",
method="GET",
response_filter=lambda response: json.loads(response.text),
log_response=True,
)
只需要一个选项:与之前一样进行测试将获得以下结果。可以确认获得与Random User Generator API相似的结果。
(sandbox) $ airflow tasks test user_processing extracting_user 2020-01-01
...
[2022-01-21 06:56:21,045] {http.py:115} INFO - {"results":[{"gender":"male","name":{"title":"Mr","first":"Darren","last":"King"},"location":{"street":{"number":6614,"name":"Bridge Road"},"city":"Exeter","state":"Clwyd","country":"United Kingdom","postcode":"K86 3ZN","coordinates":{"latitude":"6.5705","longitude":"-158.8391"},"timezone":{"offset":"-3:30","description":"Newfoundland"}},"email":"darren.king@example.com","login":{"uuid":"384a152f-66a9-472d-a592-e1f8557e564f","username":"purplecat905","password":"flyers","salt":"yyPLDi3z","md5":"495955a7b7b4fc0664ac0fc580f2cca4","sha1":"f85a89f3cfb86675bc42b5c0d787e247bbb0ad5e","sha256":"b0824af8b0fcc5de0a996b3f6c41e7ea6a916a749b0aeeb6c648264f7f3c4447"},"dob":{"date":"1951-03-25T05:24:14.327Z","age":71},"registered":{"date":"2019-01-22T19:37:00.577Z","age":3},"phone":"017683 00410","cell":"0752-503-650","id":{"name":"NINO","value":"PJ 90 72 17 S"},"picture":{"large":"https://randomuser.me/api/portraits/men/56.jpg","medium":"https://randomuser.me/api/portraits/med/men/56.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/56.jpg"},"nat":"GB"}],"info":{"seed":"53304cf938cd3a25","results":1,"page":1,"version":"1.3"}}
...
4. 处理用户
由于获得了上述数据,我们希望能够适当地处理并提取仅需要注册到表中的数据。使用PythonOperator在Python脚本中执行处理,因此需要通过python_callable定义执行函数。
在本次情况下,需要通过 extracting_user 获取的数据,所以要使用 Xcoms。Xcoms 是 Cross Communications 的缩写,它是一种允许任务之间进行相互通信的机制。详细信息请参考官方文档。由于任务的返回值默认是被推送的,因此可以通过 xcom_pull() 接收数据。(*请注意,禁止传递像数据框这样大的数据。)
...
from airflow.operators.python import PythonOperator
from pandas import json_normalize
...
def _processing_user(ti):
users = ti.xcom_pull(task_ids=["extracting_user"])
if not len(users) or "results" not in users[0]:
raise ValueError("User is Empy")
user = users[0]["results"][0]
processed_user = json_normalize(
{
"firstname": user["name"]["first"],
"lastname": user["name"]["last"],
"country": user["location"]["country"],
"username": user["login"]["username"],
"password": user["login"]["password"],
"email": user["email"],
}
)
processed_user.to_csv("/tmp/processed_user.csv", index=None, header=False)
with DAG(...) as dag:
...
processing_user = PythonOperator(
task_id="processing_user", python_callable=_processing_user
)
使用以下命令进行测试,确认从 cat tmp/processed_user.csv 提取的数据已存储到文件中。
(sandbox) $ airflow tasks test user_processing processing_user 2020-01-01
...
(sandbox) $ cat /tmp/processed_user.csv
Darren,King,United Kingdom,purplecat905,flyers,darren.king@example.com
5. 存储用户
最后用 BashOperator 将 CSV 数据保存到数据库中。省略了详细说明。
...
from airflow.operators.bash import BashOperator
...
with DAG(...) as dag:
...
storing_user = BashOperator(
ask_id="storing_user",
bash_command='echo -e ".separator ","\n.import /tmp/processed_user.csv users" | sqlite3 /home/airflow/airflow/airflow.db',
)
可以使用以下命令来确认数据是否存储在数据库中。
(sandbox) $ airflow tasks test user_processing storing_user 2020-01-01
...
(sandbox) $ splite3 airflow.db
sqlite> select * from users;
Ana|Blanchard|France|sadbutterfly288|bounty|ana.blanchard@example.com
6. 依赖关系
完成以上步骤后,返回浏览器,从[DAG]列表中选择’user_processing’,然后点击[Graph View]。这样,会呈现如下图所示的状态。
在这个阶段,任务只是零散地创建,并且需要定义它们的执行顺序。如果需要的话,可以这样描述:
...
with DAG(...) as dag:
...
(
creating_table
>> is_api_available
>> extracting_user
>> processing_user
>> storing_user
)
这个顺序被称为依赖关系,可以通过使用>>来定义。刷新页面后,可以确认已创建了像附件一样的流水线。按下左上角的切换按钮,可以执行整个流水线。
最后
使用Apache Airflow进行简单的管道构建。我们想要考虑使用各种提供商以及处理用于机器学习的大数据。