Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Airflow入門

takashi oya
December 15, 2022
4.2k

 Airflow入門

DeNA / MoT共同のAI技術共有会で発表した、Airflow入門資料です。

takashi oya

December 15, 2022
Tweet

Transcript

  1. 4 • Education / Research ◦ 早稲田大学高等学院 (2013.4 - 2016.3)

    ◦ 早稲田大学先進理工学部応用物理学科 (2016.4 - 2020.3) ▪ 首席 ◦ 早稲田大学先進理工学術院 (略) 森島研 (2020.4 - 2022.3) ▪ 研究分野: Audio-Visual Learning • 国際会議でfull paper 2本 • Work Experience ◦ ALBERT (2017.2 - 2017.11) [バイト] ▪ 動物の皮膚病診断とか、諸々 ◦ DeNA (2018.6 - 2018.9, 2020.8 - 2020.8) [バイト] ▪ 主にベイスターズ関連 ◦ DeNA 新卒入社 (2022.4 - 現在) [正社員] ▪ IRIAM、ベイスターズ関係のPJ 自己紹介
  2. 5 • Kaggle ◦ Grandmaster ▪ 0勝5敗 (🥇0🥈2🥉1) ▪ 結果を残せず

    ◦ テーブルデータメイン ▪ 仕事では computer vision多め ◦ 入社してからやってない • 趣味 ◦ 囲碁(野狐8段) ◦ マスターデュエル ▪ ソリティアが非常に得意 自己紹介
  3. • Airflowを発表テーマとして選んだ理由 ◦ 7月頃から運用業務でAirflowを使用 ▪ ただ、大規模な変更を加えることはなかった ◦ 来年から本格的に触る予定 ▪ Udemy等で勉強

    • 間違っているところがあったら教えて頂けると有り難いです🙏 ◦ (皆さんの方が詳しそう) • 本来であれば、コード多めのハンズオンをやりたかったのですが ◦ 発表資料が膨れ上がってしまい断念(次回やるかも) 雑談
  4. • しかし。。。 ◦ 処理がたくさんあったりするとなかなか大変 ▪ エラーして落ちた部分から再実行したいんだけど?😡 ▪ ログの管理が面倒なんだけど😭 ▪ ん?どのタスクがどれくらい時間かかってるの?これ?😩

    ▪ タスク間の依存関係ってどうなってるんだっけ?😥 ▪ 正直、ここらへん全部GUIで出来たら嬉しいんだけどなあ。。 こんな時にどうしたら良い? Airflowを使ってみよう!
  5. • ワークフローをスケジューリング/モニタリングするためのプラットフォーム ◦ 本家はOSS, written in python ◦ GCPやAWS上には有料のフルマネージドサービスが存在 ▪

    GCP上ならGoogle Cloud Composer ▪ AWS上ならManaged Workflows for Apache Airflow (MWAA) ◦ 特に定期的なタスクの実行に最適 ▪ 毎日朝8時にデータの存在を確認して前処理 & 推論を行いたい等 Airflowとは? 公式ロゴ
  6. • Airflowを使う際の注意点 ◦ 毎秒単位の処理が必要な場合は使わない ▪ ストリームデータ処理に対しては不適 ◦ Airflow”内で”重い処理を行わない ▪ AirflowはOrchestrator

    • 自身は重い処理を行わず、他を管理するのが仕事 ▪ Airflow上で大きいデータの処理を行うのはアンチパターン • 簡単にmemory errorになる • 代わりに、例えばSparkにjobをsubmitする Airflowとは?
  7. • Scheduler ◦ タスクのスケジューリングを行ってくれる ▪ 全タスクとDAGをモニタリング ▪ 依存するタスクが実行されたら次のタスクを開始 ▪ 一定時間ごとにタスクがトリガーされたかを確認

    • Triggerer ◦ タスクのトリガーを行う ◦ 主にDeferrable Operatorという特殊なオペレータのために用いられる Airflowの構成要素: Scheduler, Triggerer
  8. • Metastore ◦ 様々なmetadataを保持するデータベース ▪ パーミッションやAirflowの設定 ▪ XComs (タスク間で受け渡す情報) や各DAGの情報

    ▪ DAG code ◦ SQLAlchemyで操作可能 ▪ Postgres、MySQL、MSSQL, SQLiteなどが使える Airflowの構成要素: Metadata database (Metastore)
  9. • Executor ◦ どういう形でタスクが実行されるかを定義する ▪ Sequential Executor • 一度の一つのタスクを実行 •

    スケールしない ▪ Local Executor • マルチプロセスで動作 ▪ CeleryExecutor • Celeryクラスタ上で実行 Airflowの構成要素: Executor
  10. • Task ◦ Airflowの実行単位。Taskが組み合わさってDAGを構成する。 ◦ Operatorを呼ぶことによって生成する • Task Instance ◦

    DAGの実行時に生成されるタスクのインスタンスのこと ▪ 例えば、8月11日実行分の ”ping” のtask instanceとか Airflowの基本概念: Task, Task instance with DAG("my-dag") as dag: ping = SimpleHttpOperator(endpoint="http://example.com/update/") email = EmailOperator(to="[email protected]", subject="Update complete") ping >> email <dag.py>
  11. • Operator ◦ タスクが実際に何を行うかを規定するもの(3種類) ▪ 1. Action Operators: 何らかのアクションを行うオペレータ •

    PythonOperator ◦ Pythonで書かれた関数を実行 • BashOperator ◦ Bash commandを実行 • SimpleHTTPOperator ◦ HTTPリクエストを行う • etc. Airflowの基本概念: Operator
  12. • Operator ◦ タスクが実際に何を行うかを規定するもの(3種類) ▪ 2. Transfer Operators: データの移動を行うオペレータ •

    S3ToGCSOperator ◦ S3からGCSにデータを移動 • S3ToRedShiftOperator ◦ S3からRedshiftにデータを移動 • OracleToGCSOperator ◦ OracleからGCSにデータを移動 • etc. Airflowの基本概念: Operator
  13. • Operator ◦ タスクが実際に何を行うかを規定するもの(3種類) ▪ 3. Sensor Operators: 条件が満たされるまで待機するオペレータ •

    SqlSensor ◦ 特定の条件が満たされるまで30秒ごとにSQL文を実行し待機 • HttpSensor ◦ HTTPレスポンスが返ってくるまで待機 • FileSensor ◦ 特定のファイルかフォルダが出現するまで待機 • etc. Airflowの基本概念: Operator
  14. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Runオブジェクトを生成 DAG Run パース 情報を取得して反映
  15. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Run パース Task instance 情報を取得して反映
  16. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Run パース Task instance Task instance 情報を取得して反映
  17. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Run パース Task instance Task instance 情報を取得して反映
  18. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Run パース Task instance Task instance 情報を取得して反映
  19. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Run パース Task instance Task instance 情報を取得して反映 <失敗>
  20. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Run パース Task instance Task instance 情報を取得して反映 <成功>
  21. Airflow: 全体の動作の流れ DAG Folder Scheduler MetaStore Executor Web Server add

    files (e.g. dag.py) DAG Run パース Task instance Task instance 情報を取得して反映 <終了>
  22. • 公式のdocker-compose.yamlをダウンロード ◦ 適当なフォルダ (e.g. airflow-materials) を作ってそこに配置 • .envファイルを作成し、以下を記述 •

    以下のコマンドを実行 Dockerを使った導入 AIRFLOW_IMAGE_NAME=apache/airflow:2.4.2 AIRFLOW_UID=50000 $ docker-compose up -d Status: Downloaded newer image for apache/airflow:2.4.2 Creating materials_redis_1 ... done Creating materials_postgres_1 ... done Creating materials_airflow-init_1 ... done Creating materials_airflow-worker_1 ... done Creating materials_airflow-webserver_1 ... done Creating materials_airflow-scheduler_1 ... done Creating materials_airflow-triggerer_1 ... done
  23. • 基本は、dagsというフォルダの下にdag.pyを配置する ◦ dag.pyの中でDAGを定義 ▪ まずは必要なoperatorなどをimport Airflow: 基本的な記法 """Example DAG

    demonstrating the usage of the BashOperator.""" from __future__ import annotations import datetime import pendulum from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator
  24. • 基本は、dagsというフォルダの下にdag.pyを配置する ◦ dag.pyの中でDAGを定義 ▪ With DAG()で囲うと、それが一つのDAGになる Airflow: 基本的な記法 with

    DAG( dag_id='example_bash_operator', schedule='0 0 * * *', start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, dagrun_timeout=datetime.timedelta(minutes=60), tags=['example', 'example2'], params={"example_key": "example_value"}, ) as dag:
  25. • AがBに依存 • AがBとCに依存 • AとBがCとDに依存 Airflow: 依存関係の表現 op1 >>

    op2 op4 >> [op3, op2] from airflow.models.baseoperator import cross_downstream # Replaces # [op1, op2] >> op3 # [op1, op2] >> op4 cross_downstream([op1, op2], [op3, op4])
  26. • Chainを使った記法 • 上流タスクと下流タスクを明示的に設定 Airflow: 依存関係の表現 from airflow.models.baseoperator import cross_downstream

    # Replaces # [op1, op2] >> op3 # [op1, op2] >> op4 cross_downstream([op1, op2], [op3, op4]) first_task.set_downstream(second_task, third_task) third_task.set_upstream(fourth_task)
  27. • タスク間で(比較的小さい)データの受け渡しを行う Airflow: XComs (クロスコミュニケーションズ) def _task1(ti): ti.xcom_push(key='example', value=0) def

    _task2(ti): print(ti.xcom_pull(key='example', task_ids='task1')) with DAG("xcom_dag", start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag: task1 = PythonOperator( task_id='task1', python_callable=_task1 ) task2 = PythonOperator( task_id='task2', python_callable=_task2 ) task1 >> task2
  28. • タスクのグループ化 ◦ groups/フォルダに以下を配置 Airflow: TaskGroupsの使用 <groups/group1.py> from airflow.operators.bash import

    BashOperator from airflow.utils.task_group import TaskGroup def tasks_1(): with TaskGroup("tasks_1", tooltip="tasks 1") as group: task_1a = BashOperator( task_id='task_1a', bash_command='echo 0' ) task_1b = BashOperator( task_id='task_1b', bash_command='echo 0' ) return group
  29. • タスクのグループ化 ◦ groups/フォルダに以下を配置 Airflow: TaskGroupsの使用 <groups/group2.py> from airflow.operators.bash import

    BashOperator from airflow.utils.task_group import TaskGroup def tasks_2(): with TaskGroup("tasks_2", tooltip="tasks 2") as group: task_2a = BashOperator( task_id='task_2a', bash_command='echo 1' ) task_2b = BashOperator( task_id='task_2b', bash_command='echo 1' ) return group
  30. • タスクのグループ化 Airflow: TaskGroupsの使用 <dag.py> from airflow import DAG from

    airflow.operators.bash import BashOperator from groups.group1 import tasks_1 from groups.group2 import tasks_2 from datetime import datetime with DAG('group_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag: first_tasks = tasks_1() mid_task = BashOperator( task_id='mid task', bash_command='echo 2' ) second_tasks = tasks_2() first_tasks >> mid_task >> second_tasks
  31. • タスクのグループ化 ◦ Subdagを使う手もあるが、基本的には非推奨 ▪ Airflow 2.2でdepreciatedになった ▪ Cloud Composerでも明確に非推奨

    • “SubDAG は使用しないでください。” ▪ Astronomerでも明確に非推奨 • “Don't use SubDAGs.” ▪ 公式Docsでも問題点が書かれている • “Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot.” Airflow: TaskGroupsの使用
  32. • Airflowとは ◦ ワークフローを管理するためのOSS • Airflowの構成要素/基本概念 ◦ Web Server, Scheduler,

    MetaStore, worker, Executorなど ◦ DAGを用いてタスクの依存関係を記述する • GUI ◦ ログを簡単に確認出来たり、ボトルネックを洗い出せたり便利! • コードの書き方 ◦ シフト演算子、XComs, TaskGroupなど まとめ
  33. • 特に参考にした資料 ◦ 公式ドキュメント ◦ Airflow入門講座 ▪ 英語で有料ですが、わかりやすいのでオススメ ▪ 更新頻度も高

    • 他に参考にした資料 ◦ Airflowのsubdagは注意点多いよ ◦ Operators in Apache Airflow ◦ Airflow - データパイプラインのスケジュールと監視をプログラムしてみた 参考資料