Upgrade to Pro — share decks privately, control downloads, hide ads and more …

TrinoとIcebergで ログ基盤の構築 / 2023-10-05 Trino Prest...

TrinoとIcebergで ログ基盤の構築 / 2023-10-05 Trino Presto Meetup

kamijin_fanta

October 05, 2023
Tweet

More Decks by kamijin_fanta

Other Decks in Programming

Transcript

  1. 自己紹介 上條 忠久 Kamijo Tadahisa • さくらインターネット株式会社 クラウド事業本部 SRE室 •

    ITエンジニアとしてサービス開発・運用 ◦ Go, TypeScript, Python, Kotlin ◦ Linux, Nomad, Ansible, Prometheus, React
  2. さくらインターネット SRE室 • 2022年7月に発足・現在メンバーは6人 ◦ 一緒に働く方、絶賛募集中 • 全社でSREの取り組みがより評価されることを目的 • 主な取り組み

    ◦ さくらのクラウド 開発・運用 ◦ アジャイル文化の醸成 / Enabling SRE ◦ Kubernetes基盤の開発 ◦ 監視基盤の開発 • 現在社内各チームで監視基盤が個別に構築運用されている ◦ Prometheus, Elastic Stack, Loki, etc… ◦ 社内向けログ基盤を提供することで、運用レベルの底上げ・運用コスト削減を目的 ◦ OSS運用なども行ったが、マルチテナント提供・ライセンス体系の問題 ◦ Trino+Icebergの構成で開発を開始
  3. Apache Iceberg overview • ストレージフォーマット ◦ ビッグデータ・データレイク構築 ◦ 仕様とデータを読み書きする低レイヤーライブラリ (Java,Python)の提供

    ◦ クエリエンジンTrino, Presto, Spark, Hive, Flink, Impala, 他から同じデータを参照 • Netflix→Apache Software Foundation • 機能 ◦ 高い信頼性: Seriallizable isolation, Snapshot, Atomic mutation ◦ 費用対効果の高いストレージ : Object Storage, Parquet ◦ パフォーマンス最適化 : Partitioning, Clustering, CoW/MoR切り替え ◦ スキーマ変更: Schema Evolution, Table, Partitioning
  4. Before Iceberg • S3, HDFS→Hive Metastore→Trino,Presto,Athena,Spark • 課題感 ◦ 同時読み書き

    ◦ スキーマ変更 ◦ レコード単位の読み書き編集 ◦ 過去状態の復元 ◦ Metastoreのパフォーマンス問題 • Open Table Format ◦ 従来の課題を解決するテーブルフォーマット ◦ Apache Iceberg, Apache Hudi
  5. Iceberg in Trino • Icebergの一通りの機能をSQLで操作 • 他のデータソースとの JOINが可能 ◦ Hive,

    MySQL, Google BigQuery, etc… CREATE TABLE t1 (a int, b int); INSERT INTO t1 VALUES (1, 2); SELECT * FROM t1; UPDATE t1 SET b = 3 WHERE a = 1; DELETE FROM t1 WHERE a = 1;
  6. partitioning, sorted_by CREATE TABLE example ( c1 INTEGER, c2 DATE,

    c3 DOUBLE ) WITH ( partitioning = ARRAY[ 'c1', 'month(c2)' ], sorted_by = ARRAY[ 'c3' ] ); INSERT INTO example VALUES (1, DATE '2023-01-01', 1.0), (1, DATE '2023-01-02', 2.0), (1, DATE '2023-02-01', 3.0), (1, DATE '2023-02-02', 4.0), (2, DATE '2023-01-01', 5.0), (2, DATE '2023-01-02', 6.0), (2, DATE '2023-02-01', 7.0), (2, DATE '2023-02-02', 8.0); SELECT * FROM example WHERE c1 = 1 AND c2 = '2023-01-01'; c1=1, 1月 c1=1, 2月 c1=2, 1月 c1=2, 2月 c1=1, 1月をク エリ対象
  7. Iceberg Internal • Icebergの仕組みをざっくりと解説 • 目的 ◦ パフォーマンスの特性を理解する ◦ Java

    Clientを直接利用することが出来る ◦ テーブル構造のデバッグを行うことが出来る
  8. Internal - Layout • データの階層構造 ◦ catalog ◦ metadata file

    ◦ manifest list ◦ manifest file ◦ data files • CatalogはDBに保存 • その他はS3等に保存 ◦ AWS S3, Google GCS, Hadoop ◦ メタデータ・実データを低コスト・スケーラビリ ティが高いストレージに保存可能 $ tree /mnt/s3/default/t1 . |-- data | |-- 20230928_101505_00007_k5eys-54e49815-ddcf-48b7-9596-ba766979a688.parquet | |-- 20231002_083351_00006_jpegd-d2a1e1ac-1bef-4fab-981c-98c86462c630.parquet | `-- 20231002_083432_00010_jpegd-7c0bd2af-85a5-44a4-a179-82c9a1965139.parquet `-- metadata |-- 00000-1afbaa09-8ed5-4868-9952-02fdd22ccbd6 .metadata.json |-- 00001-5e8fc4c9-962c-4958-a134-ab1a13bfea99 .metadata.json |-- 00002-704c029e-e5c8-46ca-9ea1-7187930be895 .metadata.json |-- 733ba142-1754-4330-ad61-59f58bfd1d71 -m0.avro |-- 733ba142-1754-4330-ad61-59f58bfd1d71 -m1.avro |-- 7f9e5444-9486-4172-aade-288819b03d49 -m0.avro |-- snap-1463702341869305305-1-e07c019a-2f9b-4eab-94b1-38f083598652.avro |-- snap-4566183149179530475-1-b3e2bfc0-0b45-477f-9d38-e2c445eda04c.avro `-- snap-7319754993849090227-1-7f9e5444-9486-4172-aade-288819b03d49.avro
  9. Internal - catalog • 主な役割 ◦ メタデータの格納位置の保持 ◦ トランザクションの提供 •

    バックエンド選択肢 ◦ MySQL, PostgreSQL, SQLite (jdbc) ◦ Hive Metastore, Glue, Snowflake ◦ DynamoDB ◦ REST API mysql> SHOW CREATE TABLE iceberg_tables; CREATE TABLE iceberg_tables ( catalog_name varchar NOT NULL, -- example: rest_backend table_namespace varchar NOT NULL, -- example: default table_name varchar NOT NULL, -- example: t1 metadata_location varchar DEFAULT NULL, -- example: s3://bucket/.../00001-.....metadata.json previous_metadata_location varchar DEFAULT NULL, -- example: s3://bucket/.../00000-.....metadata.json PRIMARY KEY ( catalog_name,table_namespace,table_name ) );
  10. Internal - metadata file • テーブル構造の保持 ◦ スキーマ・パーティション・ソート ◦ 途中で変更された際は複数定義される

    • スナップショットの保持 ◦ 利用可能なスナップショット一覧 ◦ manifest listのパスの記録 $ jq '.' \ /mnt/s3/default/t1/metadata/00001-.....metadata.json { "schemas": [...], "partition-specs" : [...], "sort-orders" : [...], "snapshots": [ { "snapshot-id" : 1463702341869305300 , "manifest-list" : "s3://bucket/.../metadata/snap-1463...8652.avro" , ... }, { "snapshot-id" : 8429213630642387000 , "parent-snapshot-id" : 1463702341869305300 , "manifest-list": "s3://bucket/.../metadata/snap-8429...ef78.avro", ... } ], ... }
  11. Internal - manifest list • 含まれるパーティションの値・範囲 • manifestへの参照 $ avrocat

    \ /mnt/s3/default/t1/metadata/snap-8429...ef78.avro \ | jq '.' { "manifest_path": "s3://bucket/.../metadata/a2c1...-m0.avro", "manifest_length" : 6683, "partition_spec_id" : 0, "content": 0, "sequence_number" : 2, "min_sequence_number" : 2, "added_snapshot_id" : 8429213630642387000 , "added_data_files_count" : 1, "existing_data_files_count" : 0, "deleted_data_files_count" : 0, "added_rows_count" : 1, "existing_rows_count" : 0, "deleted_rows_count" : 0, "partitions": { "array": [] } } { "manifest_path" : ...
  12. Internal - manifest file • 実データファイルへの参照 • 統計情報 ◦ 値の上限・下限・値の数・Null数・NaN数

    • memo ◦ status: 1=ADDED, 2=DELETED $ avrocat \ /mnt/s3/default/t1/metadata/a2c1...-m0.avro \ | jq '.' { "status": 1, "snapshot_id" : { "long": 8429213630642387000 }, "data_file" : { "file_path": "s3://bucket/default/t1/data/2023...a688.parquet", "file_format" : "PARQUET", "lower_bounds" : { "array": [ { "key": 1, "value": "\u0001" }, { "key": 2, "value": "\u0002" } ] }, "upper_bounds" : { "array": [ { "key": 1, "value": "\u0001" }, { "key": 2, "value": "\u0002" } ] }, "value_counts" : { ... }, ... } } { "status": ...
  13. Internal - data files • 実際に書き込まれた値 • フォーマットはユーザが指定可能 ◦ Parquet,

    ORC, Avro $ pqrs cat \ /mnt/s3/default/t1/data/2023...a688.parquet {a: 1, b: 2}
  14. Internal - file layout Partition無し Partition有り $ tree example/ example/

    |-- data | |-- c1= 1 | | |-- c2_month= 2023-01 | | | `-- 20231002_100950_00021_jpegd-346ffc2d-5e11-473e-bf51-30d0898bfdc8.parquet | | `-- c2_month= 2023-02 | | `-- 20231002_100950_00021_jpegd-4029a34f-c9b1-4512-9a2d-e30bc544a163.parquet | `-- c1= 2 | |-- c2_month= 2023-01 | | `-- 20231002_100950_00021_jpegd-02a4022b-c5c3-4367-85ae-fa22904bcdf2.parquet | `-- c2_month= 2023-02 | `-- 20231002_100950_00021_jpegd-391d3d88-af2e-4e63-ba48-f6be548d8243.parquet `-- metadata |-- 00000-da3c553c-0ddc-4b15-85db-0ba0faf757ab .metadata.json |-- 00001-31e284a9-429c-4fa0-a1b2-87499b41b0d4 .metadata.json |-- 00002-c0c18a49-da83-4997-bd78-886db53c53c8 .metadata.json |-- e4737012-26a8-452d-a9d8-17733150fd4c -m0.avro |-- snap-2797793416297343028-1-e4737012-26a8-452d-a9d8-17733150fd4c.avro `-- snap-7900299843327693345-1-4ddcd5de-0345-4fe3-946c-a2607c15bdb2.avro $ tree t1/ t1/ |-- data | |-- 20230928_101505_00007_k5eys-54e49815-ddcf-48b7-9596-ba766979a688.parquet | |-- 20231002_083351_00006_jpegd-d2a1e1ac-1bef-4fab-981c-98c86462c630.parquet | `-- 20231002_083432_00010_jpegd-7c0bd2af-85a5-44a4-a179-82c9a1965139.parquet `-- metadata |-- 00000-1afbaa09-8ed5-4868-9952-02fdd22ccbd6 .metadata.json |-- 00001-5e8fc4c9-962c-4958-a134-ab1a13bfea99 .metadata.json |-- 00002-704c029e-e5c8-46ca-9ea1-7187930be895 .metadata.json |-- 733ba142-1754-4330-ad61-59f58bfd1d71 -m0.avro |-- snap-1463702341869305305-1-e07c019a-2f9b-4eab-94b1-38f083598652.avro `-- snap-4566183149179530475-1-b3e2bfc0-0b45-477f-9d38-e2c445eda04c.avro Partitionを含む場合は、パーティション毎の ディレクトリ(prefix)が作成される
  15. Performance • 概要 ◦ 実行計画は分散実行可能 ◦ ファイルはImmutableでキャッシュも可能 • 実行計画 ◦

    manifest list/fileに値のヒントが含まれる ▪ パーティション値の範囲 ▪ 値の数・下限・上限・ Null数 ◦ メタ情報だけでフィルタできる場合も多く Scan量を削減可能
  16. Maintenance • データファイルのマージ ◦ より大きい少ないファイルにマージ ◦ ALTER TABLE example EXECUTE

    optimize(file_size_threshold => '100MB'); • スナップショットの削除 ◦ 古いスナップショット・データを削除 ◦ ALTER TABLE example EXECUTE expire_snapshots(retention_threshold => '7d'); • 孤立したファイルの削除 ◦ ジョブの失敗などで発生するファイルを削除 ◦ ALTER TABLE example EXECUTE remove_orphan_files(retention_threshold => '7d');
  17. Java Clientでの書き込み • Java Clientで直接データ追加 ◦ Trino,Spark等を利用しない方法 ▪ クエリエンジンが利用しているAPI ◦

    Mavenでライブラリ配布 ◦ コード例はKotlin • 大きな流れ ◦ Writerの準備 ◦ レコード書き込み ◦ Writerを終了 (datafiles書き込み) ◦ Appenderにdatafilesを追加 ◦ Appenderを終了 (metadata書き込み) val appenderFactory = GenericAppenderFactory(table.schema(), table.spec()) val fileFactory = OutputFileFactory .builderFor(table, 0, 0) .format(FileFormat.PARQUET) .build() val writer = RowDataPartitionedFanoutWriter( table.spec(), FileFormat.PARQUET, appenderFactory, fileFactory, table.io(), Long.MAX_VALUE, table.schema(), ) writer.write(record) val writeResult = writer.complete() val append = table.newAppend() for (dataFile in writeResult.dataFiles()) { append.appendFile(dataFile) } append.commit() https://iceberg.apache.org/javadoc/1.3.1/org/apache/iceberg/data/GenericAppenderFactory.html https://iceberg.apache.org/javadoc/1.3.1/org/apache/iceberg/io/PartitionedFanoutWriter.html
  18. Java Clientでの読み取り • TableScan (ファイル単位) ◦ フィルタを元にメタデータから読み取りに必要 なファイルを決定 ◦ ファイルのリストまたは、大小のファイルが組

    み合わさったタスクのリストを出力 ◦ クエリエンジンの内部でも利用される • TableScanIterable (行単位) ◦ Parquet, Avro, ORCファイルの読み取り ◦ イテレータでレコードを読み出し val scaner = table.newScan() .filter(Expressions.equal( "id", 5)) .select("id", "name", "age") val files = scan.planFiles() val tasks = scan.planTasks() val result = IcebergGenerics.read(table) .where(Expressions.lessThan( "id", 5)) .build() https://iceberg.apache.org/javadoc/1.3.1/org/apache/iceberg/TableScan.html https://iceberg.apache.org/javadoc/1.3.1/org/apache/iceberg/data/IcebergGenerics.ScanBuilder.html
  19. ログ基盤でのモデリング • 実際にモデリング ◦ 冒頭で紹介した社内の開発中ログ基盤 ◦ (実際はさらに複雑な構成になっています) • 大まかな仕様 ◦

    テナントID毎にストア・クエリは分離 ◦ 時系列順でクエリされることが多い (例:10/3~10/4のデータ) • Partition ◦ tenant, timestampを元に毎日生成 ◦ 100テナント・1年 36,500パーティション • Sort ◦ timestampを元にソート • 備考 ◦ パーティション・ソート共に変更可能 CREATE TABLE log_store ( tenant INTEGER, timestamp TIMESTAMP(6), level INTEGER, -- DEBUG:0, INFO:1, WARN:2, ERROR:3 message VARCHAR ) WITH ( partitioning = ARRAY[ 'tenant', 'day(timestamp)' ], sorted_by = ARRAY[ 'timestamp' ] );
  20. ログ基盤の構成 Write Client Ingester (Kotlin) Shipper (Kotlin) Object Storage REST

    Catalog (MySQL) Query Gateway (Go) Maintainer (Go) gRPC Publish Consume Write Iceberg Table Catalog Operations Issue Maintenance Command HTTP API Read Client HTTP API 性能改善を目的にKafka導入 • Icebergはコミットが衝突すると処理時間が伸びる • ファイルサイズをある程度制御したい
  21. Iceberg, Trino • Icebergは高機能・高性能なストレージフォーマット ◦ 高い信頼性: Seriallizable isolation, Snapshot, Atomic

    mutation ◦ 費用対効果の高いストレージ : Object Storage, Parquet ◦ パフォーマンス最適化 : Partitioning, Clustering, CoW/MoR切り替え ◦ スキーマ変更: Schema Evolution, Table, Partitioning • Icebergは内部に階層構造を持つ ◦ スナップショット等の高度な機能を提供しているが、見通しが良い • Trinoから簡単にIcebergが扱える ◦ SQLインタフェースで性能チューニングも可能 • さくらで開発中の社内向けログ基盤はIceberg, Trinoを採用 ◦ 手軽にスケーラビリティの高いサービスを構築可能
  22. Try Trino & Iceberg! • 今回紹介した内容が一通り試せるDocker Compose構成 ◦ https://github.com/kamijin-fanta/iceberg-sandbox-kit ◦

    Linuxマシンでgit clone, makeだけでクラスタが起動 ▪ Trino, Iceberg REST Catalog, MySQL, MinIO ▪ デバッグ用コンテナに avrocat, jq, pqsr等がインストール済み ◦ 詳しい使い方はREADME.md参照