Apache Airflow / Cloud Composer笔记
DAG的简单构成
公式文件
Apache Airflow
http://airflow.apache.org
バージョン毎の使えたり使えなかったりが多いので使っているバージョンで固定して調べるのが良さそう。
https://airflow.readthedocs.io/en/1.10.4/
Google Cloud Composer
Airflowのマネージドサービス
https://cloud.google.com/composer/docs/concepts/overview
创建本地开发环境(使用Docker)
拉取并使用这个图片很方便。
- https://hub.docker.com/r/puckel/docker-airflow/
检查工作流程。
可以使用python dags/hoge.py进行检查。
使用airflow CLI的`list_dags`命令可以看到与上述相同的结果,但是它不能链接到源DAG,所以不能检查子DAG文件。所以,也许最好是获取文件名并分别运行`python xxx.py`,这样可以更全面地覆盖。
- https://airflow.apache.org/tutorial.html#running-the-script
想要在Slack上通知(使用SlackWebHookOperator、SlackPostAPIOperator)。
-
- SlackWebHookOperator
WebHook URL作成、Airflow Conn 作る必要ある。
attachments 書けない(1.10.2時点)
slack用のパッケージインストールなどは不要。
SlackPostAPIOperator
Airflow Connは不要。slack API tokenを引数にわたす必要ある。
slackclient(<2.0)のインストールが必要。
attachements書ける。
不管选择哪种方法,都希望能够使用类似Terraform的工具进行连接或添加Python包进行配置,这让人感到困惑。
我想要钩子并通知有关工作的成功与否以及重试状态。
在 BaseOperator 中有一个名为 on_{success|failure|retry}_callback 的参数,你只需要将通知的方法作为参数传递即可。
- https://airflow.apache.org/howto/operator/dingding.html
我想在JST时区处理日期数据(用户定义的过滤器)。
参考文献
- https://stackoverflow.com/questions/47112291/how-to-trigger-daily-dag-run-at-midnight-local-time-instead-of-midnight-utc-time/47227399#47227399
我想以SubDAG的结果和依赖关系创建另一个DAG(ExternalTaskSensor)。
我想要执行不依赖于CloudComposer的作业(KubernetesPodOperator)。
-
- image_pull_policy の初期値は IfNotPresent なので Always にする。
- 未指定だと、imageを最新にしても元のimageを消しても最新にならないので注意。
引用来源
-
- https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator
- https://kubernetes.io/docs/concepts/configuration/overview/#container-images
其他
-
- schedulerによる実行時の execution_date と Web UIのtriggerから実行するときのそれに入る値にズレがあるのが違和感。
スケジューリングの動作についてわかりやかった説明
https://medium.com/programming-soda/590ad9f9fb80
AirflowSkipException についてもあとで書く。