Lock in $30 Savings on PRO—Offer Ends Soon! ⏳
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Pandas卒業?大規模データを様々なパッケージで高速処理してみる/pyconjp2022-hpc
Search
みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
October 12, 2022
Programming
13
13k
Pandas卒業?大規模データを様々なパッケージで高速処理してみる/pyconjp2022-hpc
PyConJP2022発表資料
「Pandas卒業?大規模データを様々なパッケージで高速処理してみる」
みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
October 12, 2022
Tweet
Share
More Decks by みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
See All by みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
NetworkXとGNNで学ぶグラフデータ分析入門〜複雑な関係性を解き明かすPythonの力〜
mhrtech
3
1.6k
AWS CDKでホストゾーン一式を管理しよう!/nw-jaws15
mhrtech
3
860
Fin-JAWS第38回reInvent2024_全金融系セッションをライトにまとめてみた
mhrtech
2
330
AWS CDKでデータリストアの運用、どのように設計する?~Aurora・EFSの実践事例を紹介~/aws-cdk-data-restore-aurora-efs
mhrtech
6
2.5k
Azure Verified Moduleを触って分かった注目ポイント/azure-verified-module-begin
mhrtech
1
2.4k
BLEA v3.0.0の新しいベストプラクティスを取り入れた効率的なAWS CDK開発/jawsug_cdk16
mhrtech
3
1.1k
あなたのアプリケーションをレガシーコードにしないための実践Pytest入門/pyconjp2024_pytest
mhrtech
7
5.5k
静的サイトのCI/CDでも侮るなかれ!Docs as Codeに沿ったセキュアな開発プロセスの実践/secure-docsascode-cicd-for-static-sites
mhrtech
14
4.2k
Kubernetes でワークフローを組むなら cdk8s-argoworkflow がよさそう!/ cdk8s-argoworkflow is great!
mhrtech
4
2.4k
Other Decks in Programming
See All in Programming
全員アーキテクトで挑む、 巨大で高密度なドメインの紐解き方
agatan
8
17k
Module Harmony
petamoriken
2
600
ソフトウェア設計の課題・原則・実践技法
masuda220
PRO
24
20k
大体よく分かるscala.collection.immutable.HashMap ~ Compressed Hash-Array Mapped Prefix-tree (CHAMP) ~
matsu_chara
1
200
ID管理機能開発の裏側 高速にSaaS連携を実現したチームのAI活用編
atzzcokek
0
160
sbt 2
xuwei_k
0
140
connect-python: convenient protobuf RPC for Python
anuraaga
0
340
GeistFabrik and AI-augmented software development
adewale
PRO
0
230
無秩序からの脱却 / Emergence from chaos
nrslib
2
11k
Evolving NEWT’s TypeScript Backend for the AI-Driven Era
xpromx
0
250
テストやOSS開発に役立つSetup PHP Action
matsuo_atsushi
0
130
堅牢なフロントエンドテスト基盤を構築するために行った取り組み
shogo4131
4
1.7k
Featured
See All Featured
JavaScript: Past, Present, and Future - NDC Porto 2020
reverentgeek
52
5.7k
[RailsConf 2023] Rails as a piece of cake
palkan
58
6.1k
Producing Creativity
orderedlist
PRO
348
40k
4 Signs Your Business is Dying
shpigford
186
22k
Agile that works and the tools we love
rasmusluckow
331
21k
How STYLIGHT went responsive
nonsquared
100
5.9k
It's Worth the Effort
3n
187
29k
Large-scale JavaScript Application Architecture
addyosmani
514
110k
Raft: Consensus for Rubyists
vanstee
140
7.2k
Building Flexible Design Systems
yeseniaperezcruz
329
39k
Understanding Cognitive Biases in Performance Measurement
bluesmoon
31
2.7k
Code Reviewing Like a Champion
maltzj
527
40k
Transcript
Pandas卒業? 大規模データを様々なパッケージで高速処理してみる 2022/10/14
自己紹介 藤根 成暢(Shigenobu Fujine) 岩手県出身 みずほリサーチ&テクノロジーズ株式会社(MHRT)に所属 現在、先端技術研究部にてデータ分析・クラウドなどの技術検証等を担当 過去の発表 (PyConJP2021) scikit-learnの新機能を紹介します
アジェンダ ✅自己紹介 本トークの背景と狙い パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
本トークの背景と狙い Pandasは超便利! 様々なデータの入出力、主要な加工・結合・集計処理をほぼサポート だけど、大きいデータにはスケールしにくい シングルスレッド ⇒ CPUコアが何十個あっても、基本1コアしか使用しない インメモリ処理 ⇒ データ規模が大きいと、ファイル読込すら失敗 並列処理・分散処理をサポートするパッケージに手を出してみるも… パーティションやワーカー等、内部アーキテクチャを理解・意識した実装が必要 APIがPandasと違うため、プログラムの移植が難しい or
出来ない それでも処理時間やMemoryErrorが改善されない 😢
本トークの背景と狙い 本日話すこと 以下4つのパッケージで代表的なデータ処理を実装・計測した結果 APIやパフォーマンスの違い、実装時のTipsを解説 公式サンプルコードだけでは見えない、現実的な課題とその打開案を紹介 本日話さないこと・検証しないこと GPU系パッケージ(cuDF、RAPIDSなど) Sparkの各種パラメータ説明
アジェンダ ✅自己紹介 ✅本トークの背景と狙い パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
パッケージ紹介(Pandas) 実用的なリアルデータを分析するためのパッケージ Index / カラムは、内部ではNumpyで実装 多様なフォーマットや分析要件に対応した豊富で高レベル なAPI群 read_* to_* CSV
XLS PARQUET HTML <> HDF5 JSON {} GBQ SQL ... CSV XLS PARQUET HTML <> HDF5 JSON {} GBQ SQL ... https://pandas.pydata.org/docs/getting_started/intro_tutorials/02_read_write.html
パッケージ紹介(Dask) タスクグラフによる並列処理 + 遅延評価による最適化 Pandas / Numpy / Scikit-learnを踏襲したAPI https://docs.dask.org/en/stable/10-minutes-to-dask.html
パッケージ紹介(Vaex) 高パフォーマンス : 行規模のデータを秒単位で処理 遅延評価・仮想カラム メモリ効率化(フィルタリングやカラム選択はゼロメモリ) https://vaex.io/ 109
パッケージ紹介(PySpark) クラスタコンピューティング用フレームワーク Spark Coreをベースに、様々な言語やタスクに対応 version3.2より、PySparkにPandasのAPIが統合 https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch01.html 1 import pyspark.pandas as
ps 2 df = pd.DataFrame(...)
(ご参考)GitHubスター数
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証環境 ハードウェア GoogleCloud VM(N2Dインスタンス)、8vCPU/64GB OS : Debian 10.13 Python 3.7.12
パッケージ 1 pandas==1.3.5 2 dask==2022.2.0 3 vaex==4.12.0 4 pyspark==3.3.0 5 6 # other 7 pyarrow 8 numexpr 9 bokeh 10 graphviz 11 tqdm
使用するデータセット TLC Trip Record Data NYCが公開するタクシー履歴データ 乗降車した時刻、場所、人数 運賃、支払方法、チップ額、空港税など 選定理由 オープンデータ、データ数が多い
複数のデータ型がある 数値、タイムスタンプ、文字列、カテゴリ値 使用するデータ範囲 2011/1~2022/6のYellow Taxi Trip Records 月単位(例. yellow_tripdata_2022-06.parquet) 計138ファイル、13.3億レコード https://unsplash.com/photos/x7DHDky2Jwc
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証① 基本統計量の算出 対象のファイルパス Pandas Dask Vaex PySpark 1 pattern = 'data/yellow_tripdata_*.parquet'
1 taxi = pd.DataFrame() 2 for path in Path().glob(pattern): 3 taxi = pd.concat([taxi, pd.read_parquet(path)], ignore_index=True) 4 stat = taxi.describe(include='all', datetime_is_numeric=True) 1 stat = (dd.read_parquet(pattern) 2 .describe(include='all', datetime_is_numeric=True) 3 .compute()) 1 stat = vaex.open(pattern).describe() 1 stat = ps.read_parquet(pattern).describe()
検証① 基本統計量の算出 データが大きいほど、Vaexが高速かつ省メモリ
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証② 補間、フィルター、型変換 欠値・外れ値の補正、正しい値や範囲のデータを抽出、適切なデータ型に変換 元データには欠値や外れ値が多数あり
検証② 補間、フィルター、型変換(Pandas) numexprパッケージ + eval で、複数カラムを並列処理して高速化 1 taxi = pd.DataFrame() 2
for path in Path().glob('data/yellow_tripdata_*.parquet'): 3 t = pd.read_parquet(path) 4 5 # 欠値と外れ値を置換 6 t.passenger_count = t.passenger_count.fillna(1).replace({0: 1}) 7 t.RatecodeID = t.RatecodeID.fillna(1) 8 ... 9 10 # フィルター用の一時カラムを作成 11 t.eval(""" 12 _duration = (tpep_dropoff_datetime - tpep_pickup_datetime).dt.seconds 13 _base_fare = 2.5 + (ceil(trip_distance / 0.2) + ceil(_duration / 60)) * 0.5 14 _base_total = fare_amount + extra + mta_tax + tip_amount + tolls_amount + improve 15 ... 16 """, inplace=True) 17 18 # フィルター、型変換 19 t = t.query(common.queries).astype(common.new_dtypes) 20 21 t.drop(columns=[c for c in t.columns if c.startswith('_')], inplace=True) 22 taxi = pd.concat([taxi, t], ignore_index=True)
検証② 補間、フィルター、型変換(Dask) ファイル名に含まれている年月を、パーティション毎に個別付与するのに苦労 delayedを使用して、パーティション毎にファイル名情報を追加(GH#6575) 主要なAPIはPandasとほぼ同じ fillna、replace、astype などは、複数カラムを一括実行する方が高速 1 taxi = []
2 for path in Path().glob(pattern): 3 year, month = map(int, path.stem.split('_')[-1].split('-')) 4 _min_pickup = datetime(year, month, 1) 5 _max_pickup = _min_pickup + timedelta(days=calendar.monthrange(year, month)[1], sec 6 t = delayed(pd.read_parquet)(path) 7 t = delayed(pd.DataFrame.assign)(t, _min_pickup=_min_pickup, _max_pickup=_max_picku 8 taxi.append(t) 9 taxi = dd.from_delayed(taxi) 1 taxi = taxi.fillna(dict( 2 passenger_count=1, RatecodeID=1, store_and_fwd_flag='N', 3 improvement_surcharge=0, congestion_surcharge=0, airport_fee=0)) 4 taxi = taxi.replace(dict(passenger_count={0: 1}, payment_type={0: 1}))
検証② 補間、フィルター、型変換(Dask) いざ実行すると、MemoryError! 全てのworkerが同時にファイルを読み込むため、メモリ使用量が急増 デフォルトでは、worker数=CPUコア数 compute(num_workers=N)でworker数を減らし、メモリ使用量を制御 worker数を1/2にすると、メモリ使用量は半減するが、処理時間は微増
検証② 補間、フィルター、型変換(Vaex) PandasのAPIとかなり異なる 日付・時刻計算 定数でカラム作成 値の置換はfunc.where numpy.ceil()などの外部関数を使う場合、UDFを登録する必要あり 除算は / ではなく numpy.divide()
1 min_pickup = np.datetime64(f'{year}-{month:02}-01T00:00:00') 1 taxi['_min_pickup'] = vaex.vconstant(min_pickup, len(taxi)) 1 taxi.payment_type = taxi.func.where(taxi.payment_type == 0, 1, taxi.payment_type) 1 @vaex.register_function() 2 def calc_base_fare(trip_distance, duration): 3 return 2.5 + (np.ceil(np.divide(trip_distance, 0.2)) + 4 np.ceil(np.divide(duration, 60))) * 0.5
検証② 補間、フィルター、型変換(PySpark) 時刻の加減計算がエラー! 全部SQLで書き直した SQL文を書いてps.sql()で実行するだけ、ファイル名追加も標準機能にあり 1 now = ps.DataFrame({'now': [np.datetime64('2022-10-14T13:50')]}) 2
now + np.timedelta64(30, 'm') # TypeError: Addition can not be applied to datetimes. 1 query = """ 2 SELECT 3 TO_DATE(CONCAT(REGEXP_EXTRACT(INPUT_FILE_NAME(), '20[1-2][0-9]-[0-1][0-9]', 0), '-0 4 ... 5 _file_date + (INTERVAL 1 MONTH) - (INTERVAL 1 SECOND) as _max_pickup, 6 ... 7 TINYINT(VendorID) as VendorID, 8 ... 9 FROM 10 parquet.`{pattern}` 11 WHERE 12 (VendorID in (1, 2)) 13 ...""" 14 taxi = ps.sql(query.format(pattern=pattern)))
検証② 補間、フィルター、型変換 処理時間はVaex、メモリ使用量はPySparkが優秀
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証③ テーブル結合 5つの定義テーブルを履歴テーブルと内部結合し、各IDを文字列に変換 メモリ節約のため、各テーブルのデータ型はkey/valueともCategoricalDytpe型 結合イメージ
検証③ テーブル結合(Pandas) mergeよりも joinの方が高速(今回のデータでは約3倍) 全てのkeyをMultiIndexにして、あとは順番に joinするだけ mergeと joinで出力結果が異なる! keyにNaNが含まれている場合、joinでは一致しないが、mergeでは一致 CategoricalDtype型のカテゴリ値を数値に変換(NaN ⇒
-1)してから join 1 taxi = taxi.reset_index().set_index(['VendorID', 'RatecodeID', 'PULocationID', 2 'DOLocationID', 'payment_type']) 1 taxi = taxi.reset_index().set_index(target_columns) 2 3 for lookup in [common.vendors, common.ratecodes, common.pulocations, 4 common.dolocations, common.payment_types]: 5 right = pd.DataFrame(lookup, index=pd.Index(lookup.index.codes, 6 name=lookup.index.name)) 7 taxi = taxi.join(right, how='inner') 8 9 taxi = taxi.reset_index(drop=True).set_index('index')
検証③ テーブル結合(Dask) joinよりもmergeの方が高速 Daskのset_indexでは、内部でsort等が実行されるためオーバーヘッド大 定義テーブルは単一パーティションにした方が高速 1 vendors = dd.from_pandas(common.vendors, npartitions=1) 2
ratecodes = dd.from_pandas(common.ratecodes, npartitions=1) 3 pulocations = dd.from_pandas(common.pulocations, npartitions=1) 4 dolocations = dd.from_pandas(common.dolocations, npartitions=1) 5 payment_types = dd.from_pandas(common.payment_types, npartitions=1) 6 7 taxi = (taxi.merge(vendors, left_on='VendorID', right_index=True, how='inner') 8 .merge(ratecodes, left_on='RatecodeID', right_index=True, how='inner') 9 .merge(pulocations, left_on='PULocationID', right_index=True, how='inner' 10 .merge(dolocations, left_on='DOLocationID', right_index=True, how='inner' 11 .merge(payment_types, left_on='payment_type', right_index=True, how='inne
検証③ テーブル結合(Vaex) Vaexではカラム同士の結合のみ(だけどメソッド名は join) keyにCategoricalDtype型は使えないため、floatに変換してから join 1 vendors = vaex.from_pandas(common.vendors.astype({'VendorID': float}))
2 ratecodes = vaex.from_pandas(common.ratecodes.astype({'RatecodeID': float})) 3 pulocations = vaex.from_pandas(common.pulocations.astype({'PULocationID': float})) 4 dolocations = vaex.from_pandas(common.dolocations.astype({'DOLocationID': float})) 5 payment_types = vaex.from_pandas(common.payment_types.astype({'payment_type': float}) 6 7 taxi = (taxi.join(vendors, on='VendorID', how='inner') 8 .join(ratecodes, on='RatecodeID', how='inner') 9 .join(pulocations, on='PULocationID', how='inner') 10 .join(dolocations, on='DOLocationID', how='inner') 11 .join(payment_types, on='payment_type', how='inner'))
検証③ テーブル結合(PySpark) Indexにカテゴリ型(CategoricalIndex)を使用可能 カラムはカテゴリ型に未対応(今後対応予定)のため、文字列に変換 1 vendors = ps.DataFrame(common.vendors).astype(str) 2 ratecodes =
ps.DataFrame(common.ratecodes).astype(str) 3 payment_types = ps.DataFrame(common.payment_types).astype(str) 4 pulocations = ps.DataFrame(common.pulocations).astype(str) 5 dolocations = ps.DataFrame(common.dolocations).astype(str) 6 7 taxi = (taxi.merge(vendors, left_on='VendorID', right_index=True, how='inner') 8 .merge(ratecodes, left_on='RatecodeID', right_index=True, how='inner') 9 .merge(pulocations, left_on='PULocationID', right_index=True, how='inner' 10 .merge(dolocations, left_on='DOLocationID', right_index=True, how='inner' 11 .merge(payment_types, left_on='payment_type', right_index=True, how='inne 12 )
検証③ テーブル結合 処理時間、メモリ使用量ともにPySparkが優秀
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 ✅検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証④ リソース使用量の監視(Pandas) memory_profilerやline_profilerなどの追加パッケージが必要 メモリ使用量や処理時間を行単位で追跡可能 途中でGCが発生すると正確に計測できないことも… 1 Line # Mem usage Increment
Occurrences Line Contents 2 ============================================================= 3 4 117.6 MiB 117.6 MiB 1 @profile 4 5 def func(filename): 5 6 4597.1 MiB 4479.5 MiB 1 taxi = pd.read_parquet(filename) 6 7 2164.8 MiB -2432.3 MiB 1 stat = taxi.describe() 7 8 8 9 2062.1 MiB -102.7 MiB 1 del taxi 9
検証④ リソース使用量の監視(Dask) ProgressBar で進捗を出力、ResourceProfiler でCPU・メモリ使用量を可視化
検証④ リソース使用量の監視(Dask) Profiler でタスクの実行順序や経過時間などを出力 タスクの長時間化、メモリ使用量急増などの原因調査に有効
検証④ リソース使用量の監視(Vaex) vaex.progress.tree で進捗やCPU使用量を出力 widget rich
検証④ リソース使用量の監視(PySpark) ステージ毎にタスク進捗を出力 Web UIでタスク毎の処理内容、DAG、Planなどの詳細情報にアクセス可能
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 ✅検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 ✅検証③ テーブル結合
✅検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
それでもPandasを使いたいあなたに evalで並列化 数式に近い形で記述でき、可読性 の面でも恩恵あり numexprインストールを忘れずに! Pandas統合の並列処理パッケージを活用 apply/mapの並列化に有効 1 df =
pd.DataFrame() 2 df['x'] = np.arange(1e8) 3 4 # Before 5 df['x'] += 1 6 df['y'] = df['x'] * 2 7 df['z'] = df['x'] / df['y'] 8 9 # After 10 df.eval(""" 11 x = x + 1 12 y = x * 2 13 z = x / y 14 """, inplace=True)
まとめ 様々なパッケージで大規模データの効率的な処理を比較検証 Pandas : 小~中規模( )のデータ件数なら、Pandasだけでも十分有用 Dask : Pandasと(ほぼ)同じAPIで大規模データを処理できるのが強み Vaex
: APIがPandasと異なるも、処理時間を最重視したい場合には有益 PySpark : Python/SQLなど複数言語を使用可能、pyspark.pandasの今後に期待 データや処理内容が変われば、各パッケージの優劣も変わる可能性あり 手元のデータで試してみる ユースケースに応じて複数のパッケージを使い分けられる状態が理想的 ~107
ご清聴ありがとうございました。
(免責事項) 当資料は情報提供のみを目的として作成されたものであり、商品の勧誘を目的としたも のではありません。本資料は、当社が信頼できると判断した各種データに基づき作成さ れておりますが、その正確性、確実性を保証するものではありません。また、本資料に記 載された内容は予告なしに変更されることもあります。
None