Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Elixir GenStage & Flow
Search
Tymon Tobolski
May 26, 2017
Programming
2
1k
Elixir GenStage & Flow
Poznań Elixir Meetup, 25.05.2017
Tymon Tobolski
May 26, 2017
Tweet
Share
More Decks by Tymon Tobolski
See All by Tymon Tobolski
Only possible with Elixir - ubots Case Study
teamon
0
240
Fun with Elixir Macros
teamon
1
480
Elixir - Bydgoszcz Web Development Meetup
teamon
2
840
Sidekiq
teamon
1
160
Git - Monterail style
teamon
1
170
Rails Assets wroc_love.rb
teamon
1
720
Angular replacements for jQuery-based libraries
teamon
1
340
Angular replacements for jQuery-based libraries
teamon
2
320
Rails Assets LRUG
teamon
0
7.5k
Other Decks in Programming
See All in Programming
アルテニア コンサル/ITエンジニア向け 採用ピッチ資料
altenir
0
110
「手軽で便利」に潜む罠。 Popover API を WCAG 2.2の視点で安全に使うには
taitotnk
0
870
複雑なドメインに挑む.pdf
yukisakai1225
5
1.2k
概念モデル→論理モデルで気をつけていること
sunnyone
3
290
私の後悔をAWS DMSで解決した話
hiramax
4
210
Ruby Parser progress report 2025
yui_knk
1
450
今だからこそ入門する Server-Sent Events (SSE)
nearme_tech
PRO
3
240
為你自己學 Python - 冷知識篇
eddie
1
350
アプリの "かわいい" を支えるアニメーションツールRiveについて
uetyo
0
270
Namespace and Its Future
tagomoris
6
700
プロポーザル駆動学習 / Proposal-Driven Learning
mackey0225
2
1.3k
MCPでVibe Working。そして、結局はContext Eng(略)/ Working with Vibe on MCP And Context Eng
rkaga
5
2.3k
Featured
See All Featured
Typedesign – Prime Four
hannesfritz
42
2.8k
Principles of Awesome APIs and How to Build Them.
keavy
126
17k
Build your cross-platform service in a week with App Engine
jlugia
231
18k
Making the Leap to Tech Lead
cromwellryan
135
9.5k
Building an army of robots
kneath
306
46k
Documentation Writing (for coders)
carmenintech
74
5k
The MySQL Ecosystem @ GitHub 2015
samlambert
251
13k
GraphQLとの向き合い方2022年版
quramy
49
14k
A designer walks into a library…
pauljervisheath
207
24k
Six Lessons from altMBA
skipperchong
28
4k
Scaling GitHub
holman
463
140k
Practical Orchestrator
shlominoach
190
11k
Transcript
GenStage & Flow Poznań Elixir Meetup 25.05.2017
None
Hi! Tymon Tobolski • GitHub: teamon • Twitter: @iteamon •
Blog: teamon.eu • 8+ years with Ruby, 2+ years with Elixir • Currently Elixir+Dev+Ops @ Recruitee.com • Hex: tesla, mix_docker
None
The Job
Library defmodule Poz do def select do items = Enum.to_list
(1..100) :timer.sleep(:rand.uniform(100)) Progress.incr(:select, length(items)) items end def download(record) do :timer.sleep(:rand.uniform(100)) Progress.incr(:download) {:file, record} end
Library cont. def extract(file) do :timer.sleep(:rand.uniform(10)) Progress.incr(:extract) {:text, file} end
def index(texts) do :timer.sleep(:rand.uniform(1000)) Progress.incr(:index, length(texts)) :ok end end
Example blueprint defmodule Example do import Poz def run do
Progress.start_link([:select, :download, :extract, :index]) # work work work Progress.stop() end end
Example 1 - Enum select() |> Enum.map(&download/1) |> Enum.map(&extract/1) |>
index()
Example 1 - Enum
Example 2 - Stream select() |> Stream.concat([]) |> Stream.map(&download/1) |>
Stream.map(&extract/1) |> Enum.to_list() |> index()
Example 2 - Stream
Example 3 - Task.async select() |> Enum.map(fn e -> Task.async(Poz,
:download, [e]) end) |> Enum.map(&Task.await/1) |> Enum.map(fn e -> Task.async(Poz, :extract, [e]) end) |> Enum.map(&Task.await/1) |> index()
Example 3 - Task.async
Stages
GenStage explained
Example 4 - GenStage - SELECT defmodule Select do use
GenStage def init(_), do: {:producer, :ok} def handle_demand(demand, :ok) do items = Poz.select() {:noreply, items, :empty} end def handle_demand(_demand, :empty) do {:noreply, [], :empty} end end
Example 4 - GenStage - DOWNLOAD defmodule Download do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(items, _from, state) do files = Enum.map(items, &Poz.download/1) {:noreply, files, state} end end
Example 4 - GenStage - EXTRACT defmodule Extract do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(files, _from, state) do texts = Enum.map(files, &Poz.extract/1) {:noreply, files, texts} end end
Example 4 - GenStage - INDEX defmodule Index do use
GenStage def init(_), do: {:consumer, :ok} def handle_events(texts, _from, state) do Poz.index(texts) {:noreply, [], state} end end
Example 4 - GenStage {:ok, select} = GenStage.start_link(Select, :ok) {:ok,
download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select) GenStage.sync_subscribe(extract, to: download) GenStage.sync_subscribe(index, to: extract) :timer.sleep(:infinity)
Example 4 - GenStage (defaults)
Example 5 - GenStage (custom demand) {:ok, select} = GenStage.start_link(Select,
:ok) {:ok, download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) GenStage.sync_subscribe(index, to: extract, max_demand: 100) :timer.sleep(:infinity)
Example 5 - GenStage (tuned demand)
Example 6 - GenStage (multiprocess) {:ok, select} = GenStage.start_link(Select, :ok)
{:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) for i <- (1..10) do {:ok, download} = GenStage.start_link(Download, i) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) end GenStage.sync_subscribe(index, to: extract, max_demand: 100)
None
Example 7 - Flow select() |> Flow.from_enumerable(max_demand: 20) |> Flow.partition(max_demand:
20, stages: 5) |> Flow.map(&download/1) |> Flow.partition(max_demand: 20, stages: 2) |> Flow.map(&extract/1) |> Flow.partition(window: Flow.Window.count(50), stages: 1) |> Flow.reduce(fn -> [] end, fn item, list -> [item | list] end) |> Flow.emit(:state) |> Flow.partition(stages: 2) |> Flow.map(&index/1) |> Flow.run()
Example 7 - Flow
Real World
Real World - optimised
References • https:/ /github.com/teamon/poz • https:/ /elixir-lang.org/blog/2016/07/14/announcing-genstage/ • https:/ /hexdocs.pm/gen_stage/GenStage.html
• https:/ /hexdocs.pm/flow/Flow.html • http:/ /teamon.eu/2016/tuning-elixir-genstage-flow-pipeline- processing/ • http:/ /teamon.eu/2016/measuring-visualizing-genstage-flow- with-gnuplot/
Thanks! Questions?