Upgrade to Pro — share decks privately, control downloads, hide ads and more …

SQLクエリ解析によるE2Eデータリネージの実現 / E2E-data-lineage

SQLクエリ解析によるE2Eデータリネージの実現 / E2E-data-lineage

PyCon JP 2022 登壇資料 https://2022.pycon.jp/timetable?id=QRYKXS

Toshifumi Tsutsumi

October 15, 2022
Tweet

More Decks by Toshifumi Tsutsumi

Other Decks in Programming

Transcript

  1. #pyconjp_5 このトークで話すこと 5 [ ] データリネージとはなにか [ ] なぜSQLなのか [

    ] E2Eの範囲はどこまでか [ ] Pythonでどのように実現するのか [ ] どのように運用するのか SQLクエリ 解析による E2E データリネージ の実現
  2. #pyconjp_5 データリネージ(Data lineage)とは 8 データの系譜を明らかにすること lineage: 血統、家柄 データ活用の場面で使われる メタデータ(データを説明するためのデータ)の一種 注:

    このトークにおける用語の定義は下記とします - データ: データリネージの対象 - データイベント: データに対するCRUDの総称 - リネージ情報: データリネージによって得られる情報
  3. #pyconjp_5 データリネージ手法の分類: データイベント 12 データイベントをトリガーにリネージ情報を作成 Open Lineage*1 データリネージ フレームワーク 収集用

    API を設けて、 データイベントを起きたときに リクエストを送信してもらう Marquez*2 Open Lineage に則って 作成したリネージ情報を蓄積して可視化 図は https://openlineage.io/ より引用 *1 https://openlineage.io/ *2 https://github.com/MarquezProject/marquez
  4. #pyconjp_5 データリネージ手法の分類: 複合型 15 Transform*1 専用ツールの1機能としてデータリネージを提供 dbt https://www.getdbt.com/ Dataform https://dataform.co/

    Data observability を軸にしたサービス・ソフトウェア Atlan https://atlan.com/ Monte Carlo Data https://www.montecarlodata.com/ OpenMetadata https://docs.open-metadata.org/ * データ構造を組み替えること。一般に Extract, Transform, Load の工程をまとめて ETLと呼びます
  5. #pyconjp_5 このトークで話すこと 17 [✔] データリネージとはなにか [ ] なぜSQLなのか [ ]

    E2Eの範囲はどこまでか [ ] Pythonでどのように実現するのか [ ] どのように運用するのか SQLクエリ 解析による E2E データリネージ の実現
  6. #pyconjp_5 動き1: RDBMS以外での利用 23 列指向データベースへの問い合わせ言語 BigQuery documentation | Google Cloud

    Amazon Redshift SQL Key-Value Store への問い合わせ言語 PartiQL - a SQL-compatible query language for Amazon DynamoDB GQL Reference | Cloud Datastore Documentation
  7. #pyconjp_5 動き1: RDBMS以外での利用 24 分散処理 Apache Hive Apache Spark SQL

    Trino(Presto) データパイプラインの構築 Quickstart: Create a Dataflow pipeline using SQL | Google Cloud 機械学習モデルの学習と推論 What is BigQuery ML? | Google Cloud Redshift ML - Amazon Web Services
  8. #pyconjp_5 動き2: SQLをテンプレートからコンパイル: dbt 25 https://docs.getdbt.com/guides/getting-started/learning-more/using-jinja#set-variabl es-at-the-top-of-a-model より引用 # SQL

    に Jinja template が埋め込まれている {%- set payment_methods = ["bank_transfer", "credit_card", "gift_card"] -%} select order_id, {%- for payment_method in payment_methods %} sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount {%- if not loop.last %},{% endif -%} {% endfor %} from {{ ref('raw_payments') }} group by 1
  9. #pyconjp_5 動き3: Transform を CREATE TABLE AS SELECT で行う 26

    データウェアハウスのコンピューティング能力向上 and コスト低下により、 Transform をデータウェアハウスで行うことが容易になった CREATE TABLE AS SELECT で Transform を行う場合、 クエリ実行後の抽出結果がそのままテーブル*になる この場合、SELECT 文はテーブルを説明する情報 といえる * ビューやマテリアライズドビューという選択肢もあります
  10. #pyconjp_5 このトークで話すこと 33 [✔] データリネージとはなにか [✔] なぜSQLなのか [✔] E2Eの範囲はどこまでか [

    ] Pythonでどのように実現するのか [ ] どのように運用するのか SQLクエリ 解析による E2E データリネージ の実現
  11. #pyconjp_5 特徴 36 Python 製のデータリネージ OSS MIT License PyPI で公開中。

    $ pip install stairlight SQLをもとにしたリネージ情報の抽出に特化 SELECT 文を含む SQL 文字列をデータリネージの対象にできる JSON 文字列を標準出力へ流す コマンドとしても、ライブラリとしても利用可
  12. #pyconjp_5 なぜ Python を選んだのか 37 データ分析領域でよく利用されている言語 ビジュアライズ系のフレームワーク、ライブラリが充実 リネージ情報の抽出に特化する → グラフ表示は他を頼る

    テンプレート SQL で jinja2*1 が使われているケースが見られる - dbt - Apache Airflow*2 : Python製のワークフローエンジン *1 https://jinja.palletsprojects.com/en/3.1.x/ *2 https://airflow.apache.org/
  13. #pyconjp_5 前提: SELECT クエリの実行結果 → ”テーブル”と表現 38 SELECT 文はテーブルを説明する情報である このトークでは、SELECT

    クエリによる抽出結果* のような 表形式であらわせるデータ全般 を便宜上 “テーブル” と表現します データベースにおける永続テーブル、一時テーブル 出力ファイル インメモリデータなど * Stairlight はクエリを発行しないので、正確には「発行した場合に抽出されるデータ」を指します
  14. #pyconjp_5 : テーブル : 参照 データパイプラインは 上流から下流へデータが流れていく グラフ構造*1を成していると考える Stairlight は、このグラフの

    ノード(テーブル)と エッジ(テーブル間の参照関係)を見つける*2 データのつながりを見つける select … from … select … from … select … from … select … from … select … from … select … from … left join … on … 39 *1 同一テーブルに対する複数箇所での参照や、自身への参照がありうるため、正確には有向多重グラフです。 *2 Stairlight(階段灯)は、テーブル のどこに 参照関係 があるのかを知らせる役割をもちます 風来のシレン5 の”ドコ?カイ弾”です フロア 階段
  15. #pyconjp_5 下記に保存されている、SELECT 文を含む文字列を解析対象にできる データソース サポート対象 40 データソース DataSource Type 補足情報

    ローカルファイル File Pathlib を使用 Amazon S3 S3 Amazon Managed Workflows for Apache Airflow(MWAA) も対応可 Google Cloud Storage GCS Google Cloud Composer も対応可 dbt dbt `dbt compile` した結果を解析 現時点では Google BigQuery のみ対応 Redash Redash 現時点では Saved query のみ対応
  16. #pyconjp_5 Step1: データソースの指定 # stairlight.yaml サンプル Include: # 抽出対象の条件 -

    TemplateSourceType: File # データソース種別 FileSystemPath: "./tests/sql" # 抽出対象とするパス Regex: ".*/*.sql$" # 抽出対象とするファイル(正規表現) DefaultTablePrefix: "PROJECT_A" # テーブル名称のデフォルトPrefix Exclude: # 抽出対象外の条件 - TemplateSourceType: File Regex: "main/exclude.sql$" Settings: MappingPrefix: "mapping" # マッピング設定ファイルの名称 43
  17. #pyconjp_5 Step2: SQL 文とテーブル名称のマッピング 44 $ stairlight map mapping.yaml の雛形をつくる

    mapping.yaml では、SELECT 文とテーブル名称のマッピングを行う データソースから SELECT 文を読み取って、Jinja variables を探す すでに mapping.yaml があれば、未設定のパラメータだけを抽出する この時点ではリネージ情報は生成していない
  18. #pyconjp_5 Jinja templating SQL 文にある Variables に対して、mapping.yaml でパラメータを 複数設定していれば、その数だけ SQL

    文が存在するものとみなす 45 Mapping: - TemplateSourceType: GCS Uri: "gs://stairlight/sql/sample_02.sql" Tables: - TableName: "PROJECT_A.DATASET_B.TABLE_D" Parameters: DESTINATION_TABLE: TABLE_D - TableName: "PROJECT_A.DATASET_B.TABLE_E" Parameters: DESTINATION_TABLE: TABLE_E sample_02.sql (TABLE_D) sample_02.sql (TABLE_E)
  19. #pyconjp_5 Step3: リネージ情報の出力 46 $ stairlight stairlight.yaml と mapping.yaml が揃っている状態で実行すると

    検出したリネージ情報をメモリに蓄積して、JSON 文字列として まとめて標準出力する mapping.yaml にあるパラメータを適用しつつ、SQL文を コンパイルしてから読み込む
  20. #pyconjp_5 リネージ情報のデータ構造 抜粋 47 { "PROJECT_A.DATASET_B.TABLE_C": { # テーブル名称 "PROJECT_A.DATASET_B.TABLE_D":

    { # 上流テーブル名称 "TemplateSourceType": "File", # タイプ "Key": "sql/sample_01.sql", "Uri": "/foo/bar/stairlight/sql/sample_01.sql", # SQLがある場所 "Lines": [{ # SQLでの参照箇所 "LineNumber": 6, "LineString": " PROJECT_A.DATASET_B.TABLE_D" }] } } } TABLE_D TABLE_C
  21. #pyconjp_5 リネージ情報の保存*1 $ stairlight --save results/20221015.json リネージ情報の読み込み*2 $ stairlight --load

    results/20221015.json 読み込みファイルを複数指定すると、リネージ情報をマージする $ stairlight --load a.json --load b.json --save c.json リネージ情報の保存と読み込み 48 *1 ローカルのほか、S3 や GCS へも出力可 *2 load オプションを指定した場合、データソースへのアクセスはしない
  22. #pyconjp_5 依存関係をもつテーブルの検索 50 table_name から見て ひとつ(上|下)に位置するテーブルを検索 $ stairlight (up|down) -t

    table_name table_name が直接的もしくは間接的に依存しているテーブルを 再帰的に検索 $ stairlight up -t table_name -r table_name に対して、直接的もしくは間接的に依存しているテーブルを 再帰的に検索 $ stairlight down -t table_name -r
  23. #pyconjp_5 YAML 設定値は Dataclass に格納 # src/stairlight/source/file/config.py から一部抜粋 # stairlight.yaml

    の Include セクションで TemplateSourceType が File の場合 @dataclass class StairlightConfigIncludeFile(StairlightConfigInclude): TemplateSourceType: str = source_type.FILE.value FileSystemPath: str | None = None Regex: str | None = None DefaultTablePrefix: str | None = None 52
  24. #pyconjp_5 SQL 文からの Jinja variables 検出 53 # src/stairlight/source/template.py から一部抜粋

    # 正規表現で頑張っている @staticmethod def detect_jinja_params(template_str: str) -> list: jinja_expressions = "".join( re.findall("{{[^}]*}}", template_str, re.IGNORECASE) ) return re.findall("[^{} ]+", jinja_expressions, re.IGNORECASE)
  25. #pyconjp_5 未設定パラメータの検出 54 # src/stairlight/map.py から一部抜粋 # set 型の差集合で重複除去 def

    detect_unmapped_params( self, template: Template, table_attributes: MappingConfigMappingTable ) -> list[str]: (中略) unmapped_params: list[str] = list( set(template_params) - set(mapped_params) - set(ignore_params) ) return unmapped_params
  26. #pyconjp_5 リネージ情報の保持と検索 55 リネージ情報は、プログラムの中では dict で保持している Key: 下流のテーブル名称 Value: 上流のテーブル名称と、その検出位置の情報

    テーブル名称によるリネージ情報の検索を速くするため テーブル数は多くても数百〜数千のオーダーであると想定したので データベースを用意しない構成を選択
  27. #pyconjp_5 再帰的な探索におけるテーブル循環検出 56 例えば、テーブルA と テーブルA’をマージして テーブルA としてつくりなおすケースを考える “テーブルAに依存するテーブル”を再帰的に探索すると(-r オプション)

    RecursionError が発生するまで頑張ってしまうので、打ち切りたい こういうことなのだが リネージ情報では循環表現になる TABLE_A (new) TABLE_A' TABLE_A (old) TABLE_A TABLE_A'
  28. #pyconjp_5 フロイドの循環検出法 57 任意の数列に対する循環検出アルゴリズム 再帰的な探索の都度、これまで検出したテーブルを 単方向連結リストにして、循環が発生していないかチェックする リストの先頭から同時に動き出す “fast” と “slow”

    を考える fast: 一度に2つ進む slow: 一度に1つ進む 先を進んでいるはずの fast が slow に追いついたら そのリストは循環しているので、探索処理を打ち切る https://github.com/tosh2230/stairlight/blob/37a8a5da0b04dbf24dcffc26e485cbbff90 c9881/src/stairlight/stairlight.py#L513-L539
  29. #pyconjp_5 このトークで話すこと 58 [✔] データリネージとはなにか [✔] なぜSQLなのか [✔] E2Eの範囲はどこまでか [✔]

    Pythonでどのように実現するのか [ ] どのように運用するのか SQLクエリ 解析による E2E データリネージ の実現
  30. #pyconjp_5 $ stairlight | less $ stairlight | jq '.["PROJECT_d.DATASET_e.TABLE_f"][]

    \ | select(.TemplateSourceType == "File")' Stairlight がローカルにインストールされていれば、 コマンドを実行してリネージ情報を見ることができる 影響調査や設計のときにさっと確認するのに便利 Pipe でつないで別のコマンドへ流すことも可能 Shell でのコマンド実行 60
  31. #pyconjp_5 設定用のファイル、特に mapping.yaml の内容は データソースの更新に追従している状態を保つことが望ましい たとえば、アプリケーションリポジトリ main branch への Pull

    Request マージをトリガーにリネージ情報を更新する 持続可能なドキュメンテーションの難しさという課題の解決を試みる リネージ情報を最新に保つ 62
  32. #pyconjp_5 CI/CD サービスによる継続的なデータリネージ案 63 アプリケーション、データリネージ それぞれの別々に リポジトリがあるとする 下記の処理を CI/CD サービスで行う

    1. stairlight map コマンドで未設定パラメータを検出 2. stairlight --load オプション複数指定によって 既存の mapping.yaml とマージ 3. Pull Request 作成 アプリケーションの Pull Request と データリネージの Pull Request が紐づいて便利なのでは !?
  33. #pyconjp_5 Streamlit 65 Web アプリケーションを Python で手軽に実装できる手段として Streamlit*1 がある UI

    のコントロールやその入力値の取得を直感的に実装できる 次のスライドで、graphviz*2, Stairlight を使って リネージ情報をグラフ表示するサンプルアプリケーションを紹介 *1 https://streamlit.io/2 *2 https://github.com/xflr6/graphviz
  34. #pyconjp_5 リネージ情報との組み合わせ例 68 組み合わせは無限大! 組み合わせるもの 見れるもの (なし) リネージグラフ データカタログ テーブルごとのメタデータ

    (例: データオーナーやテーブル定義) データ参照ログ データ利用状況 事業におけるクリティカルパスはどこか 利用されていない経路はないか データ更新ログ データ更新遅延の有無
  35. #pyconjp_5 このトークで話すこと 69 [✔] データリネージとはなにか [✔] なぜSQLなのか [✔] E2Eの範囲はどこまでか [✔]

    Pythonでどのように実現するのか [✔] どのように運用するのか SQLクエリ 解析による E2E データリネージ の実現 💮
  36. #pyconjp_5 今後の展望 72 - 継続的なリネージ情報の更新について、具体的な実装例の追加 - データソースの拡充 - sqlparse*1 の導入

    SQL をトークンレベルで解析して情報量を増やす - カラムリネージの実現 テーブルリネージ情報からドリルダウンして、列同士の関係性を把握 sqlparse とテーブル定義情報*2 を組み合わせれば実装できるはず... *1 https://github.com/andialbrecht/sqlparse *2 SELECT 文でアスタリスクを使用しているときに必要になりそう
  37. #pyconjp_5 SQLスクリプト上でのテーブルや共通テーブル式の依存関係を可視化する, Dentsu Digital Tech Blog Overview of The Modern

    Data Stack / モダンデータスタック概論, Satoshi Hirose, Speaker Deck フロイドの循環検出法, Wikipedia Python最新バージョン対応!より良い型ヒントの書き方, 寺田学, Gihyo.jp 参考資料 74
  38. #pyconjp_5 下記のパッケージを使って開発 CI に組み込んでいるため、Push するたびに起動 flake8*1 : Linter black*2 :

    Formatter isort*3 : Import の順番をソート Linter, Formatter 77 *1 https://github.com/pycqa/flake8 *2 https://github.com/psf/black *3 https://github.com/PyCQA/isort
  39. #pyconjp_5 Type hints 78 型ヒント*1 可読性の向上 IDEでのコード自動補完 mypy*2 による静的型チェック ユニットテストではカバーしきれなかった型の整合性を

    チェックできるようになった これも CI に組み込んでいる *1 https://docs.python.org/3/library/typing.html *2 https://github.com/python/mypy
  40. #pyconjp_5 Extras でインストール対象を指定 Stairlight をインストールする時に Extras を指定しない場合 ローカルからの読み込みだけができる $ pip

    install stairlight 下記のようにインストールすると ローカル, S3, GCS からの読み込みができる $ pip install “stairlight[s3, gcs]” 81
  41. #pyconjp_5 Extras でインストール対象を指定 82 # pyproject.toml から抜粋 # データソースごとに Extra

    を設定 [tool.poetry.extras] gcs = ["google-cloud-storage", "protobuf"] redash = ["psycopg2", "SQLAlchemy"] dbt-bigquery = [ "dbt-core", "dbt-bigquery", "google-cloud-bigquery", "protobuf", "networkx" ] s3 = ["boto3", "boto3-stubs"]