Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Using Redis Streams to Build More Resilient Ser...

Using Redis Streams to Build More Resilient Services

Redis, the swiss army knife of databases, has recieved a new data type with version 5.0 which is worth taking a closer look. Specifically, Redis Streams. In this session Thomas explains how he used Redis Streams to build a more fault-tolerant and scalable service with Symfony and the Symfony Messenger Component.

Thomas Schedler

November 12, 2019
Tweet

More Decks by Thomas Schedler

Other Decks in Programming

Transcript

  1. PRESENTED BY • Co-founder & CEO of Sulu GmbH •

    More than 15 years of experience in web technologies & development • PHP, Symfony, React, SQL, Redis, Elasticsearch, … • Open source enthusiast • Loves cooking and mountains Hi, I’m Thomas Schedler chirimoya [email protected]
  2. PRESENTED BY 1 The story how I came across Redis

    Streams 2 Introduction to Redis Stream commands 3 Real-world application using Redis Streams Agenda:
  3. PRESENTED BY Tell, don’t ask. Generate an optimised read model

    for your service by 
 consuming domain events from a central stream.
  4. PRESENTED BY tail -f -n 1 Probably not the most

    robust implementation
 for a real-world application.
  5. PRESENTED BY In computer science, a stream is a sequence

    of data elements made available over time. “
  6. PRESENTED BY Stream 00 01 02 03 04 05 06

    07 08 09 10 11 12 13 14 15 16 18 17 consumer 1 position consumer 2 position new record truncatable records producer emitting records
  7. PRESENTED BY Assuming you have installed Redis server and client.


    And keep in mind, we need to use the latest version of Redis 5.0. Connect to Redis PHP $redis = new \Redis(); $redis->connect('127.0.0.1', '6379'); CLI > redis-cli -h 127.0.0.1 -p 6379
  8. PRESENTED BY The stream elements can be one or more

    key-value pairs.
 Let's add elements to the stream: Add message to stream PHP $redis->xAdd('mystream', '*', [
 'KEY' => 'VALUE'
 ]); // MESSAGE ID (e.g. 1526919030474-55) $redis->xLen('mystream'); // (integer) 1 CLI > XADD mystream * KEY VALUE # MESSAGE ID (e.g. 1526919030474-55) > XLEN mystream # (integer) 1
  9. PRESENTED BY Read message from one or multiple streams, only

    returning entries with an ID greater than the last received ID reported by the caller. Read message from stream PHP $redis->xRead(['mystream' => 0], 2); // MESSAGES // [
 // 'STREAMNAME' => [
 // 'MESSAGEID' => [
 // 'KEY' => 'VALUE'
 // ],
 // 'MESSAGEID' => [
 // 'KEY' => 'VALUE' CLI > XREAD COUNT 2 STREAMS mystream 0 # MESSAGES # STREAMNAME [
 # MESSAGEID [
 # KEY
 # VALUE
 # ]
  10. PRESENTED BY In order to avoid polling at a fixed

    or adaptive interval, the command is able to block until new entries get pushed into the requested stream. Block for data PHP $redis->xRead(['mystream' => '$'], 1, 0); // MESSAGES // [
 // 'STREAMNAME' => [
 // 'MESSAGEID' => [
 // 'KEY' => 'VALUE'
 // ]
 // ]
 // ] CLI > XREAD BLOCK 0 STREAMS mystream $ # MESSAGES # STREAMNAME [
 # MESSAGEID [
 # KEY
 # VALUE
 # ]
  11. PRESENTED BY All clients are served with all the entries

    arriving in a stream. How can we scale reads?
  12. PRESENTED BY Consumer Groups 00 01 02 03 04 05

    06 07 08 09 10 11 12 13 14 15 16 18 17 group a consumer 1 position group b consumer 2 position new record truncatable records group a consumer 2 position group b consumer 1 position
  13. PRESENTED BY Redis stream provides the concept of consumer groups,

    allowing multiple consumers to process the same stream to implement load balancing. Create a Consumer Group PHP $redis->xGroup(
 'CREATE', 
 'mystream',
 'mygroup'
 ); // 1/0 CLI > XGROUP CREATE mystream mygroup $ # 1/0
  14. PRESENTED BY Redis stream provides the concept of consumer groups,

    allowing multiple consumers to process the same stream to implement load balancing. Read data via Consumer Group PHP $redis->xReadGroup(
 'mygroup', 'myconsumer',
 ['mystream' => 0]
 ); // MESSAGES $redis->xReadGroup(
 'mygroup', 'myconsumer',
 ['mystream' => '>']
 ); CLI > XREADGROUP
 GROUP mygroup mycomsumer
 STREAMS mystream 0 # MESSAGES > XREADGROUP
 GROUP mygroup mycomsumer
 STREAMS mystream >
  15. PRESENTED BY Consumer groups require explicit acknowledgement of the messages

    successfully processed by the consumer. Data processed confirmation PHP $redis->xAck(
 'mystream', 
 'mygroup', 
 ['message-id']
 ); // 1/0 CLI > XACK mystream mygroup message-id # 1/0
  16. PRESENTED BY • Identify the message data that has been

    delivered but not confirmed • Change the owner of these data and redeliver Failure Processing PHP $redis->xPending('mystream', 'mygroup'); // MESSAGE IDs $redis->xClaim(
 'mystream', 
 'mygroup',
 'myconsumer',
 3600,
 ['message-id']
 ); CLI > XPENDING mystream mygroup # MESSAGE IDs > XCLAIM mystream mygroup myconsumer 3600 message-id
  17. PRESENTED BY Worked pretty good out of the box, but

    the API is quite low-level. Let's search for a high-level abstraction.
  18. PRESENTED BY • The Messenger component helps applications send and

    receive messages — message bus • By default, messages are handled as soon as they are dispatched • If you want to handle a message asynchronously, you can configure a transport Symfony Messenger Component
  19. declare(strict_types=1); namespace HandcraftedInTheAlps\Bundle\RedisTransportBundle\Transport; use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportInterface; class RedisStreamTransportFactory implements

    TransportFactoryInterface { public function createTransport(string $dsn, array $options): TransportInterface { $parsedUrl = parse_url($dsn); ... return new RedisStreamTransport($parsedUrl['host'], $parsedUrl['port'], ...); } public function supports(string $dsn, array $options): bool { return 0 === mb_strpos($dsn, 'redis-stream://'); } }
  20. PRESENTED BY Real-world showcase — Ticketing Application Ticketing Application Ticket

    Access Order Event ... Access System REST API Message Bus Contingent
  21. PRESENTED BY Tell, don’t ask. Domain Events are used to

    capture things that can trigger 
 a change to the state of your application.
  22. class DomainEventMessage { ... public function __construct( string $name, string

    $type, string $id, array $payload = [], ?\DateTimeImmutable $created = null ) { $this->name = $name; $this->type = $type; $this->id = $id; $this->payload = $payload; $this->created = $created ?? new \DateTimeImmutable(); } ... }
  23. public function cancelTicket(Ticket $ticket): void { $this->workflowRegistry->get($ticket)->apply($ticket, Ticket::TRANSITION_CANCEL); $newTicketId =

    Uuid::uuid4()->toString(); $this->createTicket($newTicketId, $ticket); $this->domainEventCollector->push( new DomainEventMessage( Ticket::class, 'canceled', $ticket->getId(), [ 'newTicketId' => $newTicketId, ] ) ); }
  24. class TicketCanceledDomainEventMessageHandler extends AbstractTicketDomainEventMessageHandler { public function __invoke(DomainEventMessage $message): void

    { if ('canceled' !== $message->getType() || Ticket::class !== $message->getName()) { return; } $ticketIds = [$message->getId()]; $newTicketId = $message->getPayload()['newTicketId'] ?? ''; if ($newTicketId) { $ticketIds[] = $newTicketId; } else { $this->logger->error('Could not find newTicketId in canceled for access system.', [ 'canceledTicketId' => $message->getId(), ]); } $this->syncTickets($ticketIds); } }
  25. PRESENTED BY When ~ argument is used, the trimming is

    performed only when Redis is able to remove a whole macro node. Trim Data from the stream PHP $redis->xTrim('mystream', 1000, true); CLI > XTRIM mystream MAXLEN ~ 1000
  26. class DomainEventHandler implements MessageHandlerInterface { /** * @var ElasticsearchStorage */

    private $storage; public function __construct(ElasticsearchStorage $storage) { $this->storage = $storage; } public function __invoke(DomainEventMessage $message): void { $this->storage->store($message); } }