Upgrade to Pro — share decks privately, control downloads, hide ads and more …

ETLだけじゃない!AWS Glueで学ぶ分散処理入門編

ETLだけじゃない!AWS Glueで学ぶ分散処理入門編

Avatar for NRI Netcom

NRI Netcom PRO

August 05, 2025
Tweet

More Decks by NRI Netcom

Other Decks in Technology

Transcript

  1. ETLだけじゃない! AWS Glueで学ぶ分散処理 入門編 NRIネットコム TECH AND DESIGN STUDY #69

    2025年06月23日 NRIネットコム株式会社 クラウドテクニカルセンター 和田 将利
  2. 1 Copyright(C) NRI Netcom, Ltd. All rights reserved. ごあいさつ 00

    AWS Glueについて 01 AWS Glue ETL jobsとSpark 02 Glue ETL jobsをETL以外の用途で使ってみる 03 まとめ 04
  3. 3 Copyright(C) NRI Netcom, Ltd. All rights reserved. 本日の勉強会について ◼

    概要 ⚫ AWS Glueの主要な機能を紹介しつつ、サーバレスETL実行基盤であるGlue ETL jobsを Apache Sparkのアーキテクチャを交えて解説します。 ⚫ 最後に、Glue ETL jobsをETL以外の用途で利用した検証事例をご紹介します。 ◼ こんな方にオススメ ⚫ AWSを利用したデータ分析基盤構築をこれから始める方 ⚫ AWS Glue ETL jobsによるETL実装をすでに経験しているが、Glue ETL jobsについて新たな知見を得たい方 ⚫ データ分析基盤の検証等で利用するダミーデータの生成に興味がある方 ◼ アジェンダ ⚫ AWS Glueの全体像 ⚫ Glue ETL jobsとApache Spark ⚫ Glue ETL jobsをETL以外の用途で利用してみる 0章 ごあいさつ
  4. 4 Copyright(C) NRI Netcom, Ltd. All rights reserved. プロフィール(和田 将利)

    0章 ごあいさつ 【配属履歴】 ◼ 2020年06月 NRIネットコム株式会社入社 ◼ 2024年01月 株式会社デロイトトーマツ ウェブサービス入社 ◼ 2025年02月 NRIネットコム株式会社入社 ⚫ 以降、プリセールス、CCoE業務に従事 ⚫ 現在 クラウドテクニカルセンター 課長代理 【スキル、取得資格】 ◼ バックエンド、バッチ処理のパフォーマンスチューニング ◼ 上記スキルを軸とした提案活動、プロジェクト立ち上げ ◼ 情報処理技術者試験(応用情報技術者、ネットワークスペ シャリスト) ◼ AWS認定 ⚫ 2022 APN ALL AWS Certifications Engineers 【主な業務経歴】 ◼ EC ⚫ 大手動画配信サービス データ基盤構築、運用 ⚫ 大手動画配信サービス Webアプリケーション基盤運用 ⚫ 大手動画配信サービス 動画配信基盤運用 ◼ 公共 ⚫ 政令指定都市 ガバメントクラウド運用管理補助者 提案、プロジェクト マネージャ、テックリード ◼ エンターテイメント ⚫ 大手ゲーム会社 決済システム構築、運用、改善 ⚫ 大手ゲーム会社 社内向けシステム構築、運用、改善 ⚫ 大手ゲーム会社 スマートフォンゲーム バックエンド基盤構築、運用 ◼ その他 ⚫ AWS MSP認定監査対応
  5. 6 Copyright(C) NRI Netcom, Ltd. All rights reserved. AWS Glueとは

    ◼ 「サーバレスデータ統合サービス」 ⚫ AWS Glue とは - AWS Glue ◼ データ統合とは? ⚫ 各システムが生成するデータを抽出(Extract)し、変換(Transform)し、読み込む(Load)する。 ⚫ ETLと呼ばれる処理。 ◼ AWS Glueには主に以下の機能が含まれている。 ⚫ Data Catalog • Databases/Tables • Crawlers • Connections ⚫ Data Integration and ETL • ETL jobs • Glue Studio 1章 AWS Glueについて
  6. 7 Copyright(C) NRI Netcom, Ltd. All rights reserved. 一般的なデータ分析基盤の構成とGlueそれぞれの機能のおさらい 1章

    AWS Glueについて S3 (ログ保管用) Amazon Aurora EC2 (Web Server) User AWS Cloud B2Cシステム データ分析用システム S3 (データレイク) AWS Glue ETL jobs Crawler AWS Glue Data Catalog Amazon Redshift Amazon Athena Data Analyst メタデータ 抽出 データ 抽出 メタデータ 登録 データ 取り込み データ 変換 SQLで 分析
  7. 8 Copyright(C) NRI Netcom, Ltd. All rights reserved. メタデータ管理はなぜ必要なのか? ◼

    Glue Data Catalogにはデータセットの「メタデータ」が保存される。 ◼ データセットのメタデータ? ⚫ 先述の図の中のB2Cシステム側では、“system-a-s3-logs”バケットのapplication-logsディレクトリ以下に年/月/日 ディレクトリを作成して、毎日ログが出力される ⚫ ログはjson形式で、以下のデータが1レコードに保存される、とする。 {“Timestamp”:”yyyy-mm-dd HH:MM:SS.f”,”URL”:https://xxxx”,”Cookie”:”xxxx”,”ClientIP”:”xxx.xxx.xxx.xxx”} ◼ jsonファイルの状態では都合がよろしくない・・・ ⚫ 格納先バケットやスキーマ情報を、ETLジョブのコードにハードコーディングする必要がある。 • jsonファイルであることを事前に認識してないと利用できない ⚫ SQLでデータにアクセスできない。 ◼ Glue Data Catalogに保存すると、テーブルとして扱うことができるようになる。 ⚫ Glue ETL Jobsでデータを抽出/変換 ⚫ Athenaでクエリ実行 ◼ Glue crawlerはメタデータを自動的に抽出し、Data Catalogに登録する機能。 1章 AWS Glueについて
  8. 9 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glue ETL

    jobsについて ◼ Apache Spark, またはRayによる分散処理が可能なサーバレス基盤 ◼ Sparkの場合は以下の方法で分散処理を実行 ⚫ コンパイル済みSparkアプリケーション(jar)を直接実行 ⚫ PythonシェルからPySparkを利用し、Sparkによる分散処理を実行 ◼ つまりGlue ETL jobsは「サーバレスSparkクラスタ」? ⚫ Glue Studio(Visual ETL)を利用すると、ほとんどSparkのことを意識せずにETLジョブを構築することができる。 • 実体はPython + PySparkのコードが生成されている。 ⚫ AWSから提供される”GlueContext”クラスがAWS環境でETLジョブを構築する際に便利。 • Extract, Loadの部分でAWSの他サービスとの連携がいい感じにできる。 ◼ Sparkについて少し知っておくと、より楽しくETLプログラミングができるようになると思います。 ◼ 次の章からGlue ETL jobsとSparkについて深堀をします。 1章 AWS Glueについて 好評でしたら Ray編やります
  9. 11 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glue StudioでETL処理を作る

    Extract処理 ◼ S3上のJSONファイルを読み込み ◼ URLからproductid列を作成、timestampからパーティションキーを作成 ◼ S3にParquet形式で保存 2章 AWS Glue ETL jobsとSpark
  10. 12 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glue StudioでETL処理を作る

    Transform処理 ◼ S3上のJSONファイルを読み込み ◼ URLからproductid列を作成、timestampからパーティションキーを作成 ◼ S3にParquet形式で保存 2章 AWS Glue ETL jobsとSpark
  11. 13 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glue StudioでETL処理を作る

    Load処理 ◼ S3上のJSONファイルを読み込み ◼ URLからproductid列を作成、timestampからパーティションキーを作成 ◼ S3にParquet形式で保存 2章 AWS Glue ETL jobsとSpark
  12. 14 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glue StudioでETL処理を作る

    生成されたPythonコードを眺めてみる def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: for alias, frame in mapping.items(): frame.toDF().createOrReplaceTempView(alias) result = spark.sql(query) return DynamicFrame.fromDF(result, glueContext, transformation_ctx) args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Script generated for node Amazon S3 AmazonS3_node1749015592433 = glueContext.create_dynamic_frame.from_options(format_options={"multiLine": "false"}, connection_type="s3", format="json", connection_options={"paths": ["s3://m4- wada-daaaaaaaatalake/data/accesslog/"], "recurse": True}, transformation_ctx="AmazonS3_node1749015592433") # Script generated for node SQL Query SqlQuery0 = ''' select timestamp, ipaddress,useragent,split(url,"/")[4] as productid, split(timestamp,"-")[0] as year,split(timestamp,"-")[1] as month from myDataSource ''' SQLQuery_node1749015652080 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"myDataSource":AmazonS3_node1749015592433}, transformation_ctx = "SQLQuery_node1749015652080") # Script generated for node Amazon S3 AmazonS3_node1749015918498 = glueContext.getSink(path="s3://m4-wada-daaaaaaaatalake/data/accesslog-partitioned/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["year", "month"], enableUpdateCatalog=True, transformation_ctx="AmazonS3_node1749015918498") AmazonS3_node1749015918498.setCatalogInfo(catalogDatabase="m4-wada-db",catalogTableName="accesslog-partitioned") AmazonS3_node1749015918498.setFormat("glueparquet", compression="snappy") AmazonS3_node1749015918498.writeFrame(SQLQuery_node1749015652080) job.commit() 2章 AWS Glue ETL jobsとSpark DynamicFrameをDataFrameに変換、 DataFrameでSpark SQLを実行し DynamicFrameに再度変換 S3上のデータをDynamicFrameとして Extract Transform処理 Load処理
  13. 15 Copyright(C) NRI Netcom, Ltd. All rights reserved. Spark DataFrameとGlue

    DynamicFrameについて ◼ どちらも分散処理基盤上でデータを扱う仕組み。 ⚫ 作成したDataFrame, DynamicFrameに対して処理を行うと、Sparkアプリケーションとして分散して処理が実行される。 ◼ GlueではPythonシェルが提供されるので、Pythonコードでデータ処理を書くことも可能だが、 分散処理は行われず、遅い。 ◼ 抽出したデータはDataFrame, DynamicFrameとして扱い、DataFrame, DynamicFrameの機能を利用して データの処理を行う。 ◼ データの処理? ⚫ Spark DataFrame • 集計処理、条件によるフィルタ • Spark SQL • この中で関数を利用した文字列処理を列単位で適用 • UDF(User Defined Function)を実装し、独自の処理をすることも可能 • MLibによる機械学習, 推論 ⚫ Glue DynamicFrame • 集計処理、条件によるフィルタ • 事前に定義した関数をすべての行に適用(map) • 行単位で処理を行う点がSpark SQLのUDFと異なる。 • unbox • 列内にJSON文字列で格納されたデータを列に変換 2章 AWS Glue ETL jobsとSpark
  14. 16 Copyright(C) NRI Netcom, Ltd. All rights reserved. Spark DataFrameとGlue

    DynamicFrame、どちらを利用するか? ◼ DynamicFrameはSpark DataFrameの抽象化レイヤー ◼ 大体同じことができる。 ◼ DynamicFrameを利用するメリット ⚫ スキーマ定義が不要 ⚫ 他AWSサービスへのLoad処理が強力 • Load時のGlue Data Catalogの登録/更新 • Spark DataFrameではファイルとして入出力が基本となる。 ◼ 相互変換可能なため、基本的にDynamicFrameを利用し、必要に応じてSpark DataFrameに変換する。 ◼ 余談 ⚫ “Spark” DataFrameと記載しているのは、”Pandas” DataFrameと混同しないため。 ⚫ GlueでPandas DataFrameの処理を行うことが可能だが、分散処理は行われない。 2章 AWS Glue ETL jobsとSpark
  15. 17 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glue ETL

    jobsの構成 (Glue 5.0) ◼ Pythonシェルで情報を取得した結果から推測されるGlue ETL jobsの構成 ⚫ 最小DPUが2なのは、1つ目のDPUがDriver(分散処理管理)を担うため。 2章 AWS Glue ETL jobsとSpark Worker Node #2 Worker Node #1 Python Shell Container Spark Driver Container Spark Executor Container Worker Node #3 Spark Executor Container DPU DPU DPU G.1Xでは 4C 16G このWorkerは 実処理はしない AWS Cloud
  16. 18 Copyright(C) NRI Netcom, Ltd. All rights reserved. Spark UIを見てみる①

    ◼ Spark History Serverを構築すると、Sparkジョブの詳細を確認することができる。 ⚫ Spark 履歴サーバーの起動 - AWS Glue ◼ ETL job側でSpark UI有効化とログ保管先バケットの指定が必要。 2章 AWS Glue ETL jobsとSpark AWS Glue ETL jobs S3 (Jobログ保管) Spark History Server AWS Cloud
  17. 19 Copyright(C) NRI Netcom, Ltd. All rights reserved. Spark UIを見てみる②

    ◼ Job→Stage→Task(Executorで実行) ⚫ 1Executorあたり4タスク動いていることがわかる。 ⚫ G1.Xは4Coreのため、1Core = 1タスクで割り当てられている。 2章 AWS Glue ETL jobsとSpark
  18. 20 Copyright(C) NRI Netcom, Ltd. All rights reserved. 3. Glue

    ETL jobsをETL以外の用途で使ってみる
  19. 21 Copyright(C) NRI Netcom, Ltd. All rights reserved. ダミーデータが欲しい ◼

    データ分析系サービスの性能検証で大規模なデータがたまに必要になる。 ◼ ダミーデータを準備するのにGlueは利用できるか? ⚫ ファイル(csv, json等)からDataFrame/DynamicFrame作成は簡単。 ⚫ Pythonの二次元配列をDataFrame/DynamicFrameにするのは簡単。 ⚫ Pythonで1000万行規模のデータを生成しようとすると遅い。 ⚫ DataFrame/DynamicFrameは行の追加処理が遅い。 3章 Glue ETL jobsをETL以外の用途で使ってみる 何かしらの方法で分散処理を活かしてデータを生成することはできないだろうか?
  20. 22 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glueでダミーデータ生成 全体の流れ

    ◼ 行の追加は遅いため、列を追加するアプローチを試す。 1. Pythonで生成したいレコード数分の辞書の配列を作成 ⚫ 中身は計算量なく生成が可能なダミーの値 2. 作成した配列をDataFrameに変換 3. DataFrameをDynamicFrameに変換し、map()で列のデータを生成 ⚫ mapに渡す関数でデータを生成する ⚫ 完全にランダムではなく、偏りがあるデータを用意してみる 4. S3にLoad 3章 Glue ETL jobsをETL以外の用途で使ってみる
  21. 23 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glueでダミーデータ生成 大まかな処理

    def initDyf(): list = [[1]for i in range(numlogs)] dfaccesslog = spark.createDataFrame(list, schema="id int") dyfaccesslog = DynamicFrame.fromDF(dfaccesslog, glueContext, "dyfaccesslog") return dyfaccesslog dyfaccesslog = initDyf() mapped_dyfaccesslog = dyfaccesslog.map(f = generateData) s3output.writeFrame(mapped_dyfaccesslog) 3章 Glue ETL jobsをETL以外の用途で使ってみる 行数多いだけのダミーの DynamicFrame作成 レコード生成 S3出力
  22. 24 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glueでダミーデータ生成 DynamicFrame.map()で実行する関数について

    # Timestamp Distribution probability_hours = [10,10,10,10,10,12,16,20,24,26,28,40,40,30,30,34,30,28,28,28,18,16,12] # Product Distribution productmaster = {} productmaster[“productid”] = [“10001”,”10002”,”10003”,”10004”,”10005”,”10006”,”10007”,”10008”,”10009”,”10010”] productmaster[“probability_access”] = [0.1,0.1,0.1,0.15,0.15,0.15,0.2,0.2,0.5,0.5] def generateData(rec): hours = random.choices(range(len(probability_hours)), k=1, weights=probability_hours)[0] date = startdate + pd.Timedelta(days=np.random.randint(0, numdays),seconds=hours*3600 + np.random.randint(0, 3600)) rec['timestamp'] = date.strftime('%Y-%m-%dT%H:%M:%S') productindex = random.choices(range(len(productmaster["productid"])), k=1, weights=productmaster['probability_access'])[0] product = productmaster["productid"][productindex] url = accesslogroot + str(product) rec['url'] = url # Other Information rec['IpAddress'] = fake.ipv4() rec['UserAgent'] = fake.user_agent() del rec['id'] return rec 3章 Glue ETL jobsをETL以外の用途で使ってみる 時刻ごとのアクセス数の偏り アクセスされる製品ページの偏り IP, UAはFakerで生成 ダミー列削除
  23. 25 Copyright(C) NRI Netcom, Ltd. All rights reserved. Glueでダミーデータ生成 実行結果

    ◼ 5DPUで10,000,000レコード生成 ⚫ およそ3.5分 ◼ 4 Executor * 4 Task(G1.Xが4Core) = 16ファイルに分割されて生成される 3章 Glue ETL jobsをETL以外の用途で使ってみる
  24. 26 Copyright(C) NRI Netcom, Ltd. All rights reserved. おまけ Glueでダミーデータ生成

    Spark DataFrame + UDF 版 def generateUrl(): productindex = random.choices(range(len(productmaster["productid"])), k=1, weights=productmaster['probability_access'])[0] product = productmaster["productid"][productindex] url = accesslogroot + str(product) return url def generateDate(): hours = random.choices(range(len(probability_hours)), k=1, weights=probability_hours)[0] date = startdate + pd.Timedelta(days=np.random.randint(0, numdays),seconds=hours*3600 + np.random.randint(0, 3600)) return date.strftime('%Y-%m-%dT%H:%M:%S’) (中略) spark.udf.register("generateUrl", generateUrl, StringType()) spark.udf.register("generateDate", generateDate, StringType()) spark.udf.register("generateIp", generateIp, StringType()) spark.udf.register("generateUa", generateUa, StringType()) dfa = initDf() dfa.registerTempTable('dummy') mapped_dfa = spark.sql("SELECT generateUrl() as url,generateDate() as timestamp, generateIp() as IpAddress, generateUa() as UserAgent FROM dummy") 3章 Glue ETL jobsをETL以外の用途で使ってみる DynamicFrame.map()では 行一括処理が可能だが UDFでは1列ごとに処理 SELECT内でUDF呼び出し
  25. 27 Copyright(C) NRI Netcom, Ltd. All rights reserved. おまけ DynamicFrame.map()

    と Spark DataFrame+UDF どっちが速い? ◼ 5DPUで10,000,000レコード生成 * 2 Stage = 合計4GBのデータ生成 ⚫ 1 Stage目はExecutor起動のオーバーヘッドが存在する ◼ DataFrame + UDFの方が5%強高速 3章 Glue ETL jobsをETL以外の用途で使ってみる DynamicFrame.map() DataFrame + UDF
  26. 29 Copyright(C) NRI Netcom, Ltd. All rights reserved. まとめ ◼

    AWS Glueはそれぞれの機能と目的を意識すると全体像をつかみやすい。 ⚫ Data Catalog • メタデータの管理 ⚫ Data Integration and ETL • 実データの処理 ◼ Glue ETL jobsではSparkまたはRayによる分散処理をサーバレスで実行。 ⚫ DataFrame, DynamicFrameとしてデータを扱うことでSparkによる分散処理が可能。 ⚫ Ray編? ◼ DataFrameとDynamicFrameはそれぞれ得手不得手あり。 「こちらだと組み込みの機能で実現できないな」となったら変換するくらいの気持ちで。 ⚫ スキーマが固定, 処理がやや高速で柔軟な処理が可能なDataFrame ⚫ スキーマが柔軟でAWSサービスとの入出力が得意なDynamicFrame ◼ Glueでダミーデータ生成できました。