Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Elixir GenStage & Flow
Search
Tymon Tobolski
May 26, 2017
Programming
2
900
Elixir GenStage & Flow
Poznań Elixir Meetup, 25.05.2017
Tymon Tobolski
May 26, 2017
Tweet
Share
More Decks by Tymon Tobolski
See All by Tymon Tobolski
Only possible with Elixir - ubots Case Study
teamon
0
200
Fun with Elixir Macros
teamon
1
410
Elixir - Bydgoszcz Web Development Meetup
teamon
2
690
Sidekiq
teamon
1
140
Git - Monterail style
teamon
1
140
Rails Assets wroc_love.rb
teamon
1
670
Angular replacements for jQuery-based libraries
teamon
1
290
Angular replacements for jQuery-based libraries
teamon
2
290
Rails Assets LRUG
teamon
0
7.3k
Other Decks in Programming
See All in Programming
色々なIaCツールを実際に触って比較してみる
iriikeita
0
270
Vue SFCのtemplateでTypeScriptの型を活用しよう
tsukkee
3
1.5k
Snowflake x dbtで作るセキュアでアジャイルなデータ基盤
tsoshiro
2
430
GCCのプラグインを作る / I Made a GCC Plugin
shouth
1
150
JaSST 24 九州:ワークショップ(は除く)実践!マインドマップを活用したソフトウェアテスト+活用事例
satohiroyuki
0
260
僕がつくった48個のWebサービス達
yusukebe
18
17k
Vaporモードを大規模サービスに最速導入して学びを共有する
kazukishimamoto
4
4.3k
From Subtype Polymorphism To Typeclass-based Ad hoc Polymorphism- An Example
philipschwarz
PRO
0
170
Webの技術スタックで マルチプラットフォームアプリ開発を可能にするElixirDesktopの紹介
thehaigo
2
920
Kaigi on Rails 2024 - Rails APIモードのためのシンプルで効果的なCSRF対策 / kaigionrails-2024-csrf
corocn
5
3.4k
[PyCon Korea 2024 Keynote] 커뮤니티와 파이썬, 그리고 우리
beomi
0
110
Amazon Neptuneで始めてみるグラフDB-OpenSearchによるグラフの全文検索-
satoshi256kbyte
4
330
Featured
See All Featured
Build your cross-platform service in a week with App Engine
jlugia
229
18k
We Have a Design System, Now What?
morganepeng
50
7.2k
Keith and Marios Guide to Fast Websites
keithpitt
408
22k
Rails Girls Zürich Keynote
gr2m
93
13k
Adopting Sorbet at Scale
ufuk
73
9k
Documentation Writing (for coders)
carmenintech
65
4.4k
Responsive Adventures: Dirty Tricks From The Dark Corners of Front-End
smashingmag
250
21k
Design and Strategy: How to Deal with People Who Don’t "Get" Design
morganepeng
126
18k
CSS Pre-Processors: Stylus, Less & Sass
bermonpainter
355
29k
The Language of Interfaces
destraynor
154
24k
Large-scale JavaScript Application Architecture
addyosmani
510
110k
Teambox: Starting and Learning
jrom
132
8.7k
Transcript
GenStage & Flow Poznań Elixir Meetup 25.05.2017
None
Hi! Tymon Tobolski • GitHub: teamon • Twitter: @iteamon •
Blog: teamon.eu • 8+ years with Ruby, 2+ years with Elixir • Currently Elixir+Dev+Ops @ Recruitee.com • Hex: tesla, mix_docker
None
The Job
Library defmodule Poz do def select do items = Enum.to_list
(1..100) :timer.sleep(:rand.uniform(100)) Progress.incr(:select, length(items)) items end def download(record) do :timer.sleep(:rand.uniform(100)) Progress.incr(:download) {:file, record} end
Library cont. def extract(file) do :timer.sleep(:rand.uniform(10)) Progress.incr(:extract) {:text, file} end
def index(texts) do :timer.sleep(:rand.uniform(1000)) Progress.incr(:index, length(texts)) :ok end end
Example blueprint defmodule Example do import Poz def run do
Progress.start_link([:select, :download, :extract, :index]) # work work work Progress.stop() end end
Example 1 - Enum select() |> Enum.map(&download/1) |> Enum.map(&extract/1) |>
index()
Example 1 - Enum
Example 2 - Stream select() |> Stream.concat([]) |> Stream.map(&download/1) |>
Stream.map(&extract/1) |> Enum.to_list() |> index()
Example 2 - Stream
Example 3 - Task.async select() |> Enum.map(fn e -> Task.async(Poz,
:download, [e]) end) |> Enum.map(&Task.await/1) |> Enum.map(fn e -> Task.async(Poz, :extract, [e]) end) |> Enum.map(&Task.await/1) |> index()
Example 3 - Task.async
Stages
GenStage explained
Example 4 - GenStage - SELECT defmodule Select do use
GenStage def init(_), do: {:producer, :ok} def handle_demand(demand, :ok) do items = Poz.select() {:noreply, items, :empty} end def handle_demand(_demand, :empty) do {:noreply, [], :empty} end end
Example 4 - GenStage - DOWNLOAD defmodule Download do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(items, _from, state) do files = Enum.map(items, &Poz.download/1) {:noreply, files, state} end end
Example 4 - GenStage - EXTRACT defmodule Extract do use
GenStage def init(_), do: {:producer_consumer, :ok} def handle_events(files, _from, state) do texts = Enum.map(files, &Poz.extract/1) {:noreply, files, texts} end end
Example 4 - GenStage - INDEX defmodule Index do use
GenStage def init(_), do: {:consumer, :ok} def handle_events(texts, _from, state) do Poz.index(texts) {:noreply, [], state} end end
Example 4 - GenStage {:ok, select} = GenStage.start_link(Select, :ok) {:ok,
download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select) GenStage.sync_subscribe(extract, to: download) GenStage.sync_subscribe(index, to: extract) :timer.sleep(:infinity)
Example 4 - GenStage (defaults)
Example 5 - GenStage (custom demand) {:ok, select} = GenStage.start_link(Select,
:ok) {:ok, download} = GenStage.start_link(Download, :ok) {:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) GenStage.sync_subscribe(index, to: extract, max_demand: 100) :timer.sleep(:infinity)
Example 5 - GenStage (tuned demand)
Example 6 - GenStage (multiprocess) {:ok, select} = GenStage.start_link(Select, :ok)
{:ok, extract} = GenStage.start_link(Extract, :ok) {:ok, index} = GenStage.start_link(Index, :ok) for i <- (1..10) do {:ok, download} = GenStage.start_link(Download, i) GenStage.sync_subscribe(download, to: select, max_demand: 20) GenStage.sync_subscribe(extract, to: download, max_demand: 20) end GenStage.sync_subscribe(index, to: extract, max_demand: 100)
None
Example 7 - Flow select() |> Flow.from_enumerable(max_demand: 20) |> Flow.partition(max_demand:
20, stages: 5) |> Flow.map(&download/1) |> Flow.partition(max_demand: 20, stages: 2) |> Flow.map(&extract/1) |> Flow.partition(window: Flow.Window.count(50), stages: 1) |> Flow.reduce(fn -> [] end, fn item, list -> [item | list] end) |> Flow.emit(:state) |> Flow.partition(stages: 2) |> Flow.map(&index/1) |> Flow.run()
Example 7 - Flow
Real World
Real World - optimised
References • https:/ /github.com/teamon/poz • https:/ /elixir-lang.org/blog/2016/07/14/announcing-genstage/ • https:/ /hexdocs.pm/gen_stage/GenStage.html
• https:/ /hexdocs.pm/flow/Flow.html • http:/ /teamon.eu/2016/tuning-elixir-genstage-flow-pipeline- processing/ • http:/ /teamon.eu/2016/measuring-visualizing-genstage-flow- with-gnuplot/
Thanks! Questions?