Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Conquering concurrency with functional programming

Conquering concurrency with functional programming

Some people claim functional programming is useless because in the end there will be side effects - or else your program won't do anything useful. That's not true, and as it turns out purely functional programming is really good at solving some real-world problems, like concurrency.
I'll talk about how shared mutable state, queues, and streams can be used in purely functional programs, and why such a solution might be preferred over over the classical ways of managing concurrent state.

Avatar for Jakub Kozłowski

Jakub Kozłowski

April 06, 2019
Tweet

More Decks by Jakub Kozłowski

Other Decks in Programming

Transcript

  1. C O N Q U E R I N G

    C O N C U R R E N C Y W I T H F U N C T I O N A L P R O G R A M M I N G J A K U B K O Z Ł O W S K I
  2. State, state... cats.data.State? case class State[S, A](run: S => (S,

    A)) ...does not work with concurrency *simplified*
  3. Shared state in pure FP: when a state monad won't

    do https://vimeo.com/294736344 "Passing something from a function to another is the most sequential thing you can think of" - Fabio Labella
  4. ???

  5. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = { if (!cart.contains(item)) cart = cart.appendItem(item) } }
  6. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = { if (!cart.contains(item)) cart = cart.appendItem(item) } } userCart.put(new Cart.Item(data)) //item isn't in cart yet //item added Thread 1
  7. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = { if (!cart.contains(item)) cart = cart.appendItem(item) } } userCart.put(new Cart.Item(data)) //item isn't in cart yet //item added Thread 1 userCart.put(new Cart.Item(data)) //item isn't in cart yet //item added Thread 2
  8. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = { if (!cart.contains(item)) cart = cart.appendItem(item) } } userCart.put(new Cart.Item(data)) //item isn't in cart yet //item added Thread 1 userCart.put(new Cart.Item(data)) //item isn't in cart yet //item added Thread 2 userCart.getSize() // ???
  9. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = { if (!cart.contains(item)) cart = cart.appendItem(item) } } Solution 1: locks
  10. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = { if (!cart.contains(item)) cart = cart.appendItem(item) } } Solution 1: locks
  11. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = this.synchronized { if (!cart.contains(item)) cart = cart.appendItem(item) } } Solution 1: locks Not atomic
  12. class UserCart { private var cart = Cart.Empty def getSize():

    Int = cart.size def put(item: Cart.Item): Unit = this.synchronized { if (!cart.contains(item)) cart = cart.appendItem(item) } } Solution 1: locks Not atomic ·Blocking ·Easy to break (e.g. deadlocks) ·Non-compositional
  13. object CartActor { case object GetSize case class Put(item: Cart.Item)

    } class CartActor extends Actor { private var cart = Cart.Empty import CartActor._ def receive: Receive = { case GetSize => sender() ! cart.size case Put(item) => if (!cart.contains(item)) cart = cart.appendItem(item) } } Solution 2: actor model
  14. object CartActor { case object GetSize case class Put(item: Cart.Item)

    } class CartActor extends Actor { private var cart = Cart.Empty import CartActor._ def receive: Receive = { case GetSize => sender() ! cart.size case Put(item) => if (!cart.contains(item)) cart = cart.appendItem(item) } } Solution 2: actor model · Low level, advanced construct · Imposed "push" mindset · Complex in testing
  15. object CartActor { case object GetSize case object Clear case

    class Put(item: Cart.Item) } class CartActor extends Actor { private var cart = Cart.Empty import CartActor._ def receive: Receive = { case GetSize => sender() ! cart.size case Put(item) => if (!cart.contains(item)) cart = cart.appendItem(item) case Clear => cart = Cart.Empty } } Solution 3: atomic references class UserCart { private val cartRef = new AtomicReference(Cart.Empty) def getSize(): Long = cartRef.get().size def put(item: Cart.Item): Unit = cartRef.updateAndGet { cart => if (!cart.contains(item)) cart.appendItem(item) else cart } }
  16. Solution 3: atomic references class UserCart { private val cartRef

    = new AtomicReference(Cart.Empty) def getSize(): Long = cartRef.get().size def put(item: Cart.Item): Unit = cartRef.updateAndGet { cart => if (!cart.contains(item)) cart.appendItem(item) else cart } } ·Side-effecting ·Synchronous updates only ·Shared state is implicit
  17. Solution 3: pure atomic references class UserCart { private val

    cartRef = new AtomicReference(Cart.Empty) def getSize(): Long = cartRef.get().size def put(item: Cart.Item): Unit = cartRef.updateAndGet { cart => if (!cart.contains(item)) cart.appendItem(item) else cart } }
  18. Solution 3: pure atomic references class UserCart private(cartRef: Ref[IO, Cart])

    { val getSize: IO[Long] = cartRef.get.map(_.size) def put(item: Cart.Item): IO[Unit] = cartRef.update { cart => if (!cart.contains(item)) cart.appendItem(item) else cart } } object UserCart { val create: IO[UserCart] = Ref[IO].of(Cart.Empty).map { new UserCart(_) } }
  19. Solution 3: pure atomic references class UserCart private(cartRef: Ref[IO, Cart])

    { val getSize: IO[Long] = cartRef.get.map(_.size) def put(item: Cart.Item): IO[Unit] = cartRef.update { cart => if (!cart.contains(item)) cart.appendItem(item) else cart } } object UserCart { val create: IO[UserCart] = Ref[IO].of(Cart.Empty).map { new UserCart(_) } } ·Still only synchronous updates (which is actually kinda cool*)
  20. Solution 3: pure atomic references class UserCart private(cartRef: Ref[IO, Cart])

    { val getSize: IO[Long] = cartRef.get.map(_.size) def put(item: Cart.Item): IO[Unit] = cartRef.update { cart => if (!cart.contains(item)) cart.appendItem(item) else cart } } object UserCart { val create: IO[UserCart] = Ref[IO].of(Cart.Empty).map { new UserCart(_) } } ·Still only synchronous updates (which is actually kinda cool*)
  21. Solution 3: pure atomic references class UserCart private(cartRef: Ref[IO, Cart])

    { val getSize: IO[Long] = cartRef.get.map(_.size) def put(item: Cart.Item): IO[Unit] = cartRef.update { cart => if (!cart.contains(item)) cart.appendItem(item) else cart } } object UserCart { val create: IO[UserCart] = Ref[IO].of(Cart.Empty).map { new UserCart(_) } } ·Still only synchronous updates (which is actually kinda cool*)
  22. Replacing any or all occurrences of an expression x in

    a program p with the value of x doesn't change the program. Example val pi = 3 //precise approximation (pi + 1, pi)
  23. Replacing any or all occurrences of an expression x in

    a program p with the value of x doesn't change the program. Example val pi = 3 //precise approximation (pi + 1, pi)
  24. Replacing any or all occurrences of an expression x in

    a program p with the value of x doesn't change the program. Example val pi = 3 //precise approximation (pi + 1, pi) == (3 + 1, 3)
  25. val msg = StdIn.readLine() (msg, msg) == (StdIn.readLine(), StdIn.readLine()) Referential

    transparency is broken with side effects! val msg = Future(StdIn.readLine()) (msg, msg) == (Future(StdIn.readLine()), Future(StdIn.readLine())) (yes, Future too!)
  26. ...but it works with IO/Task/ZIO val msg = IO(StdIn.readLine()) (msg,

    msg) == (IO(StdIn.readLine()), IO(StdIn.readLine())
  27. Ref creation needs to be suspended val ref = Ref.unsafe[IO,

    Int](0) val prog = for { _ <- ref.update(_ + 1) v <- ref.get } yield v prog .unsafeRunSync //1
  28. Ref creation needs to be suspended val ref = Ref.unsafe[IO,

    Int](0) val prog = for { _ <- ref.update(_ + 1) v <- ref.get } yield v prog .unsafeRunSync //1
  29. Ref creation needs to be suspended val prog = for

    { _ <- Ref.unsafe[IO, Int](0).update(_ + 1) v <- Ref.unsafe[IO, Int](0).get } yield v prog .unsafeRunSync //0
  30. Ref creation needs to be suspended val prog = for

    { _ <- Ref.unsafe[IO, Int](0).update(_ + 1) v <- Ref.unsafe[IO, Int](0).get } yield v prog .unsafeRunSync //0
  31. A Ref can be shared inside IO val refIO =

    Ref[IO].of(0) def prog(ref: Ref[IO, Int]) = for { _ <- ref.update(_ + 1) v <- ref.get } yield v refIO .flatMap(prog) .unsafeRunSync //1
  32. A Ref can be shared inside IO def prog(ref: Ref[IO,

    Int]) = for { _ <- ref.update(_ + 1) v <- ref.get } yield v Ref[IO].of(0) .flatMap(prog) .unsafeRunSync //1
  33. A Ref can be shared inside IO def prog(ref: Ref[IO,

    Int]) = for { _ <- ref.update(_ + 1) v <- ref.get } yield v Ref[IO].of(0) .flatMap(prog) .unsafeRunSync //1
  34. A Ref can be shared inside IO def prog(ref: Ref[IO,

    Int]) = for { _ <- ref.update(_ + 1) v <- ref.get } yield v Ref[IO].of(0) .flatMap(prog) .unsafeRunSync //1 Ref[IO].of(0) .flatMap { ref => for { _ <- ref.update(_ + 1) v <- ref.get } yield v } .unsafeRunSync //1 ==
  35. Why is referential transparency useful? - Fearless refactoring - Compositionality

    - Explicit, controlable dependencies - Explicit effects
  36. def racePairKeepLeft[A, B](left: IO[A], right: IO[B]): IO[A] Task: build a

    combinator 1. Left completes first - cancel right 2. Right completes first - keep left running 3. The result must maintain cancelability in all cases
  37. Cancelation val a = IO.sleep(5.seconds) >> veryExpensiveJob val b =

    IO.sleep(1.second) >> IO.raiseError(new Throwable("Oh no!")) (a, b).parTupled (1 to 100).toList.parTraverse(veryExpensive)
  38. Cancelation val a = IO.sleep(5.seconds) >> veryExpensiveJob val b =

    IO.sleep(1.second) >> IO.raiseError(new Throwable("Oh no!")) (a, b).parTupled (1 to 100).toList.parTraverse(veryExpensive)
  39. Direct implementation with racePair def racePairKeepLeft[A, B](left: IO[A], right: IO[B]):

    IO[A] = { left .racePair(right) .bracketCase { case Left((left, rightFiber)) => rightFiber.cancel.as(left).uncancelable case Right((leftFiber, _)) => leftFiber.join.guaranteeCase { case ExitCase.Canceled => leftFiber.cancel case _ => IO.unit } } { case (Left((_, rightFiber)), ExitCase.Canceled) => rightFiber.cancel case (Right((leftFiber, _)), ExitCase.Canceled) => leftFiber.cancel case _ => IO.unit } }
  40. Direct implementation with racePair def racePairKeepLeft[A, B](left: IO[A], right: IO[B]):

    IO[A] = { left .racePair(right) .bracketCase { case Left((left, rightFiber)) => rightFiber.cancel.as(left).uncancelable case Right((leftFiber, _)) => leftFiber.join.guaranteeCase { case ExitCase.Canceled => leftFiber.cancel case _ => IO.unit } } { case (Left((_, rightFiber)), ExitCase.Canceled) => rightFiber.cancel case (Right((leftFiber, _)), ExitCase.Canceled) => leftFiber.cancel case _ => IO.unit } }
  41. Direct implementation with racePair def racePairKeepLeft[A, B](left: IO[A], right: IO[B]):

    IO[A] = { left .racePair(right) .bracketCase { case Left((left, rightFiber)) => rightFiber.cancel.as(left).uncancelable case Right((leftFiber, _)) => leftFiber.join.guaranteeCase { case ExitCase.Canceled => leftFiber.cancel case _ => IO.unit } } { case (Left((_, rightFiber)), ExitCase.Canceled) => rightFiber.cancel case (Right((leftFiber, _)), ExitCase.Canceled) => leftFiber.cancel case _ => IO.unit } }
  42. Deferred - purely functional promise abstract class Deferred[F[_], A] {

    def get: F[A] def complete(a: A): F[Unit] } object Deferred { def apply[F[_], A]( implicit F: Concurrent[F] ): F[Deferred[F, A]] }
  43. Implementation with Deferred /** * Left completes first - cancel

    right * Right completes first - keep left running * The result must maintain cancelability in all cases **/ def racePairKeepLeft[A, B](left: IO[A], right: IO[B]): IO[A] = { Deferred[IO, Unit].flatMap { leftCompleted => (left <* leftCompleted.complete(())) <& (right race leftCompleted.get) } } a <* b - run a, then b, keep the result of a a <& b - run a and b in parallel, keep result of a (if one fails the other one is canceled) race - run both sides in parallel, when one succeeds cancel the other
  44. Implementation with Concurrent.memoize object Concurrent { def memoize[F[_], A](f: F[A])(implicit

    F: Concurrent[F]): F[F[A]] = Ref.of[F, Option[Deferred[F, Either[Throwable, A]]]](None).map { ref => Deferred[F, Either[Throwable, A]].flatMap { d => ref .modify { case None => Some(d) -> f.attempt.flatTap(d.complete) case s @ Some(other) => s -> other.get } .flatten .rethrow } } } def racePairKeepLeft[A, B](left: IO[A], right: IO[B]): IO[A] = { Concurrent.memoize(left).flatMap { leftM => leftM <& (right race leftM) } }
  45. Referential transparency makes concurrency bearable def unbounded[F[_]: Concurrent]: Resource[F, Manager[F]]

    = { val id: F[Unique] = Sync[F].delay(new Unique) Resource { Ref[F].of(Map.empty[Unique, Fiber[F, Unit]]).map { tasks => new Manager[F] { override def safeStart[A](fa: F[A]): F[Unit] = Deferred[F, Unit].flatMap { isManaged => id.flatMap { taskId => (isManaged.get >> fa.attempt >> tasks.update(_ - taskId)).start.flatMap { fiber => tasks.update(_ + (taskId -> fiber.void)) >> isManaged.complete(()) }.uncancelable } } } -> tasks.get.flatMap { _.toList.traverse_ { case (_, task) => task.cancel: F[Unit] } } } } } $
  46. Referential transparency makes concurrency bearable def unbounded[F[_]: Concurrent]: Resource[F, Manager[F]]

    = { val id: F[Unique] = Sync[F].delay(new Unique) Resource { Ref[F].of(Map.empty[Unique, Fiber[F, Unit]]).map { tasks => val cancelAllTasks = tasks.get.flatMap { _.toList.traverse_ { case (_, task) => task.cancel: F[Unit] } } new Manager[F] { override def safeStart[A](fa: F[A]): F[Unit] = Deferred[F, Unit].flatMap { isManaged => val markManaged = isManaged.complete(()) id.flatMap { taskId => val unregister = tasks.update(_ - taskId) val runJob = isManaged.get >> fa.attempt >> unregister runJob.start.flatMap { fiber => val register = tasks.update(_ + (taskId -> fiber.void)) register >> markManaged }.uncancelable } } } -> cancelAllTasks } } } $
  47. Build your own concurrent algebras - Circuit breakers - Caches

    - Job queues - Your domain-specific in-memory state - More
  48. Functional concurrency is cool Try it at home: - https://typelevel.org/cats-effect/datatypes/io.html

    - https://typelevel.org/cats-effect/concurrency/ Use IO in production! (we do) Don't get discouraged (it takes a while to get comfortable with)
  49. What next? Exercises: - https://typelevel.org/cats-effect/tutorial/tutorial.html - https://olegpy.com/cats-effect-exercises/ - http://degoes.net/articles/zio-challenge More:

    - https://fs2.io/ - https://typelevel.org/cats-effect/#libraries - A bunch of links on the next slide - Ask on gitter! https://gitter.im/typelevel/cats-effect
  50. T H A N K Y O U Slides: bit.ly/2YdpmxE

    Some code: github.com/kubukoz/concurrency-fun My twitter: @kubukoz My blog: blog.kubukoz.com