Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Akka Cluster 超入門 - 2019 Fringe81 大新年勉強会

Akka Cluster 超入門 - 2019 Fringe81 大新年勉強会

Shinichi Morimoto

January 09, 2019
Tweet

More Decks by Shinichi Morimoto

Other Decks in Programming

Transcript

  1. Akka Clusterͱ͸ʁ  Ϋϥελϝϯόʔγοϓ  ෳ਺ͷϊʔυͰՔಇ͢ΔͷͰ଱ো֐ੑΛ࣋ͭ  ෛՙ෼ࢄ  ϧʔςΟϯάʹΑͬͯෛՙ෼ࢄͰ͖Δ

     ϊʔυύʔςΟγϣχϯά  ಛఆͷϩʔϧͷϊʔυʹͷΈΞΫλʔΛ഑ஔͰ͖Δ  ύʔςΟγϣϯϙΠϯτ  ΞΫλʔΛαϒπϦʔʹ෼ׂͯ͠ɺଞͷϊʔυʹ഑ஔͰ͖Δ BLLBDMVTUFSϞδϡʔϧ͸্هͷػೳΛఏڙ͢Δ
  2. Akka Clusterͱ͸ʁ  Ϋϥελϝϯόʔγοϓ  ෳ਺ͷϊʔυͰՔಇ͢ΔͷͰ଱ো֐ੑΛ࣋ͭ  ෛՙ෼ࢄ  ϧʔςΟϯάʹΑͬͯෛՙ෼ࢄͰ͖Δ

     ϊʔυύʔςΟγϣχϯά  ಛఆͷϩʔϧͷϊʔυʹͷΈΞΫλʔΛ഑ஔͰ͖Δ  ύʔςΟγϣϯϙΠϯτ  ΞΫλʔΛαϒπϦʔʹ෼ׂͯ͠ɺଞͷϊʔυʹ഑ஔͰ͖Δ BLLBDMVTUFSϞδϡʔϧ͸্هͷػೳΛఏڙ͢Δ
  3. Ϋϥελϝϯόʔγοϓ γʔυϊʔυ γʔυϊʔυ ϊʔυ "ϩʔϧ ϊʔυ "ϩʔϧ ϊʔυ #ϩʔϧ ϊʔυ

    #ϩʔϧ  γʔυϊʔυ  Ϋϥελʔͷ؅ཧΛ࢘Δ  ϝϯόʔͷ+PJO΍-FBWFͳͲ  ϊʔυ  "DUPS͕ಈ͘ϊʔυ  ϩʔϧͱ͍͏໾ׂ͝ͱͷλ άΈ͍ͨͳ΋ͷΛ෇༩͢Δ ͜ͱ͕Ͱ͖Δ
  4. Ϋϥελͷىಈ-௥Ճ-཭୤ͷྲྀΕ akka { actor { provider = cluster } remote

    { artery { enabled = on transport = tcp canonical.hostname = "127.0.0.1" canonical.port = 0 } } cluster { seed-nodes = [ "akka://[email protected]:2551", "akka://[email protected]:2552"] # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. auto-down-unreachable-after = 10s } } BQQMJDBUJPODPOG
  5. Ϋϥελͷىಈ-௥Ճ-཭୤ͷྲྀΕ TBNQMFDMVTUFS object SimpleClusterApp { def main(args: Array[String]): Unit =

    { if (args.isEmpty) startup(Seq("2551", "2552", "0")) else startup(args) } def startup(ports: Seq[String]): Unit = { ports foreach { port => // Override the configuration of the port val config = ConfigFactory.parseString(s""" akka.remote.artery.canonical.port=$port """).withFallback(ConfigFactory.load()) // Create an Akka system val system = ActorSystem("ClusterSystem", config) // Create an actor that handles cluster domain events system.actorOf(Props[SimpleClusterListener], name = "clusterListener") } } }
  6. ClusterͷαϯϓϧΛಈ͔ͯ͠ΈΑ͏ γʔυϊʔυ #BDLFOE γʔυϊʔυ 'SPOUFOE ϊʔυ #BDLFOE ϊʔυ #BDLFOE ϊʔυ

    'SPOUFOE 'SPOUFOE #BDLFOE IFMMP+PC*E )FMMP+PC*E ઌ಄Λେจࣈʹม׵ ఆظతʹϝοηʔδΛBTL
  7. ClusterͷαϯϓϧΛಈ͔ͯ͠ΈΑ͏ #BDLFOE 'SPOUFOE 'SPOUFOE $MVTUFSʹ+PJOͨ͠ࡍʹ 'SPOUFOEʹϝοηʔδΛ ૹΔ #BDLFOE͔Βϝοηʔδ ͕ಧ͍ͨΒɺCBDLFOET ʹ௥Ճ͢Δ

    #BDLFOE͔Βϝοηʔδ ͕ಧ͍ͨΒɺCBDLFOET ʹ௥Ճ͢Δ #BDLFOE͕৽نʹΫϥελʹ ௥Ճ͞Εͨࡍͷڍಈ #BDLFOE3FHJTUSBUJPO
  8. ClusterͷαϯϓϧΛಈ͔ͯ͠ΈΑ͏ class TransformationFrontend extends Actor { var backends = IndexedSeq.empty[ActorRef]

    var jobCounter = 0 def receive = { case job: TransformationJob if backends.isEmpty => sender() ! JobFailed("Service unavailable, try again later", job) case job: TransformationJob => jobCounter += 1 backends(jobCounter % backends.size) forward job case BackendRegistration if !backends.contains(sender()) => context watch sender() backends = backends :+ sender() case Terminated(a) => backends = backends.filterNot(_ == a) } } 'SPOUFOEͷίʔυ
  9. ClusterͷαϯϓϧΛಈ͔ͯ͠ΈΑ͏ 'SPOUFOEͷίʔυ object TransformationFrontend { def main(args: Array[String]): Unit =

    { // Override the configuration of the port when specified as program argument val port = if (args.isEmpty) "0" else args(0) val config = ConfigFactory.parseString(s""" akka.remote.artery.canonical.port=$port """) .withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")) .withFallback(ConfigFactory.load()) val system = ActorSystem("ClusterSystem", config) val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend") val counter = new AtomicInteger import system.dispatcher system.scheduler.schedule(2.seconds, 2.seconds) { implicit val timeout = Timeout(5 seconds) (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess { case result => println(result) } } } }
  10. ClusterͷαϯϓϧΛಈ͔ͯ͠ΈΑ͏ #BDLFOEͷίʔυ class TransformationBackend extends Actor { val cluster =

    Cluster(context.system) // subscribe to cluster changes, MemberUp // re-subscribe when restart override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp]) override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase) case state: CurrentClusterState => state.members.filter(_.status == MemberStatus.Up) foreach register case MemberUp(m) => register(m) } def register(member: Member): Unit = if (member.hasRole("frontend")) context.actorSelection(RootActorPath(member.address) / "user" / "frontend") ! BackendRegistration }
  11. ClusterͷαϯϓϧΛಈ͔ͯ͠ΈΑ͏ #BDLFOEͷίʔυ object TransformationBackend { def main(args: Array[String]): Unit =

    { // Override the configuration of the port when specified as program argument val port = if (args.isEmpty) "0" else args(0) val config = ConfigFactory.parseString(s""" akka.remote.artery.canonical.port=$port """) .withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")) .withFallback(ConfigFactory.load()) val system = ActorSystem("ClusterSystem", config) system.actorOf(Props[TransformationBackend], name = "backend") } }