Upgrade to Pro — share decks privately, control downloads, hide ads and more …

暗号資産販売所のプライシング機能開発で ActorModel風なものを実装した話

Sponsored · SiteGround - Reliable hosting with speed, security, and support you can count on.

暗号資産販売所のプライシング機能開発で ActorModel風なものを実装した話

Avatar for Yuta Miyake

Yuta Miyake

June 12, 2024
Tweet

More Decks by Yuta Miyake

Other Decks in Technology

Transcript

  1. 4 LP プライシングの仕組み Mercoin バグレート 判定 ベースレート 生成 CQ生成 ASK:505万

    Bid:485万 お客様 ASK:500万 Bid:490万 ASK:500万 Bid:490万 ASK:510万 Bid:480万 ASK:2000万 Bid:200万 ASK:500万 Bid:490万 ASK:510万 Bid:480万 ❌ 高速化のために複数のQuoteをオンメモリで保持 し、一定間隔でベースレートを生成する
  2. 7 Goの非同期・並行並列処理 • Goは、シンプルな記述で非同期処理や並行処理が書けつつ、高いマルチコア性能を引き出すこ とが得意な言語。コードはgoroutine (G) という軽量スレッドで動いていて、ランタイムがGを 少数のOSスレッドに効率的に多重化することで、マルチコアをうまく活用できる仕組みになっ ている •

    ランタイムは、デフォルトでCPUコア数分のOSスレッドが常にGを実行している状態が維持さ れるようにスケジュールする • ネットワークI/Oが呼ばれると、Gが一時中断され、他のGがスケジュールされる。ランタイム にはイベントループが実装されていて、待機していたI/Oが完了すると一時中断していたGが再 びどれかのOSスレッドにスケジュールされる。 • CGOやファイル操作によって長時間ブロックされると、別のidle OSスレッド(なければ作ら れ)に他のGがスケジュールされる • 10msのタイムスライスが設けられており、それを超えると他のGに実行権限が移行する • 重厚なランタイムのおかげで、非同期や並行にしたい関数にgoをつけるだけで、効率的なマル チタスキングとCPU資源の公平な活用が可能
  3. 9 • アクターモデルとは、並行計算モデルの一種 (1973) • アクターという独立した並行動作可能な計算主体が、非同期にメッセージをやり取りすること で協調動作するモデル • 各アクターは1つのメールボックスを持ち、受信したメッセージを逐次処理する •

    受け取ったメッセージに対してできることは3つ • このモデルの良さの1つは、並行処理が管理しやすいこと Actor Model アクター 送信 生成 状態更新 mailbox アクター mailbox アクター mailbox ◦ 状態の隠蔽(競合状態がない) ◦ データ競合やロックが起きにくい ◦ 逐次処理が理解しやすい ◦ 内部状態の更新をする/次の振る舞いを規定する ◦ メッセージをアクターに送信する(自身も含む) ◦ アクターを生成する
  4. 10 • Goのライブラリ • 今回これらを直接は使わなかった • そのため、最低限のシンプルなものを作る方針で開発を始めた Actor Model in

    Go ◦ 最終的な実装はprotoactorから根幹部分であるメッセージ処理周りを移植したような ものに近い ◦ ergo - Erlang/OTPにインスパイアされたアクターベースフレームワーク ◦ prootoactor-go - Akka.NETの作者がリライトしたライブラリ ◦ GoのActorModelのライブラリは、本番導入事例少め、やりたいことに対して機能が 豊富すぎる ◦ そもそもActorModelの知見不足だった ◦ 依存ライブラリが増えるとアプリケーションが不安定になる
  5. 11 MessageProcessor すべてのActorは、MessageProcessor I/Fを実装する。PrcessMessageはメッセージを処理するコー ルバック関数で、逐次に呼ばれる。 type MessageProcessor interface { ProcessMessage

    (actx lib.ActorContext , message any) } 引数のActorContextは、そのActorの情報を保持していたり、他のActorとのやり取りを可能にするも の。 メッセージには、自身も含めてActorが送信したものが来るほか、起動後のStartや終了前のStopと いったシステムメッセージも送られてくる。
  6. 12 例:Event Eventは、イベントが発生した際にSlack通知等を行うActor。 func (e *Event) ProcessMessage (actx lib.ActorContext ,

    message any){ switch m := message.(type) { case *valueobject.Event: e.NotifyEvent(actx, m) // 通知処理 case *lib.Start: actx.Send(request.Writer , &PodStartEvent{}) // writerへ送信(非同期化) case *lib.Stop: actx.Send(request.Writer , &PodStopEvent{}) // writerへ送信(非同期化) default: } request.Writerは、globalな Actor識別子、DIの関係で必要だった
  7. 13 ActorContext type ActorContext interface { actorInfo actorSender actorSpawner }

    type actorInfo interface { Logger() *zap.Logger … Memory() memory.SharedMemory Stats() statsd.ClientInterface } type actorSender interface { Send(*ActorID, any) // メッセージ送信する SendAfter(*ActorID, any, time.Duration) func() bool // 一定時間後に送信する … Sender() *ActorID     // 送信者情報を返す Self() *ActorID // 自身の情報を返す } type actorSpawner interface { Spawn(props *Props) Actor                 // Actorを生成する }
  8. 14 SharedMemory Actor間でデータ共有するもう一つの方法。KVストア。用途は、動的な設定、テスト用変数、リー ダー状態などの保持。正直、アクターモデルの良さを殺しているが、すべてメッセージにすると開発コ ストが高くなったりするため最小限ならよしとしている。 type SharedMemoryAccessor interface { Get(key

    StateKey) (any, bool) Set(key StateKey, value any) Pop(key StateKey) (any, bool) Upsert(key StateKey, value any, cb UpsertCb) } type SharedMemory interface { SharedMemoryAccessor Load(ctx context.Context, r io.Reader) error Store(ctx context.Context) (io.Reader, error) Clear() } 内部実装はconcurrent-map (shard毎ロック)。公式のsync.Mapはwrite-once-read-many最適され てたりするのでやめた。 concurrent-swiss-map (最適なメモリレイアウト+ SSEの活用等で高速lookup が可能なSwissMap のスレッドセーフ版)はまだちゃんとさわれていないが感触早い。 Load/Storeはリー ダー状態をDBへ永続化し新リーダーに継承させるための機能で、 gobでシリアライズしている。
  9. 15 例:ConfigSyncer ConfigSyncerは、動的設定を可能にするActor。定期的にDBにある設定をオンメモリに反映する。 Start時にExec(処理開始のためのメッセージ)を自身に送信、その後Execを受信すると設定をDBか ら読み取りSharedMemoryを更新、SendAfterで一定期間後に送信されるようにする。 type Exec struct{} func (s

    *ConfigSyncer) ProcessMessage (actx lib.ActorContext, message any){ switch message.(type) { case *lib.Start: actx.Send(actx.Self(), &Exec{}) case *lib.Stop: s.stopTimer() case *Exec: ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := s.run(ctx) s.stopTimer = actx. SendAfter(actx.Self(), &Exec{}, s.config.Interval )
  10. 16 例:Writer WriterはDBへの永続化を担当するActor。ワーカープール(pond)を使って同時実行制御しながら永 続化を行い、リトライ可能な失敗をした場合、自身に再送信する。 func (r *Writer) ProcessMessage (actx lib.ActorContext,

    message any){ switch m := message.(type) { case *lib.Start: r.writerPool = r.writerPoolProducer () case *lib.Stop: r.writerPool.Stop() default: r.writerPool.Submit(func() { err := r.ProcessWriteMessage (actx, m) // 永続化処理 … if // リトライ可能なら再送信 actx. Logger().Warn("failed to write message; message will be requeued" … actx. Send(actx.Self(), m) }
  11. 17 Actorの生成と停止 Actorの生成方法は2つ。ProcessMessageに渡ってくるActorContextを使うか、 RootActorContextを使うか。子Actorを作るなら前者、ルートレベルのActor(ライフサイクルが サービス起動開始から終了まで)を作るには後者を利用する。 type Actor interface { ID()

    *ActorID Send(any) Stop() GracefulStop() WaitStop() } root := lib.NewRootActorContext () event := &Event {} // ProcessMessage を実装している eventActor := root.Spawn(lib.NewProps(request.Event , event, WithMailbox(1000)) Spawnが呼ばれると、Actorが初期化され、対応するgoroutineが起動、Startメッセージが送信さ れ、最終的に以下のActorが返ってくる。 停止するには、StopかGracefulStopを呼び、さらに停止完了を待ちたい場合、続けて WaitStop。
  12. 18 メッセージ処理ループ 肝心のメッセージ処理はmailbox.ProcessLoopから呼ばれる。このループ処理は、各Actorに対応す るgoroutineが実行する。mailboxにはメッセージ種別にFIFOキューがあり、システムメッセージが 優先的に処理される。panicが発生した際は、panicStrategy (restart/resume/stop)に応じて goroutineの再起動等が行われる。 func (w *mailbox)

    ProcessLoop() { var msg any defer func() { if r := recover(); r != nil { w.handler.HandlePanic(r, msg) // パニック処理 } }() for { if msg = w.systemQueue.Pop(); msg != nil { // システムメッセージ処理 … w.handler.HandleSystemMessage (msg) continue } } if msg, _ = w.userQueue.Pop(); msg != nil { // ユーザーメッセージ処理 w.handler.HandleUserMessage (msg) // 最終的にProcessMessage が呼ばれる } else { … …
  13. 20 MPMC リングバッファ Actorのmailbox(userQueue)に対しては、複数のActorが書き込みを行えて、1つのActorが読み取り を担当するため、最低限、Multi Producer Single Consumer (MPSC) キューが必要となる。

    現時点では、Actorの拡張性も考えて、MPMC リングバッファを使用している。素朴に実装するなら 粗くMutexを使ってリングバッファ全体をロックするのが簡単かつ安全だが、マルチスレッド環境で パフォーマンスを制限するため、Mutexの代わりにsync/atomicを使った Workiva/go-datastructuresのMPMC実装を使用している。
  14. 22 生産と消費 (SPSC前提) 3 2 1 0 4 5 6

    7 position: 0+1 data: *** position: 1+1 data: *** readIdx 0 writeIdx 2 3 2 1 0 4 5 6 7 position: 1+7 data: nil position: 2+7 data: nil readIdx 2 writeIdx 2 2つ生産後の状態 2つ消費後の状態 処理の大まかな流れは生産も消費も同じで、まずpositionとidxを比べて作業できるか判断し、NGな ら待機、OKなら作業開始、最後に作業完了を示すためにpositionを更新する 生産者はwriteIdx == node.positionなら書き込めると判断し、writeIdxを+1、dataを書き込み、最 後にposition+1 消費者はreadIdx == node.position+1なら読み取れると判断し、readIdxを+1、dataを読みとり、 最後にpositionを1周進ませる
  15. 24 RingBuffer (簡易版) type RingBuffer struct { _ [CacheLinePadSize]byte //

    padding, false-sharing を防ぐための最適化 writeIdx uint64 _ [CacheLinePadSize]byte readIdx uint64 _ [CacheLinePadSize]byte mask uint64 // bitmask, 効率的な折り返し計算のために使用する _ [CacheLinePadSize]byte nodes []node } type node struct { position uint64 data any } func New(size uint64) { size = roundUp(size) // サイズを2のべき乗にする rb := &RingBuffer{nodes: make([]node, size)} for i := uint64(0); i < size; i++ { // プリアロケーション rb.nodes[i] = node{position: i} } rb.mask = size - 1 }
  16. 25 Enqueue (簡易版) func (rb *RingBuffer) Enqueue(item interface{}) error {

    var n *node write := atomic.LoadUint64(&rb.writeIdx) L: for { n = &rb.nodes[write&rb.mask] // maskとANDでモジュロ演算と同じ効果が得られる seq := atomic.LoadUint64(&n.position) if seq == write && atomic.CompareAndSwapUint64 (&rb.writeIdx, write, write+1) { break L } else { write = atomic.LoadUint64(&rb.writeIdx) } runtime.Gosched() } n.data = item atomic.StoreUint64(&n.position, write+1) return nil }
  17. 26 Dequeue (簡易版) func (rb *RingBuffer) Dequeue() (interface{}, error) {

    var n *node read := atomic.LoadUint64(&rb.readIdx) L: for { n = &rb.nodes[read&rb.mask] seq := atomic.LoadUint64(&n.position) if seq == (read + 1) && atomic.CompareAndSwapUint64 (&rb.readIdx, read, read+1) { break L } else { read = atomic.LoadUint64(&rb.readIdx) } runtime.Gosched() } data := n.data n.data = nil atomic.StoreUint64(&n.position, read+rb.mask+1) return data, nil }
  18. 27 Benchmark: channelより性能良い? ❯ go test -test.bench BenchmarkChannelNonBlocking goos: darwin

    goarch: arm64 pkg: ringbuffer BenchmarkChannelNonBlocking1P1C- 10 9697224 134.3 ns/op 0 B/op 0 allocs/op BenchmarkChannelNonBlocking2P1C- 10 6220912 192.5 ns/op 0 B/op 0 allocs/op BenchmarkChannelNonBlocking3P1C- 10 4917793 262.6 ns/op 0 B/op 0 allocs/op PASS ok ringbuffer 5.443s ❯ go test -test.bench BenchmarkMPMCRingBufferCASSpin goos: darwin goarch: arm64 pkg: ringbuffer BenchmarkMPMCRingBufferCASSpin1P1C- 10 17458861 67.87 ns/op 8 B/op 0 allocs/op BenchmarkMPMCRingBufferCASSpin2P1C- 10 11925332 99.71 ns/op 15 B/op 1 allocs/op BenchmarkMPMCRingBufferCASSpin3P1C- 10 7679577 144.1 ns/op 23 B/op 2 allocs/op PASS ok ringbuffer 4.871s 簡易版RingBufferのコードやテストコードは githubにあります
  19. 29 References • https://go.dev/src/runtime/proc.go (preemption source code) • The Go

    Programming Language and Environment • Analysis of the Go runtime scheduler • The Go scheduler • Go が他の多くの言語での非同期プログラミングよりも優れている理由 • Futures, Async, and Actors • メッセージとイベントを中核に置いたシステム設計の有用性について • 徐々に高度になるリングバッファの話