Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
10/29 Airflowの基礎を学ぶハンズオンワークショップ
Search
Hank Ehly
October 28, 2022
Technology
310
0
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
10/29 Airflowの基礎を学ぶハンズオンワークショップ
Hank Ehly
October 28, 2022
More Decks by Hank Ehly
See All by Hank Ehly
Fivetranでデータ移動を自動化する
hankehly
0
650
Celeryの紹介と本番運用のTips
hankehly
0
1.6k
ChatGPTを活用した 便利ツールの紹介
hankehly
1
1.4k
Efficient Energy Analytics with Airflow, Spark, and MLFlow
hankehly
0
400
Deferrable Operators入門
hankehly
0
760
【初心者/ハンズオン】Dockerコンテナの基礎知識
hankehly
0
590
Compositeパターン: オブジェクトの階層関係をエレガントに表現する方法
hankehly
0
350
システム/データ品質保証のための Airflow 活用法
hankehly
0
680
海外の記事からコードレビューのBest Practiceを集めてみました
hankehly
0
1k
Other Decks in Technology
See All in Technology
20260619 私の日常業務での生成 AI 活用
masaruogura
1
230
AIはどのように 組織のアジリティを変えるのか?
junki
4
1.1k
就職⽀援サービスにおけるキャリアアドバイザーのシフトスケジューリング
recruitengineers
PRO
1
150
エラーバジェットのアラートのタイミングを考える.pdf
kairim0
0
180
ACE-Step-1.5で見る 音楽生成AIのしくみと“破綻だけ直す”Retake機能の開発【zennfes spring 2026 登壇資料】
personabb
1
540
2026 TECHFRESH 畢業分享會 - AI-Native 重塑軟體工程與虛擬講師
line_developers_tw
PRO
0
1.3k
気軽に使える"情報のハブ"としてのNotion活用 〜フロー情報の集積点 と、 Claude Code × Notion AI〜
syucream
1
160
2026TECHFRESH畢業分享會 - 葬送的通靈師:化系統與用戶雜訊成行動訊號
line_developers_tw
PRO
0
1.3k
人材育成分科会.pdf
_awache
4
300
Chainlitで作るお手軽チャットUI
ynt0485
0
280
アンオフィシャルな、オフィシャルからのお願い
wyamazak_devrel
0
140
自分が詳しくない領域でAIを使う #プロヒス2026
konifar
17
5.9k
Featured
See All Featured
Taking LLMs out of the black box: A practical guide to human-in-the-loop distillation
inesmontani
PRO
3
2.3k
世界の人気アプリ100個を分析して見えたペイウォール設計の心得
akihiro_kokubo
PRO
71
40k
[RailsConf 2023] Rails as a piece of cake
palkan
59
6.7k
GraphQLとの向き合い方2022年版
quramy
50
15k
Build The Right Thing And Hit Your Dates
maggiecrowley
39
3.2k
Evolution of real-time – Irina Nazarova, EuRuKo, 2024
irinanazarova
9
1.4k
Unsuck your backbone
ammeep
672
58k
Sam Torres - BigQuery for SEOs
techseoconnect
PRO
0
290
KATA
mclloyd
PRO
35
15k
Unlocking the hidden potential of vector embeddings in international SEO
frankvandijk
0
850
Conquering PDFs: document understanding beyond plain text
inesmontani
PRO
4
2.8k
VelocityConf: Rendering Performance Case Studies
addyosmani
333
25k
Transcript
Airflowの基礎を学ぶハンズオ ンワークショップ Tokyo Apache Airflow Meetup 2022/10/29 1
• Hank Ehly(名:ハンク 姓:イーリー) • ENECHANGE株式会社 • https://qiita.com/hankehly • https://github.com/hankehly
• https://connpass.com/user/hankehly 自己紹介 2
スポンサーのご紹介 https://www.astronomer.io/ 3 https://enechange.co.jp/recruit/require ment/django-engineer/ Airflowを活用し、電力時系列データの分析を 中心としたアプリケーション企画 /開発
アジェンダ • 課題 #1 - Example DAGを実行する ◦ Airflowの起動方法 /
DAG実行 / ログ確認 • 課題 #2 - 簡易的なDAGを作る ◦ DAG/Taskの定義方法 / XComやDAGパラメータの使い方 • 課題 #3 - 高度なDAGを作る ◦ SQLite DBにWeb APIから取得したデータを挿入する 4
課題 #1 - Example DAGを実行する • Airflowを起動する • Example DAGを有効にして実行する
• タスクログを確認する 5
Airflow を Python 仮想環境 (venv) にインストールする場合 6 # venv モジュールのインストール
(Debian/Ubuntu の場合) $ sudo apt-get install -y python3-pip python3-venv # venv を作成し、中に入る $ python3 -m venv (任意のパス) # venv の作成 $ source (上で指定したパス)/bin/activate # venv に入る $ pip install --upgrade pip setuptools wheel # pipやその関連ツール自体をアップグレードしておく ... Successfully installed pip-22.3 setuptools-65.5.0 wheel-0.37.1 # 以下, venv に入った状態で Airflow のインストールを行う # venv から出る $ deactivate # 不要になった venv を削除 $ rm -rf (上で指定したパス)
課題 #1 - Example DAGを実行する Airflowを起動する (Standalone) Quick Start -
Airflow Documentation # Airflowのプロジェクトディレクトリを作る mkdir ~/airflow-test && cd airflow-test export AIRFLOW_HOME=~/airflow-test # Pythonパッケージをインストールする AIRFLOW_VERSION=2.4.2 PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERS ION}/constraints-${PYTHON_VERSION}.txt" pip install "apache-airflow==${AIRFLOW_VERSION}" "apache-airflow-providers-sqlite" --constraint "${CONSTRAINT_URL}" # standaloneコマンドは、DB初期化 / ユーザー作成 / 環境を起動してくれます。 # コンソールに出力されるユーザー名/PWをコピーして、ブラウザで localhost:8080 を開いてログインする airflow standalone 7 前提: • Windowsの方はWSL2 • Python 3.7以上 • Airflowバージョン 2.4以上 ※一部の Linux ディストリビューション( CentOS 7・8、Ubuntu 18.04 など)は、標準で付属する Python のバージョンが 3.6 の場合があります。 Python 3.6 向けの Airflow 2.3 以降の constraints が提供されていない ため、2.3 未満の constraints(2.2.5 が最新)を使う必要があります。
課題 #1 - Example DAGを実行する Airflowを起動する (Docker) Running Airflow in
Docker - Airflow Documentation # Airflowのプロジェクトディレクトリを作り、公式 docker-compose.yml をダウンロードす る mkdir ~/airflow-test && cd airflow-test curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.4.2/docker-compose.yaml' # 環境を立ち上げて、ブラウザで localhost:8080 を開いて、ユーザー名 :airflow / PW:airflow でログインする docker compose up -d 8 前提: • Windowの方 WSL2 を使用する • Python 3.7以上 • Airflowバージョン 2.4以上
課題 #1 - Example DAGを実行する DAGを有/無効化する 過去の実行履歴 実行スケジュール 手動実行する 9
課題 #1 - Example DAGを実行する タスクログ画面 タスクインスタンスのコンテキストメニュー ホーム画面 Graph画面 10
課題 #2 - 簡易的なDAGを作る • DAGを定義する • Pythonタスクを作る • Bashタスクを作る
• XComでタスク間にデータを渡す • DAGパラメータを出力する 11
課題 #2 - 簡易的なDAGを作る / DAGを定義する import datetime from airflow
import DAG with DAG( dag_id="exercise_02", description="A simple tutorial DAG", schedule="0 * * * *", # schedule_interval="0 * * * *", # Airflow 2.4未満の場合 start_date=datetime.datetime(2021, 1, 1), tags=["example"], ) as dag: pass 12
課題 #2 - 簡易的なDAGを作る / Pythonタスクを作る import datetime from airflow
import DAG with DAG( dag_id="exercise_02", description="A simple tutorial DAG", schedule="0 * * * *", # schedule_interval="0 * * * *", # Airflow 2.4未満の場合 start_date=datetime.datetime(2021, 1, 1), tags=["example"], ) as dag: @dag.task def compute_random_number(): import random num = random.randint(1, 10) return num random_number = compute_random_number() 13
課題 #2 - 簡易的なDAGを作る / Bashタスクを作る import datetime from airflow
import DAG from airflow.operators.bash import BashOperator with DAG( dag_id="exercise_02", description="A simple tutorial DAG", schedule="0 * * * *", # schedule_interval="0 * * * *", # Airflow 2.4未満の場合 start_date=datetime.datetime(2021, 1, 1), tags=["example"], ) as dag: @dag.task def compute_random_number(): import random num = random.randint(1, 10) return num random_number = compute_random_number() bash_task_1 = BashOperator( task_id="bash_task_1", bash_command="echo 'hello world'", ) random_number >> bash_task_1 14
課題 #2 - 簡易的なDAGを作る / XComでタスク間にデータを渡す import datetime from airflow
import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator with DAG( dag_id="exercise_02", description="A simple tutorial DAG", schedule="0 * * * *", # schedule_interval="0 * * * *", # Airflow 2.4未満の場合 start_date=datetime.datetime(2021, 1, 1), tags=["example"], ) as dag: @dag.task def compute_random_number(): import random num = random.randint(1, 10) return num random_number = compute_random_number() bash_task_1 = BashOperator( task_id="bash_task_1", bash_command="echo 'random number: {{ ti.xcom_pull(\"compute_random_number\") }}'", ) random_number >> bash_task_1 15 Templates reference
課題 #2 - 簡易的なDAGを作る / DAGパラメータを出力する import datetime from airflow
import DAG from airflow.operators.bash import BashOperator with DAG( dag_id="exercise_02", description="A simple tutorial DAG", schedule="0 * * * *", # schedule_interval="0 * * * *", # Airflow 2.4未満の場合 start_date=datetime.datetime(2021, 1, 1), tags=["example"], ) as dag: @dag.task def compute_random_number(): import random num = random.randint(1, 10) return num random_number = compute_random_number() bash_task_1 = BashOperator( task_id="bash_task_1", bash_command="echo 'random number: {{ ti.xcom_pull(\"compute_random_number\") }}'", ) bash_task_2 = BashOperator( task_id="bash_task_2", bash_command="echo 'message: {{ dag_run.conf[\"message\"] }}'", ) random_number >> bash_task_1 >> bash_task_2 16 Templates reference
課題 #3 - 高度なDAGを作る 事前準備 • SQLiteをインストールする • SQLite DBを作る
Airflow開発 • SQLite テーブルを作る • Web APIからデータを取得する ◦ https://jsonplaceholder.typicode.com/users • SQLiteテーブルにデータを挿入する 17 # ubuntu $ sudo apt-get install -y sqlite3 # mac $ brew install sqlite