Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
MnesiaとFlowで高速スモールアナリティクス
Search
enpedasi
June 22, 2018
Programming
0
450
MnesiaとFlowで高速スモールアナリティクス
クラスタリングしたMnesiaのイベントをトリガーして、ストリーミング画像解析を行います。
enpedasi
June 22, 2018
Tweet
Share
More Decks by enpedasi
See All by enpedasi
Elixir Flowで膨大なImageリストを捌く
enpedasi
0
790
Elixir はじめての並列処理 (仮)
enpedasi
1
640
並列対決 Elixir × C# × Go オマケでScala nodejs
enpedasi
1
690
Other Decks in Programming
See All in Programming
Amazon Bedrock Multi Agentsを試してきた
tm2
1
290
pylint custom ruleで始めるレビュー自動化
shogoujiie
0
120
Amazon Q Developer Proで効率化するAPI開発入門
seike460
PRO
0
110
『テスト書いた方が開発が早いじゃん』を解き明かす #phpcon_nagoya
o0h
PRO
2
290
Serverless Rust: Your Low-Risk Entry Point to Rust in Production (and the benefits are huge)
lmammino
1
110
メンテが命: PHPフレームワークのコンテナ化とアップグレード戦略
shunta27
0
120
WebDriver BiDiとは何なのか
yotahada3
1
140
AWS Organizations で実現する、 マルチ AWS アカウントのルートユーザー管理からの脱却
atpons
0
150
PHPのバージョンアップ時にも役立ったAST
matsuo_atsushi
0
120
Immutable ActiveRecord
megane42
0
140
Grafana Cloudとソラカメ
devoc
0
170
仕様変更に耐えるための"今の"DRY原則を考える / Rethinking the "Don't repeat yourself" for resilience to specification changes
mkmk884
3
570
Featured
See All Featured
YesSQL, Process and Tooling at Scale
rocio
172
14k
Art, The Web, and Tiny UX
lynnandtonic
298
20k
Building Adaptive Systems
keathley
40
2.4k
How to Think Like a Performance Engineer
csswizardry
22
1.3k
Designing Experiences People Love
moore
140
23k
Navigating Team Friction
lara
183
15k
The Invisible Side of Design
smashingmag
299
50k
Done Done
chrislema
182
16k
Stop Working from a Prison Cell
hatefulcrawdad
267
20k
Designing for Performance
lara
604
68k
Templates, Plugins, & Blocks: Oh My! Creating the theme that thinks of everything
marktimemedia
30
2.2k
How STYLIGHT went responsive
nonsquared
98
5.4k
Transcript
MnesiaとFlowで⾼速スモー ルアナリティクス Created by Enpedasi/twinbee ( @enpedasi ) 2018/6/22 powered
by Marp
⾃⼰紹介 フリーランスエンジニア ⾃社ERPによるSI案件を中⼼としたスモールビジネス ソリューションを⾏っています 有限会社デライトシステムズ 代表
クラスタリング ノードでの ストリーミング画像アナリティクス クラスタリングしたMnesiaに画像を流し込んでで ストリーム上で画像解析する実験的セッションです。 ストリームミングHubとしてのElixirの可能性を探り ます。
本⽇やること 1. Mnesiaでクラスタの1台に画像を定期送信 2. セントラルサーバーでMnesiaのトリガーを受け取 り、新画像を取得 3. Google Cloud Visionで画像のラベル解析を⾏い
4. WebSocketでブラウザに結果を送信
構成図
Mnesiaでトリガーを補⾜ Getting triggers on Mnesia
None
Mnesia Event Handling
Mnesiaドキュメントより System events and table events are the two event
categories that Mnesia generates in various situations. システムイベントとテーブルイベントの2種類ある。 A user process can subscribe on the events generated by Mnesia. The following two functions are provided: ユーザープロセスで購読できる。 ⇒ つまり 「サーバーを⽴てればいい」
Elixirでサーバーといえば GenServer モジュールに use GenServer を付けてコールバックを 埋めたら、サーバーが書けてしまうBehaivor (他⾔語でいうClass/Inerface/Traitに近い) やることは3点だけ 初期化
init で,トリガーを購読する hando_info でMnesiaからのメッセージを拾う ユーザー定義関数を handle_call で実装
handle_infoで、メッセージを受け取る defmodule MnesiaTrigger do use GenServer def init(table_name) do :mnesia.subscribe({:table,
table_name, :simple}) {:ok, []} end def handle_call({:events}, _from, state) do {:reply, state, []} # イベントリスト取り出してクリア end def handle_info({:mnesia_table_event, {:write, {_tbl, key, val}, from}} = msg, state) do {:noreply, [{key, from}] ++ state } end
None
Flow バックプレッシャーベースの遅延並列ストリーム TensorFlowでいうところのGraph / Session Scala AkkaでいうところのAkka Stream パイプラインを書くだけで、「Produer 〜
Producer- Consumer 〜 Consumer」のノード(Stage)を⾃動で並 列度を加味して⾃動でコーディネートを⾏う
MnesiaイベントをStream化 FlowにMnesiaのイベントを流したい。 Flowに流すには、遅延実⾏を⾏うStreamにする必要 がある。 「Streamでないものを、Streamにする関数」 Stream.resource を使⽤する。
MnesiaイベントをStream化 #2 Mnesiaからイベントを受け取って、ストリームに画 像を流す実装例 Stream.resource(start_fn, next_fn, halt_fn) 初期化fn / 継続fn
/ 停⽌fn を実装すればOK 本処理では以下を⾏っている -- サーバーのpid(プロセスid)のたらい回し -- 画像テーブルのkeyを受け取る -- 画像をストリームに流す
# Mnesiaイベントをストリーム化 defmodule Analytics.Stream do def subscribe(pid) do start_fn =
fn -> pid end next_fn = fn pid -> case GenServer.call(pid, {:out}) do {key, _from} -> #keyから画像を取得 img = :ets.lookup_element(:img_list, key, 3) {[img], pid} nil -> {:halt, pid} end end halt_fn = fn _next_url -> :ok end Stream.resource(start_fn, next_fn, halt_fn) end end
None
Flowを使ったアナリティクス実装 def flow_coord(pid) do Analytics.Stream.subscribe(pid) |> Flow.from_enumerable(max_demand: 1, stages: 4)
|> Flow.map( &{ CloudVision.analyze(&1 , from: :direct, features: [:label]) |> CloudVision.labels_desc_score, &1} ) |> Flow.map( fn {desc, img} -> socket_send(img, desc) end) end 1. Google Cloud Visionに画像を送信 2. 解析したラベルを受け取って、画像とくくる 3. ソケットでブラウザへ送信
Flowを⼊れることにより 遅延実⾏ 並列度の設定(stages: n) 集計処理(Flow.partiotion) 等が、シンプルに記述できる。
ただし Flowはバックプレッシャーベースの遅延ストリーム なので、Enum.to_list()などでマテリアライズさせな いと、実⾏はされない。 バックプレッシャーは平たく⾔うと、受注⽣産。 「消費があって、はじめて⽣産者に発注が⾏く。」 そこに、遅延実⾏の掛け合わせ。 ⇒ 催促しつづけるしかない︕
催促サーバーをGenServer実装
AskServer抜粋 defmodule AskServer do use GenServer def handle_info(:ask, stream) do
schedule_job(stream) {:noreply, stream} end defp schedule_job(stream) do stream |> Enum.to_list # ここでストリームを実態化 # 戻り値には興味がないので捨てる Process.send_after(self(), :ask, 5 * 1000) end end
Phoenix-Channels WebSocket / PubSubを擁する、秘密兵器。 サーバー側は数⾏程度で、準備ができる。
defmodule Fuk11Web.VisionChannel do use Phoenix.Channel def join("room:loby", msg, socket) do
{:ok, socket} end # 今回の実装では、"msg"トピックは未使⽤ def handle_in("msg", params, socket) do broadcast! socket, "msg", %{body: params["body"]} {:noreply, socket} end def handle_out("msg", payload, socket) do push socket, "msg", payload {:noreply, socket} end end
Socket発⽕例 Fuk11Web.Endpoint.broadcast "room:loby", "img", %{"body" => %{"image" => Base.encode64(image), "desc"
=> desc} } 余談 : %{ } はElixirのMap型。 => を : に置き換えれ ば、JavascriptのObjectとほぼ同じ。
JavaScriptでPhoenix-WebSocket 依存パッケージ Phoenixでレンダリングする場合 require("phoenix.js") SPA / node.js の場合 npm install
phoenix-channels import {Socket} from 'phoenix-channels'
JavaScriptからの呼び出し例(Vue.js) const channel = socket.channel('room:loby', {}) channel.join() .receive('ok', res =>
console.log(` > join ok ${channel.topi .receive('error', res => console.log(' > Error join ', res)) channel.on('img', img => { let baseImage = 'data:image/jpeg;base64,' + img.body.image const newRec = {img: baseImage, no: this.imageNo, desc: img.body.desc} this.visionImages.unshift(newRec) this.imageNo = this.imageNo + 1 })
おわりに 今回の事例を通して、以下を⾒てきました。 セッティングの簡潔さ 容易な並列処理の利⽤ インタラクティブな⼿段の豊富さ ノードをIoTデバイスと考えれば、⾼速なプロトタイ ピングに使えるだけでないく、性能・スケール⾯まで 視野にいれた柔軟な処理系であることが、おわかりい ただけたのではないでしょうか︖
ご清聴ありがとうございました