Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
MnesiaとFlowで高速スモールアナリティクス
Search
enpedasi
June 22, 2018
Programming
0
490
MnesiaとFlowで高速スモールアナリティクス
クラスタリングしたMnesiaのイベントをトリガーして、ストリーミング画像解析を行います。
enpedasi
June 22, 2018
Tweet
Share
More Decks by enpedasi
See All by enpedasi
Elixir Flowで膨大なImageリストを捌く
enpedasi
0
810
Elixir はじめての並列処理 (仮)
enpedasi
1
670
並列対決 Elixir × C# × Go オマケでScala nodejs
enpedasi
1
760
Other Decks in Programming
See All in Programming
202507_ADKで始めるエージェント開発の基本 〜デモを通じて紹介〜(奥田りさ)The Basics of Agent Development with ADK — A Demo-Focused Introduction
risatube
PRO
6
1.4k
DynamoDBは怖くない!〜テーブル設計の勘所とテスト戦略〜
hyamazaki
0
200
[DevinMeetupTokyo2025] コード書かせないDevinの使い方
takumiyoshikawa
2
280
変化を楽しむエンジニアリング ~ いままでとこれから ~
murajun1978
0
710
Flutterと Vibe Coding で個人開発!
hyshu
1
250
Understanding Kotlin Multiplatform
l2hyunwoo
0
260
技術的負債で信頼性が限界だったWordPress運用をShifterで完全復活させた話
rvirus0817
1
1.5k
コーディングは技術者(エンジニア)の嗜みでして / Learning the System Development Mindset from Rock Lady
mackey0225
2
430
QA x AIエコシステム段階構築作戦
osu
0
270
MCP連携で加速するAI駆動開発/mcp integration accelerates ai-driven-development
bpstudy
0
290
オホーツクでコミュニティを立ち上げた理由―地方出身プログラマの挑戦 / TechRAMEN 2025 Conference
lemonade_37
2
460
Introduction to Git & GitHub
latte72
0
110
Featured
See All Featured
Java REST API Framework Comparison - PWX 2021
mraible
33
8.8k
The Pragmatic Product Professional
lauravandoore
36
6.8k
The Illustrated Children's Guide to Kubernetes
chrisshort
48
50k
Code Review Best Practice
trishagee
69
19k
Intergalactic Javascript Robots from Outer Space
tanoku
272
27k
Refactoring Trust on Your Teams (GOTO; Chicago 2020)
rmw
34
3.1k
Learning to Love Humans: Emotional Interface Design
aarron
273
40k
Speed Design
sergeychernyshev
32
1.1k
The Power of CSS Pseudo Elements
geoffreycrofte
77
5.9k
How STYLIGHT went responsive
nonsquared
100
5.7k
BBQ
matthewcrist
89
9.8k
How To Stay Up To Date on Web Technology
chriscoyier
790
250k
Transcript
MnesiaとFlowで⾼速スモー ルアナリティクス Created by Enpedasi/twinbee ( @enpedasi ) 2018/6/22 powered
by Marp
⾃⼰紹介 フリーランスエンジニア ⾃社ERPによるSI案件を中⼼としたスモールビジネス ソリューションを⾏っています 有限会社デライトシステムズ 代表
クラスタリング ノードでの ストリーミング画像アナリティクス クラスタリングしたMnesiaに画像を流し込んでで ストリーム上で画像解析する実験的セッションです。 ストリームミングHubとしてのElixirの可能性を探り ます。
本⽇やること 1. Mnesiaでクラスタの1台に画像を定期送信 2. セントラルサーバーでMnesiaのトリガーを受け取 り、新画像を取得 3. Google Cloud Visionで画像のラベル解析を⾏い
4. WebSocketでブラウザに結果を送信
構成図
Mnesiaでトリガーを補⾜ Getting triggers on Mnesia
None
Mnesia Event Handling
Mnesiaドキュメントより System events and table events are the two event
categories that Mnesia generates in various situations. システムイベントとテーブルイベントの2種類ある。 A user process can subscribe on the events generated by Mnesia. The following two functions are provided: ユーザープロセスで購読できる。 ⇒ つまり 「サーバーを⽴てればいい」
Elixirでサーバーといえば GenServer モジュールに use GenServer を付けてコールバックを 埋めたら、サーバーが書けてしまうBehaivor (他⾔語でいうClass/Inerface/Traitに近い) やることは3点だけ 初期化
init で,トリガーを購読する hando_info でMnesiaからのメッセージを拾う ユーザー定義関数を handle_call で実装
handle_infoで、メッセージを受け取る defmodule MnesiaTrigger do use GenServer def init(table_name) do :mnesia.subscribe({:table,
table_name, :simple}) {:ok, []} end def handle_call({:events}, _from, state) do {:reply, state, []} # イベントリスト取り出してクリア end def handle_info({:mnesia_table_event, {:write, {_tbl, key, val}, from}} = msg, state) do {:noreply, [{key, from}] ++ state } end
None
Flow バックプレッシャーベースの遅延並列ストリーム TensorFlowでいうところのGraph / Session Scala AkkaでいうところのAkka Stream パイプラインを書くだけで、「Produer 〜
Producer- Consumer 〜 Consumer」のノード(Stage)を⾃動で並 列度を加味して⾃動でコーディネートを⾏う
MnesiaイベントをStream化 FlowにMnesiaのイベントを流したい。 Flowに流すには、遅延実⾏を⾏うStreamにする必要 がある。 「Streamでないものを、Streamにする関数」 Stream.resource を使⽤する。
MnesiaイベントをStream化 #2 Mnesiaからイベントを受け取って、ストリームに画 像を流す実装例 Stream.resource(start_fn, next_fn, halt_fn) 初期化fn / 継続fn
/ 停⽌fn を実装すればOK 本処理では以下を⾏っている -- サーバーのpid(プロセスid)のたらい回し -- 画像テーブルのkeyを受け取る -- 画像をストリームに流す
# Mnesiaイベントをストリーム化 defmodule Analytics.Stream do def subscribe(pid) do start_fn =
fn -> pid end next_fn = fn pid -> case GenServer.call(pid, {:out}) do {key, _from} -> #keyから画像を取得 img = :ets.lookup_element(:img_list, key, 3) {[img], pid} nil -> {:halt, pid} end end halt_fn = fn _next_url -> :ok end Stream.resource(start_fn, next_fn, halt_fn) end end
None
Flowを使ったアナリティクス実装 def flow_coord(pid) do Analytics.Stream.subscribe(pid) |> Flow.from_enumerable(max_demand: 1, stages: 4)
|> Flow.map( &{ CloudVision.analyze(&1 , from: :direct, features: [:label]) |> CloudVision.labels_desc_score, &1} ) |> Flow.map( fn {desc, img} -> socket_send(img, desc) end) end 1. Google Cloud Visionに画像を送信 2. 解析したラベルを受け取って、画像とくくる 3. ソケットでブラウザへ送信
Flowを⼊れることにより 遅延実⾏ 並列度の設定(stages: n) 集計処理(Flow.partiotion) 等が、シンプルに記述できる。
ただし Flowはバックプレッシャーベースの遅延ストリーム なので、Enum.to_list()などでマテリアライズさせな いと、実⾏はされない。 バックプレッシャーは平たく⾔うと、受注⽣産。 「消費があって、はじめて⽣産者に発注が⾏く。」 そこに、遅延実⾏の掛け合わせ。 ⇒ 催促しつづけるしかない︕
催促サーバーをGenServer実装
AskServer抜粋 defmodule AskServer do use GenServer def handle_info(:ask, stream) do
schedule_job(stream) {:noreply, stream} end defp schedule_job(stream) do stream |> Enum.to_list # ここでストリームを実態化 # 戻り値には興味がないので捨てる Process.send_after(self(), :ask, 5 * 1000) end end
Phoenix-Channels WebSocket / PubSubを擁する、秘密兵器。 サーバー側は数⾏程度で、準備ができる。
defmodule Fuk11Web.VisionChannel do use Phoenix.Channel def join("room:loby", msg, socket) do
{:ok, socket} end # 今回の実装では、"msg"トピックは未使⽤ def handle_in("msg", params, socket) do broadcast! socket, "msg", %{body: params["body"]} {:noreply, socket} end def handle_out("msg", payload, socket) do push socket, "msg", payload {:noreply, socket} end end
Socket発⽕例 Fuk11Web.Endpoint.broadcast "room:loby", "img", %{"body" => %{"image" => Base.encode64(image), "desc"
=> desc} } 余談 : %{ } はElixirのMap型。 => を : に置き換えれ ば、JavascriptのObjectとほぼ同じ。
JavaScriptでPhoenix-WebSocket 依存パッケージ Phoenixでレンダリングする場合 require("phoenix.js") SPA / node.js の場合 npm install
phoenix-channels import {Socket} from 'phoenix-channels'
JavaScriptからの呼び出し例(Vue.js) const channel = socket.channel('room:loby', {}) channel.join() .receive('ok', res =>
console.log(` > join ok ${channel.topi .receive('error', res => console.log(' > Error join ', res)) channel.on('img', img => { let baseImage = 'data:image/jpeg;base64,' + img.body.image const newRec = {img: baseImage, no: this.imageNo, desc: img.body.desc} this.visionImages.unshift(newRec) this.imageNo = this.imageNo + 1 })
おわりに 今回の事例を通して、以下を⾒てきました。 セッティングの簡潔さ 容易な並列処理の利⽤ インタラクティブな⼿段の豊富さ ノードをIoTデバイスと考えれば、⾼速なプロトタイ ピングに使えるだけでないく、性能・スケール⾯まで 視野にいれた柔軟な処理系であることが、おわかりい ただけたのではないでしょうか︖
ご清聴ありがとうございました