Upgrade to Pro — share decks privately, control downloads, hide ads and more …

ReproでのicebergのStreaming Writeの検証と実運用にむけた取り組み

ReproでのicebergのStreaming Writeの検証と実運用にむけた取り組み

iceberg meetup

Avatar for Tomohiro Hashidate

Tomohiro Hashidate

January 21, 2026
Tweet

More Decks by Tomohiro Hashidate

Other Decks in Technology

Transcript

  1. メンテナンスタスク メンテナンスタスクは以下の様なやり⽅で実施。 • expire_snapshot: Glueに任せる • rewrite_data_files: Sparkで実⾏する • rewrite_manifests:

    Sparkで実⾏する • delete_orphan_files: Glueに任せる 全部Glueの機能で完結すれば楽だったのだが、rewrite_data_filesの負荷が⾼過ぎてGlueの 機能ではコンピューティングリソースが不⾜し失敗する。 14
  2. データ規模 • 全体のデータ量: 2TB程 • レコード数: 200億件超 • 更新頻度: 5000件

    / sec bucketingの結果、バラつきはあるが⼤体1パーティションが平均1GBぐらいになる様にして 検証。(バケットサイズ、ファイルサイズは実運⽤時には要調整) 15
  3. DDL spark-sqlで実⾏する。 CREATE TABLE IF NOT EXISTS glue.testdb_production.test_tables ( app_id

    bigint NOT NULL, user_id bigint NOT NULL, key string NOT NULL, value string ) USING iceberg PARTITIONED BY (bucket(32, app_id), bucket(8, key)) TBLPROPERTIES ( 'write.object-storage.enabled'='true', 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'history.expire.max-snapshot-age-ms'='86400000' ) LOCATION 's3://repro-experimental-store/production/testdb_production/test_tables'; ALTER TABLE glue.testdb_production.test_tables WRITE ORDERED BY insight_id, key; ALTER TABLE glue.testdb_production.test_tables SET IDENTIFIER FIELDS insight_id, user_id, key; 16
  4. Flinkによる書き込み設定 confluentのschema registryを利⽤したKafkaトピックからのストリーム書き込みを⾏うた め、以下の準備をする。 # confluent registrtyに対応したflink sql connectorのjarファイルをDL wget

    https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.20.0/flink-sql-avro-confluent-registry-1.20.0.jar # flinkセッション起動 flink-yarn-session -Dparallelism.default=2 -d # flink SQLの実行 flink-sql-client -j flink-sql-avro-confluent-registry-1.20.0.jar -f insert.sql 23
  5. flink-sqlでチェックポイント間隔を調整 SET state.backend.type = 'rocksdb'; SET execution.checkpointing.storage = 'filesystem'; SET

    execution.checkpointing.dir = 's3://repro-experimental-store/production/flink-checkpoints'; SET execution.checkpointing.savepoint-dir = 's3://repro-experimental-store/production/flink-checkpoints'; SET execution.checkpointing.num-retained = '1'; SET execution.checkpointing.interval = '15min'; SET execution.checkpointing.timeout = '10min'; SET execution.checkpointing.min-pause = '1min'; クエリ⾃体は単純にKafkaからconfluet-schema-registryを利⽤してデータを取得、upsert で書き込む単純なクエリを利⽤した。 execution.checkpointing.interval がicebergのcommit間隔になる。今回は調整の結果15 分とした。 24
  6. commit間隔とファイル数の関係 1タスクで15分に1回コミットなので、15分に凡そ1パーティションに1ファイルづつparquet ファイルが増えていく。 24時間で 4(1hで4回) * 24h * 2(data file

    & delete file) = 96ファイル程増えることになる。 並列数がもっと必要であれば、その分作成されるファイル数も増える。 これを定期的なcompactionで解消できるかどうかを検証した。 25
  7. GlueカタログとViewについて Trinoのiceberg catalogをGlue Catalogに設定して構成していた場合、icebergテーブルが所 属しているcatalog及びdatabaseに対してviewを⽣成するとGlue Data Catalog側に永続化 されることが分かった。 そのためクラスタの停⽌や⼊れ替えを伴ってもviewを再作成する⼿間はかからないことも確 認できた。

    将来的にはApache Flussを活⽤できたりすると効率が良さそうと考えているが、現状では Trinoからのクエリがサポートされていないしバックエンドのicebergサポートも計画中とい う感じなので、今後の展開に注⽬していきたい。 30
  8. 総評 • Flinkによる書き込みは動きがシンプルなので動作⾃体は⾮常に軽い。 • 15分単位のupsert書き込みを継続している状態で、compactionの実⾏が遅れると 割と顕著にパフォーマンスに影響を与えることが分かった。compactionの定期実 ⾏はやはり重要。 • compactionはかなり処理負荷が⾼くメモリもCPUリソースもかなり必要になる。 •

    特に今のicebergで普及しているテーブル仕様のバージョン(v2)とsparkの実装で は、⾮常に⼤量の⼩さなファイル(特にdeleteファイル)が存在すると、 compactionに膨⼤なメモリが必要になるため、compactionが長期に渡って実 ⾏されないとテーブルのメンテナンス⾃体が困難になる可能性があって危険。 現時点でcompactionの所要時間をそれなりに抑えて安定して実⾏できる様にするにはかなり ⼤きめのsparkクラスタがいる。 31
  9. 実装済みのモニタリング・監視 • 各種AWSリソース、アラート定義のterraform化 • flinkがちゃんと動作していることを監視し、マシンリソースと書き込みペースの メトリックをdatadogで取得できる様にする。 • EventBridgeとStepFunctionで定期的にcompactionのためのEMRを動かすデプロ イスクリプトと権限設定。 ◦

    compactionの失敗時にアラート • icebergのテーブルメタデータによるモニタリング ◦ パーティションごとのファイル数やファイルサイズ合計をメタテーブルから 取得、datadogに送信するシェルスクリプトを書いて書き込みクラスタの systemd timerで定期実⾏。 33