Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Symfony Messenger et ses messages: à la queleuleu…. Et s’il était temps de grouper ?

Symfony Messenger et ses messages: à la queleuleu…. Et s’il était temps de grouper ?

Symfony Messenger est un outil qui s’avère très utile dès lors qu’il s’agit de faire transiter des informations. Par le biais de ce qu’il considère comme des messages et à l’aide de différents concepts, ce composant nous a grandement aidé à lisser la charge d’informations à gérer par le biais d’un traitement direct ou délayé ou encore à communiquer avec différentes tierces parties.

Traitant par défaut les messages un par un par le biais de ce qu’on appelle un handler, ce fonctionnement n’est pas « l’idéal » dans toutes les situations: des problèmes pourraient nous amener à devoir au contraire grouper.

Comment Symfony Messenger peut nous aider à adapter ce fonctionnement de base et répondre donc à ce besoin?

Dans ce talk, nous découvrirons une manière élégante d’y répondre via un combo gagnant, sans oublier d'aborder le pourquoi et le comment on l’utiliserait. On approchera également les différentes problématiques qu'il pourrait générer et comment les contourner si besoin, le tout, basé sur un cas d'utilisation concret.

a_guilhem

May 24, 2023
Tweet

More Decks by a_guilhem

Other Decks in Programming

Transcript

  1. Qu’allons nous voir ? Symfony Messenger: quelques rappels Système de

    batch: comment ça marche Et d’un point de vue pratique ?
  2. Quelques notions - Message: Tout objet PHP qui puisse être

    sérialisé - Bus: Où les messages sont dispatchés - Middleware - Stamps - Handlers: Service en charge de traiter un type de message - Transport: Permet de stocker le message en attente d'un traitement ultérieur (async) - Worker: Consommer les messages du transport - Events built-in
  3. Asynchrone ou Synchrone, faites votre choix ! - Différents types

    de transport - Une configuration rapide - Possibilité de créer son propre transport - Un « In memory » bien utile pour les tests transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: … check_delayed_interval: retry_strategy: max_retries: 3 multiplier: 2 failed: 'doctrine://default? queue_name=failed' sync: 'sync://' test: 'in-memory://'
  4. Au niveau du worker WorkerMessageReceivedEvent Boucle/Envelope/ Receiver Handle Ack shouldStop

    ? Dispatch events respectifs aux situations Acked or rejected by receiver Dispatch events correspondants
  5. Ne sautons pas non plus trop vite sur les conclusions

    ! Dépend du contexte d’utilisation Pas de bon ou mauvais processus en soi Dépend de la logique métier contenu dans le(s) handler(s) et de son but et de la fréquence/nombre des messages dispatchés Exemple: limitation du nombre d’appels à une API, nécessité d’identifier rapidement les doublons, grouper les envois d’email similaires, améliorer les performances et ou requêtes en base de données etc ….
  6. …qui sera mergée dans Symfony 5.4 - Traiter plusieurs messages

    à la fois - Introduit le combo « Trait-Interface » BatchHandlerInterface - BatchHandlerTrait - Pas exclusif du traitement en mode synchrone - Sait différencier si à batcher ou non
  7. interface BatchHandlerInterface { /** * @param Acknowledger|null $ack The function

    to call to ack/ nack the $message * * @return mixed The number of pending messages in the batch if $ack is not null */ // public function __invoke(object $message, Acknowledger $ack = null): mixed; /** * Flushes any pending buffers. * * @param bool $force Whether flushing is required; it can be skipped if not */ public function flush(bool $force): void; } BatchHandlerInterface Par exclusif du mode par défaut
  8. BatchHandlerTrait Message Message + ? Acknowledger Rassemble & identifie si

    mode batch Pas Acknowledger Doit implémenter méthode abstraite du trait *: [jobs] Appelle Vérifie Peut surcharger en 5.4 Peut surcharger depuis 6.3 Pour déterminer taille batch Si oui
  9. Prise en charge du mode par défaut par le BatchHandlerTrait

    Quid sur le fait de prendre également en charge le mode par défaut dans le trait ?
  10. Batch Messages Quid du nombre de messages dispatchés avant de

    se mettre en idle inférieur au nombre sur lequel il flush ? Mais ne vous inquiétez pas ! Ça le prend en charge
  11. Batch ou pas batch : une vérification au niveau HandleMessageMiddleware

    * * … * handleMessageMiddleware Bus Si handler de type BatchHandlerInterface + AckStamp Et passe le message en premier argument Le passe en deuxième argument
  12. Buffered ou flushed : le HandleMessageMiddleware le sait …… Avec

    un Acknowledger en 2nd argument Is acknowledged or not ? Si pas « ack » ajoute un NoAutoAckStamp Is acknowledged si message a été traité (ack or nack) dans le handler
  13. Un ajout d’information utile au worker Lorsqu’il va faire un

    contrôle de la situation après dispatch des messages dans le bus pour être appréhendés Distinction avec ou sans NoAutoAckStamp « Acks » « unacks » Faire un état des lieux + dispatch des events correspondants Enlève le AckStamp
  14. Et les « unacks » alors ? Quand shouldStop ou

    quand worker idle « unacks » Avec FlushBatchHandlerStamp direction le HandleMessageMiddleware Pour noAutoAckStamps Appelle flush méthode du handler correspondant
  15. Et en pratique ça donne quoi ? - A la

    base des fichiers sont importés régulièrement et contiennent des demandes d’aides qui sont dispatchées au fur et à mesure - Les demandeurs peuvent: - appartenir à un groupe identifié - Être sans groupe mais acceptent d’être ‘aidés’ en même temps que d’autres personnes - Veulent que leur cas soit traité individuellement Le but est d’envoyer les demandes à ceux qui sont prêt à aider et d’éviter les doublons / répétitions sur un même groupe
  16. Configuration messenger messenger: failure_transport: failed transports: async: dsn: ‘%env(MESSENGER_TRANSPORT_DSN)%' options:

    … retry_strategy: … failed: 'doctrine://default?queue_name=failed' sync: 'sync://' routing: App\Messenger\Message\GatherByGroupRequestMessage: sync App\Messenger\Message\GatherSameGroupMessage: async App\Messenger\Message\SendRequestAsLoneWolfMessage: sync
  17. Le message pour « async » transport class GatherSameGroupMessage {

    public function __construct( private readonly int $id, private readonly string $name,… ) {} /** * @return int */ public function getId(): int { return $this->id; } /** * @return string */ public function getName(): string { return $this->name; } // et tous les autres getters nécessaires }
  18. Un des messages pour « sync » transport class SendRequestAsLoneWolfMessage

    { public function __construct( private readonly int $id, private readonly string $sector,… ) {} /** * @return int */ public function getId(): int { return $this->id; } /** * @return string */ public function getSector(): string { return $this->sector; } // autres getters nécessaires }
  19. Handler pour message individuel class SendRequestAsLoneWolfHandler implements MessageHandlerInterface { public

    function __construct(…){} public function __invoke(SendQuestRequestAsLoneWolfMessage $message) { $requester = $this->requestRepository->find($message->getId()); $referent = []; // check if has a referent. If referent not found, seek by sector // send 1 email for all available helpers + notif sent to user by email and/ or phone. $this->requestRepository->updateOnceSentRequest([$message->getId()]); } }
  20. Pas de batch pour les messages de type « lone

    wolf »? On pourrait… Mais aucun avantage particulier/significatif à gagner
  21. class GatherSameGroupHandler implements BatchHandlerInterface { use BatchHandlerTrait; // construct method

    public function __invoke(GatherSameGroupMessage $groupMessage, Acknowledger $ack = null) { return $this->handle($groupMessage, $ack); } private function process(array $jobs): void { $result = $this->groupRequestCollection; foreach ($jobs as $element) { $result->add($element); } $this->groupRequestCollection->checkFullGroup(); } } Messages à batcher async mode: handler
  22. Messages à batcher async mode: handler Process method Par nom

    groupe/ secteurs etc et éviter doublons Si oui pour les requêtes/groupes identifiés Ack ou nack pour chacun des messages constituant groupe
  23. Messages à batcher async mode: handler Process method Si worker

    idle ou arrêté => sera flush Avec aide des workers events il a été décidé de pouvoir flusher peu importe limite atteinte en forçant sur 0 Mais pb car peut être palier non atteint Finalise les deux paliers de process méthode En orange: mécanisme par défaut
  24. Batch pour async transport: avantages - Identifier et former facilement

    des groupes et éviter des doublons - Avoir l’assurance que même si certains groupes sont incomplets avant que le worker se mette en idle ou stoppe ils seront flushés - Ack ou nack pertinent quand sont vraiment traités - Mettre à jour tout le monde d’un coup et s’éviter des contraintes de limite - Un gain de performance
  25. Messages à batcher sync mode (sans transport) $this->bus->dispatch( new GatherByGroupRequestMessage(

    $request->getId(), $request->getSector(), … ), ************ ); $this->bus->dispatch( new GatherByGroupRequestMessage( $request->getId(), $request->getSector(), … ), [new AckStamp(static function (): void {})] ); Marche mais messages appréhendés un par un Messages appréhendés en batch
  26. Pas de transport Retour sur le quid du nombre de

    messages dispatchés avant de se mettre en idle inférieur au nombre sur lequel il flush ? Comment pourront ils être donc flush ? Messages à batcher sync mode (sans transport)
  27. On force le flush ! class GatherByGroupRequestHandler implements BatchHandlerInterface {

    use BatchHandlerTrait; … public function forceFlushAndPurgeLeft() { $this->flush(true); } } Force le flush du trait
  28. Pour conclure - Pour les cas qui lui convienne on

    en sort de grands avantages: - performance - souplesse/recul pour mieux appréhender le flux du message et sa manipulation - s’éviter des contraintes que le traitement par défaut ne résout pas - Mais si pas nécessité, pas obligé de l’utiliser Comme toujours une bonne balance !
  29. Mais, vous savez, moi je ne crois pas qu’il y

    ait de bon ou de mauvais processus en soi. Moi, si je devais résumer la vie de Symfony Messenger aujourd’hui avec vous, je dirais que c’est d’abord des recherches, une documentation qui m’a tendu la main, peut-être à un moment où je ne pouvais pas, où j’étais seule dans mes pensées. Et c’est assez curieux de se dire que les hasards, les frustrations, les documentations forgent une destinée… Parce que quand on a le goût de la chose, quand on a le goût de la chose bien faite, le beau geste, parfois on ne trouve pas le processus en face, je dirais, le processus qui vous aide à avancer. Alors ce n’est pas mon cas, comme je le disais là, puisque moi au contraire, j’ai pu ; et je dis Merci au batch de Messenger, je dis merci à la communauté Symfony… Et pour finir …