Upgrade to Pro — share decks privately, control downloads, hide ads and more …

dbtのドメイン分割による データ基盤の改善とDigdagとの連携

dbtのドメイン分割による データ基盤の改善とDigdagとの連携

Treasure Data Tech Talk 2024(https://techplay.jp/event/938776)での発表資料

<前編>
dbtのドメイン分割によるデータ基盤の改善
<後編>
既存Digdagプロジェクトのリファクタリングとdbt導入

Satoshi Akama

April 24, 2024
Tweet

Other Decks in Programming

Transcript

  1. © 2024 Treasure Data, Inc. Confidential 2 About myself Satoshi

    Akama Embulkを使ったバルクロード基盤の開発 ↓ Data teamで社内のデータ基盤の開発・運用 ミッション • 社内のデータ基盤の開発・運用 裏ミッション? • TDのプラットフォーム自体やTD発のOSS(Digdag、Embulk等)のドッグフーディング • 社内や社外コミュニティにフィードバック・知見を広める Data team
  2. © 2024 Treasure Data, Inc. Confidential 3 Agenda <前編> •

    dbtのドメイン分割によるデータ基盤の改善 <後編> • 既存Digdagプロジェクトのリファクタリングとdbt導入
  3. © 2024 Treasure Data, Inc. Confidential 元々のデータ解析基盤の構造 5 Extract/Load Transform

    Export GDPR等の制約によりPIIを含むデータ処理はUS/EU/Tokyo/Korea毎リージョン内で行う …etc
  4. © 2024 Treasure Data, Inc. Confidential 解決すべきだった課題 6 • Transformのプロセスが複雑化

    • ワークフロー間の依存関係が分からない • スキーマ定義が謎 • 人間には理解しづらいSQLがある ◦ SQL complexity Score高すぎるSQLファイルが沢山 https://learn.microsoft.com/en-us/visualstudio/code-quality/code-metrics-maintainability-index-range-and-meaning?view=vs-2022 Complexity Score Complexity Level 1 - 10 • Simple procedure, little risk 11 - 40 • More complex, moderate risk 41 - 100 • Complex, high risk 101 - 250 • Untestable code, very high risk > 250 • Unreadable code, extremely high risk ↓1920行あってComplexity Score=855.8610なSQLファイル
  5. © 2024 Treasure Data, Inc. Confidential dbtの導入 7 • ELT(Extract/Load/Transform)においてTransformのプロセスを担当

    • SQLファイルを使ってデータの流れを定義 ◦ TDでは事例がないが Pythonでモデルを書くことも可能らしい。 SQLでやるよりPandasで処理した 方がいいケースなど • macroやJinjaテンプレートによりモデルの簡略化 • モデル間の依存関係を管理できる仕組みがある • Transformのプロセスが複雑化 →dbtでモデル化して簡略化 • ワークフロー間の依存関係が分からない →lineage graphで依存関係の可視化 • スキーマ定義が謎 →dbtのモデルで管理 • 人間には理解しづらい SQLがある →dbtのJinjaテンプレート/macro機能で 簡略化、複数モデルに分割
  6. © 2024 Treasure Data, Inc. Confidential To be 8 Extract/Load

    Transform Export US/EU/Tokyo/Korea …etc
  7. © 2024 Treasure Data, Inc. Confidential How to run dbt

    with TD/Digdag? • dbt Trinoアダプターを使う ◦ https://github.com/starburstdata/dbt-trino 9 td: target: default_target outputs: default_target: type: trino method: none user: "{{ env_var('TD_API_KEY') }}" database: td-presto # e.g. api-presto.treasuredata.com host: "{{ env_var('TD_PRESTO_ENDPOINT') }}" port: 443 schema: "{{ var('target_schema') }}" threads: 4 http_scheme: https prepared_statements_enabled: false dbt_profiles.yml # dbt_profiles.ymlで定義したprofile名 profile: 'td' # これがないとSQLのシンタックスエラーが起きる quoting: database: true schema: false identifier: false dbt_project.yml
  8. © 2024 Treasure Data, Inc. Confidential Treasure Dataではdbt Trino v1.5以降が動かない

    • v1.5以降にviewを使用する記述がmacroに入っていてそこでエラー • dbtのmacroは同名のmacroを定義してoverrideできる • 3つのmacro内からviewを参照していたりCREATE VIEWしている記述等を削除 10 {% macro trino__list_relations_without_caching(relation) %} # left join system.metadata.materialized_views mv を削除 {% endmacro %} {% macro trino__create_view_as(relation, sql) -%} # create or replace view を削除 # create table if not exists を追加 {% endmacro %} macros/override_dbt_global_adapter.sql - ファイル名はなんでもOK {% macro trino__get_catalog(information_schema, schemas) -%} # metadataを取得する一部のAS句を削除 {% endmacro %} macros/override_dbt_global_catalog.sql - ファイル名はなんでもOK
  9. © 2024 Treasure Data, Inc. Confidential TDではdbt Trino v1.5以降が動かない -

    search_order 11 dispatch: - macro_namespace: trino search_order: [ 'dbt_project', # 自身のmacros/ディレクトリに新しいmacroファイルを入れた場合 'dbt_td_utils', # TDオリジナルdbt packageのmacros/ディレクトリに新しいmacroファイルを入れた場合 'dbt_trino', # dbt trinoのオリジナルmacro定義がある場所 'dbt' ] dbt_project.ymlで”trino”のmacro_namespaceに対してsearch_orderで読み込みの優先度を付ける dbt_project.yml
  10. © 2024 Treasure Data, Inc. Confidential How to call dbt

    from Digdag? 12 コンテナ立ち上げ Pythonスクリプトを実行 dbtを実行 TD platform上とローカル両方で動作させるため &後述するテストを流すためにこういう実装になっている dbt Cloudとか使える状況であればここまでしなくていいかも? +run_dbt: py>: util.dbt_runner _export: cmd: - deps - seed - run _env: … GITHUB_APP_SECRET_KEY: ${secret:td.github_app_secret_key} extra_selector: ${'tag:account_' + ACCOUNT} docker: image: "digdag/digdag-python:3.9" build: - pip install urllib3==1.24 pytz==2019.3 dbt-trino==1.7.1 click==8.1.0 jwt==1.3.1 run_dbt.dig
  11. © 2024 Treasure Data, Inc. Confidential ビジネスドメイン毎にdbt package分割 13 それぞれの分野のエキスパート(both

    エンジニア/エンジニア以外)が見る範囲を最小化 共同での作業がやりやすくなる
  12. © 2024 Treasure Data, Inc. Confidential ビジネスドメイン毎にdbt package分割 - GitHubの別レポで管理

    14 物理的にGitHubの別レポジトリで管理することでコラボレーション相手が見るべき範囲を最小化 packages: - package: dbt-labs/dbt_utils version: 1.1.1 # dbt Package hubで配布されているのでversionを使う - git: "https://github.com/treasure-data/dbt_td_utils.git" # 社内Utilityライブラリ revision: main # 社内パッケージなのでtag or branch name - git: "https://github.com/treasure-data/dbt_domain_example1.git" revision: main - git: "https://github.com/treasure-data/dbt_domain_example2.git" revision: main - git: "https://github.com/treasure-data/dbt_domain_example3.git" revision: main 親のdbt projectのdbt_packages.yml ※URLは説明のため簡略化。実際は GitHub access tokenを使用するために以下のようになる https://x-access-token:{{env_var('TD_GITHUB_ACCESS_TOKEN')}}@github.com/treasure-data/dbt_domain_example1.git
  13. © 2024 Treasure Data, Inc. Confidential 各ドメインのdbt modelの構造は公式ルールをゆるく採用 15 •

    最初は公式ルールを採用するか迷ったが、”オレオレ”ルールを作って将来技術的負債化するより良いという判断 • 新しく関わる人に「dbtのサイトに載っているルールでやってます」と言えばOK→教育コスト低い • https://docs.getdbt.com/best-practices/how-we-structure/1-guide-overview models/ staging / ソースから直接データを取得、単純な一次集計  intermediate/  中間/二次集計。単純なドメインには存在しないことも marts/   BIや外部のworkflowからの直接依存がある • 各モデルは*__models.ymlでスキーマを管理 • 配下のディレクトリ名やSQLファイル名は各開発者に任せ ている 開発中に依存関係が分からなくなって 循環参照作ってしまうことがなくなった
  14. © 2024 Treasure Data, Inc. Confidential dbtのJinjaテンプレート/macroを利用してSQLを簡略化 16 {% for

    metrics in [ 'metric_a', 'metric_b', 'metric_c', 'metric_d', ] %} SELECT time, '{{metrics}}' AS metric, ... FROM ... {% if not loop.last %} UNION ALL {% endif %} {% endfor %} Jinjaテンプレートを使った書き換え例 {{ auto_config(refresh_time_range=in_prev_nday(n=var('backfill_d ays', var('default_backfill_days')))) }} SELECT … FROM ... WHERE {{ in_prev_nday( n=var( 'backfill_days', var('default_backfill_days' ) )) }} # TD_TIME_RANGE('time', ‘...’, ‘...’) へ置き換えられる 自前macroを使った書き換え例
  15. © 2024 Treasure Data, Inc. Confidential ref(), source()によるモデル間依存関係の定義 17 SELECT

    ... FROM # 自パッケージ内のモデルの参照 {{ ref('service_tagged_infra_cost') }} # LEFT JOIN # 自分以外のパッケージ(他のドメイン@GiHub別レポジトリ)内のモデルの参照 {{ ref('dbt_domain_external1', 'engineering_team_ownership') }} ON service = user_service ref() - 他のモデルへの依存を定義 source() - TDテーブルの直接参照、stagingディレクトリ以下のモデルによくある SELECT ... FROM # TDのcost.resource_custom_scriptionテーブルを直接参照 {{ source('cost', 'resource_custom_scripting') }}
  16. © 2024 Treasure Data, Inc. Confidential exposures定義による外部からの依存の定義 18 exposures: -

    name: engineering_metric_gsheet_report label: Engineering Metric Weekly Report type: application maturity: medium url: https://docs.google.com/spreadsheets/d/abcdefg description: '{{ doc("engineering_metric_gsheet_report") }}' depends_on: # 依存しているdbtモデル - ref('operational_metrics_aggregation') - ref('operational_metrics_aggregation_weekly_time_series') - ref('deployment_metric') owner: name: Data Team BIツール、GoogleSheets、他の外部アプリケーション等からの被依存関係を定義できる dbt_domain_example_1/models/marts/path/to/_operational_metrics__exposures.yml
  17. © 2024 Treasure Data, Inc. Confidential スキーマ管理と依存/被依存関係管理 20 モデルのスキーマ定義 依存/被依存関係

    他のモデルから参照されている 他のモデルへの依存 モデルのスキーマ
  18. © 2024 Treasure Data, Inc. Confidential データのクオリティの可視化 - 実装 21

    SLO(Service Level Objective)を定義し、ビジネスドメイン/モデル毎に3メトリクスの達成率をDataDog上で可視化 • Availability - 任意の期間の想定される全タイムスタンプのデータがあるか • Freshness - 直近のデータがあるかどうか • Uniqueness - 任意の期間のデータのユニーク性が確保できているか 最近派手にmodelを変えていたので数値が落ちてます :)
  19. © 2024 Treasure Data, Inc. Confidential データのクオリティの可視化 - 実装 22

    dbtにはsingular testとgeneric testの2つのテストがあり、後者のgeneric testの機能を使って実装 {% test freshness_test(model, time_field='time', latency='49h') %} ... SELECT * FROM ( SELECT COUNT({{ time_field }}) != 0 AS result FROM {{ model }} WHERE TD_TIME_RANGE({{ time_field }},...) ) WHERE result = false dbt_td_utilsパッケージに実装したFreshness test version: 2 models: - name: cost_allocation description: dummy description tests: - dbt_td_utils.freshness_test: latency: 52h 各*__models.ymlからの呼び出し
  20. © 2024 Treasure Data, Inc. Confidential 課題&Tips 23 ドメイン分割によりCIが複雑化する •

    各ドメインモデルのバリデーションするにも親パッケージの dbt_project.yml、 dbt_profiles.ymlが必要 • 他のドメインへの依存がある場合もある • 親のCIをkickするのがいいと思うが、未着手 https://docs.github.com/ja/apps/creating-github-apps/authenticating-with-a-github-app/authenticating-as-a-github-app GitHubのaccess tokenが必要 • Personal access tokenだと管理を作成した本人しかできない • GitHub Appsを作成するのが現実解 ◦ 最終設定にGitHub OrgのAdmin権限必要なので、 ◦ 個人アカウントで作った後Orgに移管するという面倒さがある
  21. © 2024 Treasure Data, Inc. Confidential 状況と解決すべきだった課題 25 • ルートにworkflow定義が108個

    ◦ 最上位のworkflowがコケた時のリトライのコストが高い • ローカルでのテストが辛い • 過去の日付でのworkflowの実行ができない • アップストリームのデータパイプラインの影響でコケる 依存している
  22. © 2024 Treasure Data, Inc. Confidential リファクタリング&dbt導入の手順 26 Digdagプロジェクトとしてのリファクタリング Step

    1 dbtのモデルで生成されたDBに参照先を変更 Digdagとdbtの連携 多すぎるワークフローの集約 dbtのgeneric testを使ったSource testingを追加 Step 2 Step 3 Step 4 Step 5
  23. © 2024 Treasure Data, Inc. Confidential Digdagプロジェクトとしてのリファクタリング 27 Step 1

    DEBUG: True SITE: aws STAGE: development TD_API_ENDPOINT: api.treasuredata.com EMBULK_COMMAND: preview -G DB_SUFFIX: "" params/env.dig ローカルでテストできないと辛いので、production/developmenbt判定を入れてローカルでの破壊的な操作を防ぐ DEBUG: False SITE: aws STAGE: production TD_API_ENDPOINT: api.treasuredata.com EMBULK_COMMAND: run DB_SUFFIX: "" params/env_production.dig CIでパッケージをproductionにデプロイする際に”mv env_production.dig env.dig”してやる
  24. © 2024 Treasure Data, Inc. Confidential Digdagプロジェクトとしてのリファクタリング 28 Step 1

    example.dig prod/devによってINSERT/UPDATE/CREATE TABLEしているDB名を変更 - SELECT先は変えない schedule: daily>: 01:30:00 _export: !include : params/env.dig td: database: target_db # SQL内で${td.database}で参照可能 endpoint: ${TD_API_ENDPOINT} _secrets: td: apikey: td.apikey_1 +setup: +rewrite_td_database_if_local: _env: PYTHONPATH: ./scripts py>: util.rewrite_td_database_if_local # params/env.digの値に応じてtd.databaseの値を書き換えるPythonスクリプト
  25. © 2024 Treasure Data, Inc. Confidential Digdagプロジェクトとしてのリファクタリング 29 Step 1

    example.dig ${DIGDAG_ENV}でprod/devで処理を分岐 _export: !include : 'lib/env.dig' … +step1: +sub_task: if>: ${DIGDAG_ENV == 'production'} _do: # productionでしか実行したくない処理とか # 実行時間の長い依存ワークフローの実行=requireとか require>: accounts # BIツールへのexportとか # Slackへの通知とか _else_do: # production/devで分岐したい何かの処理
  26. © 2024 Treasure Data, Inc. Confidential Digdagプロジェクトとしてのリファクタリング 30 Step 1

    example.sql ワークフローの過去のタイムスタンプでの実行が可能なようにsession_date、sesion_unixtimeを使う SELECT # Bad NOW() AS current_timestamp # Good - 上と結果のフォーマットは違うが... '${session_unixtime}' AS unixtime WHERE # Bad TD_INTERVAL(time, '-1d') # Good TD_TIME_RANGE(time, TD_TIME_ADD('${session_date}', '-1d'), '${session_date}')
  27. © 2024 Treasure Data, Inc. Confidential 多すぎるワークフローの集約 31 Step 2

    パターン1 - タスク・サブタスクを追加 ビジネスドメイン毎、機能毎に集約 _export: !include : 'lib/env.dig' … +original_task: +sub_task_add_1: +task_add_1: +task_add_2 パターン2 - for_eachでうまく回す +task: _parallel: true for_each>: tables_configs: - name: table1 - name: table2 - name: table3 - name: table4 - name: table5 _do: +insert_into: td>: path/to/queries/${tables_configs.name}.sql insert_into: ${tables_configs.name} ワークフロー定義の行数が減ることが多い。 実行失敗したループ内の一部だけ再実行はできないので、実行時間が 長いものをサブタスクに入れるとリトライコストが高い 王道、手堅い
  28. © 2024 Treasure Data, Inc. Confidential 多すぎるワークフローの集約 32 Step 2

    パターン3 - requireする ビジネスドメイン毎、機能毎に集約 _export: !include : 'lib/env.dig' … +task require>: another_workflow1 require>: another_workflow2 手っ取り早い。多数のワークフローがルートディレクトリにゴロゴロ転がっているよりは、リトライコストが低い
  29. © 2024 Treasure Data, Inc. Confidential Digdagとdbtの連携 - To be

    33 Step 3 ワークフロー実行前にアップストリームのデータステータスをチェック (以前からある、dbtのmodelで生成されていないDB含む) 最終的にはdbtのmodelで生成されたDB/tableに参照先を変える
  30. © 2024 Treasure Data, Inc. Confidential Digdagからdbtを実行できるようにする 34 Step 3

    コンテナを立ち上げてsubmodule内のdbtを実行するPythonスクリプトを実行、Pythonスクリプトからdbtを実行 前編で出てきたdbt projectをGit submoduleとして追加。 +run_dbt: py>: dbt.dbt_parent_workflow.util.dbt_runner _export: cmd: - deps - test _env: PYTHONDONTWRITEBYTECODE: 1 TD_API_KEY: ${secret:td.apikey} TD_PRESTO_ENDPOINT: ${TD_PRESTO_ENDPOINT} GITHUB_APP_SECRET_KEY: ${secret:td.github_app_secret_key} DBT_PROJECT_DIR: ./dbt/dbt_parent_workflow/dbt_project DBT_PROFILES_DIR: ./dbt/dbt_parent_workflow/dbt_profile extra_selector: tag:something,package:${dbt_package},+exposure:${dbt_exposures} git submodule add https://github.com/treasure-data/dbt_parent_workflow.git dbt
  31. © 2024 Treasure Data, Inc. Confidential dbtのgeneric testを使ったSource testingを追加 35

    Step 4 dbt_domain_example1.git/ models/marts/external_workflow1/exposures.yml exposures: - name: external_workflow1 Label: something type: application maturity: medium url: https://github.com/treasure-data/external_workflow1/example.dig description: something depends_on: - source('production', 'example_table') # TD内のDB/source - source('production_tokyo_ec2', 'example_table') version: 2 vars: - &default range: 1d - name: production schema: production tables: - name: example_table tests: # 実行時点-1dのデータが存在すること - dbt_td_utils.data_arrival_test: <<: *default models/staging/example/_example__sources.yml ※過渡期なので dbtのモデル化が済んでいないデータソースを source()で参照してい るが、最終的には ref()でdbtのモデルを参照するのが望ましい “While possible, it is highly unlikely you will ever need an exposure to depend on a source directly” https://docs.getdbt.com/docs/build/exposures
  32. © 2024 Treasure Data, Inc. Confidential Digdagのワークフローからsource testingを呼び出し 36 Step

    4 execute_dbt_test.dig - submodule内のdbtを実行するPythonスクリプトをkickするワークフローを一つ用意する (このDigdagプロジェクト内で共有される) このDigdagプロジェクト内で、前編で出てきたdbtプロジェクトのdeps/testコマンドが動くようになる +run_dbt: py>: dbt.dbt_parent_workflow.util.dbt_runner _export: cmd: - deps - test _env: PYTHONDONTWRITEBYTECODE: 1 TD_API_KEY: ${secret:td.apikey} TD_PRESTO_ENDPOINT: ${TD_PRESTO_ENDPOINT} GITHUB_APP_SECRET_KEY: ${secret:td.github_app_secret_key} TD_PROD_DB_ENV: production_${SITE.replace('-','_')} DBT_PROJECT_DIR: ./dbt/dbt_parent_workflow/dbt_project DBT_PROFILES_DIR: ./dbt/dbt_parent_workflow/dbt_profile extra_selector: tag:account_${ACCOUNT},package:${dbt_package},+exposure:${dbt_exposures}
  33. © 2024 Treasure Data, Inc. Confidential dbtのsource testingを個別のワークフローからcall 37 Step

    4 +run_dbt_test: +sub_task_exmaple: +wait_dependencies: _retry: limit: 50 interval: 300 +run_dbt_test: _export: # 参照するexposuresの定義名 dbt_package: dbt_domain_example1 dbt_exposures: external_workflow1 call>: dbt_test rerun_on: failed +second_task: … example_workflow.dig - リファクタリング対象 実際のワークフローの動き 依存先ドメインモデルのexposures定義内のGeneric testの結果がTrueにな るまで+run_dbt_testタスクが繰り返される ↓ 依存先データが全てreadyとなるまで後続のワークフロータスクは実行されな い。左の例で言うと+second_task以降は実行されない td_wait>: オペレータでも同じようなことはできるが依存先をどこかで一元管理 したい。逆にtd_wait>: オペレータはコンテナの立ち上げは必要ないので軽量
  34. © 2024 Treasure Data, Inc. Confidential dbtのモデルで生成されたDBに参照先を変更 38 Step 5

    あとは単純にSQL内の参照先を変えて終わり 結果として • dbtのモデルを参照することで実装がスリムになった、今後は加速するはず • ワークフロー実行前にdbtモデルで生成された以外のものも含むデータのテストを実装
  35. The power to use every bit of privacy-compliant data to

    serve with relevance. © 2024 Treasure Data, Inc. Confidential 40