Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
PHPでアクターモデルを活用したSagaパターンの実践法 / php-saga-pattern...
Search
Sponsored
·
Ship Features Fearlessly
Turn features on and off without deploys. Used by thousands of Ruby developers.
→
yuuki takezawa
March 21, 2025
Technology
2.5k
0
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
PHPでアクターモデルを活用したSagaパターンの実践法 / php-saga-pattern-with-actor-model
phperkaigi2025
yuuki takezawa
March 21, 2025
More Decks by yuuki takezawa
See All by yuuki takezawa
なぜAI時代に 「イベント」を中心に考えるのか? / Why focus on "events" in the age of AI?
ytake
4
2.1k
PHP ステートレス VS ステートフル 状態管理と並行性 / php-stateless-stateful
ytake
0
320
PHPでアクターモデルを理解・体験しよう / Understand and experience the actor model in PHP
ytake
2
930
再考 アクターモデル/ reconsider actor model
ytake
0
1.6k
GoとアクターモデルでES+CQRSを実践! / proto_actor_es_cqrs
ytake
1
660
Phluxorでアクターモデルを 理解・体験しよう / toolkit-for-flexible-actor-models-in-php-phluxor
ytake
1
380
オブジェクトのおしゃべり大失敗 メッセージングアンチパターン集 / messaging anti-pattern collection
ytake
2
1.3k
DRE/SREのプラクティス融合によるクラウドネイティブなデータ基盤作り / dre_sre
ytake
0
1k
技術的負債と向き合う取り組みでよかったもの / positive_efforts_to_tackle_technical_debt
ytake
10
4k
Other Decks in Technology
See All in Technology
Claude Codeをどのように キャッチアップしているか
oikon48
6
3.9k
ACE-Step-1.5で見る 音楽生成AIのしくみと“破綻だけ直す”Retake機能の開発【zennfes spring 2026 登壇資料】
personabb
1
130
AIっぽい文章を採点して人間らしく直すアプリを作ってみた
yama3133
2
130
Dario Amodi『Policy on the AI Exponential』を理解する
nagatsu
0
220
Snowflakeと仲良くなる第一歩
coco_se
4
430
MIERUNE JCT 発表資料「宇宙から伊能忠敬ごっこ」
syuchimu
0
210
2026 TECHFRESH 畢業分享會 - 開發日常大解密!從領域驅動到企業級上線
line_developers_tw
PRO
0
790
RSA暗号を手計算したくなること、ありますよね?? (20260615_orestudy6_rsa)
thousanda
0
240
自宅LLMの話
jacopen
1
250
EventBridge Connection
_kensh
5
690
やさしいA2A入門
minorun365
PRO
12
1.7k
Chainlitで作るお手軽チャットUI
ynt0485
0
200
Featured
See All Featured
Optimizing for Happiness
mojombo
378
71k
Put a Button on it: Removing Barriers to Going Fast.
kastner
60
4.3k
The AI Revolution Will Not Be Monopolized: How open-source beats economies of scale, even for LLMs
inesmontani
PRO
3
3.5k
GraphQLの誤解/rethinking-graphql
sonatard
75
12k
Digital Projects Gone Horribly Wrong (And the UX Pros Who Still Save the Day) - Dean Schuster
uxyall
0
1.7k
The Pragmatic Product Professional
lauravandoore
37
7.3k
Mozcon NYC 2025: Stop Losing SEO Traffic
samtorres
1
250
Build your cross-platform service in a week with App Engine
jlugia
234
18k
Designing for Timeless Needs
cassininazir
1
250
Fireside Chat
paigeccino
42
3.9k
Noah Learner - AI + Me: how we built a GSC Bulk Export data pipeline
techseoconnect
PRO
0
200
Agile that works and the tools we love
rasmusluckow
331
21k
Transcript
PHPͰΞΫλʔϞσϧΛ ׆༻ͨ͠ Sagaύλʔϯͷ࣮ફ๏ PHPerKaigi 2025 ytake
Pro fi le • ᖒ ༗و a.k.a ytake • ઍגࣜձࣾ
CTO גࣜձࣾACESɺגࣜձࣾωοτϓϩςΫγϣϯζٕज़ސ • Go / Scala / Kotlin
https://github.com/ytake/ phluxor-saga-example
SagaύλʔϯͬͯͳΜͩʁ
Sagaύλʔϯͱ • ෳͷαʔϏεʢཧతʹ͔Ε͍ͯΔͷʣʹ·͕ͨΔॲཧͰ Ұͭʹ·ͱ·ͬͨτϥϯβΫγϣϯͷΑ͏ʹѻ͏ͨΊͷύλʔϯ • ్தͷॲཧ͕ࣦഊͨ͠߹ɺ γεςϜશମΛໃ६ͷͳ͍ঢ়ଶʹͨ͢Ίͷ ʮิঈτϥϯβΫγϣϯʯΛ࣮ߦ
None
None
None
X
X
ฦ • ϨγʔτΛͱʹɺฦɾฦۚͯ͠Β͏ ߪೖͷࣄ࣮͕͍ͬͯΔ • ͓ళʹΓɺ͓ۚࣗͷखݩʹฦͬͯ͘Δ
࣮ફ͢Δʹɾɾɾʁ
ࣦഊ͔Βͷճ෮ઓུ • ࣦഊͨ͠߹ʹલͷঢ়ଶʹͤΔ͜ͱ • ్தͷॲཧ͕ࣦഊͨ͠߹ɺϦτϥΠ͢Δ • ଞͷϥϯλΠϜͰॲཧ͕ߦΘΕͯໃ६͕ൃੜ͠ͳ͍Α͏ʹ͢Δ
࣮͢Δ߹ • ΠϕϯτιʔγϯάͰલͷঢ়ଶʹͤΔΑ͏ʹ • Πϕϯτঢ়ଶͷεφοϓγϣοτʹ • ෭߹ͤΛߦͬͯঢ়ଶ෮ݩ͢Δܗʹ͠ͳ͍ʂ
None
None
None
ؾΛ͚ͭΑ͏ • ରͷͷҎ֎௨ৗʹՔಇ • ଞͷͷΛελοΫͤ͞ͳ͍Α͏ʹ • ࣌ܥྻ͕લޙ͠ͳ͍Α͏ʹ
ૹۚͰߟ͑ͯΈΑ͏
γφϦΦ
ۚમৼସγφϦΦ • Account1 ͔Β Account2 10ԁΛૹۚ͢ΔγφϦΦ • ޱ࠲͕ϦϞʔταʔϏεͱͯ͠ಈ࡞͢Δ߹ʹى͜Γ͏Δ ϦΫΤετڋ൱ɺΫϥογϡɺϏδʔঢ়ଶɺλΠϜΞτͳͲΛ
ߟྀ
None
ޭ • Account1 ͷߴ 0 ԁɺAccount2 ͷߴ 20 ԁ •
ظ௨Γʹ͓ޓ͍ͷॲཧ͕ਖ਼ৗʹऴΘΔ
ࣦഊ • Account1 ͱ Account2 ͷߴͦΕͧΕ 10 ԁ (Ұ؏ੑͷ͋Δঢ়ଶ) •
ظ௨Γʹ͓ޓ͍ͷঢ়ଶ͕ͱʹΔ
ࣦഊγφϦΦͷରԠ:ϦτϥΠ • Ұ࣌తͳΤϥʔʢϏδʔঢ়ଶɺλΠϜΞτʣʹରͯ͠ɺ ҰఆճϦτϥΠΛࢼΈΔ
ิঈॲཧ (ϩʔϧόοΫ) • Debit ͕ޭ͕ͨ͠ Credit ͕ࣦഊͨ͠߹ɺ Debit ΛऔΓফͨ͢Ίʹ Credit
Λ Account1 ʹ͢
ΤεΧϨʔγϣϯ • ϦτϥΠิঈॲཧ͕͏·͍͔͘ͳ͍߹ɺ खಈհೖ͕ඞཁͱͳΔՄೳੑ͕͋ΔͨΊɺ ΤεΧϨʔγϣϯઓུΛઃ͚Δ
ႈੑ • Account αʔϏε͕ϦτϥΠ࣌ʹಉ͡ϝοηʔδΛ ෳճड͚औͬͯಉ݁͡Ռ͕ಘΒΕΔΑ͏ʹɺ ႈͰ͋Δඞཁ͕͋Γ • ॲཧࡁΈͷϝοηʔδΛอ͠ɺ ॏෳͨ͠ϝοηʔδʹରͯ͠ಉ͡ԠΛฦ͢
ΦʔέετϨʔγϣϯͷSaga࣮
ΞΫλʔϞσϧͰ࣮ફʂ
Phluxor
Similar to akka, pekko and Proto Actor
ɹPHPΛ௨ͯ͡ ΞΫλʔϞσϧ͕ ମݧͰ͖ΔπʔϧΩοτ *༻࣮ફଞݴޠΛ͓નΊ͠·͢
ΞΫλʔϞσϧͷ͓͞Β͍
None
None
None
ߏ • Runner - సૹϓϩηεΛ։࢝͠ɺ݁ՌΛऩू͢ΔओཁͳΞΫλʔ • Account - ߴΛ࣋ͪɺೖۚ/ग़ۚૢ࡞Λॲཧ͢ΔΞΫλʔ •
TransferProcess - ΞΧϯτؒͷૹۚΛ੍ޚ͢ΔΞΫλʔ • AccountProxy - TransferProcessͱAccountΞΫλʔؒͷ௨৴Λհ
None
None
TransferProcessΞΫλʔ
None
TransferProcess ΞΫλʔ • సૹΛཧ͢ΔΛ࣋ͭ • Process Manager • ྃͨ͠εςοϓΛ͠ɺࣦഊ࣌ʹิঈાஔΛద༻ •
BehaviorɺPersistenceɺSupervisionར༻ʂ
Βͳ͍༻ޠ͔Γɾɾ
࣮ફ͍ͨ͠ํͷͨΊʹղઆʂ
Process Manager
None
ΑΓཧղ͢ΔͨΊʹ
Behavior
Behavior • ΞΫλʔ͕ड৴͢Δϝοηʔδʹର͢Δॲཧͷఆٛ • ΞΫλʔ ঢ়ଶΛ࣋ͨͣʢεςʔτϨεʣͰͳ͍ • ΞΫλʔঢ়ଶΛ࣋ͪಈతʹ ৼΔ͍Λมߋ ͢Δ
Become / Unbecome • Become ৽͍͠ৼΔ͍ʹʮมԽʯ͢Δʢྫ: ঢ়ଶભҠΛ͏ಈ࡞ʣ มԽҎ߱ͷϝοηʔδ ৽͍͠ৼΔ͍Ͱॲཧ •
Unbecome ҎલͷৼΔ͍ʹʮΔʯɺ·ͨσϑΥϧτͷಈ࡞ʹϦηοτ
None
None
class TransferProcess implements ActorInterface, PersistentInterface { use Mixin; private bool
$processCompleted = false; private bool $restarting = false; private bool $stopping = false; public function __construct( private readonly Ref $from, private readonly Ref $to, private readonly float $amount, private readonly float $availability, private readonly Behavior $behavior = new Behavior() ) { $this->behavior->become( new ReceiveFunction( fn($context) => $this->starting($context) ) ); }
class TransferProcess implements ActorInterface, PersistentInterface { use Mixin; private bool
$processCompleted = false; private bool $restarting = false; private bool $stopping = false; public function __construct( private readonly Ref $from, private readonly Ref $to, private readonly float $amount, private readonly float $availability, private readonly Behavior $behavior = new Behavior() ) { $this->behavior->become( new ReceiveFunction( fn($context) => $this->starting($context) ) ); } 1FSTJTUFODFΛ͏͜ͱͰɺ ΞΫλʔ͕ࣗͲ͏ͳ͔ͬͨΛอ ͭ·Γ&WFOU4PVSDJOH
class TransferProcess implements ActorInterface, PersistentInterface { use Mixin; private bool
$processCompleted = false; private bool $restarting = false; private bool $stopping = false; public function __construct( private readonly Ref $from, private readonly Ref $to, private readonly float $amount, private readonly float $availability, private readonly Behavior $behavior = new Behavior() ) { $this->behavior->become( new ReceiveFunction( fn($context) => $this->starting($context) ) ); } 1IMVYPSͰ.JYJO5SBJUʹجຊ࣮͋Γ ར༻͢Δͱঢ়ଶ෮ݩͳͲΛࣗಈͰߦ͏
class TransferProcess implements ActorInterface, PersistentInterface { use Mixin; private bool
$processCompleted = false; private bool $restarting = false; private bool $stopping = false; public function __construct( private readonly Ref $from, private readonly Ref $to, private readonly float $amount, private readonly float $availability, private readonly Behavior $behavior = new Behavior() ) { $this->behavior->become( new ReceiveFunction( fn($context) => $this->starting($context) ) ); } ΞΫλʔͷৼΔ͍Λมߋ͢Δ #FIBJWPS
class TransferProcess implements ActorInterface, PersistentInterface { use Mixin; private bool
$processCompleted = false; private bool $restarting = false; private bool $stopping = false; public function __construct( private readonly Ref $from, private readonly Ref $to, private readonly float $amount, private readonly float $availability, private readonly Behavior $behavior = new Behavior() ) { $this->behavior->become( new ReceiveFunction( fn($context) => $this->starting($context) ) ); } ͜ͷΞΫλʔͷॳظঢ়ଶͱͯ͠ TUBSUJOHϝιου͕ಈ͘
None
private function starting(ContextInterface $context): void { if ($context->message() instanceof Started)
{ $context->spawnNamed($this->tryDebit($this->from, -$this->amount), 'DebitAttempt'); $this->persistEvent(new ProtoBuf\TransferStarted()); } } private function tryDebit(Ref $targetActor, float $amount): Props { return Props::fromProducer( fn() => new AccountProxy( $targetActor, fn($sender) => new Debit($amount, $sender) ) ); }
private function starting(ContextInterface $context): void { if ($context->message() instanceof Started)
{ $context->spawnNamed($this->tryDebit($this->from, -$this->amount), 'DebitAttempt'); $this->persistEvent(new ProtoBuf\TransferStarted()); } } private function tryDebit(Ref $targetActor, float $amount): Props { return Props::fromProducer( fn() => new AccountProxy( $targetActor, fn($sender) => new Debit($amount, $sender) ) ); } %FCJUΞΫλʔΛੜ
private function starting(ContextInterface $context): void { if ($context->message() instanceof Started)
{ $context->spawnNamed($this->tryDebit($this->from, -$this->amount), 'DebitAttempt'); $this->persistEvent(new ProtoBuf\TransferStarted()); } } private function tryDebit(Ref $targetActor, float $amount): Props { return Props::fromProducer( fn() => new AccountProxy( $targetActor, fn($sender) => new Debit($amount, $sender) ) ); } సૹ։࢝ΠϕϯτΛอ
private function starting(ContextInterface $context): void { if ($context->message() instanceof Started)
{ $context->spawnNamed($this->tryDebit($this->from, -$this->amount), 'DebitAttempt'); $this->persistEvent(new ProtoBuf\TransferStarted()); } } private function tryDebit(Ref $targetActor, float $amount): Props { return Props::fromProducer( fn() => new AccountProxy( $targetActor, fn($sender) => new Debit($amount, $sender) ) ); } "DDPVOU1SPYZΛੜ͠ɺ ग़ۚϝοηʔδΛૹ৴
Persistence
Persistence • ΞΫλʔͷӬଓԽΛߦ͏ • ΞΫλʔͷϥΠϑαΠΫϧͱରԠ͓ͯ͠Γɺ ىಈ࣌ʹӬଓԽ͞Εͨঢ়ଶʹΞΫλʔΛ෮ݩ͢Δ • δϟʔφϧͱεφοϓγϣοτΛΈ߹Θͤͯ࠷Ͱ෮ݩ͞ΕΔ • ࠷৽ঢ়ଶΛ͚࣋ͭͩɺͰͳ͍ͷ͕ϙΠϯτ
PersistEvent
private function applyEvent(Message $event): void { switch (true) { case
$event instanceof ProtoBuf\TransferStarted: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingDebitConfirmation($context) ) ); break; case $event instanceof ProtoBuf\AccountDebited: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingCreditConfirmation($context) ) ); break; case $event instanceof ProtoBuf\CreditRefused: $this->behavior->become( new ReceiveFunction( fn($context) => $this->rollingBackDebit($context) ) ); break; case $event instanceof ProtoBuf\AccountCredited: case $event instanceof ProtoBuf\DebitRolledBack: case $event instanceof ProtoBuf\TransferFailed: $this->processCompleted = true; break; } }
private function applyEvent(Message $event): void { switch (true) { case
$event instanceof ProtoBuf\TransferStarted: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingDebitConfirmation($context) ) ); break; case $event instanceof ProtoBuf\AccountDebited: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingCreditConfirmation($context) ) ); break; case $event instanceof ProtoBuf\CreditRefused: $this->behavior->become( new ReceiveFunction( fn($context) => $this->rollingBackDebit($context) ) ); break; case $event instanceof ProtoBuf\AccountCredited: case $event instanceof ProtoBuf\DebitRolledBack: case $event instanceof ProtoBuf\TransferFailed: $this->processCompleted = true; break; } } ͍͔ͭ͘ͷΠϕϯτΛӬଓԽͨ͠ޙʹ ࣗΞΫλʔͷৼΔ͍Λมߋ͢Δ
private function applyEvent(Message $event): void { switch (true) { case
$event instanceof ProtoBuf\TransferStarted: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingDebitConfirmation($context) ) ); break; case $event instanceof ProtoBuf\AccountDebited: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingCreditConfirmation($context) ) ); break; case $event instanceof ProtoBuf\CreditRefused: $this->behavior->become( new ReceiveFunction( fn($context) => $this->rollingBackDebit($context) ) ); break; case $event instanceof ProtoBuf\AccountCredited: case $event instanceof ProtoBuf\DebitRolledBack: case $event instanceof ProtoBuf\TransferFailed: $this->processCompleted = true; break; } } ૹ͕ۚ։࢝͢Δͱɺ Ҿ͖མͱ͠ʹؔ͢ΔৼΔ͍ʹมԽ
private function applyEvent(Message $event): void { switch (true) { case
$event instanceof ProtoBuf\TransferStarted: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingDebitConfirmation($context) ) ); break; case $event instanceof ProtoBuf\AccountDebited: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingCreditConfirmation($context) ) ); break; case $event instanceof ProtoBuf\CreditRefused: $this->behavior->become( new ReceiveFunction( fn($context) => $this->rollingBackDebit($context) ) ); break; case $event instanceof ProtoBuf\AccountCredited: case $event instanceof ProtoBuf\DebitRolledBack: case $event instanceof ProtoBuf\TransferFailed: $this->processCompleted = true; break; } } ೖۚʹؔ͢ΔৼΔ͍ʹมԽ
private function applyEvent(Message $event): void { switch (true) { case
$event instanceof ProtoBuf\TransferStarted: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingDebitConfirmation($context) ) ); break; case $event instanceof ProtoBuf\AccountDebited: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingCreditConfirmation($context) ) ); break; case $event instanceof ProtoBuf\CreditRefused: $this->behavior->become( new ReceiveFunction( fn($context) => $this->rollingBackDebit($context) ) ); break; case $event instanceof ProtoBuf\AccountCredited: case $event instanceof ProtoBuf\DebitRolledBack: case $event instanceof ProtoBuf\TransferFailed: $this->processCompleted = true; break; } } ೖ͕ࣦۚഊ͢Δͱ ϩʔϧόοΫͷৼΔ͍ʹ
private function applyEvent(Message $event): void { switch (true) { case
$event instanceof ProtoBuf\TransferStarted: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingDebitConfirmation($context) ) ); break; case $event instanceof ProtoBuf\AccountDebited: $this->behavior->become( new ReceiveFunction( fn($context) => $this->awaitingCreditConfirmation($context) ) ); break; case $event instanceof ProtoBuf\CreditRefused: $this->behavior->become( new ReceiveFunction( fn($context) => $this->rollingBackDebit($context) ) ); break; case $event instanceof ProtoBuf\AccountCredited: case $event instanceof ProtoBuf\DebitRolledBack: case $event instanceof ProtoBuf\TransferFailed: $this->processCompleted = true; break; } } ਖ਼ৗʹೖग़͕ۚྃɺ ϩʔϧόοΫ͕ྃɺ ࢦఆճҎ্ૹۚʹࣦഊ͢Δͱऴྃ
None
Supervision
Supervision • ΞΫλʔࢠؔΛ࣋ͪɺࢠΛࢹ͢ΔΛ࣋ͭ • ࢠΞΫλʔͷΤϥʔൃੜ࣌ɺΞΫλʔ͕ઓུʹج͖ͮ ࢠΞΫλʔͷ࠶ىಈ෮چΛཧ • ͕μϯ͢ΔͱԼͷࢠΞΫλʔμϯ
Supervision Strategies • OneForOneStrategy • AllForOneStrategy • ExponentialBackoffStrategy • RestartStrategy
None
None
readonly class TransferFactory { public function __construct( private ContextInterface $context,
private float $availability, private int $retryAttempts, private ProviderInterface $provider, ) { } public function createTransfer( string $actorName, Ref $fromAccount, Ref $toAccount, float $amount, ): SpawnResult { $props = Props::fromProducer( fn() => new TransferProcess($fromAccount, $toAccount, $amount, $this->availability), Props::withReceiverMiddleware( new EventSourcedFactory($this->provider) ), Props::withSupervisor( new OneForOneStrategy( $this->retryAttempts, new DateInterval('PT10S'), new DefaultDecider(), ) ) ); return $this->context->spawnNamed($props, $actorName); } }
readonly class TransferFactory { public function __construct( private ContextInterface $context,
private float $availability, private int $retryAttempts, private ProviderInterface $provider, ) { } public function createTransfer( string $actorName, Ref $fromAccount, Ref $toAccount, float $amount, ): SpawnResult { $props = Props::fromProducer( fn() => new TransferProcess($fromAccount, $toAccount, $amount, $this->availability), Props::withReceiverMiddleware( new EventSourcedFactory($this->provider) ), Props::withSupervisor( new OneForOneStrategy( $this->retryAttempts, new DateInterval('PT10S'), new DefaultDecider(), ) ) ); return $this->context->spawnNamed($props, $actorName); } } ΞΫλʔੜ࣌ʹ োઓུΛࢦࣔ
readonly class TransferFactory { public function __construct( private ContextInterface $context,
private float $availability, private int $retryAttempts, private ProviderInterface $provider, ) { } public function createTransfer( string $actorName, Ref $fromAccount, Ref $toAccount, float $amount, ): SpawnResult { $props = Props::fromProducer( fn() => new TransferProcess($fromAccount, $toAccount, $amount, $this->availability), Props::withReceiverMiddleware( new EventSourcedFactory($this->provider) ), Props::withSupervisor( new OneForOneStrategy( $this->retryAttempts, new DateInterval('PT10S'), new DefaultDecider(), ) ) ); return $this->context->spawnNamed($props, $actorName); } } 0OF'PS0OF ࢠΞΫλʔͷ͏ͪରͷҰ͚ͭͩ
readonly class TransferFactory { public function __construct( private ContextInterface $context,
private float $availability, private int $retryAttempts, private ProviderInterface $provider, ) { } public function createTransfer( string $actorName, Ref $fromAccount, Ref $toAccount, float $amount, ): SpawnResult { $props = Props::fromProducer( fn() => new TransferProcess($fromAccount, $toAccount, $amount, $this->availability), Props::withReceiverMiddleware( new EventSourcedFactory($this->provider) ), Props::withSupervisor( new OneForOneStrategy( $this->retryAttempts, new DateInterval('PT10S'), new DefaultDecider(), ) ) ); return $this->context->spawnNamed($props, $actorName); } } ඵؒͷ͏ͪࢦఆճ ճ ֘ͷΞΫλʔΛ࠶ىಈ͢Δ
readonly class TransferFactory { public function __construct( private ContextInterface $context,
private float $availability, private int $retryAttempts, private ProviderInterface $provider, ) { } public function createTransfer( string $actorName, Ref $fromAccount, Ref $toAccount, float $amount, ): SpawnResult { $props = Props::fromProducer( fn() => new TransferProcess($fromAccount, $toAccount, $amount, $this->availability), Props::withReceiverMiddleware( new EventSourcedFactory($this->provider) ), Props::withSupervisor( new OneForOneStrategy( $this->retryAttempts, new DateInterval('PT10S'), new DefaultDecider(), ) ) ); return $this->context->spawnNamed($props, $actorName); } } ϝοηʔδΛอͯ͠ ΞΫλʔΛ෮ݩ͢Δ
readonly class TransferFactory { public function __construct( private ContextInterface $context,
private float $availability, private int $retryAttempts, private ProviderInterface $provider, ) { } public function createTransfer( string $actorName, Ref $fromAccount, Ref $toAccount, float $amount, ): SpawnResult { $props = Props::fromProducer( fn() => new TransferProcess($fromAccount, $toAccount, $amount, $this->availability), Props::withReceiverMiddleware( new EventSourcedFactory($this->provider) ), Props::withSupervisor( new OneForOneStrategy( $this->retryAttempts, new DateInterval('PT10S'), new DefaultDecider(), ) ) ); return $this->context->spawnNamed($props, $actorName); } } ࢠΞΫλʔΛ࠶ىಈ͢Δͱɺ ϝοηʔδʹରԠͨ͠ॲཧΛ෦Ͱߦ͍ ࠷৽ঢ়ଶʹ෮ݩ োൃੜ࣌ͷঢ়ଶʹΔʂ
TransferProcess + State Machine + Persistence
TransferProcess ฒߦͰଘࡏ • ֤ग़ۚ͝ͱʹಠཱͯ͠ੜɾঢ়ଶڞ༗͞Εͳ͍ • ࢦఆ͞Εͨϝοηʔδ͕དྷͳ͍ݶΓৼΔ͍มΘΒͳ͍ɾΒͳ͍ • ӬଓԽ͞Ε͍ͯΔͨΊɺ࠶ىಈ͢ΔͱͦΕͧΕͷ࠷৽ঢ়ଶ෮ݩ • Error
Kernel Patternར༻ͰোΛΓ͠
Error Kernel Pattern?
None
AccountΞΫλʔ
AccountΞΫλʔ • ग़ۚͱೖ͕ۚͦΕͧΕݸͷΞΫλʔͰɺ FromAccount{*} / ToAccount{*} ͱಠཱͯ͠ੜ͞ΕΔ • ͦΕͧΕcoroutineͰ੍ޚ͞ΕΔΞΫλʔͰ ϝοηʔδͷΓऔΓΛߦ͏
class Account implements ActorInterface { private float $balance = 10.0;
private array $processedMessages = []; public function __construct( private readonly float $serviceUptime, private readonly float $refusalProbability, private readonly float $busyProbability ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Message\Credit: case $message instanceof Message\Debit: $this->handleBalanceChange($context, $message); break; case $message instanceof Message\GetBalance: $context->respond($this->balance); break; } }
class Account implements ActorInterface { private float $balance = 10.0;
private array $processedMessages = []; public function __construct( private readonly float $serviceUptime, private readonly float $refusalProbability, private readonly float $busyProbability ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Message\Credit: case $message instanceof Message\Debit: $this->handleBalanceChange($context, $message); break; case $message instanceof Message\GetBalance: $context->respond($this->balance); break; } } ΞΫλʔʹϝοηʔδ͕౸ୡ͢Δͱ ࣮ߦ͞ΕΔSFDFJWFϝιου
class Account implements ActorInterface { private float $balance = 10.0;
private array $processedMessages = []; public function __construct( private readonly float $serviceUptime, private readonly float $refusalProbability, private readonly float $busyProbability ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Message\Credit: case $message instanceof Message\Debit: $this->handleBalanceChange($context, $message); break; case $message instanceof Message\GetBalance: $context->respond($this->balance); break; } } ೖग़ۚϝοηʔδʹରԠ
class Account implements ActorInterface { private float $balance = 10.0;
private array $processedMessages = []; public function __construct( private readonly float $serviceUptime, private readonly float $refusalProbability, private readonly float $busyProbability ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Message\Credit: case $message instanceof Message\Debit: $this->handleBalanceChange($context, $message); break; case $message instanceof Message\GetBalance: $context->respond($this->balance); break; } } ߴௐʹؔ͢Δॲཧ
class Account implements ActorInterface { private float $balance = 10.0;
private array $processedMessages = []; public function __construct( private readonly float $serviceUptime, private readonly float $refusalProbability, private readonly float $busyProbability ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Message\Credit: case $message instanceof Message\Debit: $this->handleBalanceChange($context, $message); break; case $message instanceof Message\GetBalance: $context->respond($this->balance); break; } } ߴ֬ೝͷϝοηʔδ͕དྷΔͱ ૹ৴ݩʹฦ٫
private function handleBalanceChange( ContextInterface $context, Message\ChangeBalance $message ): void {
if ($this->alreadyProcessed($message->replyTo)) { $context->send($message->replyTo, $this->processedMessages[(string) $message->replyTo]); return; } if ($message instanceof Message\Debit && ($message->amount + $this->balance) < 0) { $context->send($message->replyTo, new Message\InsufficientFunds()); return; } if ($this->refusePermanently()) { $this->processedMessages[(string) $message->replyTo] = new Message\Refused(); $context->send($message->replyTo, new Message\Refused()); return; } if ($this->isBusy()) { $context->send($message->replyTo, new Message\ServiceUnavailable()); return; } if ($this->shouldFailBeforeProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } usleep(random_int(0, 150) * 1000); $this->balance += $message->amount; $this->processedMessages[(string) $message->replyTo] = new Message\Ok(); if ($this->shouldFailAfterProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } $context->send($message->replyTo, new Message\Ok()); }
private function handleBalanceChange( ContextInterface $context, Message\ChangeBalance $message ): void {
if ($this->alreadyProcessed($message->replyTo)) { $context->send($message->replyTo, $this->processedMessages[(string) $message->replyTo]); return; } if ($message instanceof Message\Debit && ($message->amount + $this->balance) < 0) { $context->send($message->replyTo, new Message\InsufficientFunds()); return; } if ($this->refusePermanently()) { $this->processedMessages[(string) $message->replyTo] = new Message\Refused(); $context->send($message->replyTo, new Message\Refused()); return; } if ($this->isBusy()) { $context->send($message->replyTo, new Message\ServiceUnavailable()); return; } if ($this->shouldFailBeforeProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } usleep(random_int(0, 150) * 1000); $this->balance += $message->amount; $this->processedMessages[(string) $message->replyTo] = new Message\Ok(); if ($this->shouldFailAfterProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } $context->send($message->replyTo, new Message\Ok()); } ߴௐʹؔ͢Δॲཧ
private function handleBalanceChange( ContextInterface $context, Message\ChangeBalance $message ): void {
if ($this->alreadyProcessed($message->replyTo)) { $context->send($message->replyTo, $this->processedMessages[(string) $message->replyTo]); return; } if ($message instanceof Message\Debit && ($message->amount + $this->balance) < 0) { $context->send($message->replyTo, new Message\InsufficientFunds()); return; } if ($this->refusePermanently()) { $this->processedMessages[(string) $message->replyTo] = new Message\Refused(); $context->send($message->replyTo, new Message\Refused()); return; } if ($this->isBusy()) { $context->send($message->replyTo, new Message\ServiceUnavailable()); return; } if ($this->shouldFailBeforeProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } usleep(random_int(0, 150) * 1000); $this->balance += $message->amount; $this->processedMessages[(string) $message->replyTo] = new Message\Ok(); if ($this->shouldFailAfterProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } $context->send($message->replyTo, new Message\Ok()); } ҾͰͦΕͧΕͷঢ়ଶΛ γϛϡϨʔτ
private function handleBalanceChange( ContextInterface $context, Message\ChangeBalance $message ): void {
if ($this->alreadyProcessed($message->replyTo)) { $context->send($message->replyTo, $this->processedMessages[(string) $message->replyTo]); return; } if ($message instanceof Message\Debit && ($message->amount + $this->balance) < 0) { $context->send($message->replyTo, new Message\InsufficientFunds()); return; } if ($this->refusePermanently()) { $this->processedMessages[(string) $message->replyTo] = new Message\Refused(); $context->send($message->replyTo, new Message\Refused()); return; } if ($this->isBusy()) { $context->send($message->replyTo, new Message\ServiceUnavailable()); return; } if ($this->shouldFailBeforeProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } usleep(random_int(0, 150) * 1000); $this->balance += $message->amount; $this->processedMessages[(string) $message->replyTo] = new Message\Ok(); if ($this->shouldFailAfterProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } $context->send($message->replyTo, new Message\Ok()); } ϝοηʔδʹؚ·ΕΔૹ৴ઌʹ ฦ৴
private function handleBalanceChange( ContextInterface $context, Message\ChangeBalance $message ): void {
if ($this->alreadyProcessed($message->replyTo)) { $context->send($message->replyTo, $this->processedMessages[(string) $message->replyTo]); return; } if ($message instanceof Message\Debit && ($message->amount + $this->balance) < 0) { $context->send($message->replyTo, new Message\InsufficientFunds()); return; } if ($this->refusePermanently()) { $this->processedMessages[(string) $message->replyTo] = new Message\Refused(); $context->send($message->replyTo, new Message\Refused()); return; } if ($this->isBusy()) { $context->send($message->replyTo, new Message\ServiceUnavailable()); return; } if ($this->shouldFailBeforeProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } usleep(random_int(0, 150) * 1000); $this->balance += $message->amount; $this->processedMessages[(string) $message->replyTo] = new Message\Ok(); if ($this->shouldFailAfterProcessing()) { $context->send($message->replyTo, new Message\InternalServerError()); return; } $context->send($message->replyTo, new Message\Ok()); } ΞΫλʔ͕ࣗঢ়ଶཧ
AccountProxy ΞΫλʔ • TransferProcessΞΫλʔʹੜ͞ΕΔΞΫλʔ • AccountΞΫλʔͱͷ௨৴Λ୲͢ΔΞΫλʔ • AccountΞΫλʔʹԿ͔͕ى͖ͨ߹ʹͰ͋Δ TransferProcessʹରԠΛҕৡ͢Δʢ࠶ىಈͳͲΛߦ͏ʣ
Error Kernel Pattern • Reactive Design Patterns • োͱ৴པੑͷཁ͕͍݅ࠞͬͯ͟Δ߹ʹ ར༻Ͱ͖Δύλʔϯ(Akka,OTPͳͲͰ͓ͳ͡Έ)
• ఀࢭ͍͚ͯ͠ͳ͍ͷ͕͍ࠞͬͯ͟Δ߹ʹִ͢Δ
None
4BHB࣮ߦ
5SBOTGFS1SPDFTT͕
"DDPVOU1SPYZͱ"DDPVOU ࢠؔͰͳ͍ "DDPVOU1SPYZ͕μϯͯ͠ "DDPVOUӨڹͳ͠
"DDPVOUΞΫλʔͷ ࣮ߦͰ͖ͳ͍ཧ༝Λड৴ ࣗΛμϯͤ͞Δ
ࢠΞΫλʔͷμϯΛݕ ઓུʹج͍ͮͯ "DDPVOU1SPYZΞΫλʔ࠶ੜ ࠶ॲཧ࣮ߦ
௨৴ͱೖग़ۚΛ͍ͯ͠Δύλʔϯ ΞΫλʔϞσϧͳΒͰͷͷ
readonly class AccountProxy implements ActorInterface { public function __construct( private
Ref $target, private Closure $createMessage ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Started: $context->send($this->target, ($this->createMessage)($context->self())); $context->setReceiveTimeout(new DateInterval('PT2S')); break; case $message instanceof Refused: case $message instanceof Ok: $context->cancelReceiveTimeout(); $context->send($context->parent(), $message); break; case $message instanceof InsufficientFunds: case $message instanceof InternalServerError: case $message instanceof ReceiveTimeout: case $message instanceof ServiceUnavailable: throw new RuntimeException('Unexpected message'); } } }
readonly class AccountProxy implements ActorInterface { public function __construct( private
Ref $target, private Closure $createMessage ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Started: $context->send($this->target, ($this->createMessage)($context->self())); $context->setReceiveTimeout(new DateInterval('PT2S')); break; case $message instanceof Refused: case $message instanceof Ok: $context->cancelReceiveTimeout(); $context->send($context->parent(), $message); break; case $message instanceof InsufficientFunds: case $message instanceof InternalServerError: case $message instanceof ReceiveTimeout: case $message instanceof ServiceUnavailable: throw new RuntimeException('Unexpected message'); } } } "DUPS͕ىಈ͢ΔͱྲྀΕͯ͘Δ ෦ϝοηʔδ 4UBSUFE
readonly class AccountProxy implements ActorInterface { public function __construct( private
Ref $target, private Closure $createMessage ) { } public function receive(ContextInterface $context): void { $message = $context->message(); switch (true) { case $message instanceof Started: $context->send($this->target, ($this->createMessage)($context->self())); $context->setReceiveTimeout(new DateInterval('PT2S')); break; case $message instanceof Refused: case $message instanceof Ok: $context->cancelReceiveTimeout(); $context->send($context->parent(), $message); break; case $message instanceof InsufficientFunds: case $message instanceof InternalServerError: case $message instanceof ReceiveTimeout: case $message instanceof ServiceUnavailable: throw new RuntimeException('Unexpected message'); } } } -FU*U$SBTI ͜Ε·Ͱͷ1)1ͱҟͳΔߟ͑ํ
None
None
RunnerΞΫλʔ
None
RunnerΞΫλʔ • SagaΛ࣮ߦ͢Δׂ • Scatter Gather PatternͰ͕ͯ͢ऴΘΔ·Ͱͭ • ࢦఆAccountΞΫλʔͱTransferProcessΞΫλʔੜ •
TransferProcessΞΫλʔͷϦϑΝϨϯεΛRunner͕อ࣋
Scatter Gatherʁ
None
Scatter Gather • ෳʹϒϩʔυΩϟετޙɺฦ৴ΛҰͭʹू͢Δ • ΞΫλʔϞσϧͳΒͰͷύλʔϯ • ϩʔΧϧϦϞʔτͱͷΈ߹ΘͤͰར༻
$inMemoryProvider = $this->inMemoryProvider(); (new ForWithProgress( $this->numberOfIterations, $this->intervalBetweenConsoleUpdates, true, false ))->everyNth(
fn($i) => print("Started {$i}/{$this->numberOfIterations} processes\n"), function ($i, $nth) use ($context, $inMemoryProvider) { $fromAccount = $this->createAccount($context, "FromAccount{$i}"); $toAccount = $this->createAccount($context, "ToAccount{$i}"); $actorName = "Transfer Process {$i}"; $factory = new TransferFactory( $context, $this->uptime, $this->retryAttempts, $inMemoryProvider ); $transfer = $factory->createTransfer($actorName, $fromAccount, $toAccount, 10); $this->transfers[] = (string) $transfer->getRef(); if ($i === $this->numberOfIterations && !$nth) { print("Started {$i}/{$this->numberOfIterations} processes\n"); } } );
$inMemoryProvider = $this->inMemoryProvider(); (new ForWithProgress( $this->numberOfIterations, $this->intervalBetweenConsoleUpdates, true, false ))->everyNth(
fn($i) => print("Started {$i}/{$this->numberOfIterations} processes\n"), function ($i, $nth) use ($context, $inMemoryProvider) { $fromAccount = $this->createAccount($context, "FromAccount{$i}"); $toAccount = $this->createAccount($context, "ToAccount{$i}"); $actorName = "Transfer Process {$i}"; $factory = new TransferFactory( $context, $this->uptime, $this->retryAttempts, $inMemoryProvider ); $transfer = $factory->createTransfer($actorName, $fromAccount, $toAccount, 10); $this->transfers[] = (string) $transfer->getRef(); if ($i === $this->numberOfIterations && !$nth) { print("Started {$i}/{$this->numberOfIterations} processes\n"); } } ); 'SPN"DDPVOU9ͱ5P"DDPVOU9ͱ͍͏໊લͰ "DDPVOUΞΫλʔΛੜ
$inMemoryProvider = $this->inMemoryProvider(); (new ForWithProgress( $this->numberOfIterations, $this->intervalBetweenConsoleUpdates, true, false ))->everyNth(
fn($i) => print("Started {$i}/{$this->numberOfIterations} processes\n"), function ($i, $nth) use ($context, $inMemoryProvider) { $fromAccount = $this->createAccount($context, "FromAccount{$i}"); $toAccount = $this->createAccount($context, "ToAccount{$i}"); $actorName = "Transfer Process {$i}"; $factory = new TransferFactory( $context, $this->uptime, $this->retryAttempts, $inMemoryProvider ); $transfer = $factory->createTransfer($actorName, $fromAccount, $toAccount, 10); $this->transfers[] = (string) $transfer->getRef(); if ($i === $this->numberOfIterations && !$nth) { print("Started {$i}/{$this->numberOfIterations} processes\n"); } } ); 5SBOTGFS1SPDFTTΞΫλʔʹ ̎ͭͷ"DDPVOUΞΫλʔϦϑΝϨϯε "DDPVOUΞΫλʔؒͰ૬ޓॲཧ
None
ΤϯτϦϙΠϯτ • ΞΫλʔγεςϜͷىಈ • RunnerΞΫλʔͷੜͱࢹɺ Runnerͷ࠶ࢼߦʹؔ͢ΔΛ࣋ͭ • ͦΕҎ֎Կͳ͠ʂ
go(function () { $system = ActorSystem::create(); $system->getLogger()->info('Starting'); $numberOfTransfers = 10;
$intervalBetweenConsoleUpdates = 1; $uptime = 99.99; $retryAttempts = 3; $refusalProbability = 0.01; $busyProbability = 0.01; $props = ActorSystem\Props::fromProducer( fn() => new Runner( $numberOfTransfers, $intervalBetweenConsoleUpdates, $uptime, $refusalProbability, $busyProbability, $retryAttempts ), ActorSystem\Props::withSupervisor( new ActorSystem\Strategy\OneForOneStrategy( $retryAttempts, new DateInterval('PT10S'), new ActorSystem\Supervision\DefaultDecider(), ) ), ); $system->root()->spawnNamed($props, 'runner'); });
go(function () { $system = ActorSystem::create(); $system->getLogger()->info('Starting'); $numberOfTransfers = 10;
$intervalBetweenConsoleUpdates = 1; $uptime = 99.99; $retryAttempts = 3; $refusalProbability = 0.01; $busyProbability = 0.01; $props = ActorSystem\Props::fromProducer( fn() => new Runner( $numberOfTransfers, $intervalBetweenConsoleUpdates, $uptime, $refusalProbability, $busyProbability, $retryAttempts ), ActorSystem\Props::withSupervisor( new ActorSystem\Strategy\OneForOneStrategy( $retryAttempts, new DateInterval('PT10S'), new ActorSystem\Supervision\DefaultDecider(), ) ), ); $system->root()->spawnNamed($props, 'runner'); }); 3VOOFSΞΫλʔੜ
go(function () { $system = ActorSystem::create(); $system->getLogger()->info('Starting'); $numberOfTransfers = 10;
$intervalBetweenConsoleUpdates = 1; $uptime = 99.99; $retryAttempts = 3; $refusalProbability = 0.01; $busyProbability = 0.01; $props = ActorSystem\Props::fromProducer( fn() => new Runner( $numberOfTransfers, $intervalBetweenConsoleUpdates, $uptime, $refusalProbability, $busyProbability, $retryAttempts ), ActorSystem\Props::withSupervisor( new ActorSystem\Strategy\OneForOneStrategy( $retryAttempts, new DateInterval('PT10S'), new ActorSystem\Supervision\DefaultDecider(), ) ), ); $system->root()->spawnNamed($props, 'runner'); }); 3VOOFSΞΫλʔʹର͢Δઓུ
go(function () { $system = ActorSystem::create(); $system->getLogger()->info('Starting'); $numberOfTransfers = 10;
$intervalBetweenConsoleUpdates = 1; $uptime = 99.99; $retryAttempts = 3; $refusalProbability = 0.01; $busyProbability = 0.01; $props = ActorSystem\Props::fromProducer( fn() => new Runner( $numberOfTransfers, $intervalBetweenConsoleUpdates, $uptime, $refusalProbability, $busyProbability, $retryAttempts ), ActorSystem\Props::withSupervisor( new ActorSystem\Strategy\OneForOneStrategy( $retryAttempts, new DateInterval('PT10S'), new ActorSystem\Supervision\DefaultDecider(), ) ), ); $system->root()->spawnNamed($props, 'runner'); }); نఆ݅Ҏ্ࣦഊ͢Δͱ ଞͷγεςϜʹΤεΧϨʔτ͢ΔͳͲ͕Մೳ
None
·ͱΊ • ΞΫλʔϞσϧΛ׆༻ͨ͠ิঈτϥϯβΫγϣϯ • ϦτϥΠઓུ • ΤϥʔΧʔωϧύλʔϯͱಜ • BehaviorʹΑΔState Machine
• Persistenceͷ׆༻
Let It Crash