Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
CQRS+ES(再)入門
Search
Sponsored
·
SiteGround - Reliable hosting with speed, security, and support you can count on.
→
かとじゅん
April 25, 2019
Programming
14k
25
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
CQRS+ES(再)入門
かとじゅん
April 25, 2019
More Decks by かとじゅん
See All by かとじゅん
TAKTでAI駆動開発の品質を設計する
j5ik2o
6
1.2k
終盤で崩壊させないAI駆動開発
j5ik2o
3
2.8k
CQRS/ESになぜアクターモデルが必要なのか
j5ik2o
0
2k
メッセージ駆動が可能にする結合の最適化
j5ik2o
10
7k
曖昧なプロンプトでも正しいコードが書ける理由
j5ik2o
0
520
AIコーディングエージェントの現実と設計品質の重要性
j5ik2o
0
160
なぜイベント駆動が必要なのか - CQRS/ESで解く複雑系システムの課題 -
j5ik2o
17
8.3k
アクターシステムに頼らずEvent Sourcingする方法について
j5ik2o
8
1.7k
メッセージとイベントを中核に置いたシステム設計の有用性について
j5ik2o
12
4.4k
Other Decks in Programming
See All in Programming
スマートグラスで並列バイブコーディング
hyshu
0
120
ユニットテストの先へ:テスト技法で要求・仕様を整理するJava開発実践 / Beyond_Unit_Testing_Practical_Java_Development_Techniques_for_Organizing_Requirements_and_Specifications
shimashima35
0
390
Make SRE Operations Easier with Azure SRE Agent
kkamegawa
0
5.3k
Spring Security 実践 ─ GraphQL APIで実務に役立つ 認証・認可 を学ぶ
wagyu
0
220
LLMによるContent Moderationの本番運用の裏側と品質担保への挑戦
suikabar
2
560
Vite+ Unified Toolchain for the Web
naokihaba
0
270
脅威をエンジニアリングの糧にして――現場編 / Turning Threats into Engineering Fuel — Field Edition
nrslib
0
270
作って学ぶ、 JSX (TSX) ランタイムの基本
syumai
7
1.6k
Copilot CLI の継戦能力を高める コンテキスト管理
nozomutu
1
1.2k
ECSアプリログをFireLensでコスト削減しようとしたけど諦めた話 in Fargate×Node.js
akihisaikeda
2
4k
Language Server 使ってる? 〜VSCode と Zed の場合〜 / Are you using a Language Server? ~For VS Code and Zed~
handlename
0
780
正しくソフトウェアを作る、前提を疑うための認知の視点 / doubt-premise
minodriven
20
6.5k
Featured
See All Featured
Tips & Tricks on How to Get Your First Job In Tech
honzajavorek
1
540
Navigating Algorithm Shifts & AI Overviews - #SMXNext
aleyda
1
1.3k
Building Flexible Design Systems
yeseniaperezcruz
330
40k
Dealing with People You Can't Stand - Big Design 2015
cassininazir
367
27k
Statistics for Hackers
jakevdp
799
230k
The SEO identity crisis: Don't let AI make you average
varn
0
490
Large-scale JavaScript Application Architecture
addyosmani
515
110k
Believing is Seeing
oripsolob
1
140
Making the Leap to Tech Lead
cromwellryan
135
9.9k
Designing Powerful Visuals for Engaging Learning
tmiket
1
410
Navigating Team Friction
lara
192
16k
[RailsConf 2023 Opening Keynote] The Magic of Rails
eileencodes
31
10k
Transcript
CQRS+ES( 再 ) 入門 突撃!! 隣のアーキテクチャ かとじゅん(@j5ik2o) 突撃!! 隣のアーキテクチャ 1
/ 38
Chatwork テックリード github/j5ik2o j5ik2o/reactiveawsclients AWS SDK for Java のソースコードからScala ラッ
パーと拡張メソッドを自動生成します j5ik2o/akkapersistencedynamodb 本家より高機能かつシンプルなプラグインを提供 j5ik2o/akkasimpleclusterk8s akkacluster をk8s で立ち上げるためのサンプル 翻訳レビュー エリックエヴァンスのドメイン駆動設計 Akka 実践バイブル 自己紹介 QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 2 / 38
最近の発表ネタ 1. ドメインモデリングの始め方 AWS Dev Day Tokyo 2018 ドメインオブジェクトの発見・実装・リファクタリングの方法論をカバー
2. Scala でのドメインモデリングのやり方 Scala 関西Summit 2018 1. のスライドと同様の観点だが、より実装技法寄りの議論をカバー 3. Scala コードとともに考えるドメインモデリング Scala 福岡 2019 ドメインイベントを使ったモデリングと実装、集約を跨がる整合性問題をカバー QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 3 / 38
アジェンダ CQRS とかEvent Sourcing(ES) とかよく聞くけどよくわからない!初めて聞く人向けの資料です 1. CQRS+ES 2. Akka 永続化アクター
アクターのシャーディング QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 4 / 38
CQRS+ES QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 5 / 38
Command and Query Responsibility Segregation コマンド・クエリ責務分離 2010 年 Greg Young
氏が考案したパターン。 1997 年にBertrand Meyer 氏が考案したコマンドクエリ分離 原則(CommandQuery Separation:CQS) をアーキテクチャ に適用したものがCQRS 。 「あらゆるメソッドは、アクションを実行するコマン ドか、呼び出し元にデータを返すクエリかのいずれか であって、両方を行ってはならない。これは、質問を することで回答を変化させてはならないということ だ。」 CQRS とは QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 6 / 38
詳しくはこちら参照してください ChatWork の新メッセージングシステムを支える技術 Apache Kafka 分散メッセージングシステムの構築と活用 Chatwork のバックエンドも CQRS+ES QRS+ES(
再) 入門 突撃!! 隣のアーキテクチャ 7 / 38
モデル状態のすべてはデータベースにある 最新を取得しなければならない 更新の衝突回避にはロックが必要 ドメインオブジェクトの部分はテーブルモジュール、トラ ンザクションスクリプトになることもある。ドメインオブ ジェクトが必須ではない DTO アップ / ダウンアーキテクチャとも呼ばれる
伝統的なアプローチ QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 8 / 38
伝統的なアプローチの善し悪し シンプルでチームがスケールしやすい 後継者への教育も簡単。チームメンバーが増えてもスケールしやすい。デフォルトとのアーテクチャでとして扱いや すい ツールのサポートがあるがDDD には向かない 伝統的なスタイルでドメインオブジェクトとデータのマッピングをサポートするツール(ORM など) がある。DTO との
マッピングコードを大量削除できる。しかしこのアーテクチャでは DDD を適用するのは難しい … RDBMS に頼りっきりなのでスケールしにくい スケーリングでRDBMS がボトルネックになる( ここでいうボトルネックは書き込みのこと) 。ほとんどのケースで垂直 にスケールアップしてコストがかさみます。 これまでのアプローチでは、ドメイン駆動設計とスケーラビリティという観点では問題がある( ただ、8 割の案件ではこういう ことが要求されないのも事実) QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 9 / 38
CRUD はなぜ DDD に向かないのか CRUD の4 つの動詞はデータ指向 CREATE= 作成する, R=
読み込む, U= 更新する, D= 削除する エンドポイントーアプリケーションサービスードメインがこの動詞に汚染されてしまう この動詞のおかげでユビキタス言語もあいまいになりがち。設計の意図が失われている! def createPurchaseItem(dto: CreateDto): Unit def updatePurchaseItem(dto: UpdateDto): Unit def deletePurchaseItem(dto: UpdateDto): Unit 商品の注文なのか?商品の返品なのか?ユビキタス言語の動詞がよくわからない ドメインモデルはDTO との相互変換のためにgetter/setter を持たざるを得なくなる Getter/Setter を避けて役に立つドメインオブジェクトを作る 最終的にはビジネスロジックがアプリケーションサービスに流出、ひどければシステム外に流出することさえある ソースコードのあちこちをみたとしても、設計の意図を汲み取ることは難しい QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 10 / 38
アプリケーションサーバが何をするのかを示したコマンド ( 販売完了、購入命令の承認など) を送信する。対話の意図 を表現できる。 ユビキタス言語にフォーカスできるようになる。例えば、 「CreateUser なのか、RegisterUser なのか」など。 コマンドによって意図の明白なインターフェイスを作り上
げる /* * createItem(itemDto: ItemDto) のようなモデルではなく * サーバに商品を購入したいと命令するコマンドでモデリングする */ case class PurchaseItemCommand( id: PurchaseItemCommandId, itemId: ItemId, quantity: Quantity, customerAccountId: CustomerAccountId, purchaseAt: Instant) データ指向からコマンド指向へ QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 11 / 38
コマンドとクエリで要件が異なる Command Query 一貫性 結果整合性よりトランザクション整合性を扱うこ と多い ほとんどの場合結果整合を使う データ形式 トランザクション処理を行い正規化されたデータ
を保存することが好まれる( 集約単位など) 非正規化したデータ形式を取得することが好まれ る( クライント都合のレスポンスなど) スケーラビリティ 全体のリクエスト比率とごく少数のトランザクシ ョン処理しかしない。必ずしもスケーラビリティ は重要ではない 全体のかなりのリクエスト比率を占める処理を行 うため、クエリ側はスケーラビリティが重要 検索・レポートやトランザクション処理を両立する単一モデルは不可能ではないか?! QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 12 / 38
DTO アップ / ダウンアーキテクチャの問題点 ドメインモデルを単純に投影したDTO を返す ドメイン上に以下のような問題がおきる リポジトリがクエリ要件をカバーするため、ページ番号やソート情報も扱う DTO を生成するために、ドメインオブジェクトはgetter
経由で内部ステートを露出する DTO を生成する際、複数の集約(Aggregate) を読み出し、N+1 を含むクエリを発行する。多くのフィールドは非正規化 のために捨てられる ( リードモデルのN+1 問題とCQRS) オブジェクトモデルでクエリを処理しDTO に変換するため、クエリの最適化が非常に困難になる QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 13 / 38
CQRS ではどうなるか QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 14 / 38
データを取得するメソッドを持ち、クライアントの画面に 表示するために必要なDTO を返す CQRS ではDTO を生成するのにドメインオブジェクトが不 要になるという大きな価値をもたらす 具体的には方法はいろいろある 集約に対応するテーブルから必要な情報を取得する SQL(
もくしはビューを事前に定義) を発行するDAO を 使う 集約の更新イベントをハンドリングし、リードモデル をDB に構築する 読み出しに発生するDTO 変換ギャップ( インピーダンスミ スマッチの一種) が解決される クエリサイド QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 15 / 38
これまでのデータ中心の設計ではデータの読み込みも行っ ていたが、クエリ側に分離した ドメインはコマンドの処理だけに焦点をあてる。ドメイン オブジェクトは内部状態を晒す必要なくなる。リポジトリ ではgetById を除くとクエリ用メソッドはごくわずかです ( コマンドのためのクエリメソッドはいくつかあるだろう) コマンドサイド QRS+ES(
再) 入門 突撃!! 隣のアーキテクチャ 16 / 38
これはあくまで一例だが、コマンドとクエリ要件は異なる のだから、それに適したデータストレージを用意するのは 自然 統合の手段としては、ドメインイベントが最良の手段 コマンドとクエリを統合する QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ
17 / 38
過去にに発生した出来事のこと= イベント ドメインエキスパートが関心を持つイベント= ドメインイ ベント 一般的には過去形の動詞で表現される CustomerRelocated CargoShipped イベントからコマンドを想像することができる( 貧血症回
避!) 。 RelocateCustomer ShipCargo イベントとコマンドは似ているが、人間が扱う言語として は別モノ コマンドは拒否されることがある イベントは既に起こったことを示す ドメインイベントとは QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 18 / 38
イベントソーシング イベントがあれば現在の状態がわかる 取引もイベントソーシング。モデルとして特別なものではない イベントはイミュータブルな歴史を表現している 訂正は赤黒訂正( 通常伝票は黒インク/ 訂正は赤インクから由来) 700 番で0001 番の受注と取り消し、伝票番号701
番で修正後のデータを登録している git もイベントソーシング。打ち消しはrevert コミットを追加する QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 19 / 38
パフォーマンスとスケーラビリティ (1/2) ドメインイベントは追加のみ。つまりロックレスでストレージに保存できる データをストレージにプッシュする特性に強いDB を選ぶ( たとえばDynamoDB やKafka) パーティションは集約ID を使う。集約の数多くなっても構造を変更するこがあっても一貫したパーティショニングが可能 //
aggregateId によってライトDB を切り替えることで書き込みを簡単に分散できる case class WalletDeposited(id: WalletEventId, aggregateId: WalletId, ..., occurredAt: Instant) QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 20 / 38
パフォーマンスとスケーラビリティ (2/2) ドメインイベントを永続化するために大袈裟なツールは不要。インピーダンスミスマッチを解消しなくてよい { "partitionKey": "2048", "sequenceNo": 20832, "event": {
"type": "WalletDeposited", "id": 20832, "aggregateId": "2048", ... "occurredAt": ... } } QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 21 / 38
パフォーマンスとスケーラビリティ (3/3) 保存されたイベント列から最新状態を得るには、イベント列を状態変更のメソッドに渡すだけでよい。しかし自前での実 装はいろいろ面倒 // 関数型ではfold の中でイベントに応じたコマンドを実行するだけ case class WalletImpl(events:
WalletEvents, snapshotBalance: Money = Money.zero(Money.JPY) // イベント列が長大なときのショートカット! ) extends Wallet { // イベントから最新状態を導出( しかも、状態をlazy loading できるとは!) override lazy val balance: Money = { events.foldLeft(snapshotBalance) { case (r, MoneyDeposited(_, _, _, _, _, money, _)) => r.plus(money) case (r, MoneyPaid(_, _, _, _, _, money, _, _)) => r.minus(money) case (r, MoneyPaymentReceived(_, _, _, _, _, money, _, _)) => r.plus(money) case (r, _) => r } } } QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 22 / 38
CQRS で変わるドメインモデリング チャットルームの直近(24hr 以内) のメッセージは編集可能だが、それ以降は編集できないケース // コマンド側 case class Room(id:
RoomId, title: Title, members: Members, recentMessageIds: MessageIds) { // クエリを想定すると全Messages を持ちたくなるがここでは不要 def addMessage(message: Message, senderId: MemberId): Room = ... def updateMessage(message: Message, senderId: MemberId): Room = { // 不変条件を検証する if (!recentMemssages.contains(message.id)) throw new UpdateMessageException() else persistEvents(toEvent(message)) { // イベントを追記しリードスタックへ送る lastestMessageIds.append(message.id) } } } // クエリ側 // 全メッセージ集合を保持することはドメインモデルの責務ではなかった! val messageDtos = MessageDao.findAllWithOffsetLimit(100, 100) 副作用が伴うビジネスルールに専念するドメインモデリングが可能となる QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 23 / 38
CQRS+ES で解決できること CQRS CRUD 依存によるドメインモデル 貧血症の回避 C/Q を分離することで設計がシンプルになる(C/Q でインピーダンスミスマッチを解消しなくてよい) ドメインモデルが本来の責務に集中できる
Event Sourcing ドメインイベントをロックレスで書き込める( ただし前提がある) 処理能力が不足してもパーティションに応じて書き込み能力をスケールアウトできる イミュターブルなドメインイベントはあらゆる場面で安全に扱うことができる 境界づけられたコンテキストを跨ぐ場合に、通信手段の一つと使える( マイクロサービス文脈) QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 24 / 38
Akka QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 25 / 38
Akka とは? アクターモデルが中心になったツールキットです。フレームワークではない リアクティブ宣言 読みましょう(https://www.reactivemanifesto.org/ja) CPU コア数の増大、レイテンシーの短縮、ゼロダウンタイム、終わりのないデータ処理 Akka はリアクティブ宣言をフルサポートしているツールキット( リアクティブシステムを作るためのツールキット)
障害に強くてめちゃくちゃスケールするやつ 主要なコンポーネント akkaactor akkastream akkahttp akkacluster(akkaclustersharding, akkaclustertools) akkapersistence akkapersistencequery QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 26 / 38
プログラミングモデルの違い 従来 Akka スケーリング スケールアップ目的のマルチスレッディング、 ス ケールアウト目的のRPC 、データベースで共有さ れた変更可能な状態が混在する
スケールアップ・アウトもメッセージの送受信、 共有された状態はなくイミュータブルなドメイン イベントを保存する インタラクティブな情報 提供 現在の値をポーリングで取得する イベントが発生したときにプッシュされる( ただ し興味のあるものだけ) ネットワークを使ったス ケールアウト 同期的なRPC, ブロッキングI/O 非同期なメッセージ送受信, ノンブロッキングI/O 障害の対処方法 すべての例外を制御し、すべてが正常なときに稼 働しつづける let it crash をポリシーとして、障害を切り離し正 常な部分は稼働し続ける QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 27 / 38
クラスター上でシャーディングされた永続化ア クターとして常駐させる( メッセージがしばらく 来ない場合はアクターを停止できます!) アクターでコマンドが受け入れられるとインメ モリ状態が更新され、ドメインイベントがスト レージに追加保存されます アクターはリプレイ( 再生) される時以外はDB
を 読み込まず、インメモリに最新状態がありま す。アクターがミニキャッシュ化される 故障したノードにあったアクターは、別のノー ド上リプレイされます CRUD からの解放 !!! node Shard Region shard 集約 Actor ID=1 集約 Actor ID=3 shard 集約 Actor ID=5 node Shard Region shard 集約 Actor ID=2 集約 Actor ID=4 クラスター上に ID=1 のアクターは 一つしかいない。 メッセージのトラ ンザクションはア クターが担保する node Controller Shard Coordinator インメモリ状態保持 コマンドによる状態遷移 ドメインイベントの保存 イベント列によるリプレイ ライトに特化した DB を使う。集約ID ごとにパーティショ ニング可能 CardCreated ItemAdded 2 Socks Item 137 ItemAdded 4 Shirts Item 354 CardCreated ItemAdded 2 Socks Item 137 CardCreated ItemAdded 2 Socks Item 137 ItemRemoved 2 Socks Item 137 ItemAdded 4 Shirts Item 354 Shipping Information Added CardCreated CardCreated ItemAdded 2 Socks Item 137 Akka による CQRS+Event Sourcing QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 28 / 38
集約アクターのためのアクターヒエラルキー WalletAggregate Wallet PersistentWalletAggregate WalletAggregates ShardedWalletAggregates WalletAggregate Wallet PersistentWalletAggregate WalletAggregate
Wallet PersistentWalletAggregate WalletAggregates WalletAggregate Wallet PersistentWalletAggregate メッセージブローカー メッセージブローカー 永続化機能の提供 集約機能の提供 シャーディング機能の提供 QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 29 / 38
CQRS+ES はスケーラビリティだけでなくドメインモデリ ングも劇的に変える CQRS+ES システムを組むならAkka がおすすめ まとめ QRS+ES( 再) 入門
突撃!! 隣のアーキテクチャ 30 / 38
( おまけ ) CQRS+ES with Akka Typed QRS+ES( 再) 入門
突撃!! 隣のアーキテクチャ 31 / 38
// 従来のUntyped Actor class MyActor extends Actor { val log
= Logging(context.system, this) def receive = { case "test" => log.info("received test") case _ => log.info("received unknown message") } } val myActor = system.actorOf(Props[MyActor], name = "my-actor") myActor ! "test" // received test myActor ! "abc" // received unknown message myActor ! 1L // received unknown message // Typed Actor object MyActor { def behavior = Beahaviors.setup { ctx => Beahaviors.receiveMessage[String]{ case "test" => ctx.log.info("received test") case _ => ctx.log.info("received unknown message") } } } val myActor = ActorSystem[String](MyActor.behavior, name = "my-actor") myActor ! "test" // received test myActor ! "abc" // received unknown message myActor ! 1L // compile error Akka Typed とは 従来のアクターは送受信するメッセージがAny でしたが、タイプセーフにできるようになりました。 QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 32 / 38
集約アクター WalletAggregate { private def onUninitialized(id: WalletId) = Behaviors.receiveMessage[CommandRequest] {
case CreateWalletRequest(_, walletId, createdAt, replyTo) if walletId == id => replyTo.foreach(_ ! CreateWalletSucceeded) onInitialized(id, Wallet(walletId, Balance.zero)) // ドメイン状態を生成し状態遷移 } private def onInitialized(id: WalletId, wallet: Wallet) = Behaviors.receiveMessage[CommandRequest] { case GetBalanceRequest(_, walletId, replyTo) if walletId == id => replyTo ! GetBalanceResponse(wallet.balance) Behaviors.same // 状態変更なし case DepositRequest(_, walletId, money, instant, replyTo) if walletId == id => wallet.deposit(money, instant) match { // 状態はドメインオブジェクト case Left(t) => replyTo.foreach(_ ! DepositFailed(t.getMessage)) Behaviors.same case Right(newWallet) => replyTo.foreach(_ ! DepositSucceeded) fireEventToSubscribers(WalletDeposited(walletId, money, instant)) onInitialized(id, newWallet, subscribers) } } // アクターの振るまいを返す def behavior(id: WalletId): Behavior[CommandRequest] = onUninitialized(id) } QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 33 / 38
永続化アクターの実装 PersistentWalletAggregate { case class State(childRef: ActorRef[CommandRequest]) def behavior(id: WalletId):
Behavior[CommandRequest] = // 永続化機能だけを提供するプロキシー Behaviors.supervise(Behaviors.setup[CommandRequest] { ctx => val childRef = ctx.spawn(WalletAggregate.behavior(id), WalletAggregate.name(id)) ctx.watch(childRef) EventSourcedBehavior[CommandRequest, Event, State]( persistenceId = PersistenceId("p-" + id.toString), // 集約ID でパーティショニングされる emptyState = State(childRef), commandHandler = { case (state, commandRequest: CommandRequest with ToEvent) => state.childRef ! commandRequest Effect.persist(commandRequest.toEvent) // コマンドに対応するイベントを生成し永続化する }, eventHandler = { (state, event) => state.childRef ! event.toCommandRequest // イベントからコマンドを生成し子アクターの状態を変更する state } ).receiveSignal { case (_, Terminated(c)) if c == childRef => Behaviors.stopped // 子アクターが停止したら自分も停止する。運命共同体 } }).onFailure[Throwable](file:///Users/j5ik2o/Source/slides/totsugeki-tonari-no-architecture/SupervisorStrategy.stop) // 子アクター } QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 34 / 38
メッセージブローカー object WalletAggregates { def behavior(behaviorF: WalletId => Behavior[CommandRequest]): Behavior[CommandRequest]
= { Behaviors.setup { ctx => def createAndSend(walletId: WalletId): ActorRef[CommandRequest] = { ctx.child(WalletAggregate.name(walletId)) match { // 集約ID の名前を持つ子アクターを探す case None => // なければ子アクターを作成 ctx.spawn(behaviorF(walletId), name = WalletAggregate.name(walletId)) case Some(ref) => // あれば子アクターの参照取得 ref.asInstanceOf[ActorRef[CommandRequest]] } } Behaviors.receiveMessage[CommandRequest] { msg => createAndSend(msg.walletId) ! msg // 子アクターにメッセージを転送 Behaviors.same } } } } QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 35 / 38
アクターのクラスターシャーディング object ShardedWalletAggregates { val TypeKey: EntityTypeKey[CommandRequest] = EntityTypeKey[CommandRequest](file:///Users/j5ik2o/Source/slides/totsugeki-tonari-no- private
def behavior(receiveTimeout: FiniteDuration): EntityContext => Behavior[CommandRequest] = { entityContext => Behaviors.setup[CommandRequest] { ctx => // メッセージブローカを生成 val childRef = ctx.spawn(WalletAggregates.behavior(PersistentWalletAggregate.behavior), name = WalletAggregates.name) ctx.setReceiveTimeout(receiveTimeout, Idle) // 指定時間 メッセージを受信しなければIdle をself に送信 Behaviors.receiveMessage[CommandRequest] { case Idle => // ShardRegion に終了を依頼する entityContext.shard ! ClusterSharding.Passivate(ctx.self); Behaviors.same case Stop => // 停止メッセージを受信したら終了する Behaviors.stopped case msg => // メッセージをプロキシーする childRef ! msg; Behaviors.same } } } def initEntityActor(clusterSharding: ClusterSharding, receiveTimeout: FiniteDuration ): ActorRef[ShardingEnvelope[CommandRequest]] = clusterSharding.init( Entity(typeKey = TypeKey, createBehavior = behavior(receiveTimeout)).withStopMessage(Stop) ) } QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 36 / 38
利用例 val system = ActorSystem.wrap(untyped.ActorSystem("wallet-system")) val clusterSharding = ClusterSharding(system) ShardedWalletAggregates.initEntityActor(clusterSharding,
1 hours) val walletId = wallet.newULID val walletRef: EntityRef[WalletProtocol.CommandRequest] = clusterSharding.entityRefFor(ShardedWalletAggregates.TypeKey, walletId.toString) // ローカルかリモートかを気にせず、メッセージ送信 walletRef ! CreateWalletRequest(wallet.newULID, walletId, Instant.now, None) walletRef ! DepositRequest(wallet.newULID, walletId, Money(BigDecimal(100)), Instant.now, None) sys.addShutdownHook { system.terminate() Await.result(system.whenTerminated, Duration.Inf) } 参考資料 Greg Young 流CQRS の和訳版 / Sipadan2003 Greg Young 流CQRS Mark Nijhof / Digtal Romanticism QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 37 / 38
一緒に働くエンジニアを募集しています! http://corp.chatwork.com/ja/recruit/ QRS+ES( 再) 入門 突撃!! 隣のアーキテクチャ 38 / 38