通过Apache Airflow创建工作流(1)

構建Airflow環境

参考Airflow的官方网站,下载docker-compose.yaml文件。

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.2/docker-compose.yaml'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11632  100 11632    0     0  25841      0 --:--:-- --:--:-- --:--:-- 25848

使用docker compose来构建Airflow环境。

 docker-compose up -d
Creating network "airflow_default" with the default driver
Pulling airflow-init (apache/airflow:2.6.2)...
2.6.2: Pulling from apache/airflow
759700526b78: Pull complete
0167f939c9b5: Pull complete
e460d26209ef: Pull complete
835221e3ec74: Pull complete
116eb26d30a0: Pull complete
5284abf95bb7: Pull complete
bcf24152424e: Pull complete
84a6c4d06a75: Pull complete
28328c623bce: Pull complete
8e39b8606b68: Pull complete
5cd403287497: Pull complete
835c1622ff4b: Pull complete
1b382cb186b7: Pull complete
0ca6778a4ad0: Pull complete
913a51e57913: Pull complete
eea8ac944152: Pull complete
01ff5004b362: Pull complete
0aad50ab46fb: Pull complete
4f4fb700ef54: Pull complete
Digest: sha256:7dbd78fc92b15c92edc222a2fc5096ac22acd46f0e5f2e1ac9de55ada671ef93
Status: Downloaded newer image for apache/airflow:2.6.2
Creating airflow_redis_1    ... done
Creating airflow_postgres_1 ... done
Creating airflow_airflow-init_1 ... done
Creating airflow_airflow-scheduler_1 ... done
Creating airflow_airflow-triggerer_1 ... done
Creating airflow_airflow-webserver_1 ... done
Creating airflow_airflow-worker_1    ... done
$ docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED         STATUS                   PORTS
                            NAMES
ac67b37461ee   apache/airflow:2.6.2   "/usr/bin/dumb-init …"   5 minutes ago   Up 5 minutes (healthy)   8080/tcp                                    airflow_airflow-worker_1
dfaeb9997132   apache/airflow:2.6.2   "/usr/bin/dumb-init …"   5 minutes ago   Up 5 minutes (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   airflow_airflow-webserver_1
ef3ec278ebc5   apache/airflow:2.6.2   "/usr/bin/dumb-init …"   5 minutes ago   Up 5 minutes (healthy)   8080/tcp                                    airflow_airflow-triggerer_1
4399d4d2f373   apache/airflow:2.6.2   "/usr/bin/dumb-init …"   5 minutes ago   Up 5 minutes (healthy)   8080/tcp                                    airflow_airflow-scheduler_1
1457368877ce   redis:latest           "docker-entrypoint.s…"   5 minutes ago   Up 5 minutes (healthy)   6379/tcp                                    airflow_redis_1

在浏览器中访问airflow-webserver。

image.png

DAG 的试验执行

确认教程的DAG

image.png

点击DAG详细页面上方的链接,打开[<> Code],以查看DAG的代码。

image.png
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html)
"""
from __future__ import annotations

# [START tutorial]
# [START import_module]
from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# [END import_module]


# [START instantiate_dag]
with DAG(
    "tutorial",
    # [START default_args]
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'trigger_rule': 'all_success'
    },
    # [END default_args]
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    # [END instantiate_dag]

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    # [START basic_task]
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    # [END basic_task]

    # [START documentation]
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    # [END documentation]

    # [START jinja_template]
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )
    # [END jinja_template]

    t1 >> [t2, t3]
# [END tutorial]

在教程中,有t1,t2和t3三个任务,并且执行顺序是先执行t1,然后并行执行t2和t3。

タスクtask_id主な処理内容t1print_datebash の date コマンド実行して現在日時を標準出力t2sleepbash の sleep コマンドを実行して 5 秒スリープt3templatedjinja テンプレートの日付変数[ds]を標準出力後、Airflowマクロを使って日付に7日加算して標準出力する処理を、ループで5回実行

执行教程的DAG。

在DAG的详细页面上将开关置于ON状态。

image.png
image.png

检查DAG的执行结果。

image.png

打印日期任务的确认

image.png
image.png
image.png

使用 `date` 命令可以输出当前的日期和时间。

[2023-06-24, 21:04:20 JST] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'date']
[2023-06-24, 21:04:20 JST] {subprocess.py:86} INFO - Output:
[2023-06-24, 21:04:20 JST] {subprocess.py:93} INFO - Sat Jun 24 12:04:20 UTC 2023
[2023-06-24, 21:04:20 JST] {subprocess.py:97} INFO - Command exited with return code 0

确认睡眠任务

image.png

从日志的执行日期时间可以看出,已经休眠了5秒钟。

[2023-06-24, 21:04:22 JST] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'sleep 5']
[2023-06-24, 21:04:22 JST] {subprocess.py:86} INFO - Output:
[2023-06-24, 21:04:27 JST] {subprocess.py:97} INFO - Command exited with return code 0

模板任务的确认

同样地,检查模板化任务的执行日志。

image.png

能够明确地看到现在日期和当前日期加7天重复输出了5次。

[2023-06-24, 21:04:22 JST] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', '\n\n    echo "2023-06-23"\n    echo "2023-06-30"\n\n    echo "2023-06-23"\n    echo "2023-06-30"\n\n    echo "2023-06-23"\n    echo "2023-06-30"\n\n    echo "2023-06-23"\n    echo "2023-06-30"\n\n    echo "2023-06-23"\n    echo "2023-06-30"\n']
[2023-06-24, 21:04:22 JST] {subprocess.py:86} INFO - Output:
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-23
[2023-06-24, 21:04:22 JST] {subprocess.py:93} INFO - 2023-06-30
[2023-06-24, 21:04:22 JST] {subprocess.py:97} INFO - Command exited with return code 0

通过使用REST API执行DAG。

$ ENDPOINT_URL="http://localhost:8080/"
$ curl -X POST --user "airflow:airflow" "${ENDPOINT_URL}/api/v1/dags/tutorial/dagRuns" \
 -H "Content-Type: application/json" -d '{"conf": {},  "dag_run_id": "string",  "logical_date": "2023-06-25T02:10:14.275Z",  "note": "string"}'

{
  "conf": {},
  "dag_id": "tutorial",
  "dag_run_id": "string",
  "data_interval_end": "2023-06-25T02:10:14.275000+00:00",
  "data_interval_start": "2023-06-24T02:10:14.275000+00:00",
  "end_date": null,
  "execution_date": "2023-06-25T02:10:14.275000+00:00",
  "external_trigger": true,
  "last_scheduling_decision": null,
  "logical_date": "2023-06-25T02:10:14.275000+00:00",
  "note": "string",
  "run_type": "manual",
  "start_date": null,
  "state": "queued"
}
image.png
广告
将在 10 秒后关闭
bannerAds