Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再...
Search
MonotaRO
PRO
May 23, 2024
Technology
870
5
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する
MonotaRO
PRO
May 23, 2024
More Decks by MonotaRO
See All by MonotaRO
AIでSDLCは変わろうとしている では、人と組織をどう再設計すべきか
monotaro
PRO
0
200
モノタロウにおけるSREの現在地:モダナイゼーションの過程で変化していく組織に、SREはどう向き合ったか
monotaro
PRO
1
610
AIと共に、組織をどう進化させるか?
monotaro
PRO
1
360
ビジネスを駆動するアーキテクチャへ ~AI-Agentという新しいアクター
monotaro
PRO
2
16k
事業成長を支えるためのデータ アーキテクチャの取り組み - Data Engineering Summit
monotaro
PRO
0
560
AIと共に進化するモノタロウ - AI駆動開発 Conference Autumn 2025
monotaro
PRO
8
4.1k
映えないObservability
monotaro
PRO
2
890
Datadogを活用した マイクロサービスの可観測性向上 ~モノタロウの導入効果と実践ノウハウ~
monotaro
PRO
0
300
FastAPIの魔法をgRPC/Connect RPCへ
monotaro
PRO
1
2.4k
Other Decks in Technology
See All in Technology
Oracle AI Database@Google Cloud:サービス概要のご紹介
oracle4engineer
PRO
6
1.6k
ACE-Step-1.5で見る 音楽生成AIのしくみと“破綻だけ直す”Retake機能の開発【zennfes spring 2026 登壇資料】
personabb
1
540
いまさら聞けない「仕様駆動開発入門」 〜AI活用時代の開発プロセスを考える〜
findy_eventslides
2
160
Flow 不死:AI 時代 DevOps 的不變本質
cheng_wei_chen
2
360
クラウドファンディング版StackChan 3体(4体)をインタラクティブな体験型作品にして展示もした話 / スタックチャンお誕生日会2026
you
PRO
0
100
GitHub Copilot app最速の発信の裏側
tomokusaba
1
200
Kubernetesにおける学習基盤とLLMOpsの概要
ry
1
320
データレイクの「見えない問題」を可視化する
sansantech
PRO
1
120
Kiroで書いた 設計書 が AI レビューの 採点基準 になる
ezaki
0
130
脆弱性対応、どこで線を引くか
rymiyamoto
1
420
ロボティクスの技術 / Robotics Technology
ks91
PRO
0
110
10年間のブログ発信を振り返って見えたWebアプリケーションエンジニアとしての軌跡
stefafafan
0
170
Featured
See All Featured
[SF Ruby Conf 2025] Rails X
palkan
2
1.1k
Stewardship and Sustainability of Urban and Community Forests
pwiseman
0
230
Jess Joyce - The Pitfalls of Following Frameworks
techseoconnect
PRO
1
170
We Analyzed 250 Million AI Search Results: Here's What I Found
joshbly
1
1.4k
30 Presentation Tips
portentint
PRO
1
330
Redefining SEO in the New Era of Traffic Generation
szymonslowik
1
340
Winning Ecommerce Organic Search in an AI Era - #searchnstuff2025
aleyda
1
2k
brightonSEO & MeasureFest 2025 - Christian Goodrich - Winning strategies for Black Friday CRO & PPC
cargoodrich
3
730
Lessons Learnt from Crawling 1000+ Websites
charlesmeaden
PRO
1
1.3k
How to Align SEO within the Product Triangle To Get Buy-In & Support - #RIMC
aleyda
2
1.5k
Hiding What from Whom? A Critical Review of the History of Programming languages for Music
tomoyanonymous
2
860
More Than Pixels: Becoming A User Experience Designer
marktimemedia
3
440
Transcript
BigQueryとCloud Composerを使って大 規模バッチ処理をデータパイプラインに 再構築する 1 © 2023 MonotaRO Co., Ltd.
All Rights Reserved. Masato Nakamura(@masahito) #bq_sushi #19@Google渋谷ストリーム 2024/05/23
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 { "name":"Masato Nakamura", "SNS": {
"X(a.k.a. Twitter)": "masahito", "GitHub": "masahitojp" }, "loves": [ "Python", "JS", "Java/Scala", "Coffee☕ ", "大規模データ処理" ] } 2 自己紹介(self-introduction)
わたしたちについて 3 BtoB を対象に、 自ら間接資材の在庫を持ち、 自らオンラインで売るEC 企業 コールセンター、商 品
採 用、物 流、 マーケティング、データサイエンス、 IT など多くの業務とシステムを 自社開発、自社運用している フルスタック EC カンパニー 事業紹介
わたしたちについて 4 事業紹介 商品点数 2,217万点 ユーザー数 約910万件 売上 2542億円
グローバルに サービス展開 (2023年度実績)
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 今回の発表では BigQueryでのバッチ処理の構築/改善 についてお話しします。 5
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 本日お話しすること • BigQueryを利用した大規模データパイプラインの構築は 作るだけではなく、改善しつづける必要があるよ •
「何を」行うか(データの移動と変換)に集中しよう • ワークフローエンジン(Apache Airflow)を利用すると運 用で苦労しない機能が揃っているよ 6
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 アジェンダ 1. バッチ処理 から ワークフローへ
2. ワークフローエンジン Apache Airflow紹介 3. 安定運用のために改善し続ける 7
バッチ処理 からワークフローへ 8
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 事例:MonotaROでの商品情報基盤 • 商品情報の検索用のミドルウェアの移行中 ◦ Solr
-> Elasticsearchへ ▪ 検索 カテゴリーの記事 - MonotaRO Tech Blog • 商品情報データ作成処理も再実装して移行中 ◦ 商品情報 約2,217万 ◦ 商品属性名のバリエーション 約2.6万 9
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 事例:MonotaROでの商品情報基盤 • 結論:商品情報データ作成処理の移行のために全体の処 理を見直しました。 ◦
商品情報データ作成処理の業務ロジック部分を BigQuery+SQLで実装 ◦ ワークフローエンジンを利用する 10
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 事例:MonotaROでの商品情報基盤 11 • 商品情報データ作成処理は(Python+BigQuery)のバッ チ処理で作っていた
BigQuery ソース データ BigQuery 商品情報 データ データ変換 検索エンジン 商品情報 データ SQLをテーブルの依 存関係に従って順番 に実行 業務ロジック(変換) をPython+SQLで実 装 エラーが起きたら再 実行 外部システムとの連 携処理 やらないといけないことがたくさんある 😅
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 事例:MonotaROでの商品情報基盤 12 • やらないといけないことを整理 ◦
業務ロジックとそれ以外の処理をまずは分けた ワークフローエンジン BigQuery ソース データ BigQuery 商品情報 データ データ変換 検索エンジン 商品情報 データ 業務ロジック 注力したいのはここ! タスク依存関係の解決 外部システムとの 連携処理 リトライ処理
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 13 • 処理の移行をするために、あらためて処理を見直す ワークフローエンジン BigQuery
ソース データ BigQuery 商品情報 データ データ変換 検索エンジン 商品情報 データ 検索エンジンへの投入は システムを分ける 業務ロジック 事例:MonotaROでの商品情報基盤 注力したいのはここ! タスク依存関係の解決 リトライ処理 業務ロジックとは別でバッチ処理の機能として考えられる部分 外部システムとの 連携処理
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 14 • 処理全体をフルスタックのワークフローエンジンに のせ、業務ロジックに集中すると決めた ワークフローエンジン
BigQuery ソース データ BigQuery 商品情報 データ データ変換 検索エンジン 商品情報 データ タスク依存関係の解 決 外部システムとの連 携処理 ワークフローエンジンで実装する リトライ 業務ロジック 事例:MonotaROでの商品情報基盤 注力したいのはここ! 検索エンジンへの投入は システムを分ける
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 15 • 業務ロジックに集中することで移行がスムーズに! タスク依存関係の解決や エラー処理はワークフローエ
ンジンでカバー ワークフローエンジン BigQuery ソース データ BigQuery 商品情報 データ データ変換 業務ロジックを実装する ことに専念 BigQuery+SQLで実装 Elasticsearch 商品情報 データ 事例:MonotaROでの商品情報基盤
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 • 「何を」行うか(データの移動と変換)<- これが大事 • 「どのように」行われるか
◦ (依存関係の管理、タスクのスケジューリング、エ ラーハンドリングなど) 16 [バッチ処理 から ワークフローへ]捉え方を変えることの効果
ワークフローエンジンCloud Composer(Apache Airflow)紹介 17
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 • ワークフローエンジンは、特定の業務プロセスやタスクの実行を自動 化、管理、監視するためのツールまたはソフトウェアプラットフォー ムです。 •
Python では以下が有名です。 ◦ Apache Airflow, Luigi, Prefect etc • クラウドサービスだと ◦ GCP Workflows, AWS step functions etc 18 ワークフローエンジンとは?
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Apache Airflow • 2014年にAirbnb社が開発したオープンソースのワークフローエンジン、2016年より Apache財団に寄贈。開発言語は
Python で、処理もPythonで書ける。 • ETL(ELT)ツールと呼ばれることもある ◦ Extract、Transform、Loadの略で、複数のシステムからデータを抽出し、抽出 したデータを変換/加工した上でデータウェアハウス等へ渡す処理、およびそれ を支援するソフトウェアのこと 19
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Apache Airflow • 2014年にAirbnb社が開発したオープンソースのワークフローエンジン、2016年より Apache財団に寄贈。開発言語は
Python で、処理もPythonで書ける。 • ETL(ELT)ツールと呼ばれることもある ◦ Extract、Transform、Loadの略で、複数のシステムからデータを抽出し、抽出 したデータを変換/加工した上でデータウェアハウス等へ渡す処理、およびそれ を支援するソフトウェアのこと ◦ -> 今回は商品情報データの変換処理(TL)なので用途に丁度あう! 20
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Apache Airflow • 特徴: 各クラウドサービスとの連携がプラグインとして提
供されていて、主要な機能がすでに機能として使える状 態。外部システムとの連携を作りやすい。 ◦ Google Cloud - BigQuery, Cloud Dataflow, Workflows 21
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Apache AirflowとDAG • DAG (Directed
acyclic graph; 有向非巡回グラフ)を利用 してワークフローを構築 ◦ 「依存のあるタスクをどの順番で実行するか」 22
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Apache AirflowとDAG • 利用しているDAGの例 #
BigQueryに配置しているソースデータから新しいデータを作成 with DAG(dag_id='my_dag', ...) as dag: wait_table = BigQueryTableExistenceSenosr( # 中間テーブル作成完了を待つ task_id='wait_table',... ) execute = BigQueryInsertJobOperator( # 中間テーブルを参照して新たにテーブル作成 task_id='extract_and_transform_and_load',... ) wait_table >> execute ## タスクを列挙して、実行の順番を定義する 23
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Apache Airflow の構成 • スケジューラー
• ワーカー • メタデータ・データベース • ウェブサーバー • UI 24
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Managed service利用の勧め • Apache Airflowの安定運用は難しい😅
◦ 構成要素を全部安定運用できる前提 ◦ ワーカーのスケールを考えるとK8sでの運用が必要 • マネージドサービスでの運用をお勧めしたい ◦ Google Cloud : Cloud Composer ◦ Amazon Managed Workflows for Apache Airflow 25
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Managed service利用の勧め • MonotaROの商品情報データ作成処理ではGoogle Cloud
& Cloud Composer を採用しています • 採用したポイント ◦ 商品情報生成処理ではBigQueryを採用 ◦ Google Cloudの他サービスとの親和性の高さ 26
安定運用のために改善し続ける 27
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 運用を改善し続けるために気をつけていること • 保守性を上げるため、処理を再利用可能にする • Apache
Airflowのバージョンアップ • アラートを意識してログを出力する • リリースを自動化する • 本番環境でのミスを減らす ◦ リリース自動化, 開発環境, ユニットテスト 28
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 保守性を上げるため、処理を再利用可能に • ワークフローの処理は同じような処理を書くことが多い ◦ ログへの出力,
BQでの処理実行 etc • DAGで頻出するパターンの処理を関数でまとめておき、 再利用すると良い ◦ TaskGroupの機能を使って処理をまとめておくとさら に良い 29
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 TaskGroupの勧め def check_source_and_create_table(wait_tables: List[str], sql_file)
-> TaskGroup: with TaskGroup(group_id=group_id) as task_group: # 中間テーブル作成完了を待つ wait_tables = BigQueryTableExistenceSensor(..) # 中間テーブルを参照して新たにテーブル作成 create_table = BigQueryInsertJobOperator(...) wait_tables >> create_table return task_group with DAG() as dag .... ham = check_source_and_create_table([‘ham’], ‘./create_ham_table.sql’) spam = check_source_and_create_table([‘spam’], ‘./create_spam_table.sql’) eggs = check_source_and_create_table([‘eggs’], ‘./create_eggs_table.sql’) start >> [ham, spam, eggs] >> end 関数にまとめることで、 DAG の可読性が上がる 30
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 TaskGroupの勧め with DAG() as dag
.... ham = check_source_and_create_table([‘ham’], ‘./create_ham_table.sql’) spam = check_source_and_create_table([‘spam’], ‘./create_spam_table.sql’) eggs = check_source_and_create_table([‘eggs’], ‘./create_eggs_table.sql’) start >> [ham, spam, eggs] >> end 31 DAGも分かりやすくなり チームで保守しやすくなる 😄
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 Apache Airflowのバージョンアップ • 各クラウドサービスでの新しいプロダクトを使う際には Apache
Airflowのアップデートが必要です • Cloud Composerではサポート期限が設定されているので 定期的にアップデートしましょう。 • セキュリティのアップデート、UIの改善が行われており ますので定期的な更新をお勧めします 32
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 アラートを意識してログを出力する • Cloud ComposerではログをCloud Loggingに直接出力で
きる • 上手くログを出すことでチームの運用が楽になるアラー トを簡単に作れる 33
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 事例紹介: ログとアラート • 移行前のシステムとデータ比較している.失敗した時にア ラートを出している。
失敗したことはわかるが、どこかは ログを追わないとわからない 😭 調査には半日かかることも 😭 34
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 事例紹介: ログとアラート # import Python
logging module. import logging def _check_diff_field(**kwargs): # 差分を確認する if diff_set: # Data Comparison Warning: という文字列ログを残すことで Cloud Monitoring 側でアラート通知を行う logger.warning(f"Data Comparison Warning:[{diff_table_name.id}]:{diff_set}") return None check_diff_data = PythonOperator( task_id='check_diff_data', python_callable=_check_diff_field) 35
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 事例紹介: ログとアラート 差異が出たデータをアラート に出力することでログを見な くても原因が分かりやすくなっ
た😄 36
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 本番環境でのミスを減らす-リリースを自動化 • リリースミス/ロールバックも自動化することでリリース の負荷を減らせる ◦
Cloud ComposerではGCSにDAGを配置することでリ リースが可能なため、VCS(Githubなど) とCloud buildを連携することで実現 37 Git tagの作成 デプロイ
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 本番環境でのミスを減らす-ロジックのユニットテスト導入 • データの変更が意図通りであることを確認する ◦ 「商品に対して販売価格を計算する」のようにある程度の
粒度でクエリを分割し処理をおこなっているイメージ。各 クエリに対して動作を確認する。 ◦ -> 実行するBigQueryのクエリに対して、ユニットテスト を準備する ◦ -> クエリはGitHubで管理しているのでPR時点で自動実行 38
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 • ユニットテストのフロー 1. 必要なデータを insert
2. クエリを実行 3. 実行結果を結果テーブルから selectしてassert/error関数で検証 39 事例紹介: ロジックのユニットテスト導入
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 本番環境でのミスを減らす-開発環境の準備 • 本番でのDAGエラーを防ぐ ◦ お金はかかりますが環境は複数用意しておくことでリ
スクを軽減できます。 ◦ Cloud Composerの場合、 ローカル開発 CLI ツールを 利用して本番に出す前に動作確認することも可能で す。 40
今後の取り組み 41
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 今後の取り組み • 基盤として顧客の検索体験向上やサイトUXの高度化の施 策実行しやすくする ◦
移行が進んだことで施策の実行を増やす • データ(DaaS)としてのユーザ体験の向上(データUX向上) ◦ 検索に利用するデータとして開発がスタートしたが、分析/推薦 など別の用途でも利用が拡大している。データの利用法などの整 備が必要なタイミング ▪ Dataplexなど検討中(特にデータリネージが欲しい) 42
まとめ 43
BigQueryとCloud Composerを使って大規模バッチ処理をデータパイプラインに再構築する #bq_sushi #19@Google渋谷ストリーム 2024/05/23 まとめ • BigQueryを利用した大規模データパイプラインの構築 は作るだけではなく、運用を改善しつづける必要があ るよ
• 「何を」行うか(データの移動と変換)に集中しよう • ワークフローエンジン(Apache Airflow)を利用すると 運用で苦労しない機能が揃っているよ 44
ありがとうございました 45
Any Other Questions? 46
47 © 2020 MonotaRO Co., Ltd. All Rights Reserved.