Upgrade to Pro — share decks privately, control downloads, hide ads and more …

dbt-coreで実現するCore DataMartsの データモデリング〜Cloud Com...

pokoyakazan
October 24, 2024
800

dbt-coreで実現するCore DataMartsの データモデリング〜Cloud Composer編〜 / Core Datamarts Modeling With dbt Core On Cloud Composer

pokoyakazan

October 24, 2024
Tweet

Transcript

  1. dbt-coreで実現するCore DataMartsの データモデリング〜Cloud Composer編〜 2024/10/22 ZOZO Tech Meetup ~データガバナンス /

    データマネジメント~ 株式会社ZOZO
 技術本部 データシステム部 データ基盤ブロック
 奥山 喬史 
 Copyright © ZOZO, Inc.

  2. © ZOZO, Inc. 株式会社ZOZO 技術本部データシステム部 データ基盤ブロック 奥山 喬史 - 2018年4月

    ~ 2022年1月: Web企業でデータエンジニア - 2022年2月 ~ : ZOZOでデータエンジニア - BigQueryで構築されたデータ基盤の運用保守 - データパイプライン、データマート更新ジョブの運用開発 - 個人活動: - 「ぽこやかざん」という名前でラジオ投稿したり大喜利したり - 「下町モルモット」というコンビで漫才したり - 日本一になりました! - 「アマ芸人日本一が決定!社会人コンビに栄冠「人生が動きそうなワクワクがあ ります」昨年優勝者はR-1決勝で注目」というタイトルで、Yahoo!ニュースに掲載 (2024年8月18日付) 2
  3. © ZOZO, Inc. 3 目次 - データマート集計基盤の紹介 - dbt導入 -

    データマート集計基盤へのdbt導入 〜方針・設計編〜 - データマート集計基盤へのdbt導入 〜実装編〜 - 運用の工夫・つまずきポイント - まとめ・今後について
  4. © ZOZO, Inc. 5 データマート集計基盤 • データマート: ◦ データ基盤利用者が作成したSQLファイル内のクエリで日次更新されるBigQueryテーブル ◦

    ※SQLファイルにはSELECT文のみ記述、UPDATEやDELETEといったDMLは書かない • データマート集計基盤: ◦ Cloud Composer(Google Cloud提供のApache Airflowマネージドサービス)上に構築 ◦ 1,100超のデータマートを依存関係を保ちながら、余計な待ち時間なく更新していくシステム
  5. © ZOZO, Inc. 6 依存関係を保ちながら • 「FROM, JOINの後ろのマート」→「SQLを実行するマート」となるよう依存関係を構築している # table1.sql

    SELECT * FROM `project.dataset.existing_table1`; # table2.sql SELECT * FROM `project.dataset.existing_table2`; # table3.sql SELECT * FROM `project.dataset.table2`; # table4.sql SELECT * FROM `project.dataset.table1` UNION ALL SELECT * FROM `project.dataset.table3`; # table5.sql SELECT * FROM `project.dataset.table3`; (?i)(?<=FROM|JOIN)[\s \n]*`(.+?)` 正規表現で参照先データマートを抽出 例: existing_table[12]というソーステーブルと下記SQLのtable[1-5]というデータマートがある場合
  6. © ZOZO, Inc. 10 dbt導入によるデータマート整備 • 体系的なモデリングに則り品質担保されたデータマートをdbtで作成したい • 1,100超のデータマート全てをdbtモデル化したくない ◦

    SQLさえ書けば自動でデータマートを作成・更新してくれる既存システムは利便性が高い ◦ ビジネスサイドの社員も含む550人以上の利用者がいるため、dbtですべてモデリングし直す のは非現実的 • 2種類のデータマートを目的に応じて使い分ける ◦ SQLデータマート ▪ これまでのSQLによって更新されるデータマート ▪ レポーティング用途 ◦ dbtデータマート ▪ dbtによって更新されるデータマート ▪ 集計定義を統制して品質担保
  7. © ZOZO, Inc. 18 Cloud Composerからdbtデータマート更新 • dbt単体ではできない実行制御: ◦ モデルごと自動リトライ

    ◦ 依存関係による待ち合わせ制御 • → 既にデータマート集計基盤でこれらの機構は実装済み • → データマート集計基盤でdbtデータマートの更新もできるようにする • また、dbtデータマートを参照するSQLデータマートが存在するため、1つのAirflow Dagに統一 • ※Cosmosについても導入検討・検証をしたが、上記dbtデータマート→SQLデータマートの依存関係をはじめ、 今後もビジネスロジックに沿った改良が発生していく可能性があるため自分達で全て実装するようにした
  8. © ZOZO, Inc. 19 データマート集計Dagの設計 • データマートごとにタスクグループ(データマートタスクグループ)を作成 • 1つのタスクグループ内に update_datamart

    タスクと data_quality_check タスクを定義 ◦ update_datamart: データマート更新(dbtの場合 dbt run --select “${model_name}”) ◦ data_quality_check: Errorデータ品質チェック ▪ ※ SQLデータマートの場合、Dataplexによるデータ品質チェックを行っている(本発表では割愛) • dbt_test_warning: ◦ 全てのdbtデータマートタスクグループ完了後に実行されるWarningデータ品質チェック
  9. © ZOZO, Inc. 20 Errorデータ品質チェックとWarningデータ品質チェック • Errorデータ品質チェック ◦ severity:errorのテストをデータマートごとに単体で実施 ◦

    各データマートの更新タスクの直後に実行 ◦ エラーが発生したら後続のデータマート更新処理は停止 • Warningデータ品質チェック ◦ severity:warnのテストを全てのデータマートに対して実施 ◦ 全てのdbtデータマートタスクグループ完了後に実行 ◦ warningが発生しても後続タスクは続行 dbt test --select "${対象dbtモデル名},config.severity:error" dbt test --select "config.severity:warn"
  10. © ZOZO, Inc. 22 データマート集計Dagの実装 • ポイント: ◦ dbtデータマートタスクグループの定義 ◦

    タスク間の依存関係定義 • manifest.json を解析 ◦ dbtコマンド実行後にtargetディレクトリ配下に作成されるjsonファイル ◦ 各dbtモデルの詳細や依存関係などの情報が記載されている
  11. © ZOZO, Inc. 23 dbtデータマートタスクグループの定義 class DbtModel(): def __init__(**kwargs): ...

    @property def project_id(self): return self._project_id @property def unique_id(self): return self._unique_id @property def dataset(self): return self._dataset @property def table(self): return self._table @property def depends_on_models(self): return self._depends_on_models def table_id(self): return f'{self._project_id}.{self._dataset}.{self._table}' ... • 各dbtモデルの情報を保持する DbtModelクラスを定義 ※ 辞書でもOK
  12. © ZOZO, Inc. 24 dbtデータマートタスクグループの定義 • manifest.jsonをロード • 各モデルの情報を取得 •

    モデルごとにDbtModelクラスを インスタンス化 • DbtModelインスタンスが要素となる dbt_modelsリストを作成 with open('target/manifest') as f: manifest_dict = json.load(f) dbt_models = [] # manifest.jsonに記載の各nodeでループ for node in manifest_dict["nodes"].keys(): if node.split('.')[0] == "model": # model情報を取得 model_conf = manifest_dict["nodes"][node] unique_id = model_conf['unique_id'] project_id = model_conf['database'] dataset = model_conf['schema'] table = model_conf['name'] depends_on_models = model_conf['depends_on']['nodes'] # 取得した情報を元に DbtModelをインスタンス化 dbt_model = DbtModel( project_id, unique_id, dataset, table, depends_on_models, ) # dbt_modelsリストに格納 dbt_models.append(dbt_model)
  13. © ZOZO, Inc. 25 dbtデータマートタスクグループの定義 # タスクを格納する辞書 task_dict = {}

    # dbtモデルごとのループ for dbt_model in dbt_models: model = dbt_model.table # モデルごとにタスクグループ作成 with TaskGroup(group_id=model) as task_dict[model]: # データマート更新タスク定義 dbt_run_command = f'dbt run --select "{model}"' update_datamart = BashOperator( task_id='update_datamart', bash_command=dbt_run_command, ) # データ品質チェックタスク定義 dbt_test_command = f'dbt test --select "{model},config.severity:error"' data_quality_check = BashOperator( task_id='data_quality_check', bash_command=dbt_test_command, ) update_datamart >> data_quality_check • dbt_modelsリスト内の dbtモデルでループ • dbtモデルごとに タスクグループを定義
  14. © ZOZO, Inc. 27 タスク間の依存関係定義: ①dbt → dbt # dbtモデルごとのループ

    for dbt_model in dbt_models: # dbtモデルが依存するnodeのunique_idでループ for depends_on_node_unique_id in dbt_model.depends_on_models: # 依存先nodeがmodelの場合依存関係定義 if depends_on_node_unique_id.split('.')[0] == "model": depends_on_model = depends_on_node_unique_id.split('.')[-1] task_dict[depends_on_model] >> task_dict[dbt_model.table]
  15. © ZOZO, Inc. 28 タスク間の依存関係定義: ②Source → dbt # 待ち処理を行うSourceのリスト

    sources = [] # 依存関係リスト dependencies = [] # dbtモデルごとのループ for dbt_model in dbt_models: # dbtモデルのunique_idを取得 dbt_model_unique_id = dbt_model.unique_id # dbtモデルが依存するモデルの unique_idでループ for depends_on_node_unique_id in dbt_model.depends_on_models: # 依存するモデルがSourceの場合、情報を取得 if depends_on_node_unique_id.split('.')[0] == 'source': source_conf = manifest_dict["sources"][depends_on_node_unique_id] unique_id = source_conf['unique_id'] project_id = source_conf['database'] dataset = source_conf['schema'] table = source_conf['name'] # DbtModelクラス同様、Source用クラスで情報を保持 dbt_source = DbtSource(project_id, unique_id, dataset, table) # DbtSourceインスタンスごとにリストに追加 sources.append(dbt_source) # 依存関係リストに追加 dependencies.append({'before': dbt_source.table, 'after': dbt_model.table})
  16. © ZOZO, Inc. 29 タスク間の依存関係定義: ②Source → dbt # Sourceのリストから待ち処理を行うタスクを生成

    for source in sources: task_dict[source.table] = PythonOperator( task_id=f'wait_{source.table}', python_callable=_wait_source_created, ) # 依存関係リストから依存関係を定義 for dependency in dependencies: before = dependency['before'] after = dependency['after'] task_dict[before] >> task_dict[after]
  17. © ZOZO, Inc. 30 タスク間の依存関係定義: ③dbt → SQL # SQLデータマートごとのループ

    for datamart in sql_datamarts:    """ SQLを解析してFROM,JOINの後にくるテーブル IDを取得し、 取得したテーブルIDをdepends_on_table_idsリストに格納する処理 datamart: DbtModelクラス同様、SQLデータマート用クラスで情報を保持している """ # FROM,JOINの後にくるテーブル IDでループ for depends_on_table_id in depends_on_table_ids: # dbtモデルごとのループ for other_datamart in dbt_models: # FROM,JOINの後にくるテーブル IDとdbtモデルのテーブルIDを比較 # 一致した場合、依存関係を定義 if depends_on_table_id == other_datamart.table_id(): task_dict[other_datamart.table] >> task_dict[datamart.table]
  18. © ZOZO, Inc. 32 Elementaryを使った実行履歴・テスト結果の可視化 • Elementary(edrコマンド)の主なコマンド ◦ edr monitor:

    dbt コマンドで発生したエラー履歴を調査、まだ通知していないものがあれば通知 ◦ edr send-report: 実行履歴やエラー履歴を可視化できるダッシュボードを作成 • edr monitor実行タイミング ◦ dbtデータマートタスクグループ内のタスクがエラー終了した時(on_failure_callback) ◦ dbt_test_warningの直後 • edr send-report実行タイミング ◦ 全てのdbt関連タスクの完了後 edr monitor edr send-report edr monitor エラー発生時
  19. © ZOZO, Inc. 33 PyPIパッケージのメンテナンス • Cloud Composerにはデフォルトで大量のPyPIパッケージが入っており、追加パッケージを入れよ うとすると依存関係が崩れて上手くインストールできないケースがある ◦

    → Elementaryがインストールできなかった • 導入当初、Cloud Composerにデフォルトで入っているdbt-coreのバージョンが古かった • dbtモデル開発用リポジトリで使用されているPyPIパッケージとバージョンを合わせるのが大変 • dbtモデル開発用リポジトリで使われているPython仮想環境をCloud Composer上で再現するよう にした dbt_run_command = ( 'cd ${DBT_PROJECT_DIR} && ' f'pipenv sync && ' f'pipenv run dbt run --select "{dbt_model.table}"' ) update_datamart = BashOperator( task_id='update_datamart', bash_command=dbt_run_command, )
  20. © ZOZO, Inc. 35 まとめ • 体系的なモデリングに則り品質担保されたデータの提供を目的としてdbtが導入された • dbtモデル更新をdbt単体で行うには課題があった ◦

    モデルごとに自動リトライができない ◦ 依存関係による待ち合わせ制御ができない • Cloud Composerで構築しているデータマート集計基盤からdbtを実行するようにして課題解決 • Cloud Composerからdbtを実行するにはmanifest.jsonの解析が有用
  21. © ZOZO, Inc. 36 今後の展望 • Backfill機能導入 ◦ 近々リリース予定 •

    SQLデータマートのデータ品質チェックジョブの利用拡大 ◦ 現在Dataplexを試験的に運用中 • データマート集計基盤で扱えるツールの拡大 ◦ BigQuery ML など
  22. © ZOZO, Inc. 39 その他つまずきポイント: 他モデルと比較するデータ品質チェック relation_ships, equal_rowcountなど他のモデルとの比較をするテストの場合に比較対象のモデルのテストも動いてしまう $ dbt

    test --select "dbt_table1" --vars "{'execution_date': '20240213'}" ~~~~ 03:33:46 5 of 13 PASS accepted_values_col1__False__1__2 ........... [PASS in 1.69s] 03:33:46 13 of 13 START test unique_dbt_table1_col2 ............................... [RUN] ### dbt_table2 の relation_ships テストが動いてしまっている ### 03:33:46 10 of 13 ERROR relationships_dbt_table2_col2__col2__ref_dbt_table2_ [ERROR in 0.47s] 03:33:46 1 of 13 PASS accepted_values_dbt_table1_col3__False__1__2__3__4__5__6__7__8 [PASS in 1.74s] ~~~ models: - name: dbt_table1 columns: - name: col2 tests: - relationships: to: ref('dbt_table2') field: col2 --exclude "*relationships_*ref_${model}_ *dbt_utils_equal_rowcount_*ref_${model}_" を指定