Upgrade to PRO for Only $50/Year—Limited-Time Offer! 🔥
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Elixir GenStage & Flow
Search
Tymon Tobolski
May 26, 2017
Programming
2
1.1k
Elixir GenStage & Flow
Poznań Elixir Meetup, 25.05.2017
Tymon Tobolski
May 26, 2017
Tweet
Share
More Decks by Tymon Tobolski
See All by Tymon Tobolski
Only possible with Elixir - ubots Case Study
teamon
0
260
Fun with Elixir Macros
teamon
1
500
Elixir - Bydgoszcz Web Development Meetup
teamon
2
900
Sidekiq
teamon
1
170
Git - Monterail style
teamon
1
180
Rails Assets wroc_love.rb
teamon
1
750
Angular replacements for jQuery-based libraries
teamon
1
360
Angular replacements for jQuery-based libraries
teamon
2
320
Rails Assets LRUG
teamon
0
7.5k
Other Decks in Programming
See All in Programming
分散DBって何者なんだ... Spannerから学ぶRDBとの違い
iwashi623
0
170
ID管理機能開発の裏側 高速にSaaS連携を実現したチームのAI活用編
atzzcokek
0
190
令和最新版Android Studioで化石デバイス向けアプリを作る
arkw
0
110
全員アーキテクトで挑む、 巨大で高密度なドメインの紐解き方
agatan
8
18k
CloudNative Days Winter 2025: 一週間で作る低レイヤコンテナランタイム
ternbusty
7
1.9k
Querying Design System デザインシステムの意思決定を支える構造検索
ikumatadokoro
1
1.2k
Integrating WordPress and Symfony
alexandresalome
0
120
AIコーディングエージェント(NotebookLM)
kondai24
0
120
モダンJSフレームワークのビルドプロセス 〜なぜReactは503行、Svelteは12行なのか〜
fuuki12
0
190
スタートアップを支える技術戦略と組織づくり
pospome
8
15k
TUIライブラリつくってみた / i-just-make-TUI-library
kazto
1
310
WebRTC、 綺麗に見るか滑らかに見るか
sublimer
1
140
Featured
See All Featured
How Fast Is Fast Enough? [PerfNow 2025]
tammyeverts
3
380
Why You Should Never Use an ORM
jnunemaker
PRO
60
9.6k
Leading Effective Engineering Teams in the AI Era
addyosmani
8
1.2k
Building Adaptive Systems
keathley
44
2.9k
Java REST API Framework Comparison - PWX 2021
mraible
34
9k
Intergalactic Javascript Robots from Outer Space
tanoku
273
27k
Documentation Writing (for coders)
carmenintech
76
5.2k
Let's Do A Bunch of Simple Stuff to Make Websites Faster
chriscoyier
508
140k
Understanding Cognitive Biases in Performance Measurement
bluesmoon
31
2.7k
Practical Tips for Bootstrapping Information Extraction Pipelines
honnibal
25
1.6k
Code Reviewing Like a Champion
maltzj
527
40k
I Don’t Have Time: Getting Over the Fear to Launch Your Podcast
jcasabona
34
2.5k
Transcript
GenStage & Flow Poznań Elixir Meetup 25.05.2017
None
Hi! Tymon Tobolski • GitHub: teamon • Twitter: @iteamon •
Blog: teamon.eu • 8+ years with Ruby, 2+ years with Elixir • Currently Elixir+Dev+Ops @ Recruitee.com • Hex: tesla, mix_docker
None
The Job
Library defmodule Poz do def select do items = Enum.to_list
(1..100) :timer.sleep(:rand.uniform(100)) Progress.incr(:select, length(items)) items end def download(record) do :timer.sleep(:rand.uniform(100)) Progress.incr(:download) {:file, record} end
Library cont. def extract(file) do :timer.sleep(:rand.uniform(10)) Progress.incr(:extract) {:text, file} end
def index(texts) do :timer.sleep(:rand.uniform(1000)) Progress.incr(:index, length(texts)) :ok end end
Example blueprint defmodule Example do import Poz def run do
Progress.start_link([:select, :download, :extract, :index]) # work work work Progress.stop() end end
Example 1 - Enum select() |> Enum.map(&download/1) |> Enum.map(&extract/1) |>
index()
Example 1 - Enum
Example 2 - Stream select() |> Stream.concat([]) |> Stream.map(&download/1) |>
Stream.map(&extract/1) |> Enum.to_list() |> index()
Example 2 - Stream
Example 3 - Task.async select() |> Enum.map(fn e -> Task.async(Poz,
:download, [e]) end) |> Enum.map(&Task.await/1) |> Enum.map(fn e -> Task.async(Poz, :extract, [e]) end) |> Enum.map(&Task.await/1) |> index()
Example 3 - Task.async
Stages
GenStage explained
Example 4 - GenStage - SELECT defmodule Select do use
GenStage def init(_), do: {:producer, :ok} def handle_demand(demand, :ok) do items = Poz.select() {:noreply, items, :empty} end def handle_demand(_demand, :empty) do {:noreply, [], :empty} end end
Example 4 - GenStage - DOWNLOAD defmodule Download do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(items, _from, state) do files = Enum.map(items, &Poz.download/1) {:noreply, files, state} end end
Example 4 - GenStage - EXTRACT defmodule Extract do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(files, _from, state) do texts = Enum.map(files, &Poz.extract/1) {:noreply, files, texts} end end
Example 4 - GenStage - INDEX defmodule Index do use
GenStage def init(_), do: {:consumer, :ok} def handle_events(texts, _from, state) do Poz.index(texts) {:noreply, [], state} end end
Example 4 - GenStage {:ok, select} = GenStage.start_link(Select, :ok) {:ok,
download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select) GenStage.sync_subscribe(extract, to: download) GenStage.sync_subscribe(index, to: extract) :timer.sleep(:infinity)
Example 4 - GenStage (defaults)
Example 5 - GenStage (custom demand) {:ok, select} = GenStage.start_link(Select,
:ok) {:ok, download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) GenStage.sync_subscribe(index, to: extract, max_demand: 100) :timer.sleep(:infinity)
Example 5 - GenStage (tuned demand)
Example 6 - GenStage (multiprocess) {:ok, select} = GenStage.start_link(Select, :ok)
{:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) for i <- (1..10) do {:ok, download} = GenStage.start_link(Download, i) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) end GenStage.sync_subscribe(index, to: extract, max_demand: 100)
None
Example 7 - Flow select() |> Flow.from_enumerable(max_demand: 20) |> Flow.partition(max_demand:
20, stages: 5) |> Flow.map(&download/1) |> Flow.partition(max_demand: 20, stages: 2) |> Flow.map(&extract/1) |> Flow.partition(window: Flow.Window.count(50), stages: 1) |> Flow.reduce(fn -> [] end, fn item, list -> [item | list] end) |> Flow.emit(:state) |> Flow.partition(stages: 2) |> Flow.map(&index/1) |> Flow.run()
Example 7 - Flow
Real World
Real World - optimised
References • https:/ /github.com/teamon/poz • https:/ /elixir-lang.org/blog/2016/07/14/announcing-genstage/ • https:/ /hexdocs.pm/gen_stage/GenStage.html
• https:/ /hexdocs.pm/flow/Flow.html • http:/ /teamon.eu/2016/tuning-elixir-genstage-flow-pipeline- processing/ • http:/ /teamon.eu/2016/measuring-visualizing-genstage-flow- with-gnuplot/
Thanks! Questions?