Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Кто отправит outbox? Валентин Удальцов, автор к...

Кто отправит outbox? Валентин Удальцов, автор канала Пых

Доклад с митапа Пыхап от канала Пых и Lamoda Tech

Валентин Удальцов, автор канала Пых, рассказывает, как эффективно отправлять сообщения, сохранённые вместе со стейтом

Avatar for Lamoda Tech

Lamoda Tech

June 24, 2025
Tweet

More Decks by Lamoda Tech

Other Decks in Technology

Transcript

  1. 2

  2. 4

  3. 13

  4. function handled(string $queue, string $messageId): bool { $result = $connection->execute(

    <<<SQL insert into deduplicator (message_id, queue) values (?, ?) on conflict (message_id, queue) do nothing SQL, [$messageId, $queue], ); return $result->affectedRows === 0; } 15
  5. 29

  6. 31

  7. 32

  8. 34

  9. incoming_message_id queue outgoing_messages 01976697-7324-70bf… y.events [{...}, {...}, …] 01976697-7324-70bf… z.events

    [{...}, {...}, …] 01976697-abc7-7025… z.events [{...}, {...}, …] … … … 35
  10. 36

  11. begin transaction; insert into deduplicator (message_id, queue) values (?, ?)

    on conflict (message_id, queue) do nothing; -- if (not handled) { -- handle insert into outbox (incoming_message_id, queue, outgoing_messages) values (?, ?, ?); -- else { select outgoing_messages from outbox where incoming_message_id = ?; -- } commit; -- publish events delete from outbox where incoming_message_id = ?; 42
  12. select outgoing_messages from outbox where incoming_message_id = ?; -- if

    (does not exist) { begin transaction; -- handle insert into outbox (incoming_message_id, queue, outgoing_messages) values (?, ?, ?); commit; -- } -- publish events update outbox set outgoing_messages = '[]' where incoming_message_id = ?; 45
  13. 49

  14. 50

  15. 52

  16. 54

  17. 55

  18. 56

  19. 58

  20. 59

  21. 60

  22. Настраиваем базу данных create database app; \c app create table

    outbox ( message_id uuid primary key, message jsonb ); alter table outbox replica identity full; create publication sequin_pub for table outbox; select pg_create_logical_replication_slot('sequin_slot', 'pgoutput'); 61
  23. Настраиваем Sequin databases: - name: app username: postgres password: postgres

    hostname: postgres database: app port: 5432 slot_name: sequin_slot publication_name: sequin_pub 65
  24. Настраиваем Sequin sinks: - name: outbox database: app source: include_table_names:

    - public.outbox actions: - insert transform: record.message destination: type: rabbitmq host: rabbitmq port: 5672 exchange: events 66
  25. 67

  26. 69

  27. 70

  28. 71

  29. Материалы • Udi Dahan: Reliable Messaging Without Distributed Transactions •

    NServiceBus: Outbox • NServiceBus: Transactional session • Лекция про MessageBus • Debezium • Sequin • Демо-проект c outbox на Sequin 72