Apache Airflow 并行处理实践
首先
继续进行Apache Airflow的实践,在The Complete Hands-On Introduction to Apache Airflow中加深理解。在本文中,我们将在使用默认设置的SQLite和SequentialExecutor进行确认后,将其更改为PostgreSQL和LocalExecutor,并尝试并行处理。
默认设置的确认
在Apache Airflow中,当初始化时会进行默认配置。可以通过以下命令来查看配置。
$ airflow config get-value core sql_alchemy_conn
sqlite:////home/airflow/airflow/airflow.db
$ airflow config get-value core executor
SequentialExecutor
通过查看这个,我们可以知道 DB 采用了 SQLite,执行器为 SequentialExecutor。我们可以先保持默认配置来确认它的行为如何。
我创建了一个简单的有向无环图。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {"start_date": datetime(2020, 1, 1)}
with DAG(
"parallel_dag", schedule_interval="@daily", default_args=default_args, catchup=False
) as dag:
task_1 = BashOperator(task_id="task_1", bash_command="sleep 3")
task_2 = BashOperator(task_id="task_2", bash_command="sleep 3")
task_3 = BashOperator(task_id="task_3", bash_command="sleep 3")
task_4 = BashOperator(task_id="task_4", bash_command="sleep 3")
task_1 >> [task_2, task_3] >> task_4
运行airflow的webserver和scheduler,并访问http://localhost:8080。在DAGs中选择parallel_dag,并选择Graph View,页面将显示如下内容。
看这个图表,似乎已经实现了并行处理,但是当点击左上角的切换按钮进行执行后转到甘特图界面时,可以确认并行处理并未进行,而是按顺序执行。
这是由于默认的 Executor 是 Sequantial Executor 所导致的。这些设置被记录在 ~/airflow/airflow.cfg 中。
进行并行处理的配置
为了进行并行处理,需要对并行处理进行支持的数据库和执行器进行更改。在这里,选择使用PostgreSQL作为数据库,并将执行器从序列执行器更改为本地执行器。
$ sudo apt update
$ sudo apt install postgresql
为了连接,设置用户postgres的密码。
$ sudo -u postgres psql
postgres=# ALTER USER postgres PASSWORD 'postgres';
ALTER ROLE
安装用于操作Postgres的包。
$ pip install 'apache-airflow[postgres]'
我們將根據以下進行設置變更。首先,我們將調整數據庫配置如下。詳細的設置格式請參考 Database Urls。
...
# sql_alchemy_conn = sqlite:////home/airflow/airflow/airflow.dbost:port/database
sql_alchemy_conn = postgresql+psycopg2://postgres:postgres@localhost/postgres
如果在反映更改后,以下命令能够顺利执行,那么数据库已经成功配置为PostgreSQL。
$ airflow db check
接着这一点,也需要更改 Executor.
...
# executor = SequentialExecutor
executor = LocalExecutor
如果能够更改设置,那么初始化Postgres DB。如果airflow webserver和airflow scheduler已经启动,则先停止它们。
$ airflow db init
...
$ airflow users create -u admin -p admin -r Admin -f admin -l admin -e admin@airflow.com
...
Admin user admin created
由于进行了DB的初始化和新用户的创建,因此执行airflow webserver和airflow scheduler,并访问http://localhost:8080。如果仍然以前的用户登录状态,请先退出登录,然后使用admin/admin重新登录。从DAG中选择parallel_dag,并选择Graph View,将显示如下屏幕。
可以确认它正在并行处理。
最后
使用PostgreSQL和LocalExecutor进行并行处理的尝试。接下来,我希望继续加深理解。