Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Implementing a Worker Pool in 4 Acts

Implementing a Worker Pool in 4 Acts

This talk is a walkthrough on how to build a worker pool. The talk is broken up into four parts, starting with the simplest implementation leading up to the final product. Through this talk, the audience will see OTP principles demonstrated in actual code, and how GenServers and Supervisors fit together to provide concurrency and robustness.

Benjamin Tan Wei Hao

June 09, 2017
Tweet

More Decks by Benjamin Tan Wei Hao

Other Decks in Programming

Transcript

  1. A Worker Pool Application Benjamin Tan Wei Hao @bentanweihao Erlang

    User Conference 2017 Implementing or: How I finally grokked OTP 9th June 2017 in 4 acts
  2. is a lightweight, generic pooling library for Erlang with a

    focus on simplicity, performance, and rock-solid disaster recovery. Poolboy < 400 lines!
  3. iex(1)> {:ok, pid} = ChuckFetcher.start_link(:ok) iex(2)> ChuckFetcher.fetch(pid) "Chuck Norris can

    instantiate an abstract class.” iex(1)> pid = Pooly.checkout("ChuckNorris") #PID<0.180.0> iex(2)> ChuckFetcher.fetch(pid) "Chuck Norris can unit test entire applications with a single assert." iex(3)> Pooly.checkin("ChuckNorris", pid) :ok Creating a Process: Checking Out & In a Process:
  4. iex(1)> {:ok, pid} = ChuckFetcher.start_link(:ok) iex(2)> ChuckFetcher.fetch(pid) "Chuck Norris can

    instantiate an abstract class.” iex(1)> pid = Pooly.checkout("ChuckNorris") #PID<0.180.0> iex(2)> ChuckFetcher.fetch(pid) "Chuck Norris can unit test entire applications with a single assert." iex(3)> Pooly.checkin("ChuckNorris", pid) :ok Creating a Process: Checking Out & In a Process:
  5. iex(1)> {:ok, pid} = ChuckFetcher.start_link(:ok) iex(2)> ChuckFetcher.fetch(pid) "Chuck Norris can

    instantiate an abstract class.” iex(1)> pid = Pooly.checkout("ChuckNorris") #PID<0.180.0> iex(2)> ChuckFetcher.fetch(pid) "Chuck Norris can unit test entire applications with a single assert." iex(3)> Pooly.checkin("ChuckNorris", pid) :ok Creating a Process: Checking Out & In a Process:
  6. Version 1 Type of Pool Single Multiple Creation of Workers

    Fixed Dynamic Consumer Recovery No Yes Worker Recovery No Yes Queueing for busy workers No Yes
  7. -module(supervisor). -behaviour(gen_server). -export([start_link/2, start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, which_children/1, count_children/1,

    check_childspecs/1, get_childspec/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, format_status/2]). -export([try_again_restart/2]).
  8. defmodule Pooly.WorkerSupervisor do use Supervisor def start_link({_,_,_} = mfa) do

    Supervisor.start_link(__MODULE__, mfa) end def init({m,f,a}) do worker_opts = [restart: :permanent, shutdown: 5000, function: f] children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5] supervise(children, opts) end end
  9. defmodule Pooly.WorkerSupervisor do use Supervisor def start_link({_,_,_} = mfa) do

    Supervisor.start_link(__MODULE__, mfa) end def init({m,f,a}) do worker_opts = [restart: :permanent, shutdown: 5000, function: f] children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5] supervise(children, opts) end end def start_link({_,_,_} = mfa) do Supervisor.start_link(__MODULE__, mfa) end def init({m,f,a}) do end
  10. defmodule Pooly.WorkerSupervisor do use Supervisor def start_link({_,_,_} = mfa) do

    Supervisor.start_link(__MODULE__, mfa) end def init({m,f,a}) do worker_opts = [restart: :permanent, shutdown: 5000, function: f] children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5] supervise(children, opts) end end worker_opts = [restart: :permanent, shutdown: 5000, function: f]
  11. defmodule Pooly.WorkerSupervisor do use Supervisor def start_link({_,_,_} = mfa) do

    Supervisor.start_link(__MODULE__, mfa) end def init({m,f,a}) do worker_opts = [restart: :permanent, shutdown: 5000, function: f] children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5] supervise(children, opts) end end children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5]
  12. defmodule Pooly.Server do use GenServer import Supervisor.Spec defmodule State do

    defstruct sup: nil, size: nil, mfa: nil end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end def init([sup, pool_config]) when is_pid(sup) do init(pool_config, %{sup: sup}) end def init([{:mfa, mfa}|rest], state) do: init(rest, %{state | mfa: mfa}) def init([{:size, s}|rest], state), do: init(rest, %{state | size: s}) def init([_|rest], state), do: init(rest, state) def init([], state) do send(self, :start_worker_supervisor) {:ok, state} end end
  13. defmodule Pooly.Server do use GenServer import Supervisor.Spec defmodule State do

    defstruct sup: nil, size: nil, mfa: nil end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end def init([sup, pool_config]) when is_pid(sup) do init(pool_config, %{sup: sup}) end def init([{:mfa, mfa}|rest], state) do: init(rest, %{state | mfa: mfa}) def init([{:size, s}|rest], state), do: init(rest, %{state | size: s}) def init([_|rest], state), do: init(rest, state) def init([], state) do send(self, :start_worker_supervisor) {:ok, state} end end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end
  14. defmodule Pooly.Server do use GenServer import Supervisor.Spec defmodule State do

    defstruct sup: nil, size: nil, mfa: nil end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end def init([sup, pool_config]) when is_pid(sup) do init(pool_config, %{sup: sup}) end def init([{:mfa, mfa}|rest], state) do: init(rest, %{state | mfa: mfa}) def init([{:size, s}|rest], state), do: init(rest, %{state | size: s}) def init([_|rest], state), do: init(rest, state) def init([], state) do send(self, :start_worker_supervisor) {:ok, state} end end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end def init([sup, pool_config]) when is_pid(sup) do init(pool_config, %{sup: sup}) end
  15. defmodule Pooly.Server do use GenServer import Supervisor.Spec defmodule State do

    defstruct sup: nil, size: nil, mfa: nil end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end def init([sup, pool_config]) when is_pid(sup) do init(pool_config, %{sup: sup}) end def init([{:mfa, mfa}|rest], state) do: init(rest, %{state | mfa: mfa}) def init([{:size, s}|rest], state), do: init(rest, %{state | size: s}) def init([_|rest], state), do: init(rest, state) def init([], state) do send(self, :start_worker_supervisor) {:ok, state} end end def init([{:mfa, mfa}|rest], state) do: init(rest, %{state | mfa: mfa}) def init([{:size, s}|rest], state), do: init(rest, %{state | size: s}) def init([_|rest], state), do: init(rest, state)
  16. defmodule Pooly.Server do use GenServer import Supervisor.Spec defmodule State do

    defstruct sup: nil, size: nil, mfa: nil end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end def init([sup, pool_config]) when is_pid(sup) do init(pool_config, %{sup: sup}) end def init([{:mfa, mfa}|rest], state) do: init(rest, %{state | mfa: mfa}) def init([{:size, s}|rest], state), do: init(rest, %{state | size: s}) def init([_|rest], state), do: init(rest, state) def init([], state) do send(self, :start_worker_supervisor) {:ok, state} end end def init([], state) do
  17. defmodule Pooly.Server do use GenServer import Supervisor.Spec defmodule State do

    defstruct sup: nil, size: nil, mfa: nil end def start_link(sup, pool_config) do GenServer.start_link(__MODULE__, [sup, pool_config], name: __MODULE__) end def init([sup, pool_config]) when is_pid(sup) do init(pool_config, %{sup: sup}) end def init([{:mfa, mfa}|rest], state) do: init(rest, %{state | mfa: mfa}) def init([{:size, s}|rest], state), do: init(rest, %{state | size: s}) def init([_|rest], state), do: init(rest, state) def init([], state) do send(self, :start_worker_supervisor) {:ok, state} end end def init([], state) do send(self, :start_worker_supervisor) {:ok, state} end
  18. defmodule Pooly.Server do defstruct State do sup: nil, worker_sup: nil,

    size: nil, workers: nil, mfa: nil end def handle_info(:start_worker_supervisor, state) do %{sup: sup, mfa: mfa, size: size} = state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) workers = prepopulate(size, worker_sup) {:noreply, %{state | worker_sup: worker_sup, workers: workers}} end defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end end
  19. defmodule Pooly.Server do defstruct State do sup: nil, worker_sup: nil,

    size: nil, workers: nil, mfa: nil end def handle_info(:start_worker_supervisor, state) do %{sup: sup, mfa: mfa, size: size} = state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) workers = prepopulate(size, worker_sup) {:noreply, %{state | worker_sup: worker_sup, workers: workers}} end defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end end sup: nil %{sup: sup = state def handle_info(:start_worker_supervisor, state) do
  20. defmodule Pooly.Server do defstruct State do sup: nil, worker_sup: nil,

    size: nil, workers: nil, mfa: nil end def handle_info(:start_worker_supervisor, state) do %{sup: sup, mfa: mfa, size: size} = state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) workers = prepopulate(size, worker_sup) {:noreply, %{state | worker_sup: worker_sup, workers: workers}} end defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end end sup: nil %{sup: sup = state state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) mfa: mfa defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end
  21. defmodule Pooly.Server do defstruct State do sup: nil, worker_sup: nil,

    size: nil, workers: nil, mfa: nil end def handle_info(:start_worker_supervisor, state) do %{sup: sup, mfa: mfa, size: size} = state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) workers = prepopulate(size, worker_sup) {:noreply, %{state | worker_sup: worker_sup, workers: workers}} end defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end end sup: nil state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end workers = prepopulate(size, worker_sup) %{sup: sup, mfa: mfa, size: size} = state
  22. defmodule Pooly.Server do defstruct State do sup: nil, worker_sup: nil,

    size: nil, workers: nil, mfa: nil end def handle_info(:start_worker_supervisor, state) do %{sup: sup, mfa: mfa, size: size} = state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) workers = prepopulate(size, worker_sup) {:noreply, %{state | worker_sup: worker_sup, workers: workers}} end defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end end workers = prepopulate(size, worker_sup) defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end
  23. defmodule Pooly.Server do defstruct State do sup: nil, worker_sup: nil,

    size: nil, workers: nil, mfa: nil end def handle_info(:start_worker_supervisor, state) do %{sup: sup, mfa: mfa, size: size} = state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) workers = prepopulate(size, worker_sup) {:noreply, %{state | worker_sup: worker_sup, workers: workers}} end defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end
  24. defmodule Pooly.Server do defstruct State do sup: nil, worker_sup: nil,

    size: nil, workers: nil, mfa: nil end def handle_info(:start_worker_supervisor, state) do %{sup: sup, mfa: mfa, size: size} = state {:ok, worker_sup} = Supervisor.start_child(sup, supervisor_spec(mfa)) workers = prepopulate(size, worker_sup) {:noreply, %{state | worker_sup: worker_sup, workers: workers}} end defp prepopulate(size, sup) when size > 1 do 1..size |> Enum.map(fn _ -> new_worker(sup) end) end defp prepopulate(_size, _sup), do: [] defp new_worker(sup) do {:ok, worker} = Supervisor.start_child(sup, [[]]) worker end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end end defp supervisor_spec(mfa) do supervisor(Pooly.WorkerSupervisor, [mfa], [restart: :temporary]) end
  25. Worker Check-Out defmodule Pooly.Server do def handle_call(:checkout, {from_pid, _ref}, state)

    do %{workers: workers, monitors: monitors} = state case workers do [worker|rest] -> ref = Process.monitor(from_pid) true = :ets.insert(monitors, {worker, ref}) {:reply, worker, %{state | workers: rest}} [] -> {:reply, :noproc, state} end end end
  26. Worker Check-IN defmodule Pooly.Server do def handle_cast({:checkin, worker}, state) do

    %{workers: workers, monitors: monitors} = state case :ets.lookup(monitors, worker) do [{pid, ref}] -> true = Process.demonitor(ref) true = :ets.delete(monitors, pid) {:noreply, %{state | workers: [pid|workers]}} [] -> {:noreply, state} end end end
  27. defmodule Pooly.Supervisor do use Supervisor def start_link(pool_config) do Supervisor.start_link(__MODULE__, pool_config)

    end def init(pool_config) do children = [ worker(Pooly.Server, [self, pool_config]) ] opts = [strategy: :one_for_all] supervise(children, opts) end end TOp-LEVEL SUPERVISOR
  28. defmodule Pooly.Supervisor do use Supervisor def start_link(pool_config) do Supervisor.start_link(__MODULE__, pool_config)

    end def init(pool_config) do children = [ worker(Pooly.Server, [self, pool_config]) ] opts = [strategy: :one_for_all] supervise(children, opts) end end TOp-LEVEL SUPERVISOR def init(pool_config) do children = [ worker(Pooly.Server, [self, pool_config]) ] opts = [strategy: :one_for_all] supervise(children, opts) end
  29. Version 1 Type of Pool Single Multiple Creation of Workers

    Fixed Dynamic Consumer Recovery No Yes Worker Recovery No Yes Queueing for busy workers No Yes
  30. Version 2 Type of Pool Single Multiple Creation of Workers

    Fixed Dynamic Consumer Recovery No Yes Worker Recovery No Yes Queueing for busy workers No Yes
  31. def handle_info({:DOWN, ref, _, _, _}, state) do %{monitors: monitors,

    workers: workers} = state case :ets.match(monitors, {:"$1", ref}) do [[pid]] -> true = :ets.delete(monitors, pid) new_state = %{state | workers: [pid|workers]} {:noreply, new_state} [[]] -> {:noreply, state} end end defmodule Pooly.Server do end
  32. def handle_info({:DOWN, ref, _, _, _}, state) do %{monitors: monitors,

    workers: workers} = state case :ets.match(monitors, {:"$1", ref}) do [[pid]] -> true = :ets.delete(monitors, pid) new_state = %{state | workers: [pid|workers]} {:noreply, new_state} [[]] -> {:noreply, state} end end defmodule Pooly.Server do end def handle_info({:DOWN, ref, _, _, _}, state) do end
  33. def handle_info({:DOWN, ref, _, _, _}, state) do %{monitors: monitors,

    workers: workers} = state case :ets.match(monitors, {:"$1", ref}) do [[pid]] -> true = :ets.delete(monitors, pid) new_state = %{state | workers: [pid|workers]} {:noreply, new_state} [[]] -> {:noreply, state} end end case :ets.match(monitors, {:"$1", ref}) do [[pid]] -> true = :ets.delete(monitors, pid) defmodule Pooly.Server do end def handle_info({:DOWN, ref, _, _, _}, state) do
  34. def handle_info({:DOWN, ref, _, _, _}, state) do %{monitors: monitors,

    workers: workers} = state case :ets.match(monitors, {:"$1", ref}) do [[pid]] -> true = :ets.delete(monitors, pid) new_state = %{state | workers: [pid|workers]} {:noreply, new_state} [[]] -> {:noreply, state} end end case :ets.match(monitors, {:"$1", ref}) do [[pid]] -> true = :ets.delete(monitors, pid) new_state = %{state | workers: [pid|workers]} {:noreply, new_state} defmodule Pooly.Server do end
  35. def handle_info({:DOWN, ref, _, _, _}, state) do %{monitors: monitors,

    workers: workers} = state case :ets.match(monitors, {:"$1", ref}) do [[pid]] -> true = :ets.delete(monitors, pid) new_state = %{state | workers: [pid|workers]} {:noreply, new_state} [[]] -> {:noreply, state} end end case :ets.match(monitors, {:"$1", ref}) do [[]] -> {:noreply, state} end defmodule Pooly.Server do end
  36. defmodule Pooly.Server do def init([sup, pool_config]) when is_pid(sup) do Process.flag(:trap_exit,

    true) monitors = :ets.new(:monitors, [:private]) init(pool_config, %State{sup: sup, monitors: monitors}) end end
  37. defmodule Pooly.Server do def init([sup, pool_config]) when is_pid(sup) do Process.flag(:trap_exit,

    true) monitors = :ets.new(:monitors, [:private]) init(pool_config, %State{sup: sup, monitors: monitors}) end end Process.flag(:trap_exit, true)
  38. def handle_info({:EXIT, pid, _reason}, state) do %{monitors: monitors, workers: workers,

    worker_sup: worker_sup} = state case :ets.lookup(monitors, pid) do [{pid, ref}] -> true = Process.demonitor(ref) true = :ets.delete(monitors, pid) new_state = %{state | workers: [new_worker(worker_sup)|workers]} {:noreply, new_state} [] -> {:noreply, state} end end case :ets.lookup(monitors, pid) do [{pid, ref}] -> [] -> end def handle_info({:EXIT, pid, _reason}, state) do end
  39. def handle_info({:EXIT, pid, _reason}, state) do %{monitors: monitors, workers: workers,

    worker_sup: worker_sup} = state case :ets.lookup(monitors, pid) do [{pid, ref}] -> true = Process.demonitor(ref) true = :ets.delete(monitors, pid) new_state = %{state | workers: [new_worker(worker_sup)|workers]} {:noreply, new_state} [] -> {:noreply, state} end end case :ets.lookup(monitors, pid) do end [] -> {:noreply, state}
  40. def handle_info({:EXIT, pid, _reason}, state) do %{monitors: monitors, workers: workers,

    worker_sup: worker_sup} = state case :ets.lookup(monitors, pid) do [{pid, ref}] -> true = Process.demonitor(ref) true = :ets.delete(monitors, pid) new_state = %{state | workers: [new_worker(worker_sup)|workers]} {:noreply, new_state} [] -> {:noreply, state} end end case :ets.lookup(monitors, pid) do [{pid, ref}] -> true = Process.demonitor(ref) true = :ets.delete(monitors, pid)
  41. def handle_info({:EXIT, pid, _reason}, state) do %{monitors: monitors, workers: workers,

    worker_sup: worker_sup} = state case :ets.lookup(monitors, pid) do [{pid, ref}] -> true = Process.demonitor(ref) true = :ets.delete(monitors, pid) new_state = %{state | workers: [new_worker(worker_sup)|workers]} {:noreply, new_state} [] -> {:noreply, state} end end new_state = %{state | workers: [new_worker(worker_sup)|workers]} {:noreply, new_state} case :ets.lookup(monitors, pid) do end
  42. Version 2 Type of Pool Single Multiple Creation of Workers

    Fixed Dynamic Consumer Recovery No Yes Worker Recovery No Yes Queueing for busy workers No Yes
  43. Version 3 Type of Pool Single Multiple Creation of Workers

    Fixed Dynamic Consumer Recovery No Yes Worker Recovery No Yes Queueing for busy workers No Yes
  44. defmodule Pooly do use Application def start(_type, _args) do pools_config

    = [ [name: "Pool1", mfa: {SampleWorker, :start_link, []}, size: 2], [name: "Pool2", mfa: {SampleWorker, :start_link, []}, size: 3], [name: "Pool3", mfa: {SampleWorker, :start_link, []}, size: 4], ] start_pools(pools_config) end end
  45. Adding the top level supervisor defmodule Pooly.Supervisor do use Supervisor

    def start_link(pools_config) do Supervisor.start_link(__MODULE__, pools_config, name: __MODULE__) end def init(pools_config) do children = [ supervisor(Pooly.PoolsSupervisor, []), worker(Pooly.Server, [pools_config]) ] opts = [strategy: :one_for_all] supervise(children, opts) end end
  46. Adding the top level supervisor defmodule Pooly.Supervisor do use Supervisor

    def start_link(pools_config) do Supervisor.start_link(__MODULE__, pools_config, name: __MODULE__) end def init(pools_config) do children = [ supervisor(Pooly.PoolsSupervisor, []), worker(Pooly.Server, [pools_config]) ] opts = [strategy: :one_for_all] supervise(children, opts) end end name: __MODULE__
  47. Adding the top level supervisor defmodule Pooly.Supervisor do use Supervisor

    def start_link(pools_config) do Supervisor.start_link(__MODULE__, pools_config, name: __MODULE__) end def init(pools_config) do children = [ supervisor(Pooly.PoolsSupervisor, []), worker(Pooly.Server, [pools_config]) ] opts = [strategy: :one_for_all] supervise(children, opts) end end children = [ supervisor(Pooly.PoolsSupervisor, []), worker(Pooly.Server, [pools_config]) ]
  48. Adding the top level supervisor defmodule Pooly.Supervisor do use Supervisor

    def start_link(pools_config) do Supervisor.start_link(__MODULE__, pools_config, name: __MODULE__) end def init(pools_config) do children = [ supervisor(Pooly.PoolsSupervisor, []), worker(Pooly.Server, [pools_config]) ] opts = [strategy: :one_for_all] supervise(children, opts) end end opts = [strategy: :one_for_all] children = [ supervisor(Pooly.PoolsSupervisor, []), worker(Pooly.Server, [pools_config]) ]
  49. defmodule Pooly.PoolsSupervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, [],

    name: __MODULE__) end def init(_) do opts = [strategy: :one_for_one] supervise([], opts) end end Adding the POOLS supervisor
  50. defmodule Pooly.PoolsSupervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, [],

    name: __MODULE__) end def init(_) do opts = [strategy: :one_for_one] supervise([], opts) end end Adding the POOLS supervisor name: __MODULE__
  51. defmodule Pooly.PoolsSupervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, [],

    name: __MODULE__) end def init(_) do opts = [strategy: :one_for_one] supervise([], opts) end end Adding the POOLS supervisor supervise([], opts) Empty child spec!
  52. defmodule Pooly.PoolsSupervisor do use Supervisor def start_link do Supervisor.start_link(__MODULE__, [],

    name: __MODULE__) end def init(_) do opts = [strategy: :one_for_one] supervise([], opts) end end Adding the POOLS supervisor supervise([], opts) opts = [strategy: :one_for_one]
  53. defmodule Pooly.Server do use GenServer import Supervisor.Spec def start_link(pools_config) do

    GenServer.start_link(__MODULE__, pools_config, name: __MODULE__) end def checkout(pool_name) do GenServer.call(:"#{pool_name}Server", :checkout) end def checkin(pool_name, worker_pid) do GenServer.cast(:"#{pool_name}Server", {:checkin, worker_pid}) end def init(pools_config) do pools_config |> Enum.each(fn(pool_config) -> send(self, {:start_pool, pool_config}) end) {:ok, pools_config} end def handle_info({:start_pool, pool_config}, state) do {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config)) {:noreply, state} end defp supervisor_spec(pool_config) do opts = [id: :"#{pool_config[:name]}Supervisor"] supervisor(Pooly.PoolSupervisor, [pool_config], opts) end end
  54. defmodule Pooly.Server do use GenServer import Supervisor.Spec def start_link(pools_config) do

    GenServer.start_link(__MODULE__, pools_config, name: __MODULE__) end def checkout(pool_name) do GenServer.call(:"#{pool_name}Server", :checkout) end def checkin(pool_name, worker_pid) do GenServer.cast(:"#{pool_name}Server", {:checkin, worker_pid}) end def init(pools_config) do pools_config |> Enum.each(fn(pool_config) -> send(self, {:start_pool, pool_config}) end) {:ok, pools_config} end def handle_info({:start_pool, pool_config}, state) do {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config)) {:noreply, state} end defp supervisor_spec(pool_config) do opts = [id: :"#{pool_config[:name]}Supervisor"] supervisor(Pooly.PoolSupervisor, [pool_config], opts) end end def checkout(pool_name) do GenServer.call(:"#{pool_name}Server", :checkout) end def checkin(pool_name, worker_pid) do GenServer.cast(:"#{pool_name}Server", {:checkin, worker_pid}) end
  55. defmodule Pooly.Server do use GenServer import Supervisor.Spec def start_link(pools_config) do

    GenServer.start_link(__MODULE__, pools_config, name: __MODULE__) end def checkout(pool_name) do GenServer.call(:"#{pool_name}Server", :checkout) end def checkin(pool_name, worker_pid) do GenServer.cast(:"#{pool_name}Server", {:checkin, worker_pid}) end def init(pools_config) do pools_config |> Enum.each(fn(pool_config) -> send(self, {:start_pool, pool_config}) end) {:ok, pools_config} end def handle_info({:start_pool, pool_config}, state) do {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config)) {:noreply, state} end defp supervisor_spec(pool_config) do opts = [id: :"#{pool_config[:name]}Supervisor"] supervisor(Pooly.PoolSupervisor, [pool_config], opts) end end def init(pools_config) do pools_config |> Enum.each(fn(pool_config) -> send(self, {:start_pool, pool_config}) end) {:ok, pools_config} end
  56. defmodule Pooly.Server do use GenServer import Supervisor.Spec def start_link(pools_config) do

    GenServer.start_link(__MODULE__, pools_config, name: __MODULE__) end def checkout(pool_name) do GenServer.call(:"#{pool_name}Server", :checkout) end def checkin(pool_name, worker_pid) do GenServer.cast(:"#{pool_name}Server", {:checkin, worker_pid}) end def init(pools_config) do pools_config |> Enum.each(fn(pool_config) -> send(self, {:start_pool, pool_config}) end) {:ok, pools_config} end def handle_info({:start_pool, pool_config}, state) do {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config)) {:noreply, state} end defp supervisor_spec(pool_config) do opts = [id: :"#{pool_config[:name]}Supervisor"] supervisor(Pooly.PoolSupervisor, [pool_config], opts) end end send(self, {:start_pool, pool_config}) def handle_info({:start_pool, pool_config}, state) do {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config)) {:noreply, state} end
  57. defmodule Pooly.Server do use GenServer import Supervisor.Spec def start_link(pools_config) do

    GenServer.start_link(__MODULE__, pools_config, name: __MODULE__) end def checkout(pool_name) do GenServer.call(:"#{pool_name}Server", :checkout) end def checkin(pool_name, worker_pid) do GenServer.cast(:"#{pool_name}Server", {:checkin, worker_pid}) end def init(pools_config) do pools_config |> Enum.each(fn(pool_config) -> send(self, {:start_pool, pool_config}) end) {:ok, pools_config} end def handle_info({:start_pool, pool_config}, state) do {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config)) {:noreply, state} end defp supervisor_spec(pool_config) do opts = [id: :"#{pool_config[:name]}Supervisor"] supervisor(Pooly.PoolSupervisor, [pool_config], opts) end end send(self, {:start_pool, pool_config}) def handle_info({:start_pool, pool_config}, state) do {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config)) {:noreply, state} end defp supervisor_spec(pool_config) do opts = [id: :"#{pool_config[:name]}Supervisor"] supervisor(Pooly.PoolSupervisor, [pool_config], opts) end Unique spec ID!
  58. defmodule Pooly.PoolSupervisor do use Supervisor def start_link(pool_config) do Supervisor.start_link(__MODULE__, pool_config,

    name: :"#{pool_config[:name]}Supervisor") end def init(pool_config) do children = [ worker(Pooly.PoolServer, [self, pool_config]) ] supervise(children, strategy: :one_for_all) end end
  59. defmodule Pooly.PoolSupervisor do use Supervisor def start_link(pool_config) do Supervisor.start_link(__MODULE__, pool_config,

    name: :"#{pool_config[:name]}Supervisor") end def init(pool_config) do children = [ worker(Pooly.PoolServer, [self, pool_config]) ] supervise(children, strategy: :one_for_all) end end def init(pool_config) do children = [ worker(Pooly.PoolServer, [self, pool_config]) ] supervise(children, strategy: :one_for_all) end
  60. defmodule Pooly.PoolSupervisor do use Supervisor def start_link(pool_config) do Supervisor.start_link(__MODULE__, pool_config,

    name: :"#{pool_config[:name]}Supervisor") end def init(pool_config) do children = [ worker(Pooly.PoolServer, [self, pool_config]) ] supervise(children, strategy: :one_for_all) end end name: :"#{pool_config[:name]}Supervisor"
  61. Implementing the pool server defmodule Pooly.PoolServer do defmodule State do

    defstruct pool_sup: nil, worker_sup: nil, monitors: nil, size: nil, workers: nil, name: nil, mfa: nil end end
  62. Implementing the pool server defmodule Pooly.PoolServer do defmodule State do

    defstruct pool_sup: nil, worker_sup: nil, monitors: nil, size: nil, workers: nil, name: nil, mfa: nil end end pool_sup: nil, worker_sup: nil
  63. Implementing the pool server defmodule Pooly.PoolServer do defmodule State do

    defstruct pool_sup: nil, worker_sup: nil, monitors: nil, size: nil, workers: nil, name: nil, mfa: nil end end monitors: nil, size: nil, workers: nil, name: nil, mfa: nil
  64. defmodule Pooly.WorkerSupervisor do use Supervisor def start_link(pool_server, {_,_,_} = mfa)

    do Supervisor.start_link(__MODULE__, [pool_server, mfa]) end def init([pool_server, {m,f,a}]) do Process.link(pool_server) worker_opts = [restart: :temporary, shutdown: 5000, function: f] children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5] supervise(children, opts) end end
  65. defmodule Pooly.WorkerSupervisor do use Supervisor def start_link(pool_server, {_,_,_} = mfa)

    do Supervisor.start_link(__MODULE__, [pool_server, mfa]) end def init([pool_server, {m,f,a}]) do Process.link(pool_server) worker_opts = [restart: :temporary, shutdown: 5000, function: f] children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5] supervise(children, opts) end end def start_link(pool_server, {_,_,_} = mfa) do Supervisor.start_link(__MODULE__, [pool_server, mfa]) end
  66. defmodule Pooly.WorkerSupervisor do use Supervisor def start_link(pool_server, {_,_,_} = mfa)

    do Supervisor.start_link(__MODULE__, [pool_server, mfa]) end def init([pool_server, {m,f,a}]) do Process.link(pool_server) worker_opts = [restart: :temporary, shutdown: 5000, function: f] children = [worker(m, a, worker_opts)] opts = [strategy: :simple_one_for_one, max_restarts: 5, max_seconds: 5] supervise(children, opts) end end def start_link(pool_server, {_,_,_} = mfa) do Supervisor.start_link(__MODULE__, [pool_server, mfa]) end def init([pool_server, {m,f,a}]) do Process.link(pool_server) end
  67. Handling a crash when worker supervisor goes down. defmodule Pooly.PoolServer

    do def handle_info({:EXIT, worker_sup, reason}, state) do %{worker_sup: ^worker_sup} = state {:stop, reason, state} end end
  68. Handling a crash when worker supervisor goes down. defmodule Pooly.PoolServer

    do def handle_info({:EXIT, worker_sup, reason}, state) do %{worker_sup: ^worker_sup} = state {:stop, reason, state} end end reason {:stop, reason, state}
  69. Handling a crash when worker supervisor goes down. defmodule Pooly.PoolServer

    do def handle_info({:EXIT, worker_sup, reason}, state) do %{worker_sup: ^worker_sup} = state {:stop, reason, state} end end worker_sup %{worker_sup: ^worker_sup} = state
  70. Version 3 Type of Pool Single Multiple Creation of Workers

    Fixed Dynamic Consumer Recovery No Yes Worker Recovery No Yes Queueing for busy workers No Yes
  71. Version 4 Type of Pool Single Multiple Creation of Workers

    Fixed Dynamic Consumer Recovery No Yes Worker Recovery No Yes Queueing for busy workers No Yes
  72. defmodule Pooly do def start(_type, _args) do pools_config = [

    [name: "ChuckNorris", mfa: {ChuckFetcher, :start_link, []}, size: 2, max_overflow: 3 ], [name: "StarWars", mfa: {SwapiFetcher, :start_link, []}, size: 4, max_overflow: 3 ] ] start_pools(pools_config) end end
  73. defmodule Pooly.PoolServer do defmodule State do defstruct pool_sup: nil, worker_sup:

    nil, monitors: nil, size: nil, workers: nil, name: nil, mfa: nil, overflow: nil, max_overflow: nil end end end
  74. defmodule Pooly.PoolServer do defmodule State do defstruct pool_sup: nil, worker_sup:

    nil, monitors: nil, size: nil, workers: nil, name: nil, mfa: nil, overflow: nil, max_overflow: nil end end end overflow: nil, max_overflow: nil
  75. defmodule Pooly.PoolServer do def handle_call(:checkout, {from_pid, _ref} = from, state)

    do %{worker_sup: worker_sup, workers: workers, monitors: monitors, overflow: overflow, max_overflow: max_overflow} = state case workers do [worker|rest] -> # ... {:reply, worker, %{state | workers: rest}} [] when max_overflow > 0 and overflow < max_overflow -> {worker, ref} = new_worker(worker_sup, from_pid) true = :ets.insert(monitors, {worker, ref}) {:reply, worker, %{state | overflow: overflow+1}} [] -> {:reply, :full, state}; end end end Overflow: Handling Worker Checkouts
  76. defmodule Pooly.PoolServer do def handle_call(:checkout, {from_pid, _ref} = from, state)

    do %{worker_sup: worker_sup, workers: workers, monitors: monitors, overflow: overflow, max_overflow: max_overflow} = state case workers do [worker|rest] -> # ... {:reply, worker, %{state | workers: rest}} [] when max_overflow > 0 and overflow < max_overflow -> {worker, ref} = new_worker(worker_sup, from_pid) true = :ets.insert(monitors, {worker, ref}) {:reply, worker, %{state | overflow: overflow+1}} [] -> {:reply, :full, state}; end end end Overflow: Handling Worker Checkouts defmodule Pooly.PoolServer do def handle_call(:checkout, {from_pid, _ref} = from, state) do end case workers do end [] when max_overflow > 0 and overflow < max_overflow ->
  77. defmodule Pooly.PoolServer do def handle_call(:checkout, {from_pid, _ref} = from, state)

    do %{worker_sup: worker_sup, workers: workers, monitors: monitors, overflow: overflow, max_overflow: max_overflow} = state case workers do [worker|rest] -> # ... {:reply, worker, %{state | workers: rest}} [] when max_overflow > 0 and overflow < max_overflow -> {worker, ref} = new_worker(worker_sup, from_pid) true = :ets.insert(monitors, {worker, ref}) {:reply, worker, %{state | overflow: overflow+1}} [] -> {:reply, :full, state}; end end end Overflow: Handling Worker Checkouts defmodule Pooly.PoolServer do def handle_call(:checkout, {from_pid, _ref} = from, state) do end case workers do end [] when max_overflow > 0 and overflow < max_overflow -> {worker, ref} = new_worker(worker_sup, from_pid) true = :ets.insert(monitors, {worker, ref})
  78. defmodule Pooly.PoolServer do def handle_call(:checkout, {from_pid, _ref} = from, state)

    do %{worker_sup: worker_sup, workers: workers, monitors: monitors, overflow: overflow, max_overflow: max_overflow} = state case workers do [worker|rest] -> # ... {:reply, worker, %{state | workers: rest}} [] when max_overflow > 0 and overflow < max_overflow -> {worker, ref} = new_worker(worker_sup, from_pid) true = :ets.insert(monitors, {worker, ref}) {:reply, worker, %{state | overflow: overflow+1}} [] -> {:reply, :full, state}; end end end Overflow: Handling Worker Checkouts defmodule Pooly.PoolServer do def handle_call(:checkout, {from_pid, _ref} = from, state) do end case workers do end [] when max_overflow > 0 and overflow < max_overflow -> {worker, ref} = new_worker(worker_sup, from_pid) true = :ets.insert(monitors, {worker, ref}) {:reply, worker, %{state | overflow: overflow+1}}
  79. {:noreply, %{state | workers: [pid|workers]}} Worker dismissal: Unlink + Terminate

    Child Overflow: Handling Worker CHECKINS Previously: Now:
  80. defp handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state if overflow > 0 do :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} else %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end defp dismiss_worker(sup, pid) do true = Process.unlink(pid) Supervisor.terminate_child(sup, pid) end
  81. defp handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state if overflow > 0 do :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} else %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end defp dismiss_worker(sup, pid) do true = Process.unlink(pid) Supervisor.terminate_child(sup, pid) end if overflow > 0 do :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} defp dismiss_worker(sup, pid) do true = Process.unlink(pid) Supervisor.terminate_child(sup, pid) end
  82. defp handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state if overflow > 0 do :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} else %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end defp dismiss_worker(sup, pid) do true = Process.unlink(pid) Supervisor.terminate_child(sup, pid) end else %{state | waiting: empty, workers: [pid|workers], overflow: 0}
  83. HANDLING WORKER EXITS defmodule Pooly.PoolServer do defp handle_worker_exit(pid, state) do

    %{worker_sup: worker_sup, workers: workers, monitors: monitors, overflow: overflow} = state if overflow > 0 do %{state | overflow: overflow-1} else %{state | workers: [new_worker(worker_sup)|workers]} end end end defp handle_checkin(pid, state) do if overflow > 0 do :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} else %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end
  84. HANDLING WORKER EXITS defmodule Pooly.PoolServer do defp handle_worker_exit(pid, state) do

    %{worker_sup: worker_sup, workers: workers, monitors: monitors, overflow: overflow} = state if overflow > 0 do %{state | overflow: overflow-1} else %{state | workers: [new_worker(worker_sup)|workers]} end end end if overflow > 0 do %{state | overflow: overflow-1}
  85. HANDLING WORKER EXITS defmodule Pooly.PoolServer do defp handle_worker_exit(pid, state) do

    %{worker_sup: worker_sup, workers: workers, monitors: monitors, overflow: overflow} = state if overflow > 0 do %{state | overflow: overflow-1} else %{state | workers: [new_worker(worker_sup)|workers]} end end end else %{state | workers: [new_worker(worker_sup)|workers]}
  86. HANDLING WORKER EXITS defmodule Pooly.PoolServer do def handle_info({:EXIT, pid, _reason},

    state) do %{monitors: monitors, workers: workers, worker_sup: worker_sup} = state case :ets.lookup(monitors, pid) do [{pid, ref}] -> # ... new_state = handle_worker_exit(pid, state) {:noreply, new_state} _ -> {:noreply, state} end end end
  87. defmodule Pooly.PoolServer do defmodule State do defstruct ..., waiting: nil,

    overflow: nil, max_overflow: nil end def init([pool_sup, pool_config]) when is_pid(pool_sup) do Process.flag(:trap_exit, true) monitors = :ets.new(:monitors, [:private]) waiting = :queue.new state = %State{pool_sup: pool_sup, monitors: monitors, waiting: waiting, overflow: 0} init(pool_config, state) end end
  88. defmodule Pooly.PoolServer do defmodule State do defstruct ..., waiting: nil,

    overflow: nil, max_overflow: nil end def init([pool_sup, pool_config]) when is_pid(pool_sup) do Process.flag(:trap_exit, true) monitors = :ets.new(:monitors, [:private]) waiting = :queue.new state = %State{pool_sup: pool_sup, monitors: monitors, waiting: waiting, overflow: 0} init(pool_config, state) end end defstruct ..., waiting: nil, overflow: nil, max_overflow: nil waiting = :queue.new
  89. defmodule Pooly.PoolServer do defmodule State do defstruct ..., waiting: nil,

    overflow: nil, max_overflow: nil end def init([pool_sup, pool_config]) when is_pid(pool_sup) do Process.flag(:trap_exit, true) monitors = :ets.new(:monitors, [:private]) waiting = :queue.new state = %State{pool_sup: pool_sup, monitors: monitors, waiting: waiting, overflow: 0} init(pool_config, state) end end state = %State{pool_sup: pool_sup, monitors: monitors, waiting: waiting, overflow: 0} init(pool_config, state) waiting = :queue.new defstruct ..., waiting: nil, overflow: nil, max_overflow: nil
  90. QUEUING: CHECKING OUT def handle_call({:checkout, block}, {from_pid, _ref} = from,

    state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors, waiting: waiting, overflow: overflow, max_overflow: max_overflow} = state case workers do [worker|rest] -> # .. [] when max_overflow > 0 and overflow < max_overflow -> # … [] when block == true -> ref = Process.monitor(from_pid) waiting = :queue.in({from, ref}, waiting) {:noreply, %{state | waiting: waiting}, :infinity} [] -> {:reply, :full, state} end end
  91. QUEUING: CHECKING OUT %{worker_sup: worker_sup, workers: workers, monitors: monitors, waiting:

    waiting, overflow: overflow, max_overflow: max_overflow} = state def handle_call({:checkout, block}, {from_pid, _ref} = from, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors, waiting: waiting, overflow: overflow, max_overflow: max_overflow} = state case workers do [worker|rest] -> # .. [] when max_overflow > 0 and overflow < max_overflow -> # … [] when block == true -> ref = Process.monitor(from_pid) waiting = :queue.in({from, ref}, waiting) {:noreply, %{state | waiting: waiting}, :infinity} [] -> {:reply, :full, state} end end
  92. QUEUING: CHECKING OUT {from_pid, _ref} = from [] when block

    == true -> ref = Process.monitor(from_pid) waiting = :queue.in({from, ref}, waiting) {:noreply, %{state | waiting: waiting}, :infinity} case workers do end def handle_call({:checkout, block}, {from_pid, _ref} = from, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors, waiting: waiting, overflow: overflow, max_overflow: max_overflow} = state case workers do [worker|rest] -> # .. [] when max_overflow > 0 and overflow < max_overflow -> # … [] when block == true -> ref = Process.monitor(from_pid) waiting = :queue.in({from, ref}, waiting) {:noreply, %{state | waiting: waiting}, :infinity} [] -> {:reply, :full, state} end end
  93. def handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state case :queue.out(waiting) do {{:value, {from, ref}}, left} -> true = :ets.insert(monitors, {pid, ref}) GenServer.reply(from, pid) %{state | waiting: left} {:empty, empty} when overflow > 0 -> :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} {:empty, empty} -> %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end QUEUING: CHECKING IN
  94. def handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state case :queue.out(waiting) do {{:value, {from, ref}}, left} -> true = :ets.insert(monitors, {pid, ref}) GenServer.reply(from, pid) %{state | waiting: left} {:empty, empty} when overflow > 0 -> :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} {:empty, empty} -> %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end QUEUING: CHECKING IN case :queue.out(waiting) do end
  95. def handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state case :queue.out(waiting) do {{:value, {from, ref}}, left} -> true = :ets.insert(monitors, {pid, ref}) GenServer.reply(from, pid) %{state | waiting: left} {:empty, empty} when overflow > 0 -> :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} {:empty, empty} -> %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end QUEUING: CHECKING IN case :queue.out(waiting) do end {{:value, {from, ref}}, left} -> true = :ets.insert(monitors, {pid, ref})
  96. def handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state case :queue.out(waiting) do {{:value, {from, ref}}, left} -> true = :ets.insert(monitors, {pid, ref}) GenServer.reply(from, pid) %{state | waiting: left} {:empty, empty} when overflow > 0 -> :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} {:empty, empty} -> %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end QUEUING: CHECKING IN case :queue.out(waiting) do end {{:value, {from, ref}}, left} -> GenServer.reply(from, pid) %{state | waiting: left}
  97. def handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state case :queue.out(waiting) do {{:value, {from, ref}}, left} -> true = :ets.insert(monitors, {pid, ref}) GenServer.reply(from, pid) %{state | waiting: left} {:empty, empty} when overflow > 0 -> :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} {:empty, empty} -> %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end QUEUING: CHECKING IN case :queue.out(waiting) do end {:empty, empty} when overflow > 0 -> :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1}
  98. def handle_checkin(pid, state) do %{worker_sup: worker_sup, workers: workers, monitors: monitors,

    waiting: waiting, overflow: overflow} = state case :queue.out(waiting) do {{:value, {from, ref}}, left} -> true = :ets.insert(monitors, {pid, ref}) GenServer.reply(from, pid) %{state | waiting: left} {:empty, empty} when overflow > 0 -> :ok = dismiss_worker(worker_sup, pid) %{state | waiting: empty, overflow: overflow-1} {:empty, empty} -> %{state | waiting: empty, workers: [pid|workers], overflow: 0} end end QUEUING: CHECKING IN case :queue.out(waiting) do end {:empty, empty} -> %{state | waiting: empty, workers: [pid|workers], overflow: 0}
  99. def transaction(pool_name, fun, timeout) do worker = checkout(pool_name, true, timeout)

    try do fun.(worker) after checkin(pool_name, worker) end end TRANSACTIONS
  100. def transaction(pool_name, fun, timeout) do worker = checkout(pool_name, true, timeout)

    try do fun.(worker) after checkin(pool_name, worker) end end pool_name TRANSACTIONS
  101. def transaction(pool_name, fun, timeout) do worker = checkout(pool_name, true, timeout)

    try do fun.(worker) after checkin(pool_name, worker) end end fun TRANSACTIONS
  102. def transaction(pool_name, fun, timeout) do worker = checkout(pool_name, true, timeout)

    try do fun.(worker) after checkin(pool_name, worker) end end timeout TRANSACTIONS
  103. def transaction(pool_name, fun, timeout) do worker = checkout(pool_name, true, timeout)

    try do fun.(worker) after checkin(pool_name, worker) end end worker = checkout(pool_name, true, timeout) TRANSACTIONS
  104. def transaction(pool_name, fun, timeout) do worker = checkout(pool_name, true, timeout)

    try do fun.(worker) after checkin(pool_name, worker) end end fun.(worker) TRANSACTIONS
  105. def transaction(pool_name, fun, timeout) do worker = checkout(pool_name, true, timeout)

    try do fun.(worker) after checkin(pool_name, worker) end end try do fun.(worker) after checkin(pool_name, worker) end TRANSACTIONS
  106. TRANSACTIONS tasks = 1..5 |> Enum.map(fn(_) -> Task.async(fn -> Pooly.transaction("ChuckNorris",

    fn(worker_pid) -> ChuckFetcher.fetch(worker_pid) end, 5_000) end) end) tasks |> Enum.map(&Task.await(&1, 5_000))
  107. TRANSACTIONS tasks = 1..5 |> Enum.map(fn(_) -> Task.async(fn -> Pooly.transaction("ChuckNorris",

    fn(worker_pid) -> ChuckFetcher.fetch(worker_pid) end, 5_000) end) end) tasks |> Enum.map(&Task.await(&1, 5_000)) Pooly.transaction("ChuckNorris", fn(worker_pid) -> end, 5_000)
  108. TRANSACTIONS tasks = 1..5 |> Enum.map(fn(_) -> Task.async(fn -> Pooly.transaction("ChuckNorris",

    fn(worker_pid) -> ChuckFetcher.fetch(worker_pid) end, 5_000) end) end) tasks |> Enum.map(&Task.await(&1, 5_000)) Pooly.transaction("ChuckNorris", fn(worker_pid) -> ChuckFetcher.fetch(worker_pid) end, 5_000)
  109. TRANSACTIONS tasks = 1..5 |> Enum.map(fn(_) -> Task.async(fn -> Pooly.transaction("ChuckNorris",

    fn(worker_pid) -> ChuckFetcher.fetch(worker_pid) end, 5_000) end) end) tasks |> Enum.map(&Task.await(&1, 5_000)) Task.async(fn -> Pooly.transaction("ChuckNorris", fn(worker_pid) -> ChuckFetcher.fetch(worker_pid) end, 5_000) end)
  110. TRANSACTIONS tasks = 1..5 |> Enum.map(fn(_) -> Task.async(fn -> Pooly.transaction("ChuckNorris",

    fn(worker_pid) -> ChuckFetcher.fetch(worker_pid) end, 5_000) end) end) tasks |> Enum.map(&Task.await(&1, 5_000))
  111. THE CHEATSHEET GenServer! Initialization def start_link(opts \\ []) do! GenServer.start_link(__MODULE__,

    :ok, opts)! end! CLIENT def init(:ok) do! state = init_state()! {:ok, state}! end! CALLBACK {:ok, state} ! {ok, state, 5_000} ! {:ok, state, :hibernate}! {:stop, reason*} ! :ignore! RETURN VALUES Synchronous Operation def sync_op(pid, args) do! GenServer.call(pid, {:sync_op, args})! end! CLIENT def handle_call({:sync_op, args}, from, state) do! new_state = f(state, args)! {:reply, new_state}! end! CALLBACK {:reply, reply, new_state}! {:reply, reply, new_state, 5_000}! {:reply, reply, new_state, :hibernate}! ! {:noreply, new_state}! {:noreply, new_state, 5_000}! {:noreply, new_state, :hibernate}! {:stop, reason*, reply, new_state}! {:stop, reason*, new_state}! RETURN VAL Asynchronous Operation def async_op(pid, args) do! GenServer.cast(pid, {:async_op, args})! end! CLIENT def handle_cast({:async_op, args}, state) do! new_state = f(state, args)! {:noreply, new_state}! end! CALLBACK {:noreply, new_state}! {:noreply, new_state, 5_000}! {:noreply, new_state, :hibernate}! {:stop, reason*, new_state} ! RETURN VAL Returns {:ok, pid}! Version 1.0! Copyright © Benjamin Tan Wei Hao. Free to use without modification for non-commercial applications. Resources http://bit.ly/genservercheatsheet http://bit.ly/supcheatsheet