Upgrade to Pro — share decks privately, control downloads, hide ads and more …

ベアメタルで実現するSpark&Trino on K8sなデータ基盤

Sponsored · Ship Features Fearlessly Turn features on and off without deploys. Used by thousands of Ruby developers.

ベアメタルで実現するSpark&Trino on K8sなデータ基盤

More Decks by システム開発部広報委員会

Other Decks in Programming

Transcript

  1. #trinodb 事業紹介(データプラットフォーム事業) 5 広告を出したい「広告主」向け DSP SSP 広告を出して欲しい「Webメディア」向け 広告 メディア ユーザー

    ユーザーB 広告主/代理店 リアルタイムで取引 (RTB) 広告出稿料 広告表示 広告代理店 広告主    : 広告 A 広告 B    : AD ユーザーA : ニュース グルメ : Data Management Platform 提携企業DB 位置情報 データ紐づけ 提携企業DB EC購買 Web行動 提携企業DB リアル購買
  2. #trinodb 現行のデータ基盤の概要 広告配信サーバ 各種サーバ Data Lake(CDH v5.16.2) 分析(CDH v6.3.2) Streaming

    ログ転送 Spark Streaming (リアルタイム加工) ワークフロー ジョブ実行 ジョブ実行 Impala 参照 分析クエリ実行 参照 約13TB/dayを処理 理論値で 約2ペタバイト 保持している 平均流量は 秒間Gbitオーダー
  3. #trinodb 現行のデータ基盤の概要 広告配信サーバ 各種サーバ Data Lake(CDH v5.16.2) 分析(CDH v6.3.2) Streaming

    ログ転送 Spark Streaming (リアルタイム加工) ワークフロー ジョブ実行 ジョブ実行 Impala 平均流量: 秒間Gbitオーダー 参照 分析クエリ実行 参照 今日はここの話
  4. #trinodb 現行のデータ基盤の課題 1. CDH無償版の提供が終了しているので継続して利用出来ない ◦ 有償の後継版Cloudera CDPも検討したが費用面がクリア出来ず見送り (Google Cloudなども検討したが、費用や技術課題がクリア出来ず見送り。5年償却で見るとクラウドは高い。) 2.

    ComputeとStorageを分離してNode配置出来ないので サーバスペックが過剰になりがち ◦ YARNのNode ManegerとHDFSは分離して配置出来ない ◦ ComputeスケールさせたいだけなのにStorageもスケールするので非効率 3. Impalaの統計情報の運用が非常に煩雑かつ有効に利用出来ない ◦ 大規模テーブルの場合、ほぼ使えない ◦ 統計情報が利用できないので効率の悪いクエリになりがちでImpalaを活かしきれない 4. ETL/ELT処理で利用しているMapReduceベースのHiveが遅い ◦ 本来はMapReduceではなく、Tez・LLAPを使うべきだがCDHが古くて利用できない 5. テーブル構造が複雑なので、SQLベースでETL/ELT処理するのが辛い ◦ 複雑なクエリになりがちで、改修に難易度が高く手間がかかる
  5. #trinodb 新しいデータ基盤に求める事 1. ComputeとStorageを分離したい 😆 HiveテーブルからIcebergテーブルに変更し、HDFSからS3互換のアプライアンスに置き換えること でYARN・Zookeeperに依存しなくなり分離が可能になった(構成要素も減ったので構築も楽になった) 2. ETL/ELT処理は、SQLベースではなく、Programmableに処理したい 😆

    Sparkを使ってスクリプトベースに処理することで、複雑なSQLでの処理が不要になった 3. SQLエンジンは大規模なテーブルでも統計情報を更新・有効活用が出来ること 😆 Trino&Icebergを使うことで、Hive・Impalaに依存せずに、柔軟に統計情報の更新・利用する 4. Hiveテーブルの様にオンラインで柔軟なスキーマ進化が可能であること 😆 Iceberg特有のスキーマ進化(orパーティション進化)により、以前より柔軟な運用が可能になる
  6. #trinodb アドホック分析用としてのTrinoで工夫したこと • Kubernetes(RKE2)を使うことでクラスタの構築やアップデートを楽にした ◦ Trino自体の構成がCoordinator・Workerと構成がシンプルでPersistentVolume (PV)が不要。そ の為、マニフェストをシンプルに保てるのでK8sでの運用はさほど辛くない。 ◦ RKE2のsystem-upgrade-controllerがあるので設定書いてapplyするとローリングアップグレー

    ドしてくれるので便利 https://docs.rke2.io/upgrade/automated_upgrade/ (もしくはRancher Web UIからポチーがもっと簡単) • Rancherを使ってK8sクラスタ管理することで管理コストを下げ利便性を向上 • Helm ChartにはTrino公式のものよりもこなれている github.com/valeriano-manassero/helm-charts を使用した ◦ JVMのヒープサイズの指定を-Xmx/-Xmsではな く-XX:MaxRAMPercentage/-XX:InitialRAMPercentageを使って使用可能なメモリーに対する 割合で指定するように変更 ◦ Affinityを利用して、CoordinatorとWorkerポッドの同居を禁止し、 Workerポッドはなるべく同一Nodeに2個以上配置しないようにする
  7. #trinodb アドホック分析用としてのTrinoで工夫したこと • Fault-tolerant executionを使い、 搭載メモリ以上のクエリを利用できるようにした ◦ 搭載メモリの10倍以上のクエリでも実行可能になった ※ただし、実行時間は延びる ◦

    Exhange managerを有効にしてTASKリトライポリシーを 使用しました。 ◦ 中間データ用ストレージはS3以外にHDFSにも対応。超便利。 Stage間の中間データを分散ストレージ に保存し、途中でTaskがクラッシュして もクエリはクラッシュせずに中間データ を使って復旧する
  8. #trinodb アドホック分析用としてのTrinoで工夫したこと • Spill-to-diskを使って、OOMを起きにくくするようにした ◦ Helm Chartを改修して、Coordinator・WorkerポッドにemptyDirボリュームをマウントすること で、OOMで落ちても復帰時に再利用出来るようにした • Icebergテーブルで利用するカタログにはRESTカタログを用うことで、

    TrinoやSparkなどからIcebergテーブルを利用しやすくした • Hive→Icebergテーブル移管の際は、IcebergのSparkのadd_filesプロシージャを 使うことで、Icebergテーブル側に過去分のデータをコピーを不要にした ◦ HiveテーブルはHDFS上にあるので、add_filesプロシージャで出来たIcebergテーブルの データは、S3とHDFSの両方を参照することになるので、カタログ(REST)の io-implプロパティにorg.apache.iceberg.io.ResolvingFileIOを利用することで両方に対応した
  9. #trinodb S3 /db1/tbl1 /db1/tbl1/k1=B 補足:add_filesプロシージャって? 移行元のデータをIcebergテーブルにコピー せずに参照出来るようにするIcebergの Sparkのプロシージャ。 数万パーティションある様なテーブルの場 合、一度にコピーするには時間がかかるが、

    これなら移行元の更新を止めずに移行が可 能。 パーティション単位で実行が可能なので、 移行&検証が終わるまでの期間はadd_fileを 使い追加分を更新し、準備が終わったら Icebergテーブルにデータ書いていけば良い ので移行作業の効率が良い。 Metadata File Metadata File Manifest List Manifest List Manifest File Manifest File Iceberg Catalog db1.tbl1 現Metadata Path Manifest File Hive Metastore db1.tbl1 /db1/tbl1/k1=A /db1/tbl1/k1=B/k2=1 /db1/tbl1/k1=A/k2=1 /db1/tbl1/k1=A/k2=2 File File File File File File File File File 参照してるだけなので コピーがいらない
  10. Icebergテーブルの場合 カタログから欲しいFileの全パスとメタデータを取 得し(取得方法はカタログ実装によりけり)、その 情報を元にTrino側で処理を実施。 Icebergは欲しいFileパスが①の段階で確認出来る (HiveはHDFSディレクトリまでなので②のタイミングで探 す必要がある) 補足:TrinoはどうやってHiveやIcebergテーブルを参照するの? Hiveテーブルの場合 thrift経由でHive

    Metastoreに対し て、メタデータとデータの格納先ディ レクトリを取得するだけで、 実際のデータ処理はTrino側で実施。 その為、YARNなどは一切関与しない。 Hive Metastore db1.tbl1 /db1/tbl1 /db1/tbl1/k1=A /db1/tbl1/k1=A/k2=1 File File File ①データのディレクトリ情報 とメタデータを取得 ②Trinoがディレクトリ 情報から欲しいFileを 探して処理する S3 Storege(S3) Metadata File Manifest List Iceberg Catalog db1.tbl1 現Metadata Path Manifest File File File File ①欲しいFileのパス とメタデータを取得 ②Trinoだけで  Fileを処理する
  11. #trinodb 現状、困っていること 1. Icebergのadd_filesプロシージャが TrinoのIcebergコネクターから利用できない ◦ Add `add_files` procedure in

    Iceberg connector · Issue #11744 · trinodb/trino 2. Icebergのadd_filesプロシージャを使ったテーブルにて、 参照元のHiveテーブルにtimestamp型があった場合、 そのままでは以下のエラーが出て参照出来ない ◦ エラー文:Query 20231002_061128_00067_tqdd2 failed: Unsupported Trino column type (timestamp(6) with time zone) for Parquet column ([update_time] optional int96 update_time = 8) ◦ Trino Iceberg not honoring existing timestamp column type name of the table created outside Trino (e.g. Spark) stored in HMS · Issue #11442 ◦ Hive/Impalaで利用しているTIMESTAMP型はINT96でParquetファイルで書き込みしている がTrinoで利用しているParquetライブラリは新しくINT96に対応していない事が影響
  12. #trinodb 今後の予定 1. 9月末にリリースのあったTrino Gateway trinodb/trino-gateway を利用する • Trinoクラスタを2系統用意してTrino Gateway経由で利用する

    • Trinoの設定反映やアップグレードの際に片系ずつ実施する事が可能になるのでサービスの ダウンタイムをなくすことが出来る • TrinoのCoordinatorはHA構成が取れない(補足を参照)ので、 Trinoサービスとしての可用性向上の目的の意味もある 2. 利用状況に合わせたResource groupsとSession property managerの設計 • クエリ実行時間の制限(連続XX時間まで) • ユーザに合わせたクエリ種別の制限 ◦ XXXユーザは特定のテーブルにはSELECTのみに限定 • 分析クラスタ内でのバッチへのリソース割当てを最優先にする 3. Icebergの統計情報を使ったパフォーマンス改善と運用整備
  13. #trinodb • Fault Tolerant Executionに関する情報 ◦ wikiにある公式ドキュメントには無い詳細な説明 https://github.com/trinodb/trino/wiki/Fault-Tolerant-Execution ◦ Trino

    | Using Trino as a batch processing engine https://trino.io/blog/2022/06/24/trino-meetup-extract-trino-load.html ◦ Trino | Project Tardigrade delivers ETL at Trino speeds to early users https://trino.io/blog/2022/05/05/tardigrade-launch.html • TrinoのHAに関連する情報 ◦ Can you set up Trino in HA mode? - Trino - Starburst forum https://www.starburst.io/community/forum/t/can-you-set-up-trino-in-ha-mode/31 ◦ High Availability · Issue #391 · trinodb/trino https://github.com/trinodb/trino/issues/391 • Icebergについて深く知る事が出来る良い記事 ◦ Apache Iceberg: An Architectural Look Under the Covers | Dremio https://www.dremio.com/resources/guides/apache-iceberg-an-architectural-look-under-the-covers/
  14. #trinodb • Hive→Iceberg移管に関して参考になるブログ記事 ◦ How to Migrate a Hive Table

    to an Iceberg Table | Dremio https://www.dremio.com/blog/how-to-migrate-a-hive-table-to-an-iceberg-table/ ◦ Migrating a Hive Table to an Iceberg Table Hands-on Tutorial | Dremio https://www.dremio.com/blog/migrating-a-hive-table-to-an-iceberg-table-hands-on-tutorial/ • 利用しているIcebergのREST Catalog実装 ◦ https://github.com/tabular-io/iceberg-rest-image ◦ Iceberg's REST Catalog: A Spark Demo • Tabular https://tabular.io/blog/rest-catalog-docker/ • Icebergを知りたいならここから始めると参考になる記事 ◦ Apache Iceberg 101 - Your Guide to Learning Apache Iceberg Concepts and Practices | Dremio https://www.dremio.com/blog/apache-iceberg-101-your-guide-to-learning-apache-iceberg-concept s-and-practices/ ◦ Apache Iceberg FAQ | Dremio https://www.dremio.com/blog/apache-iceberg-faq/#h-what-is-a-data-lakehouse
  15. #trinodb • ParquetファイルのINT96関連情報 ◦ parquet-format/LogicalTypes.md https://github.com/xhochy/parquet-format/blob/cb4727767823ae201fd567f67825cc22834c20e9 /LogicalTypes.md#int96-timestamps-also-called-impala_timestamp ◦ Parquet: Support

    filter operations on int96 timestamps by thesquelched · Pull Request #2563 · apache/iceberg https://github.com/apache/iceberg/pull/2563 ◦ 'NOT_SUPPORTED: Unsupported Trino column type (date) for Parquet column ([today] optional int64 today (TIMESTAMP(MICROS,false))) · Issue #17733 · trinodb/trino https://github.com/trinodb/trino/issues/17733 • S3互換ストレージと言えばMinIO以外にもApache Ozoneもあるよ(宣伝) ◦ S3互換のオブジェクトストレージ Apache Ozoneに関する情報(随時更新) - Qiita https://qiita.com/yassan168/items/1e3c000284ae6fc8448c
  16. #trinodb • RKE2 ◦ https://docs.rke2.io/ • Rancherを利用したモニタリング&アラート ◦ https://ranchermanager.docs.rancher.com/pages-for-subheaders/monitoring-and-aler ting

    • Rancher ◦ https://www.rancher.com/ ◦ 日本のユーザコミュニティもあるのでよろしくです。 ▪ https://rancherjp.connpass.com/