Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Pythonで大量データ処理!PySparkを用いたデータ処理と分析のきほん
Search
chie8842
September 07, 2017
Technology
32
54k
Pythonで大量データ処理! PySparkを用いたデータ処理と分析のきほん
PyConJP2017の資料 Python Spark PySpark PyConJP 2017 Apache Spark
chie8842
September 07, 2017
Tweet
Share
More Decks by chie8842
See All by chie8842
MongoDB Atlas Search のご紹介
chie8842
2
1.3k
MongoDB Atlas Vectorsearchではじめる生成AIアプリ開発
chie8842
3
1.4k
AWS GlueとAWS Lake Formationではじめるデータマネジメント
chie8842
0
940
Distributed Processing in Python
chie8842
2
640
クックパッドにおける推薦(と検索)の取り組み
chie8842
21
7.9k
Understanding distributed processing in Python
chie8842
2
1.9k
Performance Tuning Tips of TensorFlow Inference
chie8842
1
720
クックパッドにおけるCloud AutoML事例
chie8842
9
7.8k
Cookpad_Internship_MLOps_Lecture_2018
chie8842
35
16k
Other Decks in Technology
See All in Technology
リンクアンドモチベーション ソフトウェアエンジニア向け紹介資料 / Introduction to Link and Motivation for Software Engineers
lmi
4
300k
個人でもIAM Identity Centerを使おう!(アクセス管理編)
ryder472
3
210
BLADE: An Attempt to Automate Penetration Testing Using Autonomous AI Agents
bbrbbq
0
310
Terraform CI/CD パイプラインにおける AWS CodeCommit の代替手段
hiyanger
1
240
初心者向けAWS Securityの勉強会mini Security-JAWSを9ヶ月ぐらい実施してきての近況
cmusudakeisuke
0
120
OTelCol_TailSampling_and_SpanMetrics
gumamon
1
160
100 名超が参加した日経グループ横断の競技型 AWS 学習イベント「Nikkei Group AWS GameDay」の紹介/mediajaws202411
nikkei_engineer_recruiting
1
170
社内で最大の技術的負債のリファクタリングに取り組んだお話し
kidooonn
1
550
隣接領域をBeyondするFinatextのエンジニア組織設計 / beyond-engineering-areas
stajima
1
280
RubyのWebアプリケーションを50倍速くする方法 / How to Make a Ruby Web Application 50 Times Faster
hogelog
3
940
SRE×AIOpsを始めよう!GuardDutyによるお手軽脅威検出
amixedcolor
0
110
組織成長を加速させるオンボーディングの取り組み
sudoakiy
2
150
Featured
See All Featured
What’s in a name? Adding method to the madness
productmarketing
PRO
22
3.1k
Visualizing Your Data: Incorporating Mongo into Loggly Infrastructure
mongodb
42
9.2k
Making the Leap to Tech Lead
cromwellryan
133
8.9k
Cheating the UX When There Is Nothing More to Optimize - PixelPioneers
stephaniewalter
280
13k
個人開発の失敗を避けるイケてる考え方 / tips for indie hackers
panda_program
93
16k
Docker and Python
trallard
40
3.1k
Faster Mobile Websites
deanohume
305
30k
jQuery: Nuts, Bolts and Bling
dougneiner
61
7.5k
Code Review Best Practice
trishagee
64
17k
It's Worth the Effort
3n
183
27k
BBQ
matthewcrist
85
9.3k
A Philosophy of Restraint
colly
203
16k
Transcript
1ZUIPOͰେྔσʔλॲཧʂ 1Z4QBSLΛ༻͍ͨσʔλॲཧͱੳͷ͖΄Μ 1Z$PO+1 $IJF)BZBTIJEB
ࣗݾհ $IJF)BZBTIJEB 5XJUUFS!DIJF 3FUUZ*OD 4PGUXBSF&OHJOFFS
1ZUIPOػցֶश)BEPPQ4QBSL4DBMB%#ج൫WJN মϐΞϊςχεεϊϘ
ࠓ͓ͳ͢͠Δ͜ͱ • "QBDIF4QBSLͷհ • 1Z4QBSLͷΞʔΩςΫνϟ • 4QBSLͷػցֶशϥΠϒϥϦʹ͍ͭͯ • 3FUUZʹ͓͚Δ4QBSLࣄྫʹ͍ͭͯ
ಥવͰ͕͢ʂ Pythonでデータ分析 してる人!
1Z%BUBͷϥΠϒϥϦ܈ ͱ͍͑ɺ ͱ͍͏͘Β͍ɺ1ZUIPOσʔλॲཧੳͷͨΊͷ ϥΠϒϥϦ͕ͦΖͬͯ·͢Ͷʂ ଞʹͨ͘͞Μʂ ศརJ
• ͷαʔόͰॲཧ͖͠Εͳ͍େنσʔλΛѻ͍͍ͨ • σʔλྔ͕εέʔϧͯ͠ಈ͘Έ͕΄͍͠ • େنσʔλʹର͢ΔػցֶशΛߦ͍͍ͨ ͱ͍ͬͨ߹ʹɺ1Z%BUBϥΠϒϥϦ܈ͰରԠ ͖͠Εͳ͍߹͕͋Δɻ
ͦΜͳͱ͖ʹ͑Δͷ͕ɺ Ͱɾɾɾ
• 044ͷฒྻࢄॲཧϑϨʔϜϫʔΫ – ॲཧதͷো࣌ͷϦΧόϦɺλεΫׂɾεέδϡʔϦϯάΛ 4QBSL͕͏·ͬͯ͘͘ΕΔ – αʔόͷεέʔϧΞτʹΑͬͯεϧʔϓοτ͕ઢܗʹ͍͔ۙͨͪͰ্͢Δ • Ϧιʔεར༻࠷దԽͷ͕͞Ε͓ͯΓɺॲཧ͕ߴ –
ΦϯϝϞϦϕʔεͷॲཧ – +7.ͷΦʔόϔουΛվળ͢Δ1SPKFDU5VOHTUFO – Ωϟογϡ – ԆධՁ • 1ZUIPOΛؚΉෳͷݴޠ͔Β͑Δ – ࠷৽ͷ4QBSLͩͱɺରԠόʔδϣϯ1ZUIPO ʹରԠ • ػցֶशετϦʔϜॲཧɺॲཧͷྲྀΕ͕ݟ͑Δ6*ͳͲɺ ๛ͳػೳ͕͋Δ • ίϛϡχςΟͷ׆ಈ͕׆ൃ "QBDIF4QBSLͱʂ
4QBSLͷ๛ͳػೳ 4QBSL$PSF 4QBSL42- 4QBSL4USFBNJOH ʢετϦʔϜॲཧʣ .MMJC ػցֶश (SBQI9 άϥϑॲཧ
SQL CSV {json} S3 BigQuery parquet Data Sources ༷ʑͳσʔλ ιʔεʹରԠ ෳݴޠΛαϙʔτ "1*͕ॆ࣮
Ϧονͳ6* DAG Visualiza?on: 処理の流れが見える Event Timeline: 各タスクの所要時間が見える Summary Metrics: 処理時間やデータ量などの
メトリクス情報が見える • λεΫͷਐߦঢ়گϝτϦΫε͕ݟ͑Δ6*͕͋Γɺσόοά͕Γ͍͢
͍ɺͰࢄॲཧͬͯ ΊΜͲͦ͘͞͏ɻ ࠷ॳͷҰา͕౿Έग़ͤͳ͍ɻ ͬͯࢥ͏ਓଟ͍ͱࢥ͍·͢ɻ 大丈夫、Sparkはサーバ1台でも動きます!
࣮ࡍʹͬͯΈΑ͏ʂ $ wget hIps://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz $ tar xzvf spark-2.2.0-bin-hadoop2.7.tgz $ cd
spark-2.2.0-bin-hadoop2.7 $ bin/pyspark 4QBSL͔ΒɺQJQͰΠϯετʔϧͰ͖ΔΑ͏ʹͳΓ·ͨ͠ʂ ·ͨɺ%PDLFSΛ͏ͷศརͰ͢ʂ μϯϩʔυɾΠϯετʔϧͯ͠ ΠϯλϥΫςΟϒγΣϧΛ্ཱͪ͛Δ·Ͱ ͨͬͨίϚϯυʂ
• )BEPPQΫϥελ্Ͱಈ࡞͢Δ • "NB[PO&.3(PPHMF%BUB1SPDͳͲͷ ϚωʔδυαʔϏεΛ͏ͱָ ຊ֨తʹࢄॲཧΛߦ͏ͱ͖ͷಈ࡞ڥ HDFS YARN MapReduce MesosやSpark
Standaloneもある S3など他に様々な データソースにも対応する 並列分散処理 フレームワーク リソース管理、 ジョブスケジュール 分散ファイル システム Basic Stack SparkはMapReduceの 後継と言われる
4QBSLͷͭͷϓϩάϥϛϯάϞσϧ 3%% • ίϨΫγϣϯૢ࡞ͷΑ͏ʹॲཧΛ هड़͢Δ • ඇߏԽσʔλʹର͢Δॊೈͳ ॲཧ͕ߦ͑Δ • ߦࢦͷॲཧ
%BUBGSBNF • 42-ϥΠΫʹॲཧΛهड़͢Δ • εΩʔϚΛར༻ͨ͠ߏԽσʔλॲ ཧ • ྻࢦͷॲཧ %BUBGSBNFɺ4QBSL͔Βొͨ͠ϋΠϨϕϧ"1*Ͱɺ ɾΦϓςΟϚΠβʹΑΔ࠷దԽͷԸܙΛड͚Δ ɾίʔυͷՄಡੑ্͕͕Δ ͱ͍ͬͨಛ͕͋Δ
3%%ͱ%BUB'SBNF Spark Core Spark SQL Spark Streaming (ストリーム処理) Mllib (機械学習)
GraphX (グラフ処理) SQL CSV {json} S3 BigQuery parquet Data Sources • 3%%4QBSL$PSFɺ%BUB'SBNF4QBSL42-ʹؚ·ΕΔػೳ • 4QBSLͷػೳঃʑʹ%BUB'SBNFϕʔεʹஔ͖Θ͍ͬͯΔ 4USVDUVSFE4USFBNJOH4QBSL.-(SBQI'SBNFT RDD DataFrame
%BUB'SBNFͷΦϓςΟϚΠβʹΑΔ࠷దԽ • ޮతͳॲཧͷॱ൪ʹೖΕସ࣮͑ͯߦͯ͘͠ΕΔ • σʔλιʔεʹΑͬͯɺϑΟϧλॲཧΛσʔλɾιʔεଆͰߦ͍ɺ ඞཁͳσʔλͷΈΛಡΈࠐΉΑ͏ʹ͢Δ hIps://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html
3%%Ͱ8PSE$PVOUͯ͠ΈΑ͏ʂ Jupyter Notebookからも かんたんに使える! 加工して 集計して 並び替え
%BUB'SBNFͰूܭॲཧΛͬͯΈΑ͏ʂ フィルターして グループごとに 集めて カウントする SQLライクにかける!
1Z4QBSLͷΞʔΩςΫνϟ • 3%% ϫʔΧʔϊʔυͰͷॲཧ1ZUIPOϓϩηεͰߦΘΕΔ • %BUB'SBNF ϫʔΧʔϊʔυͰͷॲཧ+7.্ͰߦΘΕΔ ͨͩ͠6%'1ZUIPOϓϩηεͰ࣮ߦ͞ΕΔ Master Worker
Spark Context Java SparkContext Executor Task Task Python Python socket Py4J pipe Master Worker Spark Context Java SparkContext Executor Task Task socket Py4J
1Z4QBSLͷΞʔΩςΫνϟ • 3%% ϫʔΧʔϊʔυͰͷॲཧ1ZUIPOϓϩηεͰߦΘΕΔ • %BUB'SBNF ϫʔΧʔϊʔυͰͷॲཧ+7.্ͰߦΘΕΔ ͨͩ͠6%'1ZUIPOϓϩηεͰ࣮ߦ͞ΕΔ Master Worker
Spark Context Java SparkContext Executor Task Task Python Python socket Py4J pipe Master Worker Spark Context Java SparkContext Executor Task Task socket Py4J ύϑΥʔϚϯε্ͷ • *UFSBUPS୯ҐͰͷTFSJBMJ[BUJPOͱQZUIPOϓϩηεͷύΠϓ ͕ൃੜ • QJDLMJOHͱ+7.ͷೋॏͷTFSJBMJ[BUJPOʹΑΔίετ૿ • 1ZUIPO8PSLFSͷىಈ • 1ZUIPO8PSLFSͷϝϞϦ+7.ͷ੍ޚର֎
ύϑΥʔϚϯεൺֱ 出典:DataBricks社のブログ hIps://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data- science.html 3%%1ZUIPO͍ʂ
1Z4QBSLͷ͍͍ͱ͜Ζʂ • 1Z%BUBϥΠϒϥϦͱΈ߹Θͤͯ͏͜ͱ͕Ͱ͖Δ – 3%%ͷதͰ/VNQZ4DJQZΛ͏ – ूܭ݁ՌΛ1BOEBT%BUB'SBNFʹมͯ͠.BUQMPUMJCͰ ՄࢹԽ͢Δ • .-ϥΠϒϥϦʹ͍ͭͯɺ΄΅શͯར༻Ͱ͖Δ
1Z4QBSLΤίγεςϜͷਐԽ • 1Z4QBSLͰɺQBOEBT%BUB'SBNFͱ4QBSL%BUB'SBNFͷ ૬ޓม͕Ͱ͖Δɻ – QBOEBTͷEG͔ΒTQBSLͷEG • TQBSLDSFBUF%BUB'SBNF QE@EG
– TQBSLͷEG͔ΒQBOEBTͷEG • TQBSL@EGUP1BOEBT ͕ʂʂ্هͷมڪΖ͍͘͠ɻɻɻ ʢಛʹલऀʣ • ͜ΕΛղফ͘͢"QBDIF"SSPXͷ։ൃ͕ਐΜͰ͍Δ
"QBDIF"SSPX • σʔλϑΥʔϚοτͷ༷ͱͦΕΛར༻͢ΔͨΊͷϥΠϒϥϦ • ҟͳΔݴޠɾϓϩμΫτؒͰͷσʔλ࿈ܞίετΛԼ͛Δ • ։ൃதͷ4QBSLͰ TQBSLDPOGTFU lTQBSLTRMFYFDVUJPOBSSPXFOBCMFz lUSVFz
ͱ͢Δ͜ͱͰɺ 4QBSL%BUB'SBNFΛUP1BOEBT ͢Δࡍʹɺ"SSPXͷϑΥʔϚοτ͕ ͑ΔΑ͏ʹͳΔ༧ఆ • ଞʹ6%'ͷ7FDUPSJ[BUJPOͳͲܭը͞Ε͍ͯΔ 出典:hIps://arrow.apache.org/
4QBSL.BDIJOF-FBSOJOH • 4QBSLʹɺ3%%ϕʔεͷ.MMJCͱ%BUB'SBNFϕʔεͷ .-ͱ͍͏ͭͷػցֶशϥΠϒϥϦ͕͋Δɻ – ঃʑʹ.-ʹد͍ͤͯΔ • TDJLJUMFBSOͷӨڹΛڧ͘ड͚͍ͯΔ
1Z4QBSLͰ͑Δදతͳػցֶशख๏Ұཡ • $MBTTJpDBUJPO 3FHSFTTJPO – 47.TɺϩδεςΟοΫճؼɺܾఆɺφΠʔϒϕΠζɺϥϯμϜϑΥϨε τɺઢܗճؼɺ(#5ɺϚϧνύʔηϓτϩϯɺJTPUPOJDճؼɺ"'5ੜଘճؼ • $MVTUFSJOH –
,NFBOTɺ-%"ɺ(.. • ڠௐϑΟϧλϦϯάɺύλʔϯϚΠχϯά – "-4ɺ/.'ɺ'1(SPXUI • ࣍ݩݮ – 47%ɺ1$" • 'FBUVSF&YUSBDUBOE5SBOTGPSN – 5'*%'ɺ8PSE7FDɺ4UBOEBSE4DBMFSɺ/PSNBMJ[FSɺ/HSBNɺ 0OF)PU&ODPEFSɺ4USJOH*OEFYFSɺ-BCFMFE1PJOUɺ%$5 • #BTJDTUBUJTUJDT – ΧʔωϧີਪఆɺΧΠೋݕఆɺίϧϞΰϩϑεϛϊϑݕఆ • &WBMVBUPS 5VOJOH (FOFSBUPS – "6$ɺ$SPTT7BMJEBUPSɺ1BSBN(SJE#VJMEFSɺ֤छσʔλ(FOFSBUPS FUDʜ
4QBSLͰϨίϝϯσʔγϣϯΫοΩϯάʂ デモ
ʢ͍ͭͰʹհʣ"QBDIF;FQQFMJO • +VQZUFS/PUFCPPLͱಉ͡Α͏ʹ͑ΔՄࢹԽπʔϧ • ༷ʑͳ࣮ߦΤϯδϯΛαϙʔτ͢Δ
ੳݱͰͷ1Z4QBSLͷ͔͍ͭͲ͜Ζ • ,1*ϨϙʔςΟϯάͷΑ͏ͳੳͷݱͰɺΘ͟Θ͟ େྔσʔλΛѻΘͣͱαϯϓϦϯάͯ͠ͷαʔό ্ͰॲཧΛߦ͏΄͏͕ྑ͍߹ଟ͍ɻ – ͨ͘͞ΜͷσʔλͰΫϥελϦϯάͳͲͷֶशΛߦ͏͜ͱͰ ͔͑ͬͯաֶशʹͳΔ͜ͱ͋Δɻ • Ϩίϝϯσʔγϣϯɺҟৗݕɺࠂ৴࠷దԽɺ
େنσʔλʹରͯ͠ػցֶशΛߦ͏ඞཁ͕͋Δ໘Ͱ ΘΕΔɻ ͍ॴͷݟఆΊ͍ͩ͡ɻ
3FUUZʹ͓͚Δ1Z4QBSLࣄྫ
3FUUZͷ݄ؒສ66Λࢧ͑Δੳج൫ ReIyαʔϏεج൫ ReIyੳج൫ʢAWSʣ ReIyੳج൫ʢGCPʣ Kinesis S3 EMR (Spark) S3 EC2
EC2 RDS(MySQL) 分析者 プランナ ここでSparkを 使っている
&5-ͷ֓ཁ • 3FUUZͷΞΫηεϩά – ʹे(#ʢH[KTPOঢ়ଶʣͷϩά – SFRVFTU63*VTFSBHFOUͳͲΛੳ͍͢͠ܗʹܗ – ੳ༻ͷTFTTJPO*%Λ&5-ͷաఔͰ༩ •
TFDPOEBSZTPSUͱNBQQBSUJUJPOΛͬͨ
1Z4QBSLΛ࠾༻ͨ͠ཧ༝ • ϚωʔδυɾαʔϏεʢ&.3ʣΛར༻Ͱ͖ΔͨΊɺ ڥߏங͕ෆཁ – EBTL$FMFSZީิͱͯ͋͠Δ͕ɺڥߏங͕ඞཁͱͳΔɻ ·ͨ͜ΕΒσʔλྔͷεέʔϧʹ͑ΒΕͳ͍Մೳੑ͕͋Δ • ࣾʹ1ZUIPO͍͕ଟ͍ͨΊɺϝϯςφϯεΛߟྀͯ͠ 4DBMBͰͳ͘1ZUIPOΛ࠾༻ͨ͠
• )JWF1SFTUPʢ)BEPPQΤίγεςϜʣͱൺɺඇߏ σʔλʹର͢Δॊೈͳදݱ͕Γ͍͢ – ੳཁ݅ͷͨΊɺΞϓϦέʔγϣϯଆͰৼΒΕΔTFTTJPO*%ͱ ผͰTFTTJPO*%૬ͷͷΛ༩ͯ͠΄͍͠ͱͷཁ͕͋Γɺ ͜ΕΛຬͨͨ͢ΊʹɺෳࡶͳίϨΫγϣϯॲཧΛߦ͏ඞཁ͕ ͋ͬͨ ちなみに、想定通り、ここの処理だけRDDなので遅い。 が、許容できる範囲だった。
1Z4QBSL ʴ&.3 ͷύϑΥʔϚϯε্ͷ5JQTΛ͍͔ͭ͘ • ͳΔ͘%BUB'SBNFΛ͏ • ͳΔ͘ॲཧ͢ΔσʔλྔΛݮΒ͢ –
KPJOখ͍͞σʔλಉ࢜Λઌʹ • ετϨʔδͷར༻ – ӬଓԽϑΝΠϧ4 – தؒϑΝΠϧ)%'4 • σʔλϑΥʔϚοτ1BSRVFUΛར༻͢Δͱߴ • σʔλͷ4LFXʹҙ – ҰͭͷύʔςΟγϣϯ͚ͩॲཧ͢Δσʔλྔ͕ଟ͘ͳͬͯɺͦ ͜ͷॲཧͪʹͳΔ
1Z4QBSL ʴ&.3 ͷύϑΥʔϚϯε্ͷ5JQTΛ͍͔ͭ͘ • ϝϞϦνϡʔχϯάͷίπ – 1ZUIPOͰ4QBSLΛ͏߹ɺ4DBMBͱൺͯΦϑώʔϓʢ+7.֎ ͷϝϞϦʣΛར༻͢ΔɻͦͷͨΊɺZBSOʹΑͬͯίϯςφ͕LJMM
͞ΕΔΤϥʔ͕ى͖͍͢ɻTQBSLQZUIPOXPSLFSNFNPSZ TQBSLZBSOFYFDVUPSNFNPSZ0WFSIFBEύϥϝʔλͰνϡʔχϯ άΛߦ͏ • "QBDIF;FQQFMJO – &.3Ͱɺ+VQZUFS/PUFCPPLΛΠϯετʔϧ͠ͳͯ͘ɺ "QBDIF;FQQFMJOͱ͍͏/PUFCPPL͕͑Δɻ
·ͱΊ • 4QBSLɺେྔσʔλΛߴεϧʔϓοτͰॲཧ͢Δ͜ͱ ͷͰ͖ΔศརͳϓϩμΫτ • ػցֶशετϦʔϜͳͲ๛ͳػೳ͕͋Δ • &.3%BUB1SPDͳͲͷϚωʔδυαʔϏε͕ศར • 1Z4QBSLΛར༻͢Δ߹Φʔόϔου͕͋Δ
• ͍Ͳ͜ΖΛ͖ͪΜͱཧղ͢Δ͜ͱ͕͍ͩ͡ • 3FUUZͰσʔλΤϯδχΞϦϯάʹ1Z4QBSLΛ࠾༻ͯ͠ ͍Δ みなさんも、PySparkで大量データ処理やってみよう!
͝ਗ਼ௌ͋Γ͕ͱ͏͍͟͝·ͨ͠J
ิ 1ZUIPOͷଞͷฒྻࢄॲཧܥͱͷൺֱ 1ZUIPOͷฒྻࢄॲཧܥʢDFMFSZ EBTLʣͱ4QBSLͷେ͖ ͳҧ͍ͱͯ͠ɺΫϥελنͷΧόʔྖҬ͕͋͛ΒΕΔɻ ·ͨɺ4QBSLͷ߹ɺΫϥυΛ͏͜ͱͰڥߏஙΛߦ Θͳͯ͘ྑ͍ͱݴ͏ϝϦοτ͕͋Δɻ サーバ台数の目安 Celery dask
(py)spark 1台 4〜5台 数千台
ิ 4QBSLϢʔβͷ֤ݴޠར༻ऀͷׂ߹ 出典:Spark Survery 2016 hIps://databricks.com/blog/2016/09/27/spark-survey-2016-released.html
ิ ηΧϯμϦιʔτͱNBQQBSUJUJPOͷઆ໌ [(0,4), (2,4), (3,1), (2,2), (0,1), (1,1),(1,8), (3,6)] [(0,1),
(0,4), (2,2), (2,4)] [(0,1), (1,1), (1,8), (3,1), (3,6)] もとの配列 パーティションキー とそれ以外の値で ソートする [(0,1), (0,5), (2,2), (2,5)] [(0,1), (1,1), (1,9), (3,1), (3,7)] secondary sort mappar??on par??onごとに一度 処理を実行する