symfony console messenger:consume async A message identifies the task to be done and contains the task's context Message handling can happen asynchronously
} #[AsMessageHandler] class SomeHandler { public function __invoke(SomeMessage $message) { error_log($message->message); } } Handling messages The handling logic is wrapped in another service A serializable data object
of trial email shouldn't be sent now... Envelopes add metadata to the message $endOfTrial = (new \DateTimeImmutable())->modify('+30 days'); $msg = Envelope::wrap($msg)->with(DelayStamp::delayUntil($endOfTrial)); ... but scheduled in 30 days
int $userId) { } } #[AsMessageHandler] class EndOfTrialMessageHandler { public function __invoke(EndOfTrialMessage $message) { // Get the User from DB and send them an email error_log('Processing '.$message->userId); } }
= new MessageBus([ new SendMessageMiddleware( new SendersLocator([ EndOfTrialMessage::class => ['mem'], ], new class(['mem' => fn () => $transport]) implements ContainerInterface { use ServiceLocatorTrait; }, , ), new HandleMessageMiddleware(new HandlersLocator([ EndOfTrialMessage::class => [new EndOfTrialMessageHandler()], ])), ]); PSR Container Implements the interface as an array of callables
the workers'); $receivers = ['mem' => $transport]; $worker = new Worker($receivers, $bus); $worker->run(); Dispatch messages Process incoming messages forever Check it actually works
Envelope::wrap($msg)->with(DelayStamp::delayUntil($cleanupDate)); Is it the best way? 1. Should discard converted trials 2. No need to act precisely at +30 days 3. More e ff i cient as batch
EndOfTrialMessageHandler()], [$schedule], ); $scheduler->run(); Runs forever and will handle messages as per the trigger definition Only useful when using the component standalone
the same Messenger concepts Messages, handlers, envelopes, stamps, ... 2. The scheduler can be integrated with Messenger via a special transport Triggers and message generators 3. Scheduler uses the same Messenger worker: messenger:consume Time limit, memory management, signal handling, ... The beauty of the new component ❤
public function getSchedule(): Schedule { return (new Schedule()) ->add(RecurringMessage::every('1 week', new EndOfTrialMessage())) ; } } This automatically wires the Scheduler with Messenger Like before 1
= 0; $next = new \DateTimeImmutable(); while ($i++ < 20) { $next = $msg->getTrigger()->getNextRunDate($next); if (!$next) { break; } echo $next->format('Y-m-d H:i:s')."\n"; } That's what makes the Scheduler tick!
Messages are handled immediately when generated (like the sync transport) Messages are never "dispatched" They are generated by the transport directly Messages are handled one by one For all recurring messages configured on a schedule Scale by using di ff erent workers for di ff erence schedules Or use more than one worker for a schedule
on time or at the required frequency It's up to you to configure schedules properly Create more than one schedule to con fi gure them di ff erently You can then scale each worker and schedule independently
EndOfTrialCleanupMessage EndOfTrialCleanupMessage 2023-03-24 13:00 handled Next message will be handled just after the current one A message that takes a lot of time to handle 10 minutes 10 minutes Recurring message EndOfTrialCleanupMessage 2023-03-24 13:20 EndOfTrialCleanupMessage handled The backlog keeps growing until the worker is restarted
down SignupEmailMessage for User 1337 handled SignupEmailMessage for User 42 queued SignupEmailMessage for User 42 handled "Regular" message Another message
__construct( private CacheInterface $cache, ) { } public function getSchedule(): Schedule { $msg = new EndOfTrialMessage(); return (new Schedule()) ->add(RecurringMessage::every('2 seconds', $msg)) ->stateful($this->cache) ; } } When missing messages is not an option, Stateful schedules will catch up.
by running 2 workers running at the same Worker 1 Worker 2 handled EndOfTrialCleanupMessage handled Oops, the message is generated and handled twice! 2023-03-24 13:10
__construct( private CacheInterface $cache, private LockFactory $lockFactory, ) { } public function getSchedule(): Schedule { $msg = new EndOfTrialMessage(); return (new Schedule()) ->add(RecurringMessage::every('2 seconds', $msg)) ->stateful($this->cache) ->lock($this->lockFactory->createLock('default-scheduler')) ; } } Don't use a lock without a cache
2 workers running at the same Worker 1 Worker 2 handled not generated The lock prevents the message to be generated multiple times 2023-03-24 13:10 2023-03-24 13:10
A message that takes a lot of time to execute EndOfTrialCleanupMessage 2023-03-24 13:10 not generated The lock prevents the next message to be generated as another is still handled Worker 1 Worker 2
'async'); Message will be handled asynchronously ⚠ Lock is only partially supported The same message is probably not generated twice But several messages can be handled in //
LockFactory $lockFactory, ) { $this->lock = $lockFactory->createLock('update-doc'); } public function __invoke(UpdateDocMessage $message) { if (!$this->lock->acquire(false)) { echo "Doc is already being generated, skip...\n"; return; } try { // do something } finally { $this->lock->release(); } } } Locking schedules Handling the lock in the handler works great!
Symfony Console commands 2. Cron support on Windows? 3. Cron is yet another moving part to monitor 4. Cron max frequency limited to one minute 5. Cron needs tooling for reporting (errors, ...) 6. Cron does not support multiple "workers", locks, states, ... Cron is still probably best for any recurring command not defined in Symfony
nal and provide feedback! ‣ ProcessMessage and ProcessMessageHandler ‣ RecurringRule support for triggers (I have a branch for that) ‣ debug:scheduler command ‣ ... Next Steps? The main concepts and low-level infrastructure are well-defined or don't complain :)
Lock On a single worker Catch up missed messages when downtime Not needed On several workers Not recommended Not recommended Avoid messages to be executed several times