Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Server-Sent Events in Go

Oleg Kovalov
February 20, 2020

Server-Sent Events in Go

Oleg Kovalov

February 20, 2020
Tweet

More Decks by Oleg Kovalov

Other Decks in Programming

Transcript

  1. Me - Gopher for ~4 years - Open source contributor

    - Engineer at allegro.pl core team Website: https://olegk.dev Twitter: @oleg_kovalov Github: @cristaloleg 2
  2. - Senior Software Engineer - Server-Sent Events - Streaming SIMD

    Extensions - Shanghai Stock Exchange - Small-scale enterprise What is SSE? 7
  3. - Senior Software Engineer - Server-Sent Events - Streaming SIMD

    Extensions - Shanghai Stock Exchange - Small-scale enterprise - Skyrim Special Edition - ... What is SSE? 8
  4. - Pull - Hey, Server, do you have something? -

    Push - Hey, Client, I have something! Real-time data delivery 13
  5. - Pull - Hey, Server, do you have something? -

    Push - Hey, Client, I have something! - Bidirectional - Server <-> Client Real-time data delivery 14
  6. - Pull - Hey, Server, do you have something? -

    Push - Hey, Client, I have something! - Bidirectional - Server <-> Client - Unidirectional - Server -> Client or Client -> Server Real-time data delivery 15
  7. - Pull - Hey, Server, do you have something? -

    Push - Hey, Client, I have something! - Bidirectional - Server <-> Client - Unidirectional - Server -> Client or Client -> Server - ... Real-time data delivery 16
  8. - Long/short Polling - WebSocket - Server-Sent Events - MQTT

    and sooooo on... There are few ways 21
  9. WebSocket 26 - Made in 2008 - Bidirectional Server-Client protocol

    - Supported by all browsers (even IE) - And binary data
  10. - Made in 2006 - Simple Server-Client protocol over HTTP

    - Only Server -> Client direction Server-Sent Events? 30
  11. - Made in 2006 - Simple Server-Client protocol over HTTP

    - Only Server -> Client direction - Supported by all browsers (sorry, no IE) Server-Sent Events? 31
  12. - Made in 2006 - Simple Server-Client protocol over HTTP

    - Only Server -> Client direction - Supported by all browsers (sorry, no IE) - No binary data Server-Sent Events? 32
  13. - Made in 2006 - Simple Server-Client protocol over HTTP

    - Only Server -> Client direction - Supported by all browsers (sorry, no IE) - No binary data Debut in Opera, September 1, 2006 Server-Sent Events? 33
  14. Flow - GET /sse - <Do an Upgrade procedure> -

    Content-Type: “text/event-stream” - <Push the data> 38
  15. Flow - GET /sse - <Do an Upgrade procedure> -

    Content-Type: “text/event-stream” - <Push the data> Other checks if you want, it stills a HTTP 39
  16. - ID - uniqueness - Event - as a type

    SSE protocol is simple 42
  17. - ID - uniqueness - Event - as a type

    - Data - main part SSE protocol is simple 43
  18. - ID - uniqueness - Event - as a type

    - Data - main part - Retry - refresh rate SSE protocol is simple 44
  19. - ID - uniqueness - Event - as a type

    - Data - main part - Retry - refresh rate id: 54\n event: hey-you\n data: yes that’s a data\n data: <whatever-you-want>\n\n SSE protocol is simple 45
  20. - `Last-Event-ID` header - A last known ID for client

    - Decide on your own what to do next A cozy thing 46
  21. http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { stream, err := sse.UpgradeHTTP(r,

    w) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } Let’s upgrade to SSE 49
  22. http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { stream, err := sse.UpgradeHTTP(r,

    w) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } var data struct { Text string `json:"text"` Timestamp int64 `json:"ts"` } Let’s upgrade to SSE 50
  23. http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { stream, err := sse.UpgradeHTTP(r,

    w) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } var data struct { Text string `json:"text"` Timestamp int64 `json:"ts"` } for { data.Timestamp = time.Now().Unix() stream.WriteJSON(data) time.Sleep(time.Second) } }) Let’s upgrade to SSE 51
  24. func UpgradeHTTP(r *http.Request, w http.ResponseWriter) (*Stream, error) { hj, ok

    := w.(http.Hijacker) if !ok { return nil, ErrNotHijacker } Inside the UpgradeHTTP 53
  25. func UpgradeHTTP(r *http.Request, w http.ResponseWriter) (*Stream, error) { hj, ok

    := w.(http.Hijacker) if !ok { return nil, ErrNotHijacker } // Hijack() (net.Conn, *bufio.ReadWriter, error) // // The Hijacker interface is implemented by ResponseWriters // that allow an HTTP handler to take over the connection. Inside the UpgradeHTTP 54
  26. func UpgradeHTTP(r *http.Request, w http.ResponseWriter) (*Stream, error) { hj, ok

    := w.(http.Hijacker) if !ok { return nil, ErrNotHijacker } // Hijack() (net.Conn, *bufio.ReadWriter, error) // // The Hijacker interface is implemented by ResponseWriters // that allow an HTTP handler to take over the connection. // // After a call to Hijack, the original Request.Body must not // be used. Inside the UpgradeHTTP 55
  27. func UpgradeHTTP(r *http.Request, w http.ResponseWriter) (*Stream, error) { hj, ok

    := w.(http.Hijacker) if !ok { return nil, ErrNotHijacker } _, bw, err := hj.Hijack() if err != nil { retu rn nil, err } Inside the UpgradeHTTP 56
  28. func UpgradeHTTP(r *http.Request, w http.ResponseWriter) (*Stream, error) { hj, ok

    := w.(http.Hijacker) if !ok { return nil, ErrNotHijacker } _, bw, err := hj.Hijack() if err != nil { retu rn nil, err } httpWriteResponseUpgrade(bw.Writer) Inside the UpgradeHTTP 57
  29. func UpgradeHTTP(r *http.Request, w http.ResponseWriter) (*Stream, error) { hj, ok

    := w.(http.Hijacker) if !ok { return nil, ErrNotHijacker } _, bw, err := hj.Hijack() if err != nil { retu rn nil, err } httpWriteResponseUpgrade(bw.Writer) if err := bw.Flush(); err != nil { return nil, err } return &Stream{bw: bw}, nil } Inside the UpgradeHTTP 58
  30. const ( textStatusLine = "HTTP/1.1 200\r\n" colonAndSpace = ": "

    ) func httpWriteResponseUpgrade(bw *bufio.Writer) { bw.WriteString(textStatusLine) httpWriteHeader(bw, "Cache-Control", "no-cache") httpWriteHeader(bw, "Content-Type", "text/event-stream") bw.WriteString(crlf) } Improvise. Adopt. BufferedWriter. 63
  31. const ( textStatusLine = "HTTP/1.1 200\r\n" colonAndSpace = ": "

    ) func httpWriteResponseUpgrade(bw *bufio.Writer) { bw.WriteString(textStatusLine) httpWriteHeader(bw, "Cache-Control", "no-cache") httpWriteHeader(bw, "Content-Type", "text/event-stream") bw.WriteString(crlf) } func httpWriteHeader(bw *bufio.Writer, key, value string) { bw.WriteString(key) bw.WriteString(colonAndSpace) bw.WriteString(value) bw.WriteString(crlf) } Improvise. Adopt. BufferedWriter. 64
  32. func UpgradeHTTP(*http.Request, http.ResponseWriter) (*Stream, error) {...} type Stream struct {

    bw *bufio.ReadWriter } func (s *Stream) SetEvent(event string) error { data := []byte("event:" + event + "\n") _, err := s.bw.Write(data) return err } Stream from a user perspective 68
  33. func UpgradeHTTP(*http.Request, http.ResponseWriter) (*Stream, error) {...} type Stream struct {

    bw *bufio.ReadWriter } func (s *Stream) SetEvent(event string) error { data := []byte("event:" + event + "\n") _, err := s.bw.Write(data) return err } func (s *Stream) WriteBytes(data []byte) error { return s.writeData(data) } Stream from a user perspective 69
  34. func UpgradeHTTP(*http.Request, http.ResponseWriter) (*Stream, error) {...} type Stream struct {

    bw *bufio.ReadWriter } func (s *Stream) SetEvent(event string) error { data := []byte("event:" + event + "\n") _, err := s.bw.Write(data) return err } func (s *Stream) WriteBytes(data []byte) error { return s.writeData(data) } and other methods: SetRetry, WriteJSON, WriteFloat... Stream from a user perspective 70
  35. <!DOCTYPE html> <html> <head> <title>SSE Demo</title> <script type="module" defer> //

    next slide, I promise </script> </head> <body> <pre id="contentbox">Waiting for data…<br></pre> </body> </html> BRUTAL MINIMALISM 72
  36. <!DOCTYPE html> <html> <head> <title>SSE Demo</title> <script type="module" defer> //

    next slide, I promise </script> </head> <body> <pre id="contentbox">Waiting for data…<br></pre> </body> </html> BRUTAL MINIMALISM 73 Google: This is a motherf***ing website.
  37. let contentbox = document.querySelector("#contentbox") let src = new EventSource("http://127.0.0.1:31337/sse") src.addEventListener("close",

    e => { src.close(); contentbox.innerText += "close connection\n"; }); A bit of JavaScript, no node_modules! 76
  38. let contentbox = document.querySelector("#contentbox") let src = new EventSource("http://127.0.0.1:31337/sse") src.addEventListener("close",

    e => { src.close(); contentbox.innerText += "close connection\n"; }); src.addEventListener("message", e => { let data = JSON.parse(e.data) contentbox.innerText += data.time + "\n"; }); A bit of JavaScript, no node_modules! 77
  39. Who needs TCP when we’re almost in HTTP3 era? 81

    - Better resource control - Less allocations
  40. Who needs TCP when we’re almost in HTTP3 era? 82

    - Better resource control - Less allocations - Mechanical sympathy
  41. Who needs TCP when we’re almost in HTTP3 era? 83

    - Better resource control - Less allocations - Mechanical sympathy See Github: - valyala/fasthttp - HTTP/1.1 - gobwas/ws - WebSocket
  42. Simple Go HTTP server ... func main() { http.Handle("/", yoHandler)

    http.ListenAndServe(":31337", nil) } func yoHandler(w http.ResponseWriter, r *http.Request) { io.WriteString(w, "hey ʕ◔ϖ◔ʔ") } 84
  43. // Serve accepts incoming connections on the Listener l, creating

    a // new service goroutine for each. The service goroutines read requests and // then call srv.Handler to reply to them. // func (srv *Server) Serve(l net.Listener) error { // ... for { rw, e := l.Accept() // ... c := srv.newConn(rw) go c.serve(connCtx) } } But what happens inside? 85
  44. func main() { ln, err := net.Listen("tcp", ":8080") if err

    != nil { log.Fatal(err) } defer ln.Close() for { conn, err := ln.Accept() if err != nil { log.Println(err) continue } go handle(conn) } } Well, it’s just a TCP server 86
  45. func main() { ln, err := net.Listen("tcp", ":8080") if err

    != nil { log.Fatal(err) } defer ln.Close() for { conn, err := ln.Accept() if err != nil { log.Println(err) continue } go handle(conn) // or just an Upgrade! } } Well, it’s just a TCP server 87
  46. func (u Upgrader) Upgrade(conn io.ReadWriter) (*Stream, error) { br :=

    bufio.NewReaderSize(conn, 1000) bw := bufio.NewWriterSize(conn, 1000) rl, err := readLine(br) if err != nil { return nil, err } req, err := httpParseRequestLine(rl) if err != nil { return nil, err } // ... } Low-level Upgrader 88
  47. func (u Upgrader) Upgrade(conn io.ReadWriter) (*Stream, error) { br :=

    bufio.NewReaderSize(conn, 1000) bw := bufio.NewWriterSize(conn, 1000) rl, err := readLine(br) if err != nil { return nil, err } req, err := httpParseRequestLine(rl) if err != nil { return nil, err } // ... } Low-level Upgrader 89
  48. func (u Upgrader) Upgrade(conn io.ReadWriter) (*Stream, error) { // ...

    for err == nil { line, errRead := readLine(br) if errRead != nil { return nil, errRead } if len(line) == 0 { break // Blank line, no more lines to read. } k, v, ok := httpParseHeaderLine(line) if !ok { err = errors.New("ErrMalformedRequest"); break } // ... } } Low-level Upgrader (cont.) 90
  49. - That’s basically all - Stream works with bufio.Writer -

    Which is independent of a type of a Writer Wanna cool thing? 94
  50. - Good question - Partially available - https://github.com/cristalhq/sse - Eventually

    everything will be on Github Where is the code Lebowski? 98
  51. - Good question - Partially available - https://github.com/cristalhq/sse - Eventually

    everything will be on Github - I mean code... Where is the code Lebowski? 99
  52. - Make better open source - Simple API - No

    dependencies (mostly) cristaltech? cristalhq? wut? 103
  53. - Make better open source - Simple API - No

    dependencies (mostly) - Learn new things & have fun cristaltech? cristalhq? wut? 104
  54. - Make better open source - Simple API - No

    dependencies (mostly) - Learn new things & have fun - Don’t forget to rest sometimes cristaltech? cristalhq? wut? 105
  55. - Aliaksandr Valialkin (fasthttp) - Sergey Kamardin (ws) - One

    of the best Go libraries (IMO, but who cares) https://github.com/valyala/fasthttp https://github.com/gobwas/ws Thanks 106