Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Elixir GenStage & Flow
Search
Tymon Tobolski
May 26, 2017
Programming
2
970
Elixir GenStage & Flow
Poznań Elixir Meetup, 25.05.2017
Tymon Tobolski
May 26, 2017
Tweet
Share
More Decks by Tymon Tobolski
See All by Tymon Tobolski
Only possible with Elixir - ubots Case Study
teamon
0
220
Fun with Elixir Macros
teamon
1
430
Elixir - Bydgoszcz Web Development Meetup
teamon
2
760
Sidekiq
teamon
1
140
Git - Monterail style
teamon
1
160
Rails Assets wroc_love.rb
teamon
1
690
Angular replacements for jQuery-based libraries
teamon
1
310
Angular replacements for jQuery-based libraries
teamon
2
300
Rails Assets LRUG
teamon
0
7.4k
Other Decks in Programming
See All in Programming
複数ドメインに散らばってしまった画像…! 運用中のPHPアプリに後からCDNを導入する…!
suguruooki
0
450
Denoでフロントエンド開発 2025年春版 / Frontend Development with Deno (Spring 2025)
petamoriken
1
1.3k
Fluent UI Blazor 5 (alpha)の紹介
tomokusaba
0
160
家族・子育て重視/沖縄在住を維持しながらエンジニアとしてのキャリアをどのように育てていくか?
ug
0
260
The Evolution of Enterprise Java with Jakarta EE 11 and Beyond
ivargrimstad
0
1.4k
リアクティブシステムの変遷から理解するalien-signals / Learning alien-signals from the evolution of reactive systems
yamanoku
2
1.2k
SLI/SLOの設定を進めるその前に アラート品質の改善に取り組んだ話
tanden
2
770
エンジニア未経験が最短で戦力になるためのTips
gokana
0
240
Kamal 2 – Get Out of the Cloud
aleksandrov
0
140
Preact、HooksとSignalsの両立 / Preact: Harmonizing Hooks and Signals
ssssota
1
1.1k
パスキーのすべて / 20250324 iddance Lesson.5
kuralab
0
140
ベクトル検索システムの気持ち
monochromegane
30
9.5k
Featured
See All Featured
The MySQL Ecosystem @ GitHub 2015
samlambert
251
12k
Templates, Plugins, & Blocks: Oh My! Creating the theme that thinks of everything
marktimemedia
30
2.3k
Understanding Cognitive Biases in Performance Measurement
bluesmoon
28
1.6k
Dealing with People You Can't Stand - Big Design 2015
cassininazir
367
25k
Save Time (by Creating Custom Rails Generators)
garrettdimon
PRO
30
1.1k
Let's Do A Bunch of Simple Stuff to Make Websites Faster
chriscoyier
507
140k
The Cult of Friendly URLs
andyhume
78
6.3k
Intergalactic Javascript Robots from Outer Space
tanoku
270
27k
How to train your dragon (web standard)
notwaldorf
91
5.9k
Unsuck your backbone
ammeep
670
57k
Visualization
eitanlees
146
16k
RailsConf 2023
tenderlove
29
1k
Transcript
GenStage & Flow Poznań Elixir Meetup 25.05.2017
None
Hi! Tymon Tobolski • GitHub: teamon • Twitter: @iteamon •
Blog: teamon.eu • 8+ years with Ruby, 2+ years with Elixir • Currently Elixir+Dev+Ops @ Recruitee.com • Hex: tesla, mix_docker
None
The Job
Library defmodule Poz do def select do items = Enum.to_list
(1..100) :timer.sleep(:rand.uniform(100)) Progress.incr(:select, length(items)) items end def download(record) do :timer.sleep(:rand.uniform(100)) Progress.incr(:download) {:file, record} end
Library cont. def extract(file) do :timer.sleep(:rand.uniform(10)) Progress.incr(:extract) {:text, file} end
def index(texts) do :timer.sleep(:rand.uniform(1000)) Progress.incr(:index, length(texts)) :ok end end
Example blueprint defmodule Example do import Poz def run do
Progress.start_link([:select, :download, :extract, :index]) # work work work Progress.stop() end end
Example 1 - Enum select() |> Enum.map(&download/1) |> Enum.map(&extract/1) |>
index()
Example 1 - Enum
Example 2 - Stream select() |> Stream.concat([]) |> Stream.map(&download/1) |>
Stream.map(&extract/1) |> Enum.to_list() |> index()
Example 2 - Stream
Example 3 - Task.async select() |> Enum.map(fn e -> Task.async(Poz,
:download, [e]) end) |> Enum.map(&Task.await/1) |> Enum.map(fn e -> Task.async(Poz, :extract, [e]) end) |> Enum.map(&Task.await/1) |> index()
Example 3 - Task.async
Stages
GenStage explained
Example 4 - GenStage - SELECT defmodule Select do use
GenStage def init(_), do: {:producer, :ok} def handle_demand(demand, :ok) do items = Poz.select() {:noreply, items, :empty} end def handle_demand(_demand, :empty) do {:noreply, [], :empty} end end
Example 4 - GenStage - DOWNLOAD defmodule Download do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(items, _from, state) do files = Enum.map(items, &Poz.download/1) {:noreply, files, state} end end
Example 4 - GenStage - EXTRACT defmodule Extract do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(files, _from, state) do texts = Enum.map(files, &Poz.extract/1) {:noreply, files, texts} end end
Example 4 - GenStage - INDEX defmodule Index do use
GenStage def init(_), do: {:consumer, :ok} def handle_events(texts, _from, state) do Poz.index(texts) {:noreply, [], state} end end
Example 4 - GenStage {:ok, select} = GenStage.start_link(Select, :ok) {:ok,
download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select) GenStage.sync_subscribe(extract, to: download) GenStage.sync_subscribe(index, to: extract) :timer.sleep(:infinity)
Example 4 - GenStage (defaults)
Example 5 - GenStage (custom demand) {:ok, select} = GenStage.start_link(Select,
:ok) {:ok, download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) GenStage.sync_subscribe(index, to: extract, max_demand: 100) :timer.sleep(:infinity)
Example 5 - GenStage (tuned demand)
Example 6 - GenStage (multiprocess) {:ok, select} = GenStage.start_link(Select, :ok)
{:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) for i <- (1..10) do {:ok, download} = GenStage.start_link(Download, i) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) end GenStage.sync_subscribe(index, to: extract, max_demand: 100)
None
Example 7 - Flow select() |> Flow.from_enumerable(max_demand: 20) |> Flow.partition(max_demand:
20, stages: 5) |> Flow.map(&download/1) |> Flow.partition(max_demand: 20, stages: 2) |> Flow.map(&extract/1) |> Flow.partition(window: Flow.Window.count(50), stages: 1) |> Flow.reduce(fn -> [] end, fn item, list -> [item | list] end) |> Flow.emit(:state) |> Flow.partition(stages: 2) |> Flow.map(&index/1) |> Flow.run()
Example 7 - Flow
Real World
Real World - optimised
References • https:/ /github.com/teamon/poz • https:/ /elixir-lang.org/blog/2016/07/14/announcing-genstage/ • https:/ /hexdocs.pm/gen_stage/GenStage.html
• https:/ /hexdocs.pm/flow/Flow.html • http:/ /teamon.eu/2016/tuning-elixir-genstage-flow-pipeline- processing/ • http:/ /teamon.eu/2016/measuring-visualizing-genstage-flow- with-gnuplot/
Thanks! Questions?