Apache Airflow 并行处理实践

首先

继续进行Apache Airflow的实践,在The Complete Hands-On Introduction to Apache Airflow中加深理解。在本文中,我们将在使用默认设置的SQLite和SequentialExecutor进行确认后,将其更改为PostgreSQL和LocalExecutor,并尝试并行处理。

默认设置的确认

在Apache Airflow中,当初始化时会进行默认配置。可以通过以下命令来查看配置。

$ airflow config get-value core sql_alchemy_conn
sqlite:////home/airflow/airflow/airflow.db
$ airflow config get-value core executor
SequentialExecutor

通过查看这个,我们可以知道 DB 采用了 SQLite,执行器为 SequentialExecutor。我们可以先保持默认配置来确认它的行为如何。

我创建了一个简单的有向无环图。

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {"start_date": datetime(2020, 1, 1)}

with DAG(
    "parallel_dag", schedule_interval="@daily", default_args=default_args, catchup=False
) as dag:

    task_1 = BashOperator(task_id="task_1", bash_command="sleep 3")

    task_2 = BashOperator(task_id="task_2", bash_command="sleep 3")

    task_3 = BashOperator(task_id="task_3", bash_command="sleep 3")

    task_4 = BashOperator(task_id="task_4", bash_command="sleep 3")

    task_1 >> [task_2, task_3] >> task_4

运行airflow的webserver和scheduler,并访问http://localhost:8080。在DAGs中选择parallel_dag,并选择Graph View,页面将显示如下内容。

parallel_dag.png

看这个图表,似乎已经实现了并行处理,但是当点击左上角的切换按钮进行执行后转到甘特图界面时,可以确认并行处理并未进行,而是按顺序执行。

gantt.png

这是由于默认的 Executor 是 Sequantial Executor 所导致的。这些设置被记录在 ~/airflow/airflow.cfg 中。

进行并行处理的配置

为了进行并行处理,需要对并行处理进行支持的数据库和执行器进行更改。在这里,选择使用PostgreSQL作为数据库,并将执行器从序列执行器更改为本地执行器。

$ sudo apt update
$ sudo apt install postgresql

为了连接,设置用户postgres的密码。

$ sudo -u postgres psql
postgres=# ALTER USER postgres PASSWORD 'postgres';
ALTER ROLE

安装用于操作Postgres的包。

$ pip install 'apache-airflow[postgres]'

我們將根據以下進行設置變更。首先,我們將調整數據庫配置如下。詳細的設置格式請參考 Database Urls。

...

# sql_alchemy_conn = sqlite:////home/airflow/airflow/airflow.dbost:port/database
sql_alchemy_conn = postgresql+psycopg2://postgres:postgres@localhost/postgres

如果在反映更改后,以下命令能够顺利执行,那么数据库已经成功配置为PostgreSQL。

$ airflow db check

接着这一点,也需要更改 Executor.

...

# executor = SequentialExecutor
executor = LocalExecutor

如果能够更改设置,那么初始化Postgres DB。如果airflow webserver和airflow scheduler已经启动,则先停止它们。

$ airflow db init
...
$ airflow users create -u admin -p admin -r Admin -f admin -l admin -e admin@airflow.com
...
Admin user admin created

由于进行了DB的初始化和新用户的创建,因此执行airflow webserver和airflow scheduler,并访问http://localhost:8080。如果仍然以前的用户登录状态,请先退出登录,然后使用admin/admin重新登录。从DAG中选择parallel_dag,并选择Graph View,将显示如下屏幕。

parallel_gantt.png

可以确认它正在并行处理。

最后

使用PostgreSQL和LocalExecutor进行并行处理的尝试。接下来,我希望继续加深理解。

广告
将在 10 秒后关闭
bannerAds