通过Apache Airflow创建工作流(1)
構建Airflow環境
参考Airflow的官方网站,下载docker-compose.yaml文件。
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.2/docker-compose.yaml'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 11632 100 11632 0 0 25841 0 --:--:-- --:--:-- --:--:-- 25848
使用docker compose来构建Airflow环境。
docker-compose up -d
Creating network "airflow_default" with the default driver
Pulling airflow-init (apache/airflow:2.6.2)...
2.6.2: Pulling from apache/airflow
759700526b78: Pull complete
0167f939c9b5: Pull complete
e460d26209ef: Pull complete
835221e3ec74: Pull complete
116eb26d30a0: Pull complete
5284abf95bb7: Pull complete
bcf24152424e: Pull complete
84a6c4d06a75: Pull complete
28328c623bce: Pull complete
8e39b8606b68: Pull complete
5cd403287497: Pull complete
835c1622ff4b: Pull complete
1b382cb186b7: Pull complete
0ca6778a4ad0: Pull complete
913a51e57913: Pull complete
eea8ac944152: Pull complete
01ff5004b362: Pull complete
0aad50ab46fb: Pull complete
4f4fb700ef54: Pull complete
Digest: sha256:7dbd78fc92b15c92edc222a2fc5096ac22acd46f0e5f2e1ac9de55ada671ef93
Status: Downloaded newer image for apache/airflow:2.6.2
Creating airflow_redis_1 ... done
Creating airflow_postgres_1 ... done
Creating airflow_airflow-init_1 ... done
Creating airflow_airflow-scheduler_1 ... done
Creating airflow_airflow-triggerer_1 ... done
Creating airflow_airflow-webserver_1 ... done
Creating airflow_airflow-worker_1 ... done
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
ac67b37461ee apache/airflow:2.6.2 "/usr/bin/dumb-init …" 5 minutes ago Up 5 minutes (healthy) 8080/tcp airflow_airflow-worker_1
dfaeb9997132 apache/airflow:2.6.2 "/usr/bin/dumb-init …" 5 minutes ago Up 5 minutes (healthy) 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp airflow_airflow-webserver_1
ef3ec278ebc5 apache/airflow:2.6.2 "/usr/bin/dumb-init …" 5 minutes ago Up 5 minutes (healthy) 8080/tcp airflow_airflow-triggerer_1
4399d4d2f373 apache/airflow:2.6.2 "/usr/bin/dumb-init …" 5 minutes ago Up 5 minutes (healthy) 8080/tcp airflow_airflow-scheduler_1
1457368877ce redis:latest "docker-entrypoint.s…" 5 minutes ago Up 5 minutes (healthy) 6379/tcp airflow_redis_1
在浏览器中访问airflow-webserver。
DAG 的试验执行
确认教程的DAG
点击DAG详细页面上方的链接,打开[<> Code],以查看DAG的代码。
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html)
"""
from __future__ import annotations
# [START tutorial]
# [START import_module]
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# [END import_module]
# [START instantiate_dag]
with DAG(
"tutorial",
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
# [END default_args]
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# [END instantiate_dag]
# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
# [END basic_task]
# [START documentation]
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
# [END documentation]
# [START jinja_template]
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
# [END jinja_template]
t1 >> [t2, t3]
# [END tutorial]
在教程中,有t1,t2和t3三个任务,并且执行顺序是先执行t1,然后并行执行t2和t3。
タスクtask_id主な処理内容t1print_datebash の date コマンド実行して現在日時を標準出力t2sleepbash の sleep コマンドを実行して 5 秒スリープt3templatedjinja テンプレートの日付変数[ds]を標準出力後、Airflowマクロを使って日付に7日加算して標準出力する処理を、ループで5回実行
执行教程的DAG。
在DAG的详细页面上将开关置于ON状态。
检查DAG的执行结果。
打印日期任务的确认
使用 `date` 命令可以输出当前的日期和时间。
[2023-06-24, 21:04:20 JST] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'date']
[2023-06-24, 21:04:20 JST] {subprocess.py:86} INFO - Output:
[2023-06-24, 21:04:20 JST] {subprocess.py:93} INFO - Sat Jun 24 12:04:20 UTC 2023
[2023-06-24, 21:04:20 JST] {subprocess.py:97} INFO - Command exited with return code 0
确认睡眠任务
从日志的执行日期时间可以看出,已经休眠了5秒钟。
[2023-06-24, 21:04:22 JST] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'sleep 5']
[2023-06-24, 21:04:22 JST] {subprocess.py:86} INFO - Output:
[2023-06-24, 21:04:27 JST] {subprocess.py:97} INFO - Command exited with return code 0
模板任务的确认
同样地,检查模板化任务的执行日志。
能够明确地看到现在日期和当前日期加7天重复输出了5次。
[2023-06-24, 21:04:22 JST] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', '\n\n echo "2023-06-23"\n echo "2023-06-30"\n\n echo "2023-06-23"\n echo "2023-06-30"\n\n echo "2023-06-23"\n echo "2023-06-30"\n\n echo "2023-06-23"\n echo "2023-06-30"\n\n echo "2023-06-23"\n echo "2023-06-30"\n']
[2023-06-24, 21:04:22 JST] {subprocess.py:86} INFO - Output:
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:97} INFO - Command exited with return code 0
通过使用REST API执行DAG。
$ ENDPOINT_URL="http://localhost:8080/"
$ curl -X POST --user "airflow:airflow" "${ENDPOINT_URL}/api/v1/dags/tutorial/dagRuns" \
-H "Content-Type: application/json" -d '{"conf": {}, "dag_run_id": "string", "logical_date": "2023-06-25T02:10:14.275Z", "note": "string"}'
{
"conf": {},
"dag_id": "tutorial",
"dag_run_id": "string",
"data_interval_end": "2023-06-25T02:10:14.275000+00:00",
"data_interval_start": "2023-06-24T02:10:14.275000+00:00",
"end_date": null,
"execution_date": "2023-06-25T02:10:14.275000+00:00",
"external_trigger": true,
"last_scheduling_decision": null,
"logical_date": "2023-06-25T02:10:14.275000+00:00",
"note": "string",
"run_type": "manual",
"start_date": null,
"state": "queued"
}