Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Akkaによるスレッドの使い方

Sponsored · Your Podcast. Everywhere. Effortlessly. Share. Educate. Inspire. Entertain. You do you. We'll handle the rest.

 Akkaによるスレッドの使い方

Avatar for リチャード 伊真岡

リチャード 伊真岡

April 02, 2018

More Decks by リチャード 伊真岡

Transcript

  1. > Akka is a toolkit for building highly concurrent, distributed,

    and resilient message-driven applications for Java and Scala
  2. •Thread •join •Runnable/Callable •Lock •synchronized •Atomic variables •volatile •ThreadLocal •DynamicVariable

    •ConcurrentHashMap •ConcurrentLinkedQueue •ForkJoin •ThreadPool •Executor/ExecutorService •ExecutionContext •Future •Thread intereference •Memory Consistency error •Dead lock •Live lock •Starvation •InterruptedException •Scheduling
  3. import scala.collection class MyActor extends Actor { //mutable state val

    state1: mutable.Map[String, String] = … val state2: mutable.ListBuffer[String] = … def receive = { //immutable message case MyMessage(msg) => procedure(msg) } def procedure(msg: String) { //࣍ͷϝοηʔδ͔ΒׂΓࠐ·Εͳ͍ } }
  4. import scala.collection class MyActor extends Actor { //mutable state val

    state1: mutable.Map[String, String] = … val state2: mutable.ListBuffer[String] = … def receive = { //immutable message case MyMessage(msg) => procedure(msg) } def procedure(msg: String) { //࣍ͷϝοηʔδ͔ΒׂΓࠐ·Εͳ͍ } }
  5. import scala.collection class MyActor extends Actor { //mutable state val

    state1: mutable.Map[String, String] = … val state2: mutable.ListBuffer[String] = … def receive = { //immutable message case MyMessage(msg) => procedure(msg) } def procedure(msg: String) { //࣍ͷϝοηʔδ͔ΒׂΓࠐ·Εͳ͍ } }
  6. import scala.collection class MyActor extends Actor { //mutable state val

    state1: mutable.Map[String, String] = … val state2: mutable.ListBuffer[String] = … def receive = { //immutable message case MyMessage(msg) => procedure(msg) } def procedure(msg: String) { //࣍ͷϝοηʔδ͔ΒׂΓࠐ·Εͳ͍ } }
  7. •Thread •join •Runnable/Callable •Lock •synchronized •Atomic variables •volatile •ThreadLocal •DynamicVariable

    •ConcurrentHashMap •ConcurrentLinkedQueue •ForkJoin •ThreadPool •Executor/ExecutorService •ExecutionContext •Future •Thread intereference •Memory Consistency error •Dead lock •Live lock •Starvation •InterruptedException •Scheduling ๨Ε͍͍ͯ
  8. •Thread •join •Runnable/Callable •Lock •synchronized •Atomic variables •volatile •ThreadLocal •DynamicVariable

    •ConcurrentHashMap •ConcurrentLinkedQueue •ForkJoin •ThreadPool •Executor/ExecutorService •ExecutionContext •Future •Thread intereference •Memory Consistency error •Dead lock •Live lock •Starvation •InterruptedException •Scheduling ๨Ε͍͍ͯ $PODVSSFODZಛ༗ͷ໰୊͸Ͳ͏ͯ͠΋ ى͜Γ·͢
  9. •Thread •join •Runnable/Callable •Lock •synchronized •Atomic variables •volatile •ThreadLocal •DynamicVariable

    •ConcurrentHashMap •ConcurrentLinkedQueue •ForkJoin •ThreadPool •Executor/ExecutorService •ExecutionContext •Future •Thread intereference •Memory Consistency error •Dead lock •Live lock •Starvation •InterruptedException •Scheduling ๨Ε͍͍ͯ Ұ෦ͷ௿Ϩϕϧͳॲཧ͔Β͸ ղ์͞ΕΔ
  10. •Thread •join •Runnable/Callable •Lock •synchronized •Atomic variables •volatile •ThreadLocal •DynamicVariable

    •ConcurrentHashMap •ConcurrentLinkedQueue •ForkJoin •ThreadPool •Executor/ExecutorService •ExecutionContext •Future •Thread intereference •Memory Consistency error •Dead lock •Live lock •Starvation •InterruptedException •Scheduling "LLB͕ड͚࣋ͬͯ͘ΕΔ Ұ෦ͷ௿Ϩϕϧͳॲཧ͔Β͸ ղ์͞ΕΔ
  11. •Thread •join •Runnable/Callable •Lock •synchronized •Atomic variables •volatile •ThreadLocal •DynamicVariable

    •ConcurrentHashMap •ConcurrentLinkedQueue •ForkJoin •ThreadPool •Executor/ExecutorService •ExecutionContext •Future •Thread intereference •Memory Consistency error •Dead lock •Live lock •Starvation •InterruptedException •Scheduling ͦͷ෦෼Λݟ͍͖ͯ·͠ΐ͏ "LLB͕ड͚࣋ͬͯ͘ΕΔ
  12. class LocalActorRef override def !(message: Any) (implicit sender: ActorRef =

    Actor.noSender): Unit = actorCell.sendMessage(message, sender)
  13. class LocalActorRef override def !(message: Any) (implicit sender: ActorRef =

    Actor.noSender): Unit = actorCell.sendMessage(message, sender)
  14. trait Dispatch def sendMessage(msg: Envelope): Unit = try { val

    msgToDispatch = if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg) else msg dispatcher.dispatch(this, msgToDispatch) } catch handleException
  15. trait Dispatch def sendMessage(msg: Envelope): Unit = try { val

    msgToDispatch = if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg) else msg dispatcher.dispatch(this, msgToDispatch) } catch handleException
  16. trait Dispatch def sendMessage(msg: Envelope): Unit = try { val

    msgToDispatch = if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg) else msg dispatcher.dispatch(this, msgToDispatch) } catch handleException
  17. class Dispatcher def dispatch( receiver: ActorCell, invocation: Envelope ): Unit

    = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) }
  18. class Dispatcher def dispatch( receiver: ActorCell, invocation: Envelope ): Unit

    = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) }
  19. abstract class Mailbox def enqueue( receiver: ActorRef, msg : Envelope

    ): Unit = messageQueue.enqueue(receiver, msg)
  20. abstract class Mailbox def enqueue( receiver: ActorRef, msg : Envelope

    ): Unit = messageQueue.enqueue(receiver, msg)
  21. https://doc.akka.io/docs/akka/2.5/mailboxes.html SingleConsumerOnlyUnboundedMailbox This queue may or may not be faster

    than the default one depending on your use-case— be sure to benchmark properly! NonBlockingBoundedMailbox Backed by a very efficient Multiple-Producer Single-Consumer queue …etc ଞʹ΋MessageQueue৭ʑ͋Γ·͢
  22. class Dispatcher def dispatch( receiver: ActorCell, invocation: Envelope ): Unit

    = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) }
  23. class Dispatcher def dispatch( receiver: ActorCell, invocation: Envelope ): Unit

    = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) }
  24. class Dispatcher override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint:

    Boolean ): Boolean = { if (mbox.canBeScheduledForExecution( hasMessageHint, hasSystemMessageHint)) { if (mbox.setAsScheduled()) { … executorService execute mbox … … }
  25. class Dispatcher override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint:

    Boolean ): Boolean = { if (mbox.canBeScheduledForExecution( hasMessageHint, hasSystemMessageHint)) { if (mbox.setAsScheduled()) { … executorService execute mbox … … }
  26. class Dispatcher override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint:

    Boolean ): Boolean = { if (mbox.canBeScheduledForExecution( hasMessageHint, hasSystemMessageHint)) { if (mbox.setAsScheduled()) { … executorService execute mbox … … }
  27. abstract class Mailbox abstract class Mailbox( val messageQueue: MessageQueue )

    extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { ... }
  28. class Dispatcher override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint:

    Boolean ): Boolean = { if (mbox.canBeScheduledForExecution( hasMessageHint, hasSystemMessageHint)) { if (mbox.setAsScheduled()) { … executorService execute mbox … … }
  29. abstract class Mailbox abstract class Mailbox( val messageQueue: MessageQueue )

    extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { ... }
  30. LET IT CRASH blog 2012 … resulted in improved implementation

    of fork join pool … randomized queing and stealing
  31. abstract class Mailbox abstract class Mailbox( val messageQueue: MessageQueue )

    extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable { ... } ExecutorServiceͷ executeϝιουΛ࢖͏ͱ Runnableͷrunϝιου͕ݺ͹ΕΔ
  32. abstract class Mailbox override final def run(): Unit = {

    try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages processMailbox() //Then deal with messages } } finally { setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } }
  33. abstract class Mailbox override final def run(): Unit = {

    try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages processMailbox() //Then deal with messages } } finally { setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } }
  34. abstract class Mailbox @tailrec private final def processMailbox( left: Int

    = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = …): Unit = if (shouldProcessMessage) { val next = dequeue() if (next ne null) { … actor invoke next … processMailbox(left - 1, deadlineNs) } }
  35. abstract class Mailbox @tailrec private final def processMailbox( left: Int

    = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = …): Unit = if (shouldProcessMessage) { val next = dequeue() if (next ne null) { … actor invoke next … processMailbox(left - 1, deadlineNs) } }
  36. abstract class Mailbox @tailrec private final def processMailbox( left: Int

    = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = …): Unit = if (shouldProcessMessage) { val next = dequeue() if (next ne null) { … actor invoke next … processMailbox(left - 1, deadlineNs) } } //receiveϝιουΛݺͿ class MyActor extends Actor { def receive = { … } }
  37. abstract class Mailbox @tailrec private final def processMailbox( left: Int

    = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = …): Unit = if (shouldProcessMessage) { val next = dequeue() if (next ne null) { … actor invoke next … processMailbox(left - 1, deadlineNs) } }
  38. rerence.conf # Throughput defines the number of messages # that

    are processed in a batch # before the thread is returned to the pool. # Set to 1 for as fair as possible. throughput = 5
  39. abstract class Mailbox @tailrec private final def processMailbox( left: Int

    = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = …): Unit = if (shouldProcessMessage) { val next = dequeue() if (next ne null) { … actor invoke next … processMailbox(left - 1, deadlineNs) } }