Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Elixir GenStage & Flow
Search
Tymon Tobolski
May 26, 2017
Programming
2
1k
Elixir GenStage & Flow
Poznań Elixir Meetup, 25.05.2017
Tymon Tobolski
May 26, 2017
Tweet
Share
More Decks by Tymon Tobolski
See All by Tymon Tobolski
Only possible with Elixir - ubots Case Study
teamon
0
240
Fun with Elixir Macros
teamon
1
470
Elixir - Bydgoszcz Web Development Meetup
teamon
2
820
Sidekiq
teamon
1
150
Git - Monterail style
teamon
1
170
Rails Assets wroc_love.rb
teamon
1
720
Angular replacements for jQuery-based libraries
teamon
1
330
Angular replacements for jQuery-based libraries
teamon
2
310
Rails Assets LRUG
teamon
0
7.5k
Other Decks in Programming
See All in Programming
TypeScriptでDXを上げろ! Hono編
yusukebe
4
900
#QiitaBash TDDで(自分の)開発がどう変わったか
ryosukedtomita
1
280
코딩 에이전트 체크리스트: Claude Code ver.
nacyot
0
1k
構造化・自動化・ガードレール - Vibe Coding実践記 -
tonegawa07
0
160
PHPUnitの限界をPlaywrightで補完するテストアプローチ
yuzneri
0
360
Streamlitで実現できるようになったこと、実現してくれたこと
ayumu_yamaguchi
2
250
JetBrainsのAI機能の紹介 #jjug
yusuke
0
160
脱Riverpod?fqueryで考える、TanStack Queryライクなアーキテクチャの可能性
ostk0069
0
580
Prompt Engineeringの再定義「Context Engineering」とは
htsuruo
0
110
Google I/O Extended Incheon 2025 ~ What's new in Android development tools
pluu
1
210
AIのメモリー
watany
11
1.1k
CLI ツールを Go ライブラリ として再実装する理由 / Why reimplement a CLI tool as a Go library
ktr_0731
3
830
Featured
See All Featured
Understanding Cognitive Biases in Performance Measurement
bluesmoon
29
1.8k
Become a Pro
speakerdeck
PRO
29
5.5k
Building an army of robots
kneath
306
45k
How To Stay Up To Date on Web Technology
chriscoyier
790
250k
The Illustrated Children's Guide to Kubernetes
chrisshort
48
50k
Adopting Sorbet at Scale
ufuk
77
9.5k
Fantastic passwords and where to find them - at NoRuKo
philnash
51
3.4k
I Don’t Have Time: Getting Over the Fear to Launch Your Podcast
jcasabona
33
2.4k
Java REST API Framework Comparison - PWX 2021
mraible
31
8.7k
Product Roadmaps are Hard
iamctodd
PRO
54
11k
ピンチをチャンスに:未来をつくるプロダクトロードマップ #pmconf2020
aki_iinuma
126
53k
ReactJS: Keep Simple. Everything can be a component!
pedronauck
667
120k
Transcript
GenStage & Flow Poznań Elixir Meetup 25.05.2017
None
Hi! Tymon Tobolski • GitHub: teamon • Twitter: @iteamon •
Blog: teamon.eu • 8+ years with Ruby, 2+ years with Elixir • Currently Elixir+Dev+Ops @ Recruitee.com • Hex: tesla, mix_docker
None
The Job
Library defmodule Poz do def select do items = Enum.to_list
(1..100) :timer.sleep(:rand.uniform(100)) Progress.incr(:select, length(items)) items end def download(record) do :timer.sleep(:rand.uniform(100)) Progress.incr(:download) {:file, record} end
Library cont. def extract(file) do :timer.sleep(:rand.uniform(10)) Progress.incr(:extract) {:text, file} end
def index(texts) do :timer.sleep(:rand.uniform(1000)) Progress.incr(:index, length(texts)) :ok end end
Example blueprint defmodule Example do import Poz def run do
Progress.start_link([:select, :download, :extract, :index]) # work work work Progress.stop() end end
Example 1 - Enum select() |> Enum.map(&download/1) |> Enum.map(&extract/1) |>
index()
Example 1 - Enum
Example 2 - Stream select() |> Stream.concat([]) |> Stream.map(&download/1) |>
Stream.map(&extract/1) |> Enum.to_list() |> index()
Example 2 - Stream
Example 3 - Task.async select() |> Enum.map(fn e -> Task.async(Poz,
:download, [e]) end) |> Enum.map(&Task.await/1) |> Enum.map(fn e -> Task.async(Poz, :extract, [e]) end) |> Enum.map(&Task.await/1) |> index()
Example 3 - Task.async
Stages
GenStage explained
Example 4 - GenStage - SELECT defmodule Select do use
GenStage def init(_), do: {:producer, :ok} def handle_demand(demand, :ok) do items = Poz.select() {:noreply, items, :empty} end def handle_demand(_demand, :empty) do {:noreply, [], :empty} end end
Example 4 - GenStage - DOWNLOAD defmodule Download do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(items, _from, state) do files = Enum.map(items, &Poz.download/1) {:noreply, files, state} end end
Example 4 - GenStage - EXTRACT defmodule Extract do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(files, _from, state) do texts = Enum.map(files, &Poz.extract/1) {:noreply, files, texts} end end
Example 4 - GenStage - INDEX defmodule Index do use
GenStage def init(_), do: {:consumer, :ok} def handle_events(texts, _from, state) do Poz.index(texts) {:noreply, [], state} end end
Example 4 - GenStage {:ok, select} = GenStage.start_link(Select, :ok) {:ok,
download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select) GenStage.sync_subscribe(extract, to: download) GenStage.sync_subscribe(index, to: extract) :timer.sleep(:infinity)
Example 4 - GenStage (defaults)
Example 5 - GenStage (custom demand) {:ok, select} = GenStage.start_link(Select,
:ok) {:ok, download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) GenStage.sync_subscribe(index, to: extract, max_demand: 100) :timer.sleep(:infinity)
Example 5 - GenStage (tuned demand)
Example 6 - GenStage (multiprocess) {:ok, select} = GenStage.start_link(Select, :ok)
{:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) for i <- (1..10) do {:ok, download} = GenStage.start_link(Download, i) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) end GenStage.sync_subscribe(index, to: extract, max_demand: 100)
None
Example 7 - Flow select() |> Flow.from_enumerable(max_demand: 20) |> Flow.partition(max_demand:
20, stages: 5) |> Flow.map(&download/1) |> Flow.partition(max_demand: 20, stages: 2) |> Flow.map(&extract/1) |> Flow.partition(window: Flow.Window.count(50), stages: 1) |> Flow.reduce(fn -> [] end, fn item, list -> [item | list] end) |> Flow.emit(:state) |> Flow.partition(stages: 2) |> Flow.map(&index/1) |> Flow.run()
Example 7 - Flow
Real World
Real World - optimised
References • https:/ /github.com/teamon/poz • https:/ /elixir-lang.org/blog/2016/07/14/announcing-genstage/ • https:/ /hexdocs.pm/gen_stage/GenStage.html
• https:/ /hexdocs.pm/flow/Flow.html • http:/ /teamon.eu/2016/tuning-elixir-genstage-flow-pipeline- processing/ • http:/ /teamon.eu/2016/measuring-visualizing-genstage-flow- with-gnuplot/
Thanks! Questions?