Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Zenoh Advanced Features Workshop

Zenoh Advanced Features Workshop

Explore the advanced features of Zenoh guided by our team, unlocking new options for high-performance and scalable applications.

Topics Covered:
- Protocol Updates: Exploring the latest enhancements to the Zenoh protocol and their implications for performance and reliability
- Shared Memory: Leveraging shared memory for ultra-low-latency communication and efficient data handling
- Interceptors: Understanding how to customise and extend Zenoh’s functionality with interceptors.
- Serialization/Deserialization Framework: Delving into Zenoh’s flexible and powerful framework for optimising data encoding and decoding.

Avatar for Angelo Corsaro

Angelo Corsaro

May 09, 2025
Tweet

Video

Transcript

  1. CEO / CTO ZettaScale Technology Angelo Corsaro, PhD Zenoh’s Advanced

    Features Architect ZettaScale Technology Olivier Hécart
  2. Broadcast Router Peer Peer Router Data fi ltering put() Data

    SUB B/* Peer SUB A/* Peer SUB C/* Peer PUB A/1 PUB W/1
  3. Zenoh v0.x Router Peer Peer Router SUB B/* Peer SUB

    A/* Peer SUB C/* Peer PUB A/1 PUB W/1 Data fi ltering put() Data Subscribtions ← Sub A/* ← Sub B/* ← Sub C/* ← Sub A/* ← Sub B/* ← Sub C/* ← Sub A/* ← Sub B/* ← Sub C/*
  4. Zenoh v1.x using put() & get() Router Peer Peer Router

    SUB B/* Peer SUB A/* Peer SUB C/* Peer PUB A/1 PUB W/1 Data fi ltering put() Data Subscribtions Data ← Sub A/* ← Sub B/* ← Sub C/*
  5. Zenoh v1.x using declare_publisher() & declare_querier() Router Peer Peer Router

    SUB B/* Peer SUB A/* Peer SUB C/* Peer PUB A/1 PUB W/1 Data fi ltering publisher.put() Data Subscribtions Subscription fi ltering declare_publisher() Subscribtions Interest Interest A/1 → ← Sub A/* ← Sub A/* ← Sub B/* ← Sub C/* Interest W/1 →
  6. Subscribing to Subscriptions - Dominic Cobb Interest Message 7 6

    5 4 3 2 1 0 +-+-+-+-+-+-+-+-+ |Z|Mod|INTEREST | +-+-+-+---------+ ~ id:z32 ~ +---------------+ |A|M|N|R|T|Q|S|K| if Mod!=Final +---------------+ ~ key_scope:z16 ~ if Mod!=Final && R==1 +---------------+ ~ key_suffix ~ if Mod!=Final && R==1 +---------------+ && N==1 -- <u8;z16> ~ [int_exts] ~ if Z==1 +---------------+ - Mode 0b00: Final - Mode 0b01: Current - Mode 0b10: Future - Mode 0b11: CurrentFuture - if K==1 interest for key expressions - if S==1 interest for subscribers - if Q==1 interest for queryables - if T==1 interest for liveliness tokens - if R==1 key expression, else all key expressions - if N==1 key expr has name/suffix - if M==1 sender mapping, else receiver mapping - if A==1 aggregated replies
  7. Interest Behavior A B | INTEREST | |------------------>| -- Mode:

    CurrentFuture | | Target: Subscribers | | | DECL SUBSCRIBER | |<------------------| | DECL SUBSCRIBER | |<------------------| | DECL SUBSCRIBER | |<------------------| | | | DECL FINAL | |<------------------| -- interest_id field set | | | DECL SUBSCRIBER | |<------------------| | UNDECL SUBSCRIBER | |<------------------| | | | ... | | | | INTEREST FINAL | |------------------>| -- Mode: Final | | Stops subscriber decls/undecls | |
  8. Cold Start Improvements Zenoh v1.1.1 Single network interface scan Gossip

    target option Minimize routes precomputation Zenoh v1.2.1 Lazy routes computation main branch Auto-connect strategy option
  9. Gossip Scouting Peer Peer Router Peer Rendez-vous point Peer Multiple

    rendez-vous points can be used for fault tolerance i
  10. Gossip Scouting Peer Peer Router Peer Rendez-vous point Peer Peer

    f00d tcp/x.x.x.x:x Peer c00l tcp/z.z.z.z:z
  11. Gossip Con fi guration scouting: { gossip: { enabled: true,

    multihop: false, autoconnect: { router: [], peer: ["router", “peer"] }, }, },
  12. Gossip Target Peer Peer Router Rendez-vous point Peer scouting: {

    gossip: { enabled: true, /// Which type of Zenoh instances /// to send gossip messages to. target: { router: ["router", “peer"], peer: ["router"], }, }, }, Gossip No Gossip since Zenoh v1.1.1
  13. Connectivity Status and Events https://github.com/eclipse-zenoh/roadmap/blob/main/rfcs/ALL/Connectivity Status and Events.md Status: get

    @/*/session/** Events: sub @/*/session/** key: @/<local_zid>/session/transport/unicast/<remote_zid> kind: PUT/DELETE value: { zid: <remote_zid>, whatami: router, is_qos: true, is_shm: true }
  14. Matching Status and Events https://github.com/eclipse-zenoh/roadmap/blob/main/rfcs/ALL/Matching Status.md let publisher = session.declare_publisher("key/expression").await.unwrap();

    let matching_subscribers: bool = publisher.matching_status().await.unwrap().matching_subscribers(); let publisher = session.declare_publisher("key/expression").await.unwrap(); let matching_listener = publisher.matching_listener().await.unwrap(); while let Ok(matching_status) = matching_listener.recv_async().await { if matching_status.matching_subscribers() { println!("Publisher has matching subscribers."); } else { println!("Publisher has NO MORE matching subscribers."); } } Status: Events:
  15. Liveliness Tokens https://github.com/eclipse-zenoh/roadmap/blob/main/rfcs/ALL/Liveliness.md let token = session.liveliness().declare_liveliness("group1/member1").unwrap(); let tokens =

    session.liveliness().get("group1/*").unwrap(); while let Ok(token) = tokens.recv() { if let Ok(sample) = token.sample { println!("Alive token '{}'", sample.key_expr.as_str(),), } } Declare token: Get tokens: let subscriber = session.liveliness().declare_subscriber("group1/*").unwrap(); while let Ok(change) = subscriber.recv() { match change.kind { SampleKind::Put => println!("Alive token '{}'", change.key_expr.as_str()), SampleKind::Delete => println!("Dropped token '{}'", change.key_expr.as_str()), } } Sub to tokens:
  16. Shared Memory Highlights No topological constraints Distributed memory management Safe

    mutability Shared memory Providers Custom Layout Minimal system and network overhead for both normal and fail- recovery operations
  17. Shared Memory Architecture SHM Backend SHM Provider POSIX GPU …

    let backend = PosixShmProviderBackend::builder() .with_size(65536) .unwrap() .wait() .unwrap(); let provider = ShmProviderBuilder::builder() .protocol_id::<POSIX_PROTOCOL_ID>() .backend(backend) .wait(); let buf = provider.alloc(512).wait().unwrap(); Application can allocate shared memory bu ff er from selected providers/backend. There are no limitations on the number of provider/ backend used by an application
  18. Layout Layout can be speci fi ed if required let

    layout = provider .alloc(512) .with_alignment(AllocAlignment::new(2).unwrap()) .into_layout() .unwrap(); let buf = layout.alloc().wait().unwrap()
  19. Garbage Collection & Defragmentation Garbage collection and defragmentation can be

    triggered explicitly, as well as controlled via policies through the allocation let a_buf = buffer_layout .alloc() .with_policy::<BlockOn<Defragment<GarbageCollect>>>() .wait() .unwrap(); let b_buf = buffer_layout .alloc() .with_policy::<Deallocate<1000, Defragment<GarbageCollect>>>() .wait() .unwrap();
  20. Pub/Sub/Query with Shared Memory Buffers A Shared Memory bu ff

    er can be used with no distinction from regular bu ff ers with any Zenoh API — put, get, reply, query attachment, etc. Zenoh under-the-hood decides how this a should be “communicated” to relevant parties Zenoh understand when the bu ff er can be “shared” or when it needs to be sent over the network or to a process that is not on the same memory domain Receiving applications, do not need to know they are dealing with a shared memory bu ff er — unless this is important to them
  21. Interceptor Framework Zenoh is equipped with an interceptor framework that

    gives the ability to intercept and operate on protocol messages on ingress and egress At the current stage available interceptors are de fi ned at compile time and activated by con fi guration Eth Wi f Serial Zenoh Eth Wi f Serial ingress egress Interceptors
  22. Downsampling Interceptors One of the available interceptors allows to do

    fl ow-control on an interface basis (ingress/egress) downsampling: [ { "id": "en0FlowControl", interfaces: [ "en0" ], flow: "egress", rules: [ { key_expr: “demo/*", freq: 1.0 }, ], }, ], Eth Wi f Zenoh Eth Wi f ingress egress Interceptors
  23. Access Control Interceptor The Access Control Interceptor allows to control

    the fl ow of zenoh messages based on interfaces and certi fi cate names Router Router Subscriber sensor/** Publisher sensor/temp Subscriber sensor/** Publisher sensor/temp Put Put Put Put Put Put ACL con fi gured ACL con fi gured
  24. Access Control Interceptor The Access Control Interceptor allows to control

    the fl ow of zenoh messages based on interfaces and certi fi cate names Router Router Subscriber sensor/** sensor/temp Subscriber sensor/** sensor/temp Put Put Put Put ACL con fi gured ACL con fi gured Put Put
  25. Example Flow Control downsampling: [ { "id": "en0FlowControl", interfaces: [

    "en0" ], flow: "egress", rules: [ { key_expr: “demo/*", freq: 1.0 }, ], }, ],
  26. Zenoh Serialization — Rationale There is no lack of great

    serialization frameworks and Zenoh serialization does not try to replace your favorite from general purpose use cases The main reasons to add basic support for serialization in Zenoh where: - Ensuring that you don’t need to add in another dependency in deeply embedded platforms - Providing a certi fi ed stack that included su ffi cient elements of serialization - Batteries included for those that are just starting
  27. What is Supported? Zenoh serialization provides time and wire e

    ff i cient support for: Primitive Types: integer, fl oating points, boolean string Container Types: Tuple, Arrays, Vectors and Maps<K,V> — for any K and V for which serialization is supported
  28. Simple API z_serialize/z_deserialize can be used to serialize let input

    = 1234_u32; let payload = z_serialize(&input); let output: u32 = z_deserialize(&payload).unwrap(); assert_eq!(input, output); // Vec let input = vec![0.0f32, 1.5, 42.0]; let payload = z_serialize(&input); let output: Vec<f32> = z_deserialize(&payload).unwrap(); assert_eq!(input, output); // HashMap let mut input: HashMap<u32, String> = HashMap::new(); input.insert(0, String::from("abc")); input.insert(1, String::from("def")); let payload = z_serialize(&input); let output: HashMap<u32, String> = z_deserialize(&payload).unwrap(); assert_eq!(input, output); // Tuple let input = (0.42f64, "string".to_string()); let payload = z_serialize(&input); let output: (f64, String) = z_deserialize(&payload).unwrap(); assert_eq!(input, output); // Array (handled as variable-length sequence, not as tuple) let input = [0.0f32, 1.5, 42.0]; let payload = z_serialize(&input); let output: [f32; 3] = z_deserialize(&payload).unwrap(); assert_eq!(input, output); // can also be deserialized as a vec let output: Vec<f32> = z_deserialize(&payload).unwrap(); assert_eq!(input.as_slice(), output);