Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
MnesiaとFlowで高速スモールアナリティクス
Search
enpedasi
June 22, 2018
Programming
0
450
MnesiaとFlowで高速スモールアナリティクス
クラスタリングしたMnesiaのイベントをトリガーして、ストリーミング画像解析を行います。
enpedasi
June 22, 2018
Tweet
Share
More Decks by enpedasi
See All by enpedasi
Elixir Flowで膨大なImageリストを捌く
enpedasi
0
780
Elixir はじめての並列処理 (仮)
enpedasi
1
630
並列対決 Elixir × C# × Go オマケでScala nodejs
enpedasi
1
680
Other Decks in Programming
See All in Programming
Внедряем бюджетирование, или Как сделать хорошо?
lamodatech
0
950
AppRouterを用いた大規模サービス開発におけるディレクトリ構成の変遷と問題点
eiganken
1
450
「とりあえず動く」コードはよい、「読みやすい」コードはもっとよい / Code that 'just works' is good, but code that is 'readable' is even better.
mkmk884
6
1.4k
PSR-15 はあなたのための ものではない? - phpcon2024
myamagishi
0
410
Итераторы в Go 1.23: зачем они нужны, как использовать, и насколько они быстрые?
lamodatech
0
1.4k
PHPとAPI Platformで作る本格的なWeb APIアプリケーション(入門編) / phpcon 2024 Intro to API Platform
ttskch
0
390
return文におけるstd::moveについて
onihusube
1
1.4k
はてなにおけるfujiwara-wareの活用やecspressoのCI/CD構成 / Fujiwara Tech Conference 2025
cohalz
3
2.8k
QA環境で誰でも自由自在に現在時刻を操って検証できるようにした話
kalibora
1
140
traP の部内 ISUCON とそれを支えるポータル / PISCON Portal
ikura_hamu
0
190
Fibonacci Function Gallery - Part 2
philipschwarz
PRO
0
210
Androidアプリのモジュール分割における:x:commonを考える
okuzawats
1
280
Featured
See All Featured
CoffeeScript is Beautiful & I Never Want to Write Plain JavaScript Again
sstephenson
160
15k
The Cult of Friendly URLs
andyhume
78
6.1k
JavaScript: Past, Present, and Future - NDC Porto 2020
reverentgeek
47
5.1k
Why You Should Never Use an ORM
jnunemaker
PRO
54
9.1k
Creating an realtime collaboration tool: Agile Flush - .NET Oxford
marcduiker
26
1.9k
Reflections from 52 weeks, 52 projects
jeffersonlam
348
20k
Navigating Team Friction
lara
183
15k
Building Flexible Design Systems
yeseniaperezcruz
328
38k
Designing Dashboards & Data Visualisations in Web Apps
destraynor
231
52k
Speed Design
sergeychernyshev
25
740
BBQ
matthewcrist
85
9.4k
Bootstrapping a Software Product
garrettdimon
PRO
305
110k
Transcript
MnesiaとFlowで⾼速スモー ルアナリティクス Created by Enpedasi/twinbee ( @enpedasi ) 2018/6/22 powered
by Marp
⾃⼰紹介 フリーランスエンジニア ⾃社ERPによるSI案件を中⼼としたスモールビジネス ソリューションを⾏っています 有限会社デライトシステムズ 代表
クラスタリング ノードでの ストリーミング画像アナリティクス クラスタリングしたMnesiaに画像を流し込んでで ストリーム上で画像解析する実験的セッションです。 ストリームミングHubとしてのElixirの可能性を探り ます。
本⽇やること 1. Mnesiaでクラスタの1台に画像を定期送信 2. セントラルサーバーでMnesiaのトリガーを受け取 り、新画像を取得 3. Google Cloud Visionで画像のラベル解析を⾏い
4. WebSocketでブラウザに結果を送信
構成図
Mnesiaでトリガーを補⾜ Getting triggers on Mnesia
None
Mnesia Event Handling
Mnesiaドキュメントより System events and table events are the two event
categories that Mnesia generates in various situations. システムイベントとテーブルイベントの2種類ある。 A user process can subscribe on the events generated by Mnesia. The following two functions are provided: ユーザープロセスで購読できる。 ⇒ つまり 「サーバーを⽴てればいい」
Elixirでサーバーといえば GenServer モジュールに use GenServer を付けてコールバックを 埋めたら、サーバーが書けてしまうBehaivor (他⾔語でいうClass/Inerface/Traitに近い) やることは3点だけ 初期化
init で,トリガーを購読する hando_info でMnesiaからのメッセージを拾う ユーザー定義関数を handle_call で実装
handle_infoで、メッセージを受け取る defmodule MnesiaTrigger do use GenServer def init(table_name) do :mnesia.subscribe({:table,
table_name, :simple}) {:ok, []} end def handle_call({:events}, _from, state) do {:reply, state, []} # イベントリスト取り出してクリア end def handle_info({:mnesia_table_event, {:write, {_tbl, key, val}, from}} = msg, state) do {:noreply, [{key, from}] ++ state } end
None
Flow バックプレッシャーベースの遅延並列ストリーム TensorFlowでいうところのGraph / Session Scala AkkaでいうところのAkka Stream パイプラインを書くだけで、「Produer 〜
Producer- Consumer 〜 Consumer」のノード(Stage)を⾃動で並 列度を加味して⾃動でコーディネートを⾏う
MnesiaイベントをStream化 FlowにMnesiaのイベントを流したい。 Flowに流すには、遅延実⾏を⾏うStreamにする必要 がある。 「Streamでないものを、Streamにする関数」 Stream.resource を使⽤する。
MnesiaイベントをStream化 #2 Mnesiaからイベントを受け取って、ストリームに画 像を流す実装例 Stream.resource(start_fn, next_fn, halt_fn) 初期化fn / 継続fn
/ 停⽌fn を実装すればOK 本処理では以下を⾏っている -- サーバーのpid(プロセスid)のたらい回し -- 画像テーブルのkeyを受け取る -- 画像をストリームに流す
# Mnesiaイベントをストリーム化 defmodule Analytics.Stream do def subscribe(pid) do start_fn =
fn -> pid end next_fn = fn pid -> case GenServer.call(pid, {:out}) do {key, _from} -> #keyから画像を取得 img = :ets.lookup_element(:img_list, key, 3) {[img], pid} nil -> {:halt, pid} end end halt_fn = fn _next_url -> :ok end Stream.resource(start_fn, next_fn, halt_fn) end end
None
Flowを使ったアナリティクス実装 def flow_coord(pid) do Analytics.Stream.subscribe(pid) |> Flow.from_enumerable(max_demand: 1, stages: 4)
|> Flow.map( &{ CloudVision.analyze(&1 , from: :direct, features: [:label]) |> CloudVision.labels_desc_score, &1} ) |> Flow.map( fn {desc, img} -> socket_send(img, desc) end) end 1. Google Cloud Visionに画像を送信 2. 解析したラベルを受け取って、画像とくくる 3. ソケットでブラウザへ送信
Flowを⼊れることにより 遅延実⾏ 並列度の設定(stages: n) 集計処理(Flow.partiotion) 等が、シンプルに記述できる。
ただし Flowはバックプレッシャーベースの遅延ストリーム なので、Enum.to_list()などでマテリアライズさせな いと、実⾏はされない。 バックプレッシャーは平たく⾔うと、受注⽣産。 「消費があって、はじめて⽣産者に発注が⾏く。」 そこに、遅延実⾏の掛け合わせ。 ⇒ 催促しつづけるしかない︕
催促サーバーをGenServer実装
AskServer抜粋 defmodule AskServer do use GenServer def handle_info(:ask, stream) do
schedule_job(stream) {:noreply, stream} end defp schedule_job(stream) do stream |> Enum.to_list # ここでストリームを実態化 # 戻り値には興味がないので捨てる Process.send_after(self(), :ask, 5 * 1000) end end
Phoenix-Channels WebSocket / PubSubを擁する、秘密兵器。 サーバー側は数⾏程度で、準備ができる。
defmodule Fuk11Web.VisionChannel do use Phoenix.Channel def join("room:loby", msg, socket) do
{:ok, socket} end # 今回の実装では、"msg"トピックは未使⽤ def handle_in("msg", params, socket) do broadcast! socket, "msg", %{body: params["body"]} {:noreply, socket} end def handle_out("msg", payload, socket) do push socket, "msg", payload {:noreply, socket} end end
Socket発⽕例 Fuk11Web.Endpoint.broadcast "room:loby", "img", %{"body" => %{"image" => Base.encode64(image), "desc"
=> desc} } 余談 : %{ } はElixirのMap型。 => を : に置き換えれ ば、JavascriptのObjectとほぼ同じ。
JavaScriptでPhoenix-WebSocket 依存パッケージ Phoenixでレンダリングする場合 require("phoenix.js") SPA / node.js の場合 npm install
phoenix-channels import {Socket} from 'phoenix-channels'
JavaScriptからの呼び出し例(Vue.js) const channel = socket.channel('room:loby', {}) channel.join() .receive('ok', res =>
console.log(` > join ok ${channel.topi .receive('error', res => console.log(' > Error join ', res)) channel.on('img', img => { let baseImage = 'data:image/jpeg;base64,' + img.body.image const newRec = {img: baseImage, no: this.imageNo, desc: img.body.desc} this.visionImages.unshift(newRec) this.imageNo = this.imageNo + 1 })
おわりに 今回の事例を通して、以下を⾒てきました。 セッティングの簡潔さ 容易な並列処理の利⽤ インタラクティブな⼿段の豊富さ ノードをIoTデバイスと考えれば、⾼速なプロトタイ ピングに使えるだけでないく、性能・スケール⾯まで 視野にいれた柔軟な処理系であることが、おわかりい ただけたのではないでしょうか︖
ご清聴ありがとうございました