Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Pandas卒業?大規模データを様々なパッケージで高速処理してみる/pyconjp2022-hpc
Search
みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
October 12, 2022
Programming
10
12k
Pandas卒業?大規模データを様々なパッケージで高速処理してみる/pyconjp2022-hpc
PyConJP2022発表資料
「Pandas卒業?大規模データを様々なパッケージで高速処理してみる」
みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
October 12, 2022
Tweet
Share
More Decks by みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
See All by みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
AWS CDKでデータリストアの運用、どのように設計する?~Aurora・EFSの実践事例を紹介~/aws-cdk-data-restore-aurora-efs
mhrtech
6
1.5k
Azure Verified Moduleを触って分かった注目ポイント/azure-verified-module-begin
mhrtech
1
1.3k
BLEA v3.0.0の新しいベストプラクティスを取り入れた効率的なAWS CDK開発/jawsug_cdk16
mhrtech
3
710
あなたのアプリケーションをレガシーコードにしないための実践Pytest入門/pyconjp2024_pytest
mhrtech
7
3.3k
静的サイトのCI/CDでも侮るなかれ!Docs as Codeに沿ったセキュアな開発プロセスの実践/secure-docsascode-cicd-for-static-sites
mhrtech
14
3.1k
Kubernetes でワークフローを組むなら cdk8s-argoworkflow がよさそう!/ cdk8s-argoworkflow is great!
mhrtech
3
1.5k
IaCでセキュリティを強化しよう!~IAMが苦手な開発者でも簡単に権限を絞れる。そう、AWS CDKならね!~/secjaws32
mhrtech
5
2.9k
AWS Control Towerを2年弱運用して得たエッセンスと展望/securityjaws31
mhrtech
1
2.1k
そのリファレンス誰のため?ユーザ活用に向き合う/finjaws31
mhrtech
0
670
Other Decks in Programming
See All in Programming
ESLintプラグインを使用してCDKのセオリーを適用する
yamanashi_ren01
2
240
Stackless и stackful? Корутины и асинхронность в Go
lamodatech
0
1.3k
Итераторы в Go 1.23: зачем они нужны, как использовать, и насколько они быстрые?
lamodatech
0
1.4k
shadcn/uiを使ってReactでの開発を加速させよう!
lef237
0
300
DMMオンラインサロンアプリのSwift化
hayatan
0
190
PicoRubyと暮らす、シェアハウスハック
ryosk7
0
220
ecspresso, ecschedule, lambroll を PipeCDプラグインとして動かしてみた (プロトタイプ) / Running ecspresso, ecschedule, and lambroll as PipeCD Plugins (prototype)
tkikuc
2
1.9k
php-conference-japan-2024
tasuku43
0
430
責務を分離するための例外設計 - PHPカンファレンス 2024
kajitack
9
2.4k
PSR-15 はあなたのための ものではない? - phpcon2024
myamagishi
0
400
為你自己學 Python
eddie
0
520
HTML/CSS超絶浅い説明
yuki0329
0
190
Featured
See All Featured
Documentation Writing (for coders)
carmenintech
67
4.5k
Designing for humans not robots
tammielis
250
25k
Fontdeck: Realign not Redesign
paulrobertlloyd
82
5.3k
Dealing with People You Can't Stand - Big Design 2015
cassininazir
365
25k
A Modern Web Designer's Workflow
chriscoyier
693
190k
Art, The Web, and Tiny UX
lynnandtonic
298
20k
Learning to Love Humans: Emotional Interface Design
aarron
274
40k
Speed Design
sergeychernyshev
25
740
The World Runs on Bad Software
bkeepers
PRO
66
11k
The MySQL Ecosystem @ GitHub 2015
samlambert
250
12k
Code Reviewing Like a Champion
maltzj
521
39k
Visualization
eitanlees
146
15k
Transcript
Pandas卒業? 大規模データを様々なパッケージで高速処理してみる 2022/10/14
自己紹介 藤根 成暢(Shigenobu Fujine) 岩手県出身 みずほリサーチ&テクノロジーズ株式会社(MHRT)に所属 現在、先端技術研究部にてデータ分析・クラウドなどの技術検証等を担当 過去の発表 (PyConJP2021) scikit-learnの新機能を紹介します
アジェンダ ✅自己紹介 本トークの背景と狙い パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
本トークの背景と狙い Pandasは超便利! 様々なデータの入出力、主要な加工・結合・集計処理をほぼサポート だけど、大きいデータにはスケールしにくい シングルスレッド ⇒ CPUコアが何十個あっても、基本1コアしか使用しない インメモリ処理 ⇒ データ規模が大きいと、ファイル読込すら失敗 並列処理・分散処理をサポートするパッケージに手を出してみるも… パーティションやワーカー等、内部アーキテクチャを理解・意識した実装が必要 APIがPandasと違うため、プログラムの移植が難しい or
出来ない それでも処理時間やMemoryErrorが改善されない 😢
本トークの背景と狙い 本日話すこと 以下4つのパッケージで代表的なデータ処理を実装・計測した結果 APIやパフォーマンスの違い、実装時のTipsを解説 公式サンプルコードだけでは見えない、現実的な課題とその打開案を紹介 本日話さないこと・検証しないこと GPU系パッケージ(cuDF、RAPIDSなど) Sparkの各種パラメータ説明
アジェンダ ✅自己紹介 ✅本トークの背景と狙い パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
パッケージ紹介(Pandas) 実用的なリアルデータを分析するためのパッケージ Index / カラムは、内部ではNumpyで実装 多様なフォーマットや分析要件に対応した豊富で高レベル なAPI群 read_* to_* CSV
XLS PARQUET HTML <> HDF5 JSON {} GBQ SQL ... CSV XLS PARQUET HTML <> HDF5 JSON {} GBQ SQL ... https://pandas.pydata.org/docs/getting_started/intro_tutorials/02_read_write.html
パッケージ紹介(Dask) タスクグラフによる並列処理 + 遅延評価による最適化 Pandas / Numpy / Scikit-learnを踏襲したAPI https://docs.dask.org/en/stable/10-minutes-to-dask.html
パッケージ紹介(Vaex) 高パフォーマンス : 行規模のデータを秒単位で処理 遅延評価・仮想カラム メモリ効率化(フィルタリングやカラム選択はゼロメモリ) https://vaex.io/ 109
パッケージ紹介(PySpark) クラスタコンピューティング用フレームワーク Spark Coreをベースに、様々な言語やタスクに対応 version3.2より、PySparkにPandasのAPIが統合 https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch01.html 1 import pyspark.pandas as
ps 2 df = pd.DataFrame(...)
(ご参考)GitHubスター数
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証環境 ハードウェア GoogleCloud VM(N2Dインスタンス)、8vCPU/64GB OS : Debian 10.13 Python 3.7.12
パッケージ 1 pandas==1.3.5 2 dask==2022.2.0 3 vaex==4.12.0 4 pyspark==3.3.0 5 6 # other 7 pyarrow 8 numexpr 9 bokeh 10 graphviz 11 tqdm
使用するデータセット TLC Trip Record Data NYCが公開するタクシー履歴データ 乗降車した時刻、場所、人数 運賃、支払方法、チップ額、空港税など 選定理由 オープンデータ、データ数が多い
複数のデータ型がある 数値、タイムスタンプ、文字列、カテゴリ値 使用するデータ範囲 2011/1~2022/6のYellow Taxi Trip Records 月単位(例. yellow_tripdata_2022-06.parquet) 計138ファイル、13.3億レコード https://unsplash.com/photos/x7DHDky2Jwc
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証① 基本統計量の算出 対象のファイルパス Pandas Dask Vaex PySpark 1 pattern = 'data/yellow_tripdata_*.parquet'
1 taxi = pd.DataFrame() 2 for path in Path().glob(pattern): 3 taxi = pd.concat([taxi, pd.read_parquet(path)], ignore_index=True) 4 stat = taxi.describe(include='all', datetime_is_numeric=True) 1 stat = (dd.read_parquet(pattern) 2 .describe(include='all', datetime_is_numeric=True) 3 .compute()) 1 stat = vaex.open(pattern).describe() 1 stat = ps.read_parquet(pattern).describe()
検証① 基本統計量の算出 データが大きいほど、Vaexが高速かつ省メモリ
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証② 補間、フィルター、型変換 欠値・外れ値の補正、正しい値や範囲のデータを抽出、適切なデータ型に変換 元データには欠値や外れ値が多数あり
検証② 補間、フィルター、型変換(Pandas) numexprパッケージ + eval で、複数カラムを並列処理して高速化 1 taxi = pd.DataFrame() 2
for path in Path().glob('data/yellow_tripdata_*.parquet'): 3 t = pd.read_parquet(path) 4 5 # 欠値と外れ値を置換 6 t.passenger_count = t.passenger_count.fillna(1).replace({0: 1}) 7 t.RatecodeID = t.RatecodeID.fillna(1) 8 ... 9 10 # フィルター用の一時カラムを作成 11 t.eval(""" 12 _duration = (tpep_dropoff_datetime - tpep_pickup_datetime).dt.seconds 13 _base_fare = 2.5 + (ceil(trip_distance / 0.2) + ceil(_duration / 60)) * 0.5 14 _base_total = fare_amount + extra + mta_tax + tip_amount + tolls_amount + improve 15 ... 16 """, inplace=True) 17 18 # フィルター、型変換 19 t = t.query(common.queries).astype(common.new_dtypes) 20 21 t.drop(columns=[c for c in t.columns if c.startswith('_')], inplace=True) 22 taxi = pd.concat([taxi, t], ignore_index=True)
検証② 補間、フィルター、型変換(Dask) ファイル名に含まれている年月を、パーティション毎に個別付与するのに苦労 delayedを使用して、パーティション毎にファイル名情報を追加(GH#6575) 主要なAPIはPandasとほぼ同じ fillna、replace、astype などは、複数カラムを一括実行する方が高速 1 taxi = []
2 for path in Path().glob(pattern): 3 year, month = map(int, path.stem.split('_')[-1].split('-')) 4 _min_pickup = datetime(year, month, 1) 5 _max_pickup = _min_pickup + timedelta(days=calendar.monthrange(year, month)[1], sec 6 t = delayed(pd.read_parquet)(path) 7 t = delayed(pd.DataFrame.assign)(t, _min_pickup=_min_pickup, _max_pickup=_max_picku 8 taxi.append(t) 9 taxi = dd.from_delayed(taxi) 1 taxi = taxi.fillna(dict( 2 passenger_count=1, RatecodeID=1, store_and_fwd_flag='N', 3 improvement_surcharge=0, congestion_surcharge=0, airport_fee=0)) 4 taxi = taxi.replace(dict(passenger_count={0: 1}, payment_type={0: 1}))
検証② 補間、フィルター、型変換(Dask) いざ実行すると、MemoryError! 全てのworkerが同時にファイルを読み込むため、メモリ使用量が急増 デフォルトでは、worker数=CPUコア数 compute(num_workers=N)でworker数を減らし、メモリ使用量を制御 worker数を1/2にすると、メモリ使用量は半減するが、処理時間は微増
検証② 補間、フィルター、型変換(Vaex) PandasのAPIとかなり異なる 日付・時刻計算 定数でカラム作成 値の置換はfunc.where numpy.ceil()などの外部関数を使う場合、UDFを登録する必要あり 除算は / ではなく numpy.divide()
1 min_pickup = np.datetime64(f'{year}-{month:02}-01T00:00:00') 1 taxi['_min_pickup'] = vaex.vconstant(min_pickup, len(taxi)) 1 taxi.payment_type = taxi.func.where(taxi.payment_type == 0, 1, taxi.payment_type) 1 @vaex.register_function() 2 def calc_base_fare(trip_distance, duration): 3 return 2.5 + (np.ceil(np.divide(trip_distance, 0.2)) + 4 np.ceil(np.divide(duration, 60))) * 0.5
検証② 補間、フィルター、型変換(PySpark) 時刻の加減計算がエラー! 全部SQLで書き直した SQL文を書いてps.sql()で実行するだけ、ファイル名追加も標準機能にあり 1 now = ps.DataFrame({'now': [np.datetime64('2022-10-14T13:50')]}) 2
now + np.timedelta64(30, 'm') # TypeError: Addition can not be applied to datetimes. 1 query = """ 2 SELECT 3 TO_DATE(CONCAT(REGEXP_EXTRACT(INPUT_FILE_NAME(), '20[1-2][0-9]-[0-1][0-9]', 0), '-0 4 ... 5 _file_date + (INTERVAL 1 MONTH) - (INTERVAL 1 SECOND) as _max_pickup, 6 ... 7 TINYINT(VendorID) as VendorID, 8 ... 9 FROM 10 parquet.`{pattern}` 11 WHERE 12 (VendorID in (1, 2)) 13 ...""" 14 taxi = ps.sql(query.format(pattern=pattern)))
検証② 補間、フィルター、型変換 処理時間はVaex、メモリ使用量はPySparkが優秀
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証③ テーブル結合 5つの定義テーブルを履歴テーブルと内部結合し、各IDを文字列に変換 メモリ節約のため、各テーブルのデータ型はkey/valueともCategoricalDytpe型 結合イメージ
検証③ テーブル結合(Pandas) mergeよりも joinの方が高速(今回のデータでは約3倍) 全てのkeyをMultiIndexにして、あとは順番に joinするだけ mergeと joinで出力結果が異なる! keyにNaNが含まれている場合、joinでは一致しないが、mergeでは一致 CategoricalDtype型のカテゴリ値を数値に変換(NaN ⇒
-1)してから join 1 taxi = taxi.reset_index().set_index(['VendorID', 'RatecodeID', 'PULocationID', 2 'DOLocationID', 'payment_type']) 1 taxi = taxi.reset_index().set_index(target_columns) 2 3 for lookup in [common.vendors, common.ratecodes, common.pulocations, 4 common.dolocations, common.payment_types]: 5 right = pd.DataFrame(lookup, index=pd.Index(lookup.index.codes, 6 name=lookup.index.name)) 7 taxi = taxi.join(right, how='inner') 8 9 taxi = taxi.reset_index(drop=True).set_index('index')
検証③ テーブル結合(Dask) joinよりもmergeの方が高速 Daskのset_indexでは、内部でsort等が実行されるためオーバーヘッド大 定義テーブルは単一パーティションにした方が高速 1 vendors = dd.from_pandas(common.vendors, npartitions=1) 2
ratecodes = dd.from_pandas(common.ratecodes, npartitions=1) 3 pulocations = dd.from_pandas(common.pulocations, npartitions=1) 4 dolocations = dd.from_pandas(common.dolocations, npartitions=1) 5 payment_types = dd.from_pandas(common.payment_types, npartitions=1) 6 7 taxi = (taxi.merge(vendors, left_on='VendorID', right_index=True, how='inner') 8 .merge(ratecodes, left_on='RatecodeID', right_index=True, how='inner') 9 .merge(pulocations, left_on='PULocationID', right_index=True, how='inner' 10 .merge(dolocations, left_on='DOLocationID', right_index=True, how='inner' 11 .merge(payment_types, left_on='payment_type', right_index=True, how='inne
検証③ テーブル結合(Vaex) Vaexではカラム同士の結合のみ(だけどメソッド名は join) keyにCategoricalDtype型は使えないため、floatに変換してから join 1 vendors = vaex.from_pandas(common.vendors.astype({'VendorID': float}))
2 ratecodes = vaex.from_pandas(common.ratecodes.astype({'RatecodeID': float})) 3 pulocations = vaex.from_pandas(common.pulocations.astype({'PULocationID': float})) 4 dolocations = vaex.from_pandas(common.dolocations.astype({'DOLocationID': float})) 5 payment_types = vaex.from_pandas(common.payment_types.astype({'payment_type': float}) 6 7 taxi = (taxi.join(vendors, on='VendorID', how='inner') 8 .join(ratecodes, on='RatecodeID', how='inner') 9 .join(pulocations, on='PULocationID', how='inner') 10 .join(dolocations, on='DOLocationID', how='inner') 11 .join(payment_types, on='payment_type', how='inner'))
検証③ テーブル結合(PySpark) Indexにカテゴリ型(CategoricalIndex)を使用可能 カラムはカテゴリ型に未対応(今後対応予定)のため、文字列に変換 1 vendors = ps.DataFrame(common.vendors).astype(str) 2 ratecodes =
ps.DataFrame(common.ratecodes).astype(str) 3 payment_types = ps.DataFrame(common.payment_types).astype(str) 4 pulocations = ps.DataFrame(common.pulocations).astype(str) 5 dolocations = ps.DataFrame(common.dolocations).astype(str) 6 7 taxi = (taxi.merge(vendors, left_on='VendorID', right_index=True, how='inner') 8 .merge(ratecodes, left_on='RatecodeID', right_index=True, how='inner') 9 .merge(pulocations, left_on='PULocationID', right_index=True, how='inner' 10 .merge(dolocations, left_on='DOLocationID', right_index=True, how='inner' 11 .merge(payment_types, left_on='payment_type', right_index=True, how='inne 12 )
検証③ テーブル結合 処理時間、メモリ使用量ともにPySparkが優秀
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 ✅検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証④ リソース使用量の監視(Pandas) memory_profilerやline_profilerなどの追加パッケージが必要 メモリ使用量や処理時間を行単位で追跡可能 途中でGCが発生すると正確に計測できないことも… 1 Line # Mem usage Increment
Occurrences Line Contents 2 ============================================================= 3 4 117.6 MiB 117.6 MiB 1 @profile 4 5 def func(filename): 5 6 4597.1 MiB 4479.5 MiB 1 taxi = pd.read_parquet(filename) 6 7 2164.8 MiB -2432.3 MiB 1 stat = taxi.describe() 7 8 8 9 2062.1 MiB -102.7 MiB 1 del taxi 9
検証④ リソース使用量の監視(Dask) ProgressBar で進捗を出力、ResourceProfiler でCPU・メモリ使用量を可視化
検証④ リソース使用量の監視(Dask) Profiler でタスクの実行順序や経過時間などを出力 タスクの長時間化、メモリ使用量急増などの原因調査に有効
検証④ リソース使用量の監視(Vaex) vaex.progress.tree で進捗やCPU使用量を出力 widget rich
検証④ リソース使用量の監視(PySpark) ステージ毎にタスク進捗を出力 Web UIでタスク毎の処理内容、DAG、Planなどの詳細情報にアクセス可能
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 ✅検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 ✅検証③ テーブル結合
✅検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
それでもPandasを使いたいあなたに evalで並列化 数式に近い形で記述でき、可読性 の面でも恩恵あり numexprインストールを忘れずに! Pandas統合の並列処理パッケージを活用 apply/mapの並列化に有効 1 df =
pd.DataFrame() 2 df['x'] = np.arange(1e8) 3 4 # Before 5 df['x'] += 1 6 df['y'] = df['x'] * 2 7 df['z'] = df['x'] / df['y'] 8 9 # After 10 df.eval(""" 11 x = x + 1 12 y = x * 2 13 z = x / y 14 """, inplace=True)
まとめ 様々なパッケージで大規模データの効率的な処理を比較検証 Pandas : 小~中規模( )のデータ件数なら、Pandasだけでも十分有用 Dask : Pandasと(ほぼ)同じAPIで大規模データを処理できるのが強み Vaex
: APIがPandasと異なるも、処理時間を最重視したい場合には有益 PySpark : Python/SQLなど複数言語を使用可能、pyspark.pandasの今後に期待 データや処理内容が変われば、各パッケージの優劣も変わる可能性あり 手元のデータで試してみる ユースケースに応じて複数のパッケージを使い分けられる状態が理想的 ~107
ご清聴ありがとうございました。
(免責事項) 当資料は情報提供のみを目的として作成されたものであり、商品の勧誘を目的としたも のではありません。本資料は、当社が信頼できると判断した各種データに基づき作成さ れておりますが、その正確性、確実性を保証するものではありません。また、本資料に記 載された内容は予告なしに変更されることもあります。
None