Reactive in Reverse, a guest talk by Daniel Spiewak

The New York Times Developers invited Daniel Spiewak to give a guest talk at on reactive programming at TimesOpen, our public developer event series. Daniel is a software developer based out of Boulder, CO. Over the years, he has worked with Java, Scala, Ruby, C/C++, ML, Clojure and several experimental languages. He currently spends most of his free time researching parser theory and methodologies, particularly areas where the field intersects with functional language design, domain-specific languages and type theory.

The New York Times Developers

September 10, 2014

    • Akka streams • "Coreactive" streams • Haskell's Kmett's Machines • scalaz-stream
    • Akka streams • "Coreactive" streams • Haskell's Kmett's Machines • scalaz-stream
    • Multi-output trivial; multi-input hard Pull vs Push
    • Multi-output trivial; multi-input hard • Pull streams • "Turn the crank" from the end and request data • Multi-output hard; multi-input trivial Pull vs Push
    design • More intuitive control flow (imperatively) Pull vs Push
    design • More intuitive control flow (imperatively) • Pull streams • Backpressure is trivial (it "just works") • More declarative control, which can be weird Pull vs Push
    Process[Task, A] • A strict sequence of actions
    immediately • No more memory leaks!
    immediately • No more memory leaks! • Easy to move tasks between thread pools
    immediately • No more memory leaks! • Easy to move tasks between thread pools • Better thread utilization
    immediately • No more memory leaks! • Easy to move tasks between thread pools • Better thread utilization • Explicit parallelism
    | 1 => Task now 1 case n => { for { x <- fib(n - 1) y <- fib(n - 2) } yield x + y } } fib(42).run
    | 1 => Task now 1 case n => { val ND = Nondeterminism[Task] for { pair <- ND.both(fib(n - 1), fib(n - 2)) (x, y) = pair } yield x + y } } fib(42).run
    => f onComplete { case Success(v) => cb(\/.right(v)) case Failure(e) => cb(\/.left(v)) } } }
    => f onComplete { case Success(v) => cb(\/.right(v)) case Failure(e) => cb(\/.left(v)) } } }
  16. Concepts: Process • An ordered sequence of actions • Ask

    for an action…then the next…then the next • If you can't keep up, you ask less frequently • Easy to merge (just ask for data from either "side") • Explicit parallelism
    Task delay { val svc = url(s"http://api.stuff.com/record/$num") Task fork futureToTask(Http(svc OK as.String)) } fetch.join }
  18. val nums: Process[Task, Int] = Process.range(0, 10) val adjusted =

    nums map { _ * 2 } filter { _ < 10 } val pages = adjusted flatMap { num => Process.eval(fetchUrl(num)) } val found = pages find { _ contains "Waldo!" } val stuff: Task[Unit] = found to io.stdOutLines run stuff.run
    Int] = Process.range(11, 20) val nums: Process[Task, Int] = nums1 interleave nums2 ...
  20. val i = new AtomicInteger val read = Task delay

    { i.getAndIncrement() } val src = Process.eval(read).repeat val left = src map { i => s"left: $i" } val right = src map { i => s"right: $i" } left interleave right to io.stdOutLines
  21. left: 0 right: 1 left: 2 right: 3 left: 4

    right: 5 left: 6 right: 7 left: 8 right: 9 left: 10 right: 11 left: 12 right: 13 ...
    wimp val read: Task[Message] = Task delay { queue.take() } val src: Process[Task, Message] = Process.eval(read).repeat ... // bounded queues are for wimps...
    to a channel • Writing to disk
    to a channel • Writing to disk • …or all of the above
    to a channel • Writing to disk • …or all of the above • What is a sink anyway?
    to a channel • Writing to disk • …or all of the above • What is a sink anyway? • A stream of functions!
  27. def write(str: String): Task[Unit] = Task delay { println(str) }

    val sink: Sink[Task, String] = Process.constant(write _) val src = Process.range(0, 10) map { _.toString } val results = src zip sink flatMap { case (str, f) => Process eval f(str) } val universe: Task[Unit] = results.run
    = ... val src = Process.range(0, 10) map { _.toString } val results = src zip stdOut zip channel flatMap { case ((str, f1), f2) => { for { _ <- Process eval f1(str) _ <- Process eval f2(str) } yield () } } val universe: Task[Unit] = results.run
    = ... val src = Process.range(0, 10) map { _.toString } val results = src observe stdOut to channel val universe: Task[Unit] = results.run
    Racing two streams into one • Turning a stream "sideways"
    Racing two streams into one • Turning a stream "sideways" • Almost everything implemented on top of wye
  32. wye

    = ... val merged: Process[Task, Message] = left.wye(right)(wye.merge)
    = ... val merged: Process[Task, Message] = left merge right // should be "race"
    = ... // oh NOES! teh symbols cometh! val merged: Process[Task, Message \/ Line] = left either right
    nums map { _ * 2 } filter { _ < 10 } val pages = adjusted flatMap { num => Process.eval(fetchUrl(num)) }
    nums map { _ * 2 } filter { _ < 10 } val pages: Process[Task, Task[String]] = adjusted map { num => fetchUrl(num) } val parallel: Process[Task, String] = pages.gather(4)
    chunk of stream may be truncated • Great for finite streams!
    chunk of stream may be truncated • Great for finite streams! • Causes DEADLOCK on infinite streams
    chunk of stream may be truncated • Great for finite streams! • Causes DEADLOCK on infinite streams • Don't use if you source from a queue!
    nums map { _ * 2 } filter { _ < 10 } val pages: Process[Task, Process[Task, String]] = adjusted map { num => Process.eval(fetchUrl(num)) } val parallel: Process[Task, String] = merge.mergeN(pages)
    Process • Uses a variable bounded queue
    Process • Uses a variable bounded queue • Races all input streams
    Process • Uses a variable bounded queue • Races all input streams • Up to n at a time
    Process • Uses a variable bounded queue • Races all input streams • Up to n at a time • Almost always what you really want
    OSS soon™! • Would also work with scalaz-nio
    OSS soon™! • Would also work with scalaz-nio • Uses scodec
    OSS soon™! • Would also work with scalaz-nio • Uses scodec • Use this. Use it. It's amazing.
    OSS soon™! • Would also work with scalaz-nio • Uses scodec • Use this. Use it. It's amazing. • Demonstrates the power of Process abstraction
    Pipe inbound data to a relay queue • Pipe relay queue into the outbound channel • …including all history!
    Pipe inbound data to a relay queue • Pipe relay queue into the outbound channel • …including all history! • Continue until client closes connection
    handlers = Netty server address map { client => for { Exchange(src, sink) <- client in = src to relay.publish out = relay.subscribe to sink _ <- in merge out } yield () } val server: Task[Unit] = merge.mergeN(handlers).run
    server (as UTF-8) • Pipe server response to standard output
    server (as UTF-8) • Pipe server response to standard output • Continue until user fail-sauce Ctrl-C kills us
    = { val decoder = decode.many[String] val encoder = encode.many[String] val Exchange(src, sink) = ex val src2 = src flatMap decoder.decode val sink2 = sink pipeIn encoder.encoder Exchange(src2, sink2) }
    Exchange(src, sink) = transcode(rawData) in = src to io.stdOutLines out = io.stdInLines to sink _ <- in merge out } yield () val client: Task[Unit] = clientP.run
    is pure and encapsulated from networking
    is pure and encapsulated from networking • Backpressure "just works" (sort of)
    is pure and encapsulated from networking • Backpressure "just works" (sort of) • Our Topic is unbounded, because I'm lazy
    is pure and encapsulated from networking • Backpressure "just works" (sort of) • Our Topic is unbounded, because I'm lazy • Handshaking would be almost trivial
    is pure and encapsulated from networking • Backpressure "just works" (sort of) • Our Topic is unbounded, because I'm lazy • Handshaking would be almost trivial • Client and server logic looks almost the same!
    understand complex logic! • No more puzzling about state or resource leaks
    understand complex logic! • No more puzzling about state or resource leaks • Simple and easy combinators scale well
    understand complex logic! • No more puzzling about state or resource leaks • Simple and easy combinators scale well • You know almost everything you need