Upgrade to Pro — share decks privately, control downloads, hide ads and more …

PySparkとGoogle Cloudを使った野球データ解析のきほん / PySpark, Google Cloud and Baseball Data

PySparkとGoogle Cloudを使った野球データ解析のきほん / PySpark, Google Cloud and Baseball Data

#kwskrb #112 発表資料

PyCon JP 2022発表のプロトタイプ

Shinichi Nakagawa

September 28, 2022
Tweet

More Decks by Shinichi Nakagawa

Other Decks in Technology

Transcript

  1. Python࢖͍ͷͨΊͷ


    εϙʔπσʔλղੳͷ͖΄Μ


    β൛ʢ΋͘͠͸༧ࠂฤʣ
    PyCon JP 2022ͷຊ൪લʹͪΐͬͱ͓࿩ͯ͠Έ·ͨ͠.


    Shinichi Nakagawa@shinyorke 2022/09/28 kawasaki.rb #112

    View full-size slide

  2. Onboardingʢ͜ͷࢿྉɾൃදͷҐஔ͚ͮʣ
    • PythonͱSparkͱύϒϦοΫΫϥ΢υʢGoogle CloudʣͰ

    ਺ඦGBҎ্ͷσʔλΛ͍͍ײ͡ʹॲཧͯ͠ѻ͓͏ͥʂ, ͱ͍͏࿩.


    • PyCon JP 2022ʮPython࢖͍ͷͨΊͷεϙʔπσʔλղੳͷ͖΄Μʯ

    Ͱ΍Δલʹͪΐͬͱ͚͓ͩ൸࿐໨͢΂͘࡞੒ͨ͠΋ͷͰ͢.


    • ݱ࣌఺Ͱͷ׬੒൛Ͱ, ຊ൪Ͱ͸಺༰͕มΘΔՄೳੑ͕͋Γ·͢.

    View full-size slide

  3. Who am ɹ?


    ʢ͓લ୭Α?ʣ
    • Shinichi Nakagawa@shinyorke


    • ֎ࢿITίϯαϧاۀϚωʔδϟʔ


    • ࢓ࣄ͸Ϋϥ΢υαʔϏεΛѻ͏ΤϯδχΞ.

    ʢͷνʔϜΛవΊΔϚωʔδϟʔʣ


    • ࠷ۙ͸ެࢲͱ΋ʹGoogle CloudΛ࢖͏ਓ.


    • झຯͱ࣮ӹΛ݉Ͷͯݸਓ։ൃͯ͠·͢🍺


    #Python #GoogleCloud #Baseball


    #DataScience #SABRmetrics

    View full-size slide

  4. ຊ೔ͷελʔςΟϯάϝϯόʔ
    • ϝδϟʔϦʔάͷϏοάσʔλͰ༡΅͏ ※PyCon౰೔ʹ࿩͢


    • PythonͱGoogle CloudͰ࡞ΔαʔόϨεͰ͍͍ײ͡ͳσʔλج൫


    • PySpark + DataprocͰ࣮ݱ͢ΔαʔόϨεͳσʔλॲཧ


    • ࠓ೥ͷϝδϟʔϦʔάΛσʔλͰৼΓฦΔ ※PyCon౰೔ʹ࿩͢

    View full-size slide

  5. ࠓճѻ͏σʔλʮMLBͷϏοάσʔλʯ
    • ϝδϟʔϦʔά͸ʮStatcastʯͱ͍͏γεςϜͰ৭ΜͳσʔλΛه࿥͍ͯ͠·͢.

    ※ΧϝϥɾϨʔμʔͰه࿥, Ұ෦౷ܭ஋ɾਓྗͰه࿥


    • ྫ͑͹, ͜ͷลͷ࣮گͷݩωλ͸͢΂ͯ͜ͷʮStatcastʯͱ͍͏Ϗοάσʔλ͕ݩωλʹͳ͍ͬͯ·͢.


    • ΦΦλχαϯʂ˓߸ຊྥଧʂଧٿ଎౓180km/h, ඈڑ཭130m


    • ΦΦλχαϯʂ162km/hͷਅͬ௚͙Ͱݟಀ͠ࡾৼʂʂʂ


    • ໺ٿͷҰڍखҰ౤଍, ͢΂ͯͷ౤ٿɾଧٿσʔλ͕ه࿥͞ΕΔ.


    • ϨΪϡϥʔγʔζϯʢ30νʔϜɾ162ࢼ߹ʣͰ͓͓Αͦ70ʙ80ສٿલޙ. ϙετγʔζϯɾय़Ωϟϯϓσʔλ΋͋Δ.


    • σʔλ͸91ݸͷ߲໨ʢ!?ʣͰߏ੒͞ΕΔ, ϨΪϡϥʔγʔζϯ෼Ͱ͓͓Αͦ400MBʙ600MB͙Β͍ͷσʔλ.


    • baseballsavant.mlb.com ͱ͍͏αΠτͰ୭Ͱ΋Ӿཡɾμ΢ϯϩʔυʢCSV ϑΥʔϚοτʣͰ͖·͢.

    View full-size slide

  6. ϫΠʮຖ೔ຖࢼ߹ݟΔ࢓૊Έཉ͍͠ʯ


    ຖ೔σʔλऩू -> BigQueryʹอଘ -> ՄࢹԽ, తͳج൫࡞ͬͪΌ͑ʂ

    View full-size slide

  7. ͱ͍͏Θ͚Ͱ, ͪΐͬ͜ͱ࡞ͬͯΈ·ͨ͠.

    View full-size slide

  8. PythonͱGoogle CloudͰ࡞Δ


    αʔόϨεͰ͍͍ײ͡ͳ


    σʔλج൫ʢ໺ٿฤʣ

    View full-size slide

  9. ΞʔΩςΫνϟͷશମ૾

    View full-size slide

  10. ΞʔΩςΫνϟղઆʢ㲈ͩ͜ΘΓϙΠϯτʣ
    • ຖ೔σʔλ֬ೝɾຖ೔σʔλߋ৽Λ͍͍ײ͡ʹ࣮ݱ͢ΔͨΊ,

    ʮϑϧϚωʔδυͳαʔόϨεܥΫϥ΢υαʔϏεʯΛશ໘తʹ׆༻ͯ͠ߏஙɾӡ༻.


    • ʮϑϧϚωʔδυͳαʔόϨεܥΫϥ΢υαʔϏεʯ #ͱ͸


    • CLI΍ίϯιʔϧͰϙνϙν͢Δ͚ͩͰͻͱ·্ཱ͕ͣͪΔ


    • Πϯϑϥɾαʔόʔͷϝϯςφϯε͕ෆཁʢࣗ෼͡Όͳͯ͘, Ϋϥ΢υαʔϏεଆ͕΍Δʣ


    • ΑΓ۩ମతʹ͸, ࣗ෼ͰK8sΫϥελ΍VMΛݐͯͳͯ͘΋ྑ͍ʢωοτϫʔΫ౳ͷઃఆ͸ൃੜʣ


    • GitHub Actions౳ͷCI/CDͷύΠϓϥΠϯʹ૊ΈࠐΜͰσϓϩΠɾεέʔϧͰ͖ͨΓ

    جຊతʹ͸ʮ࢖ͬͨ෼͚ͩ՝ۚʯʹͳΔͷͰ͓ࡒ෍ʹ΋༏͍͠👛

    View full-size slide

  11. Ϣʔεέʔεͱ࢖ͬͨαʔϏε

    View full-size slide

  12. μογϡϘʔυΞϓϦ
    • ΞϓϦຊମ͸Cloud RunͰϗεςΟϯά, API GatewayΛ௨ͯ͠όοΫΤϯυʢCloud FunctionsʣʹΞΫηε
    • Firestore͕ϝΠϯͷDB, Cache໾ͷMemoryStoreʢRedisʣΛஔ͍͍ͯΔ
    • ͜͜Ͱ͸SparkʢPySparkʣ͸ొ৔͠·ͤΜ

    View full-size slide

  13. σʔλऩू&BigQueryอଘ
    • σʔλݩαΠτʢBaseball Savantʣ͔Βఆظతʹσʔλऩू͢ΔΫϩʔϥʔʢCloud Functionsʣ࣮ߦ
    • ࣮ߦ݁Ռ͸Google Cloud StorageʢGCSʣʹCSVͱͯ͠อଘ. ͜Ε͕ݯઘͷσʔλʢDatalakeʣ
    • GCS্ͷCSVΛαϚϦʔ͍͍ͯ͠ײ͡ʹͯ͠BigQueryʹอଘ͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ

    View full-size slide

  14. Firestore౤ೖʢDatabaseʹσʔλҠૹʣ
    • BigQueryσʔλΛμογϡϘʔυ༻σʔλͷܗࣜʢJSONʣʹม׵͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ
    • ࣮ߦ݁ՌʢGCS্ʹJSONܗࣜͰอଘʣΛFirestoreʹೖΕΔͨΊͷPythonεΫϦϓτΛ࣮ߦ
    • ͳ͓͍ͣΕ΋खಈͰͷ࣮ߦʢཧ༝&ରԠࡦ͸ޙ΄Ͳʣ

    View full-size slide

  15. PySpark + DataprocͰ࣮ݱ͢Δ


    αʔόϨεͳσʔλॲཧ


    ※͕͜͜͜ͷτʔΫͷຊ୊ͱͳΓ·͢.

    View full-size slide

  16. ͜ͷ࿩ͷείʔϓ

    View full-size slide

  17. 33.4ඵͰΘ͔ͬͨʢؾʹͳΔʣ🐯


    SparkͱPySpark

    View full-size slide

  18. SparkͱPySpark
    • ʮେ͖͍σʔλΛ͍͍ײ͡ʹ෼ࢄͯ͠ॲཧ͢ΔʯͨΊͷFramework


    • ྺ࢙తͳ࿩Ͱ͍͑͹HadoopͷޙܧͰ, ར༻ࣄྫ͸݁ߏ͋Δ.


    • ϥΠϒϥϦɾAPIͷ࣮૷͸Java͕ͩ, ʮPySparkʯͱ͍͏

    PythonͷInterface͕͋Δ.


    • DataFrame APIͱ͍͏, PandasͰ͓ೃછΈͷDataFrameͳInterface

    ͕͋Γ, ͜ΕΛϝΠϯʹ࢖͏ͱ͍͍ײ͡ʹ࢖͑Δ.

    View full-size slide

  19. SparkʢͱPySparkʣͷ࢖͍ํ
    • σʔληοτ


    • େݩͷσʔλ͸RDDʢResilient Distributed DatasetʣͰߏ੒ɾ؅ཧ.


    • RDDΛ͍͍ײ͡ʹѻ͏Interfaceͱͯ͠DataFrame API͕ଘࡏ͢Δ.


    • DataFrame͸PandasͬΆ͍࢖͍ํ͕ग़དྷΔ.


    • ࢖͍ํ


    • εΫϦϓτΛॻ͍ͯSpark؀ڥ্Ͱ࣮ߦ.


    • Jupyter Lab, Zeppelin notebookͰΞυϗοΫʹ࢖͏.

    View full-size slide

  20. ؀ڥΛͲ͜Ͱߏஙɾӡ༻͢Δ͔🤔
    ؀ڥɾखஈ ߏஙͷखؒ ӡ༻͠΍͢͞ උߟ
    ΦϯϓϨϛεͰ
    શͯࣗલߏஙɾӡ༻
    શͯࣗલͰઃఆ͢Δ
    ඞཁ͕͋Δ
    Կ͔ΒԿ·Ͱ
    ࣗ෼ͰݟΔඞཁ͕͋Δ
    Ұ൪େมͳύλʔϯ
    ຊ৬ͷΠϯϑϥΤϯδχΞ
    Ͱ΋͖͍ͭ࢓ࣄ
    Ϋϥ΢υ্ͷ7.,Tʹ
    ࣗલͰߏஙɾӡ༻
    શͯࣗલͰઃఆ͢Δ
    ඞཁ͕͋Δ
    ͋Δఔ౓Ϋϥ΢υαʔϏε
    ͷԸܙʹत͔ΕΔ
    4QBSL؀ڥͷࣗલߏங͸
    ׂͱ೉қ౓͕ߴ͍
    Ϋϥ΢υαʔϏεఏڙͷ
    ϚωʔδυαʔϏεΛ࢖͏
    ˞࠷΋ਪ঑͢Δํࣜ
    (6*Ͱϙνϙν͢Δ
    ΋͘͠͸$-*"1*Ͱ
    ͍͍ײ͡ʹ࣮ߦ
    $16౳ͷϦιʔεΛ؂ࢹ
    ঢ়گʹԠͯ͡ϝϯςφϯε
    ࠷΋ָ͔ͭεϚʔτͳํ๏
    "84 (PPHMF$MPVEଞ
    ֤ࣾαʔϏε༗

    View full-size slide

  21. Google Cloudʹ͓͚ΔSparkӡ༻ͷબ୒ࢶ
    ؀ڥɾखஈ ߏங ӡ༻ ࢖͑Δػೳ උߟ
    ($&PS(,&ʹ
    ؀ڥΛ࡞ͬͯӡ༻
    ࣗલͰߏஙޙ
    4QBSLΛಋೖ
    શͯࣗલͰӡ༻
    ໘౗ΛݟΔඞཁ༗
    શͯͷػೳ
    ݁ہͷॴ%BUBQSPDͰ
    ग़དྷΔ͜ͱͳͷͰ
    ͓͢͢Ί͠ͳ͍
    %BUBQSPD
    HDMPVEίϚϯυ
    "1* ίϯιʔϧͷ
    ͲΕ͔Ͱߏங
    %BUBQSPD͕࡞ͬͨ
    (,&PS($&؀ڥ
    Λ؂ࢹɾӡ༻
    શͯͷػೳ Ұ൪ඪ४తͳߏ੒
    %BUBQSPD
    4FSWFSMFTT
    HDMPVEίϚϯυ
    "1* ίϯιʔϧ
    ্هͷͲΕ͔Ͱߏங
    ࣮ߦதͷ؂ࢹͷΈ
    ؀ڥ͸ॲཧޙʹ
    ࣗಈ࡟আ
    όονॲཧͷΈରԠ
    OPUFCPPL࢖͑ͳ͍
    ఆظతͳόονॲཧ
    ͸͜Ε͕Ұ൪͍͍

    View full-size slide

  22. DataprocͱDataproc Serverless
    • Google Cloudʹ͸Dataprocͱ͍͏SparkʢHadoopʣͷ

    ϚωʔδυαʔϏε͕ଘࡏ͢Δ.


    • ࠓ·Ͱ͸GCE΍GKEʢK8sʣͰʮϗετɾCluster͕ଘࡏʯલఏͷ

    ӡ༻͔͠Ͱ͖ͳ͔͕ͬͨ, ͍ͭ࠷ۙServerlessͱ͍͏બ୒ࢶ͕ര஀


    • ʮ1೔1ճʯʮ30෼͓͖ʯΈ͍ͨͳόονӡ༻Ͱ͋Ε͹

    Serverless͕࢖͑Δʂͨͩ͠notebookܥͷ࣮ߦʢJupyterͳͲʣ͸ະରԠ


    • Serverless͸࢖ͬͨ෼͚ͩ՝ۚͳͷͰ͓ࡒ෍ʹ΋༏͍͠👛

    View full-size slide

  23. PySparkΛ࢖ͬͯ΍ͬͨλεΫΛ঺հ
    • σʔλऩू&BigQuery΁ͷσʔλ౤ೖ


    • μογϡϘʔυΞϓϦ༻DBʢFirestoreʣ΁ͷσʔλ౤ೖ

    View full-size slide

  24. ʲ࠶ܝʳσʔλऩू&BigQueryอଘ
    • σʔλݩαΠτʢBaseball Savantʣ͔Βఆظతʹσʔλऩू͢ΔΫϩʔϥʔʢCloud Functionsʣ࣮ߦ
    • ࣮ߦ݁Ռ͸Google Cloud StorageʢGCSʣʹCSVͱͯ͠อଘ. ͜Ε͕ݯઘͷσʔλʢDatalakeʣ
    • GCS্ͷCSVΛαϚϦʔ͍͍ͯ͠ײ͡ʹͯ͠BigQueryʹอଘ͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ

    View full-size slide

  25. σʔλऩू


    ʢnot Sparkʣ
    • WebεΫϨΠϐϯά͸SparkͰ

    ΍Δ΂͖͜ͱͰ͸ͳ͍.


    • λεΫΛrequests-htmlͰ࣮૷,

    Cloud FunctionsͰӡ༻ͯ͠ରॲ.


    • Cloud SchedulerͷCronઃఆͰ

    ఆظ࣮ߦ, GCSʹอଘ

    View full-size slide

  26. CSVσʔλΛ


    BigQueryʹ౤ೖ
    • Dataproc্Ͱ΍ΔλεΫͱͯ͠

    ద੾ͳൣғɾॲཧͷҰͭ


    • GCSͷύε͔ΒϑΝΠϧநग़

    Spark SQLͰॲཧͯ͠BigQuery΁


    • DataFrameͱSQL͕Θ͔Ε͹

    ͍͍ײ͡ʹ࣮૷ɾӡ༻Մೳ

    View full-size slide

  27. DataprocΛ࢖͓͏
    • Google CloudͷυΩϡϝϯτɾαϯϓϧΛࣸܦ͠ͳ͕Β΍Δͱྑ͖


    • https://cloud.google.com/dataproc


    • https://cloud.google.com/dataproc-serverless/docs


    • https://github.com/GoogleCloudDataproc/cloud-dataproc


    • Serverlessͷ৔߹, ࣄલʹVPC subnetΛ࡞Δඞཁ͋Γ


    • ࣍ϖʔδ͔Β, PySparkΛ࢖ͬͯ΍Δ৔߹ͷαϯϓϧΛগ͠঺հ


    • αϯϓϧίʔυ͸Type Hints࢖࣮ͬͯ૷ͯ͠·͕͢ผʹແͯ͘΋͍͍Ͱ͢.

    ʢͲ͜ͷInterfaceͳͷ͔Θ͔Γ΍͘͢͢ΔͨΊ, ׶͑ͯType HintsΛ࢖ͬͯॻ͍͍ͯ·͢ʣ.

    View full-size slide

  28. ͻͱ·࣮ͣ૷


    1. SessionΛ࡞Δ
    from pyspark.sql import SparkSession
    # BigQueryΛ࢖͏&StringͷൣғΛ޿͛Δ
    spark: SparkSession = SparkSession \
    .builder \
    .appName(‘app_yakiu')\
    .config('spark.jars', 'gs://spark-lib/bigquery/spark-
    bigquery-with-dependencies_2.12-0.25.2.jar') \
    .config('spark.sql.debug.maxToStringFields', 2000) \
    .getOrCreate()
    # ͜Ε͸Dataprocಛ༗ͷఆٛ, ࣮ߦ࣌ͷtemporaryྖҬ
    spark.conf.set('temporaryGcsBucket', 'GCSͷόέοτ໊')
    • ద౰ͳ.pyϑΝΠϧΛ࡞Δ


    • SparkSessionΛੜ੒͢Δ


    • ඞཁͳJAR΍ఆٛΛઃఆ

    BigQueryΛ࢖͏࣌͸

    JARͷࢦఆ͕ඞਢ

    View full-size slide

  29. ͻͱ·࣮ͣ૷


    2. SchemaΛ࡞Δ
    from pyspark.sql.types import StructType, StructField,
    DoubleType, DateType, StringType, LongType
    # schemaઃఆʢͪΐͬͱ௕͍ʣ
    STATCAST_SCHEMA: StructType = StructType(
    [
    StructField("pitch_type", StringType(), False),
    StructField("game_date", DateType(), False),
    StructField("release_speed", DoubleType(), False),
    StructField("release_pos_x", DoubleType(), False),
    StructField("release_pos_z", DoubleType(), False),
    # ௕͗͢ΔͷͰলུʢ91߲໨෼͋Δʣ
    StructField("spin_axis", DoubleType(), False),
    StructField("delta_home_win_exp", DoubleType(), False),
    StructField("delta_run_exp", DoubleType(), False)
    ]
    )
    • CSVͷ৔߹SchemaΛ࡞Δ


    • ͜Ε͕ແ͍ͱҙਤ௨Γʹ

    ಈ͔ͳ͍


    • 91߲໨෼ͷSchema

    ؤுͬͯॻ͖·ͨ͠ྦ

    View full-size slide

  30. ͻͱ·࣮ͣ૷


    3. CSVಡΈࠐΉ
    from pyspark.sql import DataFrame as SparkDataFrame
    from schema import STATCAST_SCHEMA # ઌ΄ͲͷεΩʔϚఆٛ
    # ࠷ॳʹੜ੒ͨ͠sparkηογϣϯ͔Βreadؔ਺ΛCSV FormatࢦఆͰಈ͔͢
    def read_csv(date: str, filename: str, schema: StructType = None) ->
    SparkDataFrame:
    try:
    return spark.read \
    .format('csv') \
    .options(header="true", inferSchema="true") \
    .load(f'gs://your-bucket-name/path/{date}/
    {filename}', schema=schema)
    except AnalysisException:
    return None # type: ignore
    sdf: SparkDataFrame = read_csv('2022-10-15', 'batter.csv',
    schema=STATCAST_SCHEMA)
    • sparkηογϣϯͷreadΛ

    ࢖͏, formatʹCSVΛࢦఆ


    • ϔομʔͱͯ͠ઌ΄Ͳͷ

    SchemaΛࢦఆ


    • GCSͷϑϧύεΛࢦఆ

    View full-size slide

  31. ͻͱ·࣮ͣ૷


    4. BigQueryอଘ
    from pyspark.sql import DataFrame as SparkDataFrame
    # BigQueryʹอଘ͢Δؔ਺
    def save_bigquery(sdf: SparkDataFrame, table_name: str) -> None:
    sdf.write\
    .mode('append') \
    .format('bigquery') \
    .option('project', 'your project name') \
    .option('table', f'dataset.{table_name}') \
    .option('temporaryGcsBucket', 'GCSͷόέοτ໊') \
    .option('createDisposition', 'CREATE_NEVER') \
    .save()
    # Spark DataFrameͱBigQueryͷςʔϒϧ໊Λࢦఆ
    save_bigquery(sdf, 'batting_data')
    • DataFrameͷwriteؔ਺


    • bigqueryΛࢦఆ


    • ྫ͸طଘςʔϒϧ΁ͷ

    ௥هॻ͖ࠐΈ

    View full-size slide

  32. Dataproc ServerlessΛ࢖࣮ͬͯߦ
    # ࣮ߦϑΝΠϧΛGCSʹΞοϓϩʔυ
    gsutil -m cp -r app gs://your-gcs-bucket/script
    # bata optionΛ͚ͭΔͱαʔόϨε࣮ߦʹͳΓ·͢
    gcloud beta dataproc batches submit \
    --project your-project \
    --region asia-northeast1 pyspark \
    --batch hello-world gs://your-gcs-bucket/script/app/csv2bq_daily.py \
    --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.25.2.jar \
    --subnet your-subnet \
    --py-files gs://your-gcs-bucket/script/app/environment.py,gs://your-gcs-bucket/script/app/query.py,gs://your-gcs-
    bucket/script/app/schema.py

    View full-size slide

  33. ΍ͬͯΈͨײ૝
    • ͻͱ·ͣ࡞Γ͍ͨ΋ͷ͕εϚʔτʹ࡞ΕͨͷͰྑ͔ͬͨ.


    • PySparkͷ࣮૷ͦͷ΋ͷ͸Dataprocͷํݴ͕গͳͯͦ͘͜΋ྑ͖.


    • ࣮ߦݖݶʢService Account & IAMʣ͸σϑΥϧτͰ͍͍ײ͡ʹߦ͚ͨ.


    • BigQuery΋GCS΋࠷ॳ͔ΒϑϧͰ৮ΕΔ.


    • ͨͩ, ݖݶڧ͗͢Δઆ͋ΔͷͰ࣮຿ͷ࣌͸νϡʔχϯά͕ඞཁ͔΋.


    • ͋Δఔ౓SparkͱGoogle Cloudʹ׳ΕͯΔਓ͡Όͳ͍ͱΩπΠઆ?

    SparkΛॳΊͯ৮Γ·͢ʂͱ͍͏ํ͸ͪΐͬͱ֮ޛ͕ඞཁ͔΋͠Εͳ͍.

    View full-size slide

  34. ʲ࠶ܝʳFirestore౤ೖʢDatabaseʹσʔλҠૹʣ
    • BigQueryσʔλΛμογϡϘʔυ༻σʔλͷܗࣜʢJSONʣʹม׵͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ
    • ࣮ߦ݁ՌʢGCS্ʹJSONܗࣜͰอଘʣΛFirestoreʹೖΕΔͨΊͷPythonεΫϦϓτΛ࣮ߦ
    • ͳ͓͍ͣΕ΋खಈͰͷ࣮ߦʢཧ༝&ରԠࡦ͸ޙ΄Ͳʣ

    View full-size slide

  35. ʲ࠶ܝʳFirestore౤ೖʢDatabaseʹσʔλҠૹʣ
    • BigQueryσʔλΛμογϡϘʔυ༻σʔλͷܗࣜʢJSONʣʹม׵͢ΔPySparkεΫϦϓτΛDataproc Serverless্Ͱ࣮ߦ
    • ࣮ߦ݁ՌʢGCS্ʹJSONܗࣜͰอଘʣΛFirestoreʹೖΕΔͨΊͷPythonεΫϦϓτΛ࣮ߦ
    • ͳ͓͍ͣΕ΋खಈͰͷ࣮ߦʢཧ༝&ରԠࡦ͸ޙ΄Ͳʣ

    View full-size slide

  36. BigQuery͔ΒGCSʹϑΝΠϧग़ྗ for Dataproc
    • BigQueryͷσʔλΛSpark DataFrameʹ


    • Spark DataFrameΛϑΝΠϧग़ྗ


    ͪͳΈʹ࣮ߦํ๏ʢgcloud CLIʣ͸มΘΒͳ͍ͷͰׂѪ͠·͢.

    View full-size slide

  37. ͻͱ·࣮ͣ૷


    5. BigQueryಡࠐ
    from pyspark.sql import SparkSession
    from pyspark.sql import DataFrame as SparkDataFrame
    from pyspark.sql.utils import AnalysisException
    # ͜͜͸ಉ͡
    spark: SparkSession = SparkSession \
    .builder \
    .appName('your app')\
    .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-
    dependencies_2.12-0.25.2.jar') \
    .config('spark.sql.debug.maxToStringFields', 2000) \
    .getOrCreate()
    spark.conf.set('temporaryGcsBucket', GCS_BUCKET)
    # ͜͜ͰViewಡΈऔΓΛ༗ޮԽ͠ͳ͍ͱΤϥʔʹͳΔͷͰ஫ҙ
    spark.conf.set("viewsEnabled", "true")
    def read_bq() -> SparkDataFrame:
    """
    Read Dashboard data
    BigQuery View to SparkDataFrame
    """
    try:
    df: SparkDataFrame = spark.read.format('bigquery') \
    .option('project', 'your project') \
    .option('table', f'your_project.view_baseball') \
    .load()
    return df
    except AnalysisException:
    return None # type: ignore
    sdf: SparkDataFrame = read_bq()
    • อଘͱಉ͘͡BigQueryͷ

    JARΛࢦఆ


    • spark readͰBigQueryΛࢦఆ


    • BigQueryͷViewʹରͯ͠

    ߦ͏৔߹, Φϓγϣϯ͕ඞཁ

    View full-size slide

  38. ͻͱ·࣮ͣ૷


    6. GCSอଘ
    from pyspark.sql import DataFrame as SparkDataFrame
    def save_json(sdf: SparkDataFrame) -> None:
    """
    Save as JSON dataset
    SparkDataFrame to GCS Bucket
    """
    sdf.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "gs://your-gcs-bucket/filepath/hoge") \
    .save()
    save_json(sdf)
    • DataFrameͷwriteؔ਺


    • jsonΛࢦఆ


    • ࠷ऴతͳύεΛࢦఆ

    View full-size slide

  39. PySparkͱDataproc Serverless
    • ʮ࢖͍͍ͨͱ͖͚ͩSparkΛ࢖͏ʯͱ͍͏ϢʔεέʔεΛ࣮ݱՄೳ.

    ͜Ε͕αʔόϨεαʔϏεΛ࢖͏΂͖࠷େͷཧ༝.


    • ࠓճͷΞϓϦέʔγϣϯͷσʔλαΠζʢ1೥Ͱ1GB͍͔ͳ͍ʣͩͱ

    Ըܙʹत͔Εͳ͍͕, ʮ਺GB/೔ఔ౓ͷσʔλΛαΫοͱόονॲཧʯ

    Έ͍ͨͳϢʔεέʔεʹͳΔͱ݁ߏศརͳؾ͕͠·͢ʢલॲཧɾΫϨϯδϯά͢Δͱ͔ʣ.


    • ʮॲཧ͢Δͱ͖͚ͩಈ͔͢ʯͱ͍͏ײ͡ͷ͍ܰίʔυͳͷͰPySparkͱ΋૬ੑόπάϯ.


    • ͳ͓, ॲཧͷࣗಈԽ͸ͪΐͬͱบ͕͋Γ·͢, खஈ੍͕ݶ͞ΕΔʢͷͰखಈʹͨ͠ʣ.

    ࣗಈԽ͸Cloud ComposerʢAir
    fl
    owʣͰ΍Δͷ͕ࠓͷॴͷϕεϓϥͬΆ͍.

    View full-size slide

  40. ࠓ೔ͷ࿩ͷ·ͱΊ
    • PythonͰ͍͍ײ͡ʹσʔλॲཧΛ͢ΔͷʹPySpark͸ྑ͍ͧ.


    • PySpark͸Ϋϥ΢υͰಈ͔ͤ·͢, ࠓ೔͸DataprocΛ঺հ͠·ͨ͠.


    • αʔόϨεʹΫϥ΢υΛ࢖͑ΔΑ͏ʹͳΔͱ,৭ʑͱָ

    ͨͩ͠, ࢓্༷ͷ੍ݶ΋͋ΔͷͰ༰ྔ༻๏Λਖ਼͘͠ཧղͯ͠࢖͓͏.

    View full-size slide

  41. Done.
    ͝ਗ਼ௌ͋Γ͕ͱ͏͍͟͝·ͨ͠⽁

    View full-size slide