Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Pub/Sub で実装するワーカープールパターン - BigQuery リバースETLジョブの...

Avatar for TVer Inc. TVer Inc. PRO
January 28, 2026
47

Pub/Sub で実装するワーカープールパターン - BigQuery リバースETLジョブの並行数制御-

Avatar for TVer Inc.

TVer Inc. PRO

January 28, 2026
Tweet

More Decks by TVer Inc.

Transcript

  1. ©2024 TVer INC. 髙品純大 • 株式会社 TVer 広告プロダクト本部所属 • 担当領域

    ◦ SRE ◦ 広告配信システムのバックエンド 自己紹介 2
  2. ©2024 TVer INC. 背景:BigQuery リバースETLの「蛇口全開」 Cloud Bigtable Cloud Storage BigQuery

    Eventarc Workflows Cloud Run Apply Bulk with Reverse ETL object.v1.finalized • Cloud Storage へのファイルアップロードをトリガーに、BigQuery から Bigtable へデータを 書き込むリバースETLジョブが実行されるパイプライン • Cloud Storage イベント発生とほぼ同時に、パイプラインが多重起動する • パイプライン側は Bigtalbe に全力で書き込もうとする(蛇口全開) Multiple Pipelines 5
  3. ©2024 TVer INC. 課題:BigQuery と Bigtable のリソース飽和 • 複数のリバースETLジョブから Bigtable

    に 書き込みリクエストが集中する • Bigtable サービスの書き込みキューが溢れて BigQuery にスロットリングエラーが返り、 BigQuery のクエリジョブもエラー終了する • リバースETLを実行するには BigQuery Enterprise Edition の予約スロットが必要 • パイプラインの多重起動により、大量の スロット割り当て待ちタスクが発生して スロットが競合 • 結果的にクエリジョブの処理遅延が起きる BigQuery スロット競合による処理遅延 Bigtable の書き込みスロットリング 6
  4. ©2024 TVer INC. 課題の要因と対策 • Bigtable テーブルを、数百万から数千万行を更新するジョブが 大量に並行実行されると想定しておらず、BigQuery と Bigtable

    にかかる負荷を予測できなかった • イベントを即時処理するアーキテクチャは、ダウンストリーム (BigQuery, Bigtable)の限界を超えてしまうので、流量制限が 必要 7
  5. ©2024 TVer INC. リアーキテクチャ: Worker Pool パターン Cloud Bigtable Cloud

    Storage BigQuery Eventarc Workflows Pub/Sub Google Kubernetes Engine publish message(job) pull message Apply Bulk with Reverse ETL Multiple Pipelines • Push 型から Pull 型へ変更 • Workflows は Pub/Sub にジョブを Push して終了 • Cloud Run Job の単発ジョブから、Pub/Sub をサブスクライブする GKE の常駐ワーカーに 変更し、同時に処理するメッセージを制御しながら Pull させる 8
  6. ©2024 TVer INC. Worker Pool パターン • コンセプト ◦ あらかじめ決めた数の

    Worker (リソース) がキューから ジョブやタスクを消化していく構造 • 言語によって呼び名は違っても同様のコンセプトは昔から存在 する ◦ Java 風に言うと Worker Thread ◦ Go だと Worker Pool 9
  7. ©2024 TVer INC. Pub/Sub Client(Subscriber) の同時処理メッセージ数制御 (Go) Pub/Sub Client(Subscriber) の

    MaxOustandingMessages により、 同時に処理するメッセージの最大数 を制御できる この設定は Worker 数の上限と同義 const maxOutstanding = 3 type Worker struct { subscriber *pubsub.Subscriber } func NewWorker(pubsub *pubsub.Client, subscription string) *Worker { subscriber := pubsub.Subscriber(subscription) subscriber.ReceiveSettings.MaxOutstandingMessages = maxOutstanding // this line return &Worker{ subscriber: subscriber, } } 10
  8. ©2024 TVer INC. subscriber.Receive() に渡す コールバック関数の内側にビジネス ロジックを書く コールバックは goroutine で実行さ

    れるが、 goroutine 数 =< maxOustandingMessages となって goroutine (worker) が増え過ぎる ことがない func (w *Worker) Run(ctx context.Context) error { var err error err = w.subscriber.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { err := w.processJob(m) if err != nil { m.Nack() return } m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { return fmt.Errorf("unexpected error: %w", err) } else { log.Println("subscriber.Receive exited by context.Canceled") } return nil } Pub/Sub Client(Subscriber) を使用した Worker Pool 実装 (Go) 11
  9. ©2024 TVer INC. なぜ Pub/Sub を使うのか 言語標準の機能(Go の channel など)だけでも

    Worker Pool を実装 することは勿論可能だが、Pub/Sub を使うメリットは大きい • 耐久性 • スケーラビリティ • 抽象化 13