Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
AI・機械学習チームにおけるデータパイプライン構築
Search
Sponsored
·
Your Podcast. Everywhere. Effortlessly.
Share. Educate. Inspire. Entertain. You do you. We'll handle the rest.
→
nishiba
February 12, 2019
Technology
27k
8
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
AI・機械学習チームにおけるデータパイプライン構築
nishiba
February 12, 2019
More Decks by nishiba
See All by nishiba
ジョブ理論: 顧客の「進歩」から発想する イノベーション設計の実践
nishiba
1
680
gokartを作った話
nishiba
2
8.8k
m3 ai team
nishiba
38
52k
Graph Convolutional Networksを使った 推薦システム
nishiba
6
8.6k
機械学習関連の開発を 効率化した話
nishiba
7
5.7k
エムスリーの機械学習チームビルディングの考え方
nishiba
13
7.6k
Graphの推薦システムへの応用
nishiba
6
9.1k
エムスリーにおける 機械学習活用事例と開発の効率化
nishiba
3
6.9k
医療用語に注目した文書の類似度計算
nishiba
6
5.2k
Other Decks in Technology
See All in Technology
Deep Data Security 機能解説
oracle4engineer
PRO
2
110
FPGAの開発コンペでZephyrを使ってみた
iotengineer22
0
200
感情と身体を置き去りにしない、エンジニアの生きのこり方 ──いまから、ここから「自分の状態」を扱うという選択
saorimurooka
0
330
FPC(フレキシブル)基板にZephyr実装してみた。
iotengineer22
0
170
AIAU_UMEMOGU_ninomiya_slide
ninomiya_ii
0
260
Bucharest Tech Week 2026 - Guardians of the Cloud-Native Galaxy
edeandrea
PRO
0
140
Zenoh on Zephyr on LiteX
takasehideki
2
110
Kiro Ambassador を目指す話
k_adachi_01
0
130
【FinOps】データドリブンな意思決定を目指して
z63d
0
320
LayerX コーポレートエンジニアリング室におけるサプライチェーンセキュリティへの取り組み / Supply Chain Security at LayerX Corporate Engineering
yuyatakeyama
3
840
Flow 不死:AI 時代 DevOps 的不變本質
cheng_wei_chen
2
500
Agile and AI Redmine Japan 2026
hiranabe
4
480
Featured
See All Featured
Mobile First: as difficult as doing things right
swwweet
225
10k
Easily Structure & Communicate Ideas using Wireframe
afnizarnur
194
17k
How People are Using Generative and Agentic AI to Supercharge Their Products, Projects, Services and Value Streams Today
helenjbeal
1
220
What the history of the web can teach us about the future of AI
inesmontani
PRO
1
620
Exploring anti-patterns in Rails
aemeredith
3
420
Building an army of robots
kneath
306
46k
Organizational Design Perspectives: An Ontology of Organizational Design Elements
kimpetersen
PRO
1
750
Leo the Paperboy
mayatellez
7
1.9k
End of SEO as We Know It (SMX Advanced Version)
ipullrank
3
4.2k
AI Search: Implications for SEO and How to Move Forward - #ShenzhenSEOConference
aleyda
1
1.3k
Imperfection Machines: The Place of Print at Facebook
scottboms
270
14k
Designing for humans not robots
tammielis
254
26k
Transcript
AI・機械学習チームにおける データパイプライン構築 エムスリー株式会社 西場正浩 @m_nishiba Data Pipeline Casual Talk #1
自己紹介 • 西場正浩(m_nishiba) • エムスリー株式会社 • AI・機械学習チーム / 採用チーム リーダー
• 数理ファイナンス(Ph.D.) → 金融機関(クオンツ) → エムスリー(機械学習エンジニア) • 採用チームの活動 ◦ 新規プロダクトを生み出すエンジニア/プロダクト説明会
話したいこと • エムスリーにおけるMLのデータパイプラインの開発方法 • luigiを使うことのメリット(おそらくairflowでも同様) • luigiを拡張したgokartによりさらに効率的に。
機械学習系の開発で困ったこと • 2018年7月にAI・機械学習チーム立ち上げ。 • 最初にリリースされたシステムは課題だらけ。 • データ(学習済みモデル、前処理済みデータも含む)の 使いまわしが難しい。 • モデルの再現性が低い。
• ログ出力が適切でない。ログが読めない。 • アルゴリズムの切り替えが煩雑になる。 • class・taskの設計が良くない。 • 開発時に同じ処理を 何度も実行している。 • テストがしづらい。設計が悪い。
• Luigiを使う。 ◦ data pipelineの構築が簡単にできるようになる。 ◦ ログ出力がキレイになる。 ◦ Taskのインターフェイスが決まっているので設計がシンプルに。 •
Luigiを拡張したGoKartを開発中。 ◦ Taskの実装が楽に。 ◦ パラメータに応じてTaskのアウトプットを管理。 ◦ Task名とIDだけで再現できる。 ◦ slackへ通知。 問題の解決方法
gokart等、公開しています • luigiをラップし、自分たちのニーズを満たすようなライブラリを作る ◦ https://github.com/m3dev/gokart ◦ ☆ × 9 ◦
https://gokart.readthedocs.io/en/latest/ • 共通化できるタスク群をライブラリ化する ◦ https://github.com/m3dev/redshells ◦ ☆ × 19
Luigi: データパイプラインを構築する。 TaskA: - param - output: task_a.csv - run:
do_something TaskB: - param - requires:TaskA(param=self.param) - output: task_b.csv - run: do something TaskBはTaskBに依存する。 TaskAのパラメータ TaskAはtask_a.csvを出力する。 何かしらの処理を行なう。 inputとしてtask_a.csvを受け取る
Luigi: ログが読みやすい INFO: Informed scheduler that task TaskB_test_param_f20ef5457e has status
PENDING INFO: Informed scheduler that task TaskA_test_param_f20ef5457e has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes INFO: [pid 20187] Worker Worker(...) running TaskA(param=test_param) INFO: [pid 20187] Worker Worker(...) done TaskA(param=test_param) INFO: Informed scheduler that task TaskA_test_param_f20ef5457e has status DONE INFO: [pid 20187] Worker Worker(...) running TaskB(param=test_param) INFO: [pid 20187] Worker Worker(...) done TaskB(param=test_param) INFO: Informed scheduler that task TaskB_test_param_f20ef5457e has status DONE INFO: Worker Worker(...) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 TaskA(...) - 1 TaskB(...) This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
Luigi: Taskのインターフェイスが決まる class TaskB(gokart.TaskOnKart): param = luigi.Parameter() def requires(self): return
TaskA(param=self.param) def output(self): return self.make_target('task_b.pkl') def run(self): data = self.load() self.dump(data + '!!') • 左例はgokartを使っている。luigiでも同じ。 • 3つの関数を定義するだけ ◦ requires ◦ output ◦ run • 単一責任の原則が守られやすい。 • 他のメンバーが作ったタスクも再利用しやすい。
Luigi: データパイプライン • Taskの組合せによりデータパイプラインを構築 • データをBigQueryから取り出し、加工し、 S3等に保存する • 各タスクのinとoutはファイルとして保存される。 Task
BiqQuey API DB S3
gokart: Taskの実装が簡単に。 class TaskB(gokart.TaskOnKart): param = luigi.Parameter() def requires(self): return
TaskA(param=self.param) def output(self): return self.make_target('task_b.pkl') def run(self): data = self.load() self.dump(data + '!!') 拡張子から自動的にフォーマットを選択 load() だけでTaskAのoutputを読み込む dump() だけでoutput()に出力
gokart: 出力先 class TaskB(gokart.TaskOnKart): param = luigi.Parameter() def requires(self): return
TaskA(param=self.param) def output(self): return self.make_target('task_b.pkl') • 設定ファイルを読み込んで、 s3またはローカルに保存する。 • Taskのパラメータやrequiresのタスクに 応じてuniqueなファイル名に変換 • ex. task_b_3d5b7a50a230d4.pkl
gokart: モデルの保存 def output(self): return self.make_model_target( 'word2vec.zip', save_function=gensim.models.Word2Vec.save, load_function=gensim.models.Word2Vec.load) def
run(self): texts = self.load() # type: List[List[str]] shuffle(texts) model = gensim.models.Word2Vec( sentences=texts, **self.word2vec_kwargs) self.dump(model) • 複数のファイルを出力するモデルは zip にまとめて保存。 • saveやloadの関数を指定 保存はdumpを呼ぶだけ。
• TaskA(赤) ◦ パラメータが変更された ◦ 出力ファイルが削除された ◦ パラメータは同じだが更新された • 黄色のタスク群は再実行される。
gokart: 再実行の設定 TaskA BiqQuey API DB S3
gokart: Slackへの通知 • タスク同士の依存関係や実行パラメータ等が取得できる • ちゃんと仕込めばbigquery等の依存テーブル一覧を取得することも可能 ===== Event List ====
---- Success Tasks ---- TaskB:[9a4f24468013c7daeeac64] ==== Tree Info ==== └─-(COMPLETE) TaskB[9a4f24468013c7daeeac64](parameter={'param': 'Hello'}, output=['./resources/output_of_task_b_9a4f24468013c7daeeac64.pkl'], time=0.0013031s, task_log={}) └─-(COMPLETE) TaskA[060b9dac70db9e17da7d0](parameter={'param': 'called by TaskB'}, output=['./resources/output_of_task_a_060b9dac70db9e17da7d0.pkl'], time=0.00066s, task_log={}) • タスク名とIDが分かれば手元で簡単に再現ができる • ジョブが終わったらslackに上記の情報を通知
gokart: pipelineの構築 class TaskA(gokart.TaskOnKart): param = luigi.Parameter() class TaskB(gokart.TaskOnKart): input_task
= gokart.TaskInstanceParameter() class TaskC(gokart.TaskOnKart): def requires(self): return TaskB(input_task=TaskA(param='test')) requiresの中で複雑なパイプラインを構築で きる。 ex. word2vecからfasttextへの変更できる Taskのパラメータとして、 Taskのインスタンスを指定できる。
まとめ • 機械学習におけるデータパイプラインを luigiで管理している • luigiを自分たちのニーズに合わせて改造している (gokart) • タスクや結果の使い回しができるようになり、生産性が上がった( redshells)