Apache Airflow / Cloud Composer笔记

DAG的简单构成

公式文件

Apache Airflow

http://airflow.apache.org
バージョン毎の使えたり使えなかったりが多いので使っているバージョンで固定して調べるのが良さそう。

https://airflow.readthedocs.io/en/1.10.4/

Google Cloud Composer

Airflowのマネージドサービス
https://cloud.google.com/composer/docs/concepts/overview

创建本地开发环境(使用Docker)

拉取并使用这个图片很方便。

    https://hub.docker.com/r/puckel/docker-airflow/

检查工作流程。

可以使用python dags/hoge.py进行检查。

使用airflow CLI的`list_dags`命令可以看到与上述相同的结果,但是它不能链接到源DAG,所以不能检查子DAG文件。所以,也许最好是获取文件名并分别运行`python xxx.py`,这样可以更全面地覆盖。

    https://airflow.apache.org/tutorial.html#running-the-script

想要在Slack上通知(使用SlackWebHookOperator、SlackPostAPIOperator)。

    • SlackWebHookOperator

WebHook URL作成、Airflow Conn 作る必要ある。
attachments 書けない(1.10.2時点)
slack用のパッケージインストールなどは不要。

SlackPostAPIOperator

Airflow Connは不要。slack API tokenを引数にわたす必要ある。
slackclient(<2.0)のインストールが必要。
attachements書ける。

不管选择哪种方法,都希望能够使用类似Terraform的工具进行连接或添加Python包进行配置,这让人感到困惑。

我想要钩子并通知有关工作的成功与否以及重试状态。

在 BaseOperator 中有一个名为 on_{success|failure|retry}_callback 的参数,你只需要将通知的方法作为参数传递即可。

    https://airflow.apache.org/howto/operator/dingding.html

我想在JST时区处理日期数据(用户定义的过滤器)。

参考文献

    https://stackoverflow.com/questions/47112291/how-to-trigger-daily-dag-run-at-midnight-local-time-instead-of-midnight-utc-time/47227399#47227399

我想以SubDAG的结果和依赖关系创建另一个DAG(ExternalTaskSensor)。

我想要执行不依赖于CloudComposer的作业(KubernetesPodOperator)。

    • image_pull_policy の初期値は IfNotPresent なので Always にする。

 

    未指定だと、imageを最新にしても元のimageを消しても最新にならないので注意。

引用来源

    • https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator

 

    https://kubernetes.io/docs/concepts/configuration/overview/#container-images

其他

    • schedulerによる実行時の execution_date と Web UIのtriggerから実行するときのそれに入る値にズレがあるのが違和感。

スケジューリングの動作についてわかりやかった説明
https://medium.com/programming-soda/590ad9f9fb80

AirflowSkipException についてもあとで書く。

广告
将在 10 秒后关闭
bannerAds