Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
PySparkとGoogle Cloudを使った野球データ解析のきほん / PySpark, ...
Search
Shinichi Nakagawa
PRO
September 28, 2022
Technology
680
0
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
PySparkとGoogle Cloudを使った野球データ解析のきほん / PySpark, Google Cloud and Baseball Data
#kwskrb #112 発表資料
PyCon JP 2022発表のプロトタイプ
Shinichi Nakagawa
PRO
September 28, 2022
More Decks by Shinichi Nakagawa
See All by Shinichi Nakagawa
野球解説AI Agentを開発してみた - 2026/02/27 LayerX社内LT会資料
shinyorke
PRO
0
480
WBCの解説は生成AIにやらせよう - 生成AIで野球解説者AI Agentを実現する / Baseball Commentator AI Agent for Gemini
shinyorke
PRO
1
460
自らを強いエンジニアにするための3つの習慣 2025/ Fitter happier more productive
shinyorke
PRO
0
300
生成AI時代におけるSREの進化とキャリア戦略 / Building an Embedded SRE team and my career
shinyorke
PRO
0
160
生成AIを活用した野球データ分析 - メジャーリーグ編 / Baseball Analytics for Gen AI
shinyorke
PRO
1
6.3k
ゼロから始めるSREの事業貢献 - 生成AI時代のSRE成長戦略と実践 / Starting SRE from Day One
shinyorke
PRO
3
8k
AI・LLM事業部のSREとタスクの自動運転
shinyorke
PRO
0
560
実践Dash - 手を抜きながら本気で作るデータApplicationの基本と応用 / Dash for Python and Baseball
shinyorke
PRO
2
4.5k
Terraform, GitHub Actions, Cloud Buildでデータ基盤をProvisioningする / Data Platform provisioning for Google Cloud and Terraform
shinyorke
PRO
2
3.7k
Other Decks in Technology
See All in Technology
AIっぽい文章を採点して人間らしく直すアプリを作ってみた
yama3133
2
200
白金鉱業Meetup_Vol.24_「AIエージェントは分けるほど良い」は本当か? / Is it true that “the more you divide AI agents, the better”?
brainpadpr
1
390
スキルと MCP ツール、責務をどう分けるか? AI が迷わないインターフェース設計の戦略
cdataj
1
1.1k
MUSUBI 田中裕一『AIと共に行う「しごとのリデザイン」- スモールバックオフィス編』AI Ops Lab #4
musubi
0
200
小さく始める AI 活用推進 ― 日経電子版 Web チームの事例/nikkei-tech-talk47
nikkei_engineer_recruiting
0
270
中期計画、2回作ってみた ~業務委託と正社員、両方の視点から~
demaecan
1
890
なぜ Platform Engineering の土台に Kubernetes を選ぶのか
r4ynode
2
650
AGENTS.mdとSkillsで始めるAIエージェント活用
sonoda_mj
3
220
小さくはじめるSLI/SLO ~育てながら組織に定着させる実践知~ / Starting Small with SLI/SLOs: Building Adoption Through Continuous Growth
nari_ex
7
2k
失敗を資産に変えるClaude Code
shinyasaita
0
680
MCP Appsを作ってみよう
iwamot
PRO
4
660
機械学習を「社会実装」するということ 2026年夏版 / Social Implementation of Machine Learning June 2026 Version
moepy_stats
6
2.4k
Featured
See All Featured
Put a Button on it: Removing Barriers to Going Fast.
kastner
60
4.3k
Into the Great Unknown - MozCon
thekraken
41
2.6k
Believing is Seeing
oripsolob
1
150
Documentation Writing (for coders)
carmenintech
77
5.4k
ラッコキーワード サービス紹介資料
rakko
1
3.7M
The Pragmatic Product Professional
lauravandoore
37
7.3k
Test your architecture with Archunit
thirion
1
2.3k
How STYLIGHT went responsive
nonsquared
100
6.2k
Being A Developer After 40
akosma
91
590k
VelocityConf: Rendering Performance Case Studies
addyosmani
333
25k
[RailsConf 2023 Opening Keynote] The Magic of Rails
eileencodes
31
10k
Jamie Indigo - Trashchat’s Guide to Black Boxes: Technical SEO Tactics for LLMs
techseoconnect
PRO
0
160
Transcript
Python͍ͷͨΊͷ εϙʔπσʔλղੳͷ͖΄Μ β൛ʢ͘͠༧ࠂฤʣ PyCon JP 2022ͷຊ൪લʹͪΐͬͱ͓ͯ͠Έ·ͨ͠. Shinichi Nakagawa@shinyorke 2022/09/28 kawasaki.rb
#112
Onboardingʢ͜ͷࢿྉɾൃදͷҐஔ͚ͮʣ • PythonͱSparkͱύϒϦοΫΫϥυʢGoogle CloudʣͰ ඦGBҎ্ͷσʔλΛ͍͍ײ͡ʹॲཧͯ͠ѻ͓͏ͥʂ, ͱ͍͏. • PyCon JP
2022ʮPython͍ͷͨΊͷεϙʔπσʔλղੳͷ͖΄Μʯ ͰΔલʹͪΐͬͱ͚͓ͩ൸࿐͘͢࡞ͨ͠ͷͰ͢. • ݱ࣌Ͱͷ൛Ͱ, ຊ൪Ͱ༰͕มΘΔՄೳੑ͕͋Γ·͢.
Who am ɹ? ʢ͓લ୭Α?ʣ • Shinichi Nakagawa@shinyorke • ֎ࢿITίϯαϧاۀϚωʔδϟʔ •
ࣄΫϥυαʔϏεΛѻ͏ΤϯδχΞ. ʢͷνʔϜΛవΊΔϚωʔδϟʔʣ • ࠷ۙެࢲͱʹGoogle CloudΛ͏ਓ. • झຯͱ࣮ӹΛ݉Ͷͯݸਓ։ൃͯ͠·͢🍺 #Python #GoogleCloud #Baseball #DataScience #SABRmetrics
ຊͷελʔςΟϯάϝϯόʔ • ϝδϟʔϦʔάͷϏοάσʔλͰ༡΅͏ ※PyConʹ͢ • PythonͱGoogle CloudͰ࡞ΔαʔόϨεͰ͍͍ײ͡ͳσʔλج൫ • PySpark +
DataprocͰ࣮ݱ͢ΔαʔόϨεͳσʔλॲཧ • ࠓͷϝδϟʔϦʔάΛσʔλͰৼΓฦΔ ※PyConʹ͢
ࠓճѻ͏σʔλʮMLBͷϏοάσʔλʯ • ϝδϟʔϦʔάʮStatcastʯͱ͍͏γεςϜͰ৭ΜͳσʔλΛه͍ͯ͠·͢. ※ΧϝϥɾϨʔμʔͰه, Ұ෦౷ܭɾਓྗͰه • ྫ͑, ͜ͷลͷ࣮گͷݩωλͯ͢͜ͷʮStatcastʯͱ͍͏Ϗοάσʔλ͕ݩωλʹͳ͍ͬͯ·͢. •
ΦΦλχαϯʂ˓߸ຊྥଧʂଧٿ180km/h, ඈڑ130m • ΦΦλχαϯʂ162km/hͷਅ͙ͬͰݟಀ͠ࡾৼʂʂʂ • ٿͷҰڍखҰ, ͯ͢ͷٿɾଧٿσʔλ͕ه͞ΕΔ. • ϨΪϡϥʔγʔζϯʢ30νʔϜɾ162ࢼ߹ʣͰ͓͓Αͦ70ʙ80ສٿલޙ. ϙετγʔζϯɾय़Ωϟϯϓσʔλ͋Δ. • σʔλ91ݸͷ߲ʢ!?ʣͰߏ͞ΕΔ, ϨΪϡϥʔγʔζϯͰ͓͓Αͦ400MBʙ600MB͙Β͍ͷσʔλ. • baseballsavant.mlb.com ͱ͍͏αΠτͰ୭ͰӾཡɾμϯϩʔυʢCSV ϑΥʔϚοτʣͰ͖·͢.
ϫΠʮຖຖࢼ߹ݟΔΈཉ͍͠ʯ ຖσʔλऩू -> BigQueryʹอଘ -> ՄࢹԽ, తͳج൫࡞ͬͪΌ͑ʂ
ͱ͍͏Θ͚Ͱ, ͪΐͬ͜ͱ࡞ͬͯΈ·ͨ͠.
PythonͱGoogle CloudͰ࡞Δ αʔόϨεͰ͍͍ײ͡ͳ σʔλج൫ʢٿฤʣ
ΞʔΩςΫνϟͷશମ૾
None
ΞʔΩςΫνϟղઆʢ㲈ͩ͜ΘΓϙΠϯτʣ • ຖσʔλ֬ೝɾຖσʔλߋ৽Λ͍͍ײ͡ʹ࣮ݱ͢ΔͨΊ, ʮϑϧϚωʔδυͳαʔόϨεܥΫϥυαʔϏεʯΛશ໘తʹ׆༻ͯ͠ߏஙɾӡ༻. • ʮϑϧϚωʔδυͳαʔόϨεܥΫϥυαʔϏεʯ #ͱ • CLIίϯιʔϧͰϙνϙν͢Δ͚ͩͰͻͱ·্ཱ͕ͣͪΔ
• Πϯϑϥɾαʔόʔͷϝϯςφϯε͕ෆཁʢࣗ͡Όͳͯ͘, ΫϥυαʔϏεଆ͕Δʣ • ΑΓ۩ମతʹ, ࣗͰK8sΫϥελVMΛݐͯͳͯ͘ྑ͍ʢωοτϫʔΫͷઃఆൃੜʣ • GitHub ActionsͷCI/CDͷύΠϓϥΠϯʹΈࠐΜͰσϓϩΠɾεέʔϧͰ͖ͨΓ جຊతʹʮ͚ͬͨͩ՝ۚʯʹͳΔͷͰ͓ࡒʹ༏͍͠👛
ϢʔεέʔεͱͬͨαʔϏε
None
μογϡϘʔυΞϓϦ • ΞϓϦຊମCloud RunͰϗεςΟϯά, API GatewayΛ௨ͯ͠όοΫΤϯυʢCloud FunctionsʣʹΞΫηε • Firestore͕ϝΠϯͷDB, CacheͷMemoryStoreʢRedisʣΛஔ͍͍ͯΔ
• ͜͜ͰSparkʢPySparkʣొ͠·ͤΜ
None
σʔλऩू&BigQueryอଘ • σʔλݩαΠτʢBaseball Savantʣ͔Βఆظతʹσʔλऩू͢ΔΫϩʔϥʔʢCloud Functionsʣ࣮ߦ • ࣮ߦ݁ՌGoogle Cloud StorageʢGCSʣʹCSVͱͯ͠อଘ. ͜Ε͕ݯઘͷσʔλʢDatalakeʣ
• GCS্ͷCSVΛαϚϦʔ͍͍ͯ͠ײ͡ʹͯ͠BigQueryʹอଘ͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ
None
FirestoreೖʢDatabaseʹσʔλҠૹʣ • BigQueryσʔλΛμογϡϘʔυ༻σʔλͷܗࣜʢJSONʣʹม͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ • ࣮ߦ݁ՌʢGCS্ʹJSONܗࣜͰอଘʣΛFirestoreʹೖΕΔͨΊͷPythonεΫϦϓτΛ࣮ߦ • ͳ͓͍ͣΕखಈͰͷ࣮ߦʢཧ༝&ରԠࡦޙ΄Ͳʣ
PySpark + DataprocͰ࣮ݱ͢Δ αʔόϨεͳσʔλॲཧ ※͕͜͜͜ͷτʔΫͷຊͱͳΓ·͢.
͜ͷͷείʔϓ
None
33.4ඵͰΘ͔ͬͨʢؾʹͳΔʣ🐯 SparkͱPySpark
SparkͱPySpark • ʮେ͖͍σʔλΛ͍͍ײ͡ʹࢄͯ͠ॲཧ͢ΔʯͨΊͷFramework • ྺ࢙తͳͰ͍͑HadoopͷޙܧͰ, ར༻ࣄྫ݁ߏ͋Δ. • ϥΠϒϥϦɾAPIͷ࣮Java͕ͩ, ʮPySparkʯͱ͍͏
PythonͷInterface͕͋Δ. • DataFrame APIͱ͍͏, PandasͰ͓ೃછΈͷDataFrameͳInterface ͕͋Γ, ͜ΕΛϝΠϯʹ͏ͱ͍͍ײ͡ʹ͑Δ.
SparkʢͱPySparkʣͷ͍ํ • σʔληοτ • େݩͷσʔλRDDʢResilient Distributed DatasetʣͰߏɾཧ. • RDDΛ͍͍ײ͡ʹѻ͏Interfaceͱͯ͠DataFrame API͕ଘࡏ͢Δ.
• DataFramePandasͬΆ͍͍ํ͕ग़དྷΔ. • ͍ํ • εΫϦϓτΛॻ͍ͯSparkڥ্Ͱ࣮ߦ. • Jupyter Lab, Zeppelin notebookͰΞυϗοΫʹ͏.
ڥΛͲ͜Ͱߏஙɾӡ༻͢Δ͔🤔 ڥɾखஈ ߏஙͷखؒ ӡ༻͢͠͞ උߟ ΦϯϓϨϛεͰ શͯࣗલߏஙɾӡ༻ શͯࣗલͰઃఆ͢Δ ඞཁ͕͋Δ Կ͔ΒԿ·Ͱ
ࣗͰݟΔඞཁ͕͋Δ Ұ൪େมͳύλʔϯ ຊ৬ͷΠϯϑϥΤϯδχΞ Ͱ͖͍ͭࣄ Ϋϥυ্ͷ7.,Tʹ ࣗલͰߏஙɾӡ༻ શͯࣗલͰઃఆ͢Δ ඞཁ͕͋Δ ͋ΔఔΫϥυαʔϏε ͷԸܙʹत͔ΕΔ 4QBSLڥͷࣗલߏங ׂͱқ͕ߴ͍ ΫϥυαʔϏεఏڙͷ ϚωʔδυαʔϏεΛ͏ ˞࠷ਪ͢Δํࣜ (6*Ͱϙνϙν͢Δ ͘͠$-*"1*Ͱ ͍͍ײ͡ʹ࣮ߦ $16ͷϦιʔεΛࢹ ঢ়گʹԠͯ͡ϝϯςφϯε ࠷ָ͔ͭεϚʔτͳํ๏ "84 (PPHMF$MPVEଞ ֤ࣾαʔϏε༗
Google Cloudʹ͓͚ΔSparkӡ༻ͷબࢶ ڥɾखஈ ߏங ӡ༻ ͑Δػೳ උߟ ($&PS(,&ʹ ڥΛ࡞ͬͯӡ༻ ࣗલͰߏஙޙ
4QBSLΛಋೖ શͯࣗલͰӡ༻ ໘ΛݟΔඞཁ༗ શͯͷػೳ ݁ہͷॴ%BUBQSPDͰ ग़དྷΔ͜ͱͳͷͰ ͓͢͢Ί͠ͳ͍ %BUBQSPD HDMPVEίϚϯυ "1* ίϯιʔϧͷ ͲΕ͔Ͱߏங %BUBQSPD͕࡞ͬͨ (,&PS($&ڥ Λࢹɾӡ༻ શͯͷػೳ Ұ൪ඪ४తͳߏ %BUBQSPD 4FSWFSMFTT HDMPVEίϚϯυ "1* ίϯιʔϧ ্هͷͲΕ͔Ͱߏங ࣮ߦதͷࢹͷΈ ڥॲཧޙʹ ࣗಈআ όονॲཧͷΈରԠ OPUFCPPL͑ͳ͍ ఆظతͳόονॲཧ ͜Ε͕Ұ൪͍͍
DataprocͱDataproc Serverless • Google CloudʹDataprocͱ͍͏SparkʢHadoopʣͷ ϚωʔδυαʔϏε͕ଘࡏ͢Δ. • ࠓ·ͰGCEGKEʢK8sʣͰʮϗετɾCluster͕ଘࡏʯલఏͷ
ӡ༻͔͠Ͱ͖ͳ͔͕ͬͨ, ͍ͭ࠷ۙServerlessͱ͍͏બࢶ͕ര • ʮ11ճʯʮ30͓͖ʯΈ͍ͨͳόονӡ༻Ͱ͋Ε Serverless͕͑Δʂͨͩ͠notebookܥͷ࣮ߦʢJupyterͳͲʣະରԠ • Serverless͚ͬͨͩ՝ۚͳͷͰ͓ࡒʹ༏͍͠👛
PySparkΛͬͯͬͨλεΫΛհ • σʔλऩू&BigQueryͷσʔλೖ • μογϡϘʔυΞϓϦ༻DBʢFirestoreʣͷσʔλೖ
ʲ࠶ܝʳσʔλऩू&BigQueryอଘ • σʔλݩαΠτʢBaseball Savantʣ͔Βఆظతʹσʔλऩू͢ΔΫϩʔϥʔʢCloud Functionsʣ࣮ߦ • ࣮ߦ݁ՌGoogle Cloud StorageʢGCSʣʹCSVͱͯ͠อଘ. ͜Ε͕ݯઘͷσʔλʢDatalakeʣ
• GCS্ͷCSVΛαϚϦʔ͍͍ͯ͠ײ͡ʹͯ͠BigQueryʹอଘ͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ
σʔλऩू ʢnot Sparkʣ • WebεΫϨΠϐϯάSparkͰ Δ͖͜ͱͰͳ͍. • λεΫΛrequests-htmlͰ࣮,
Cloud FunctionsͰӡ༻ͯ͠ରॲ. • Cloud SchedulerͷCronઃఆͰ ఆظ࣮ߦ, GCSʹอଘ
CSVσʔλΛ BigQueryʹೖ • Dataproc্ͰΔλεΫͱͯ͠ దͳൣғɾॲཧͷҰͭ • GCSͷύε͔ΒϑΝΠϧநग़ Spark
SQLͰॲཧͯ͠BigQuery • DataFrameͱSQL͕Θ͔Ε ͍͍ײ͡ʹ࣮ɾӡ༻Մೳ
DataprocΛ͓͏ • Google CloudͷυΩϡϝϯτɾαϯϓϧΛࣸܦ͠ͳ͕ΒΔͱྑ͖ • https://cloud.google.com/dataproc • https://cloud.google.com/dataproc-serverless/docs • https://github.com/GoogleCloudDataproc/cloud-dataproc
• Serverlessͷ߹, ࣄલʹVPC subnetΛ࡞Δඞཁ͋Γ • ࣍ϖʔδ͔Β, PySparkΛͬͯΔ߹ͷαϯϓϧΛগ͠հ • αϯϓϧίʔυType Hints࣮ͬͯͯ͠·͕͢ผʹແ͍͍ͯ͘Ͱ͢. ʢͲ͜ͷInterfaceͳͷ͔Θ͔Γ͘͢͢ΔͨΊ, ͑ͯType HintsΛͬͯॻ͍͍ͯ·͢ʣ.
ͻͱ·࣮ͣ 1. SessionΛ࡞Δ from pyspark.sql import SparkSession # BigQueryΛ͏&StringͷൣғΛ͛Δ spark:
SparkSession = SparkSession \ .builder \ .appName(‘app_yakiu')\ .config('spark.jars', 'gs://spark-lib/bigquery/spark- bigquery-with-dependencies_2.12-0.25.2.jar') \ .config('spark.sql.debug.maxToStringFields', 2000) \ .getOrCreate() # ͜ΕDataprocಛ༗ͷఆٛ, ࣮ߦ࣌ͷtemporaryྖҬ spark.conf.set('temporaryGcsBucket', 'GCSͷόέοτ໊') • దͳ.pyϑΝΠϧΛ࡞Δ • SparkSessionΛੜ͢Δ • ඞཁͳJARఆٛΛઃఆ BigQueryΛ͏࣌ JARͷࢦఆ͕ඞਢ
ͻͱ·࣮ͣ 2. SchemaΛ࡞Δ from pyspark.sql.types import StructType, StructField, DoubleType, DateType,
StringType, LongType # schemaઃఆʢͪΐͬͱ͍ʣ STATCAST_SCHEMA: StructType = StructType( [ StructField("pitch_type", StringType(), False), StructField("game_date", DateType(), False), StructField("release_speed", DoubleType(), False), StructField("release_pos_x", DoubleType(), False), StructField("release_pos_z", DoubleType(), False), # ͗͢ΔͷͰলུʢ91߲͋Δʣ StructField("spin_axis", DoubleType(), False), StructField("delta_home_win_exp", DoubleType(), False), StructField("delta_run_exp", DoubleType(), False) ] ) • CSVͷ߹SchemaΛ࡞Δ • ͜Ε͕ແ͍ͱҙਤ௨Γʹ ಈ͔ͳ͍ • 91߲ͷSchema ؤுͬͯॻ͖·ͨ͠ྦ
ͻͱ·࣮ͣ 3. CSVಡΈࠐΉ from pyspark.sql import DataFrame as SparkDataFrame from
schema import STATCAST_SCHEMA # ઌ΄ͲͷεΩʔϚఆٛ # ࠷ॳʹੜͨ͠sparkηογϣϯ͔ΒreadؔΛCSV FormatࢦఆͰಈ͔͢ def read_csv(date: str, filename: str, schema: StructType = None) -> SparkDataFrame: try: return spark.read \ .format('csv') \ .options(header="true", inferSchema="true") \ .load(f'gs://your-bucket-name/path/{date}/ {filename}', schema=schema) except AnalysisException: return None # type: ignore sdf: SparkDataFrame = read_csv('2022-10-15', 'batter.csv', schema=STATCAST_SCHEMA) • sparkηογϣϯͷreadΛ ͏, formatʹCSVΛࢦఆ • ϔομʔͱͯ͠ઌ΄Ͳͷ SchemaΛࢦఆ • GCSͷϑϧύεΛࢦఆ
ͻͱ·࣮ͣ 4. BigQueryอଘ from pyspark.sql import DataFrame as SparkDataFrame #
BigQueryʹอଘ͢Δؔ def save_bigquery(sdf: SparkDataFrame, table_name: str) -> None: sdf.write\ .mode('append') \ .format('bigquery') \ .option('project', 'your project name') \ .option('table', f'dataset.{table_name}') \ .option('temporaryGcsBucket', 'GCSͷόέοτ໊') \ .option('createDisposition', 'CREATE_NEVER') \ .save() # Spark DataFrameͱBigQueryͷςʔϒϧ໊Λࢦఆ save_bigquery(sdf, 'batting_data') • DataFrameͷwriteؔ • bigqueryΛࢦఆ • ྫطଘςʔϒϧͷ هॻ͖ࠐΈ
Dataproc ServerlessΛ࣮ͬͯߦ # ࣮ߦϑΝΠϧΛGCSʹΞοϓϩʔυ gsutil -m cp -r app gs://your-gcs-bucket/script
# bata optionΛ͚ͭΔͱαʔόϨε࣮ߦʹͳΓ·͢ gcloud beta dataproc batches submit \ --project your-project \ --region asia-northeast1 pyspark \ --batch hello-world gs://your-gcs-bucket/script/app/csv2bq_daily.py \ --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.25.2.jar \ --subnet your-subnet \ --py-files gs://your-gcs-bucket/script/app/environment.py,gs://your-gcs-bucket/script/app/query.py,gs://your-gcs- bucket/script/app/schema.py
ͬͯΈͨײ • ͻͱ·ͣ࡞Γ͍ͨͷ͕εϚʔτʹ࡞ΕͨͷͰྑ͔ͬͨ. • PySparkͷ࣮ͦͷͷDataprocͷํݴ͕গͳͯͦ͘͜ྑ͖. • ࣮ߦݖݶʢService Account & IAMʣσϑΥϧτͰ͍͍ײ͡ʹߦ͚ͨ.
• BigQueryGCS࠷ॳ͔ΒϑϧͰ৮ΕΔ. • ͨͩ, ݖݶڧ͗͢Δઆ͋ΔͷͰ࣮ͷ࣌νϡʔχϯά͕ඞཁ͔. • ͋ΔఔSparkͱGoogle Cloudʹ׳ΕͯΔਓ͡Όͳ͍ͱΩπΠઆ? SparkΛॳΊͯ৮Γ·͢ʂͱ͍͏ํͪΐͬͱ֮ޛ͕ඞཁ͔͠Εͳ͍.
ʲ࠶ܝʳFirestoreೖʢDatabaseʹσʔλҠૹʣ • BigQueryσʔλΛμογϡϘʔυ༻σʔλͷܗࣜʢJSONʣʹม͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ • ࣮ߦ݁ՌʢGCS্ʹJSONܗࣜͰอଘʣΛFirestoreʹೖΕΔͨΊͷPythonεΫϦϓτΛ࣮ߦ • ͳ͓͍ͣΕखಈͰͷ࣮ߦʢཧ༝&ରԠࡦޙ΄Ͳʣ
ʲ࠶ܝʳFirestoreೖʢDatabaseʹσʔλҠૹʣ • BigQueryσʔλΛμογϡϘʔυ༻σʔλͷܗࣜʢJSONʣʹม͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ • ࣮ߦ݁ՌʢGCS্ʹJSONܗࣜͰอଘʣΛFirestoreʹೖΕΔͨΊͷPythonεΫϦϓτΛ࣮ߦ • ͳ͓͍ͣΕखಈͰͷ࣮ߦʢཧ༝&ରԠࡦޙ΄Ͳʣ
BigQuery͔ΒGCSʹϑΝΠϧग़ྗ for Dataproc • BigQueryͷσʔλΛSpark DataFrameʹ • Spark DataFrameΛϑΝΠϧग़ྗ ͪͳΈʹ࣮ߦํ๏ʢgcloud
CLIʣมΘΒͳ͍ͷͰׂѪ͠·͢.
ͻͱ·࣮ͣ 5. BigQueryಡࠐ from pyspark.sql import SparkSession from pyspark.sql import
DataFrame as SparkDataFrame from pyspark.sql.utils import AnalysisException # ͜͜ಉ͡ spark: SparkSession = SparkSession \ .builder \ .appName('your app')\ .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with- dependencies_2.12-0.25.2.jar') \ .config('spark.sql.debug.maxToStringFields', 2000) \ .getOrCreate() spark.conf.set('temporaryGcsBucket', GCS_BUCKET) # ͜͜ͰViewಡΈऔΓΛ༗ޮԽ͠ͳ͍ͱΤϥʔʹͳΔͷͰҙ spark.conf.set("viewsEnabled", "true") def read_bq() -> SparkDataFrame: """ Read Dashboard data BigQuery View to SparkDataFrame """ try: df: SparkDataFrame = spark.read.format('bigquery') \ .option('project', 'your project') \ .option('table', f'your_project.view_baseball') \ .load() return df except AnalysisException: return None # type: ignore sdf: SparkDataFrame = read_bq() • อଘͱಉ͘͡BigQueryͷ JARΛࢦఆ • spark readͰBigQueryΛࢦఆ • BigQueryͷViewʹରͯ͠ ߦ͏߹, Φϓγϣϯ͕ඞཁ
ͻͱ·࣮ͣ 6. GCSอଘ from pyspark.sql import DataFrame as SparkDataFrame def
save_json(sdf: SparkDataFrame) -> None: """ Save as JSON dataset SparkDataFrame to GCS Bucket """ sdf.write \ .format("json") \ .mode("overwrite") \ .option("path", "gs://your-gcs-bucket/filepath/hoge") \ .save() save_json(sdf) • DataFrameͷwriteؔ • jsonΛࢦఆ • ࠷ऴతͳύεΛࢦఆ
PySparkͱDataproc Serverless • ʮ͍͍ͨͱ͖͚ͩSparkΛ͏ʯͱ͍͏ϢʔεέʔεΛ࣮ݱՄೳ. ͜Ε͕αʔόϨεαʔϏεΛ͏͖࠷େͷཧ༝. • ࠓճͷΞϓϦέʔγϣϯͷσʔλαΠζʢ1Ͱ1GB͍͔ͳ͍ʣͩͱ Ըܙʹत͔Εͳ͍͕,
ʮGB/ఔͷσʔλΛαΫοͱόονॲཧʯ Έ͍ͨͳϢʔεέʔεʹͳΔͱ݁ߏศརͳؾ͕͠·͢ʢલॲཧɾΫϨϯδϯά͢Δͱ͔ʣ. • ʮॲཧ͢Δͱ͖͚ͩಈ͔͢ʯͱ͍͏ײ͡ͷ͍ܰίʔυͳͷͰPySparkͱ૬ੑόπάϯ. • ͳ͓, ॲཧͷࣗಈԽͪΐͬͱบ͕͋Γ·͢, खஈ੍͕ݶ͞ΕΔʢͷͰखಈʹͨ͠ʣ. ࣗಈԽCloud ComposerʢAir fl owʣͰΔͷ͕ࠓͷॴͷϕεϓϥͬΆ͍.
ࠓͷͷ·ͱΊ • PythonͰ͍͍ײ͡ʹσʔλॲཧΛ͢ΔͷʹPySparkྑ͍ͧ. • PySparkΫϥυͰಈ͔ͤ·͢, ࠓDataprocΛհ͠·ͨ͠. • αʔόϨεʹΫϥυΛ͑ΔΑ͏ʹͳΔͱ,৭ʑͱָ ͨͩ͠,
্༷ͷ੍ݶ͋ΔͷͰ༰ྔ༻๏Λਖ਼͘͠ཧղ͓ͯ͠͏.
Done. ͝ਗ਼ௌ͋Γ͕ͱ͏͍͟͝·ͨ͠⽁