Upgrade to Pro — share decks privately, control downloads, hide ads and more …

2017-09-09 Scala Kansai Summit - Akka Streams

kamijin_fanta
September 09, 2017

2017-09-09 Scala Kansai Summit - Akka Streams

2017-09-09 Scala Kansai Summit @kamijin_fanta

グラフを知って理解するAkka Streams

https://github.com/kamijin-fanta/slide/blob/master/2017-09-09%20ScalaKansai-AkkaStreams/README.md

kamijin_fanta

September 09, 2017
Tweet

More Decks by kamijin_fanta

Other Decks in Technology

Transcript

  1. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT "LLB4USFBNTͱ͸ Introduction "LLB ͷ  ͭͷϓϩμΫτɻ"LLB"DUPS ͱ͸ߟ͑ํ͕ҧ͏ɻ ಛ௃ 

    ετϦʔϜॲཧ ʢ௿஗Ԇɾແݶͷσʔλॲཧʣ  όοΫϓϨογϟʔʹΑΔϑϩʔ੍ޚ  ܕ҆શ  άϥϑΛ༻͍ͨγϯϓϧͳॲཧهड़ 1 import akka.actor.ActorSystem
  2. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT ओͳάϥϑͷߏ੒ཁૉ Components Source Flow Sink Լྲྀ͔ΒͷཁٻʹԠ͡ɺΤϨϝϯτͷग़ྗΛߦ͏ɻ ্ྲྀ͔ΒͷΤϨϝϯτΛड͚औΓॲཧΛߦ͍ɺԼྲྀʹྲྀ͢ɻ 'MPX

    ͸ NBQpMUFS ౳͔Β؆୯ʹ࣮૷Ͱ͖Δɻ ্ྲྀ͔ΒͷΤϨϝϯτΛड͚औΓɺԿΒ͔ͷॲཧΛߦ͏ɻ 1 import akka.NotUsed 2 import akka.actor.ActorSystem 3 import akka.stream.ActorMaterializer 4 import akka.stream.scaladsl.{Flow, Keep, Source}
  3. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT Fan-in Fan-out ෳ਺ͷೖྗ  ग़ྗϙʔτΛ࣋ͭ 'BOJO'BOPVU ΋ଘࡏ͢Δɻ ෦඼ͷೖޱΛ JOMFUɾग़ޱΛ

    PVUMFU ͱݺͼɺ߹ΘͤͯϙʔτͱݺͿɻ ೖग़ྗΛߦ͏ΤϨϝϯτͷܕ͕ࢦఆ͞Εɺܕ͕Ұகͨ͠ PVUMFUJOMFU ͕઀ଓͰ͖Δɻ άϥϑͷೖޱ ɾ ग़ޱ Outlet / Inlet Source Flow Sink PVUMFU ͕Ұͭ PVUMFU JOMFU ֤  ͭ JOMFU ͕  ͭ
  4. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT άϥϑͷ֎΁ม਺ͷड౉͠Λߦ͏࢓૊ΈΛ .BUFSJBMJ[FE7BMVFT ͱݺͿɻ ୅දతͳ .BUFSJBMJ[FE7BMVFT ͷར༻ྫ Source Flow Flow

    Flow Source Flow killSwitch.shutdown() Future Source Sink Sink Sink Flow ดͨ͡άϥϑ΁ͷ௨৴खஈ Materialized values Source.actorRef KillSwitches.single Sink.fold Actor Tell "DUPS3FG Λฦ͢ "DUPS ͔Βૹ৴ͨ͠஋͕ 4PVSDF ͷ஋ʹͳΔ LJMM4XJUDI ΦϒδΣΫτΛฦ͢ ֎෦͔ΒετϦʔϜΛऴྃͰ͖Δ 'VUVSF Λฦ͢ ετϦʔϜऴྃޙʹɺूܭͷ݁ՌΛ౉͢
  5. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT ͢΂ͯͷάϥϑʹ .BUFSJBMJ[FE7BMVFT ͸༗Δ͕ɺ࢖༻͠ͳ͍΋ͷ΋ଟ͍ɻ $PNCJOFS ࠞ߹ث Λ࢖༻ͯ͠ɺ.BUFSJBMJ[FE7BMVFT ͷऔࣺબ୒Λߦ͏ɻ Source Flow

    Sink )JOU .BU Λ࢖༻͠ͳ͍৔߹ɺ/PU6TFE ͱͯ͠ܕ͕ࢦఆ͞Ε͍ͯΔ 3JHIU ্ྲྀଆ ɾ-FGU Լྲྀ ɾ#PUI ྆ํ #PUI ΛબΜͩࡍɺλϓϧͰ྆ํͷ஋͕ฦ͞ΕΔ .BUFSJBMJ[FE7BMVFTͷऔࣺબ୒ Combiner Keep.right Keep.both
  6. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT d ͷ 4PVSDF Λ࡞Γɺ*OU Λ  ഒ͢Δ 'MPX Λܦ༝͠ɺQSJOUMO

    ͢Δ 4JOL ʹߦ͘ ॲཧͷ෼཭͕ग़དྷ͍ͯͯ៉ྷɾ࠶ར༻͠΍͍͢ ࣮૷ྫجຊ Basic Example 5 object Basic { 6 def main(args: Array[String]): Unit = { 7 implicit val system = ActorSystem() 8 implicit val mat = ActorMaterializer() 9 10 val src = Source(1 to 5) 11 val doubleFlow = Flow[Int].map(x => x * 2) 12 val printSink = Sink.foreach(println) 13 14 val runnableGraph = src via doubleFlow to printSink 15 16 runnableGraph.run() 17 18 Thread.sleep(100) 19 system.terminate() 20 } 21 } FYBNQMFTTSDNBJOTDBMB#BTJDTDBMB
  7. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT ࣮૷ྫ.BUFSJBMJ[FEWBMVFT Materialized values Examples (SBQI ͷ֎ͱ௨৴͢ΔͨΊʹ .BUFSJBMJ[FEWBMVFT Λ࢖༻͍ͯ͠Δ 7

    8 object Mat { 9 def main(args: Array[String]): Unit = { 0 implicit val system = ActorSystem() 1 implicit val mat = ActorMaterializer() 2 implicit val ctx = system.dispatcher 3 4 val src: Source[Int, NotUsed] = Source(1 to 20) 5 val primeFilterFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter { 6 case i if i <= 1 => false 7 case i if i == 2 => true 8 case i => !(2 until i).exists(x => i % x == 0) 9 } 0 val collectIntSink: Sink[Int, Future[Set[Int]]] = 1 Sink.fold(Set[Int]()){ case (a, b) => a + b } 2 3 val runnableGraph: RunnableGraph[Future[Set[Int]]] = 4 (src via primeFilterFlow toMat collectIntSink)(Keep.right) 5 6 val future: Future[Set[Int]] = runnableGraph.run() 7 future.onComplete { x => 8 println(x) 9 system.terminate() 0 } 1 } 2 } ˡૉ਺͚ͩ௨͢ϑΟϧλ ˡདྷͨ஋Λूܭͯ͠ 4FU ʹ٧ΊΔ ˡूܭ஋Λड͚औΔͨΊ .BU ࢖༻ ˡ ͷ਺ྻ FYBNQMFTTSDNBJOTDBMB.BUTDBMB
  8. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT 'MPXͷςετྫ Flow Tests ֤ύʔπͷ࣮૷͕୯७ͳͷͰςετ͕༰қɹUFTULJU ΋༻ҙ͞Ε͍ͯΔ 8 class FlowTestSpec extends

    FunSpec { 9 implicit val system = ActorSystem() 10 implicit val mat = ActorMaterializer() 11 12 describe("flow basic test") { 13 it("use probe") { 14 val primeFilterFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter { 15 case i if i <= 1 => false 16 case i if i.==(2) => true 17 case i => !(2 until i).exists(x => i % x == 0) 18 } 19 20 val testSrc = Source(1 to 20) 21 val probe = testSrc 22 .via(primeFilterFlow) 23 .toMat(TestSink.probe[Int])(Keep.right) 24 .run() 25 26 probe.request(8) 27 probe.expectNext(2, 3, 5, 7, 11, 13, 17, 19) 28 } 29 } 30 } FYBNQMFTTSDUFTUTDBMB'MPX5FTU4QFDTDBMB ˡ1SPCF Λड͚औΔͨΊʹ .BU Λ࢖༻
  9. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT Default Akka Streams Kafka alpakka Reactive Streams ఏڙ͞Ε͍ͯΔ4PVSDF4JOL Libraries

    σϑΥϧτͰଟ͘ͷ *0ɾίϨΫγϣϯ͔Β 4PVSDF4JOL Λ࡞ΕΔ Source.tick / FileIO / TCP, TLS "LLB4USFBNT ͷ෼ࢄΛߦ͍͍ͨ৔߹ʹ࢖͏͜ͱ΋ଟ͍ IUUQTHJUIVCDPNBLLBSFBDUJWFLBGLB Producer / Consumer +BWB Ͱಋೖ༧ఆͷετϦʔϜ "1* %BUBCBTFɺݕࡧΤϯδϯ౳΁ͷ઀ଓΛߦ͍΍͍͢ Slick / elasticsearch σϑΥϧτͰଟ͘ͷ *0ɾίϨΫγϣϯ͔Β 4PVSDF4JOL Λ࡞ΕΔ AMQP / MQTT / AWS / Azure / HBase
  10. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT  4JOL ্͕ྲྀʹ 1VMM ΤϨϝϯτͷཁٻ ͢Δ   'MPX

    ͕ 1VMM Λड͚औΓ্ྲྀʹ 1VMM Λड͚ྲྀ͢   4PVSDF ͕ 1VMM Λड͚औΓɺΤϨϝϯτΛ 1VTI ͢Δ   'MPX ͕ΤϨϝϯτͷ 1VTI Λड͚औΓɺඞཁͳॲཧΛߦ͍Լྲྀʹ 1VTI ͢Δ  4JOL ͕ΤϨϝϯτͷ 1VTI Λड͚औΓɺඞཁͳॲཧΛߦ͍  ʹ໭Δ 1. pull(in) 2. onPull() 3. pull(in) 4. onPull() 8. onPush() 7. push(out, elem) 6. onPush() 5. push(out, elem) Source Flow Sink جຊతͳಈ࡞ͷྲྀΕ Element Flows Լྲྀ͔Β࢝·ΔཁٻͰόοΫϓϨογϟʔΛ࣮ݱ
  11. άϥϑΛ஌ͬͯཧղ͢Δ"LLB4USFBNT .BQΛάϥϑͱ࣮ͯ͠૷ Implement Map 1 import akka.stream._ 2 import akka.stream.stage.{GraphStage,

    GraphStageLogic, InHandler, OutHandler} 3 4 case class MapGraph[In, Out](val fn: In => Out) extends GraphStage[FlowShape[In, Out]] { 5 private val in = Inlet[In]("in") 6 private val out = Outlet[Out]("out") 7 8 override val shape = FlowShape(in, out) 9 10 override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { 11 setHandler(in, new InHandler { 12 override def onPush(): Unit = push(out, fn(grab(in))) 13 }) 14 setHandler(out, new OutHandler { 15 override def onPull(): Unit = pull(in) 16 }) 17 } 18 } FYBNQMFTTSDNBJOTDBMB.BQ(SBQITDBMB ˡ্ྲྀ͔Β஋Λ HSBC ͠ɺ ɹGO Λద༻͠ɺԼྲྀʹ QVTI ͢Δ ˡJOMFUPVUMFU ϙʔτΛએݴ ˡ'MPX Λએݴ ˡԼྲྀ͔Βͷ QVMM Λ্ྲྀʹड͚ྲྀ͢