Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Pandas卒業?大規模データを様々なパッケージで高速処理してみる/pyconjp2022-hpc
Search
みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
October 12, 2022
Programming
10
12k
Pandas卒業?大規模データを様々なパッケージで高速処理してみる/pyconjp2022-hpc
PyConJP2022発表資料
「Pandas卒業?大規模データを様々なパッケージで高速処理してみる」
みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
October 12, 2022
Tweet
Share
More Decks by みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
See All by みずほリサーチ&テクノロジーズ株式会社 先端技術研究部
AWS CDKでデータリストアの運用、どのように設計する?~Aurora・EFSの実践事例を紹介~/aws-cdk-data-restore-aurora-efs
mhrtech
6
1.6k
Azure Verified Moduleを触って分かった注目ポイント/azure-verified-module-begin
mhrtech
1
1.3k
BLEA v3.0.0の新しいベストプラクティスを取り入れた効率的なAWS CDK開発/jawsug_cdk16
mhrtech
3
710
あなたのアプリケーションをレガシーコードにしないための実践Pytest入門/pyconjp2024_pytest
mhrtech
7
3.3k
静的サイトのCI/CDでも侮るなかれ!Docs as Codeに沿ったセキュアな開発プロセスの実践/secure-docsascode-cicd-for-static-sites
mhrtech
14
3.1k
Kubernetes でワークフローを組むなら cdk8s-argoworkflow がよさそう!/ cdk8s-argoworkflow is great!
mhrtech
3
1.5k
IaCでセキュリティを強化しよう!~IAMが苦手な開発者でも簡単に権限を絞れる。そう、AWS CDKならね!~/secjaws32
mhrtech
5
2.9k
AWS Control Towerを2年弱運用して得たエッセンスと展望/securityjaws31
mhrtech
1
2.1k
そのリファレンス誰のため?ユーザ活用に向き合う/finjaws31
mhrtech
0
670
Other Decks in Programming
See All in Programming
各クラウドサービスにおける.NETの対応と見解
ymd65536
0
250
2025.01.17_Sansan × DMM.swift
riofujimon
2
600
ドメインイベント増えすぎ問題
h0r15h0
2
580
ecspresso, ecschedule, lambroll を PipeCDプラグインとして動かしてみた (プロトタイプ) / Running ecspresso, ecschedule, and lambroll as PipeCD Plugins (prototype)
tkikuc
2
2k
php-conference-japan-2024
tasuku43
0
440
PicoRubyと暮らす、シェアハウスハック
ryosk7
0
240
Androidアプリのモジュール分割における:x:commonを考える
okuzawats
1
290
SRE、開発、QAが協業して挑んだリリースプロセス改革@SRE Kaigi 2025
nealle
1
140
ChatGPT とつくる PHP で OS 実装
memory1994
PRO
3
190
“あなた” の開発を支援する AI エージェント Bedrock Engineer / introducing-bedrock-engineer
gawa
7
750
functionalなアプローチで動的要素を排除する
ryopeko
1
500
AWSのLambdaで PHPを動かす選択肢
rinchoku
2
390
Featured
See All Featured
The Pragmatic Product Professional
lauravandoore
32
6.4k
The Cult of Friendly URLs
andyhume
78
6.1k
Save Time (by Creating Custom Rails Generators)
garrettdimon
PRO
29
970
Building an army of robots
kneath
302
45k
XXLCSS - How to scale CSS and keep your sanity
sugarenia
248
1.3M
Speed Design
sergeychernyshev
25
740
KATA
mclloyd
29
14k
Statistics for Hackers
jakevdp
797
220k
Distributed Sagas: A Protocol for Coordinating Microservices
caitiem20
330
21k
Become a Pro
speakerdeck
PRO
26
5.1k
Optimising Largest Contentful Paint
csswizardry
33
3k
Designing for humans not robots
tammielis
250
25k
Transcript
Pandas卒業? 大規模データを様々なパッケージで高速処理してみる 2022/10/14
自己紹介 藤根 成暢(Shigenobu Fujine) 岩手県出身 みずほリサーチ&テクノロジーズ株式会社(MHRT)に所属 現在、先端技術研究部にてデータ分析・クラウドなどの技術検証等を担当 過去の発表 (PyConJP2021) scikit-learnの新機能を紹介します
アジェンダ ✅自己紹介 本トークの背景と狙い パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
本トークの背景と狙い Pandasは超便利! 様々なデータの入出力、主要な加工・結合・集計処理をほぼサポート だけど、大きいデータにはスケールしにくい シングルスレッド ⇒ CPUコアが何十個あっても、基本1コアしか使用しない インメモリ処理 ⇒ データ規模が大きいと、ファイル読込すら失敗 並列処理・分散処理をサポートするパッケージに手を出してみるも… パーティションやワーカー等、内部アーキテクチャを理解・意識した実装が必要 APIがPandasと違うため、プログラムの移植が難しい or
出来ない それでも処理時間やMemoryErrorが改善されない 😢
本トークの背景と狙い 本日話すこと 以下4つのパッケージで代表的なデータ処理を実装・計測した結果 APIやパフォーマンスの違い、実装時のTipsを解説 公式サンプルコードだけでは見えない、現実的な課題とその打開案を紹介 本日話さないこと・検証しないこと GPU系パッケージ(cuDF、RAPIDSなど) Sparkの各種パラメータ説明
アジェンダ ✅自己紹介 ✅本トークの背景と狙い パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
パッケージ紹介(Pandas) 実用的なリアルデータを分析するためのパッケージ Index / カラムは、内部ではNumpyで実装 多様なフォーマットや分析要件に対応した豊富で高レベル なAPI群 read_* to_* CSV
XLS PARQUET HTML <> HDF5 JSON {} GBQ SQL ... CSV XLS PARQUET HTML <> HDF5 JSON {} GBQ SQL ... https://pandas.pydata.org/docs/getting_started/intro_tutorials/02_read_write.html
パッケージ紹介(Dask) タスクグラフによる並列処理 + 遅延評価による最適化 Pandas / Numpy / Scikit-learnを踏襲したAPI https://docs.dask.org/en/stable/10-minutes-to-dask.html
パッケージ紹介(Vaex) 高パフォーマンス : 行規模のデータを秒単位で処理 遅延評価・仮想カラム メモリ効率化(フィルタリングやカラム選択はゼロメモリ) https://vaex.io/ 109
パッケージ紹介(PySpark) クラスタコンピューティング用フレームワーク Spark Coreをベースに、様々な言語やタスクに対応 version3.2より、PySparkにPandasのAPIが統合 https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch01.html 1 import pyspark.pandas as
ps 2 df = pd.DataFrame(...)
(ご参考)GitHubスター数
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 検証環境 使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証環境 ハードウェア GoogleCloud VM(N2Dインスタンス)、8vCPU/64GB OS : Debian 10.13 Python 3.7.12
パッケージ 1 pandas==1.3.5 2 dask==2022.2.0 3 vaex==4.12.0 4 pyspark==3.3.0 5 6 # other 7 pyarrow 8 numexpr 9 bokeh 10 graphviz 11 tqdm
使用するデータセット TLC Trip Record Data NYCが公開するタクシー履歴データ 乗降車した時刻、場所、人数 運賃、支払方法、チップ額、空港税など 選定理由 オープンデータ、データ数が多い
複数のデータ型がある 数値、タイムスタンプ、文字列、カテゴリ値 使用するデータ範囲 2011/1~2022/6のYellow Taxi Trip Records 月単位(例. yellow_tripdata_2022-06.parquet) 計138ファイル、13.3億レコード https://unsplash.com/photos/x7DHDky2Jwc
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ 検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証① 基本統計量の算出 対象のファイルパス Pandas Dask Vaex PySpark 1 pattern = 'data/yellow_tripdata_*.parquet'
1 taxi = pd.DataFrame() 2 for path in Path().glob(pattern): 3 taxi = pd.concat([taxi, pd.read_parquet(path)], ignore_index=True) 4 stat = taxi.describe(include='all', datetime_is_numeric=True) 1 stat = (dd.read_parquet(pattern) 2 .describe(include='all', datetime_is_numeric=True) 3 .compute()) 1 stat = vaex.open(pattern).describe() 1 stat = ps.read_parquet(pattern).describe()
検証① 基本統計量の算出 データが大きいほど、Vaexが高速かつ省メモリ
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証② 補間、フィルター、型変換 欠値・外れ値の補正、正しい値や範囲のデータを抽出、適切なデータ型に変換 元データには欠値や外れ値が多数あり
検証② 補間、フィルター、型変換(Pandas) numexprパッケージ + eval で、複数カラムを並列処理して高速化 1 taxi = pd.DataFrame() 2
for path in Path().glob('data/yellow_tripdata_*.parquet'): 3 t = pd.read_parquet(path) 4 5 # 欠値と外れ値を置換 6 t.passenger_count = t.passenger_count.fillna(1).replace({0: 1}) 7 t.RatecodeID = t.RatecodeID.fillna(1) 8 ... 9 10 # フィルター用の一時カラムを作成 11 t.eval(""" 12 _duration = (tpep_dropoff_datetime - tpep_pickup_datetime).dt.seconds 13 _base_fare = 2.5 + (ceil(trip_distance / 0.2) + ceil(_duration / 60)) * 0.5 14 _base_total = fare_amount + extra + mta_tax + tip_amount + tolls_amount + improve 15 ... 16 """, inplace=True) 17 18 # フィルター、型変換 19 t = t.query(common.queries).astype(common.new_dtypes) 20 21 t.drop(columns=[c for c in t.columns if c.startswith('_')], inplace=True) 22 taxi = pd.concat([taxi, t], ignore_index=True)
検証② 補間、フィルター、型変換(Dask) ファイル名に含まれている年月を、パーティション毎に個別付与するのに苦労 delayedを使用して、パーティション毎にファイル名情報を追加(GH#6575) 主要なAPIはPandasとほぼ同じ fillna、replace、astype などは、複数カラムを一括実行する方が高速 1 taxi = []
2 for path in Path().glob(pattern): 3 year, month = map(int, path.stem.split('_')[-1].split('-')) 4 _min_pickup = datetime(year, month, 1) 5 _max_pickup = _min_pickup + timedelta(days=calendar.monthrange(year, month)[1], sec 6 t = delayed(pd.read_parquet)(path) 7 t = delayed(pd.DataFrame.assign)(t, _min_pickup=_min_pickup, _max_pickup=_max_picku 8 taxi.append(t) 9 taxi = dd.from_delayed(taxi) 1 taxi = taxi.fillna(dict( 2 passenger_count=1, RatecodeID=1, store_and_fwd_flag='N', 3 improvement_surcharge=0, congestion_surcharge=0, airport_fee=0)) 4 taxi = taxi.replace(dict(passenger_count={0: 1}, payment_type={0: 1}))
検証② 補間、フィルター、型変換(Dask) いざ実行すると、MemoryError! 全てのworkerが同時にファイルを読み込むため、メモリ使用量が急増 デフォルトでは、worker数=CPUコア数 compute(num_workers=N)でworker数を減らし、メモリ使用量を制御 worker数を1/2にすると、メモリ使用量は半減するが、処理時間は微増
検証② 補間、フィルター、型変換(Vaex) PandasのAPIとかなり異なる 日付・時刻計算 定数でカラム作成 値の置換はfunc.where numpy.ceil()などの外部関数を使う場合、UDFを登録する必要あり 除算は / ではなく numpy.divide()
1 min_pickup = np.datetime64(f'{year}-{month:02}-01T00:00:00') 1 taxi['_min_pickup'] = vaex.vconstant(min_pickup, len(taxi)) 1 taxi.payment_type = taxi.func.where(taxi.payment_type == 0, 1, taxi.payment_type) 1 @vaex.register_function() 2 def calc_base_fare(trip_distance, duration): 3 return 2.5 + (np.ceil(np.divide(trip_distance, 0.2)) + 4 np.ceil(np.divide(duration, 60))) * 0.5
検証② 補間、フィルター、型変換(PySpark) 時刻の加減計算がエラー! 全部SQLで書き直した SQL文を書いてps.sql()で実行するだけ、ファイル名追加も標準機能にあり 1 now = ps.DataFrame({'now': [np.datetime64('2022-10-14T13:50')]}) 2
now + np.timedelta64(30, 'm') # TypeError: Addition can not be applied to datetimes. 1 query = """ 2 SELECT 3 TO_DATE(CONCAT(REGEXP_EXTRACT(INPUT_FILE_NAME(), '20[1-2][0-9]-[0-1][0-9]', 0), '-0 4 ... 5 _file_date + (INTERVAL 1 MONTH) - (INTERVAL 1 SECOND) as _max_pickup, 6 ... 7 TINYINT(VendorID) as VendorID, 8 ... 9 FROM 10 parquet.`{pattern}` 11 WHERE 12 (VendorID in (1, 2)) 13 ...""" 14 taxi = ps.sql(query.format(pattern=pattern)))
検証② 補間、フィルター、型変換 処理時間はVaex、メモリ使用量はPySparkが優秀
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証③ テーブル結合 5つの定義テーブルを履歴テーブルと内部結合し、各IDを文字列に変換 メモリ節約のため、各テーブルのデータ型はkey/valueともCategoricalDytpe型 結合イメージ
検証③ テーブル結合(Pandas) mergeよりも joinの方が高速(今回のデータでは約3倍) 全てのkeyをMultiIndexにして、あとは順番に joinするだけ mergeと joinで出力結果が異なる! keyにNaNが含まれている場合、joinでは一致しないが、mergeでは一致 CategoricalDtype型のカテゴリ値を数値に変換(NaN ⇒
-1)してから join 1 taxi = taxi.reset_index().set_index(['VendorID', 'RatecodeID', 'PULocationID', 2 'DOLocationID', 'payment_type']) 1 taxi = taxi.reset_index().set_index(target_columns) 2 3 for lookup in [common.vendors, common.ratecodes, common.pulocations, 4 common.dolocations, common.payment_types]: 5 right = pd.DataFrame(lookup, index=pd.Index(lookup.index.codes, 6 name=lookup.index.name)) 7 taxi = taxi.join(right, how='inner') 8 9 taxi = taxi.reset_index(drop=True).set_index('index')
検証③ テーブル結合(Dask) joinよりもmergeの方が高速 Daskのset_indexでは、内部でsort等が実行されるためオーバーヘッド大 定義テーブルは単一パーティションにした方が高速 1 vendors = dd.from_pandas(common.vendors, npartitions=1) 2
ratecodes = dd.from_pandas(common.ratecodes, npartitions=1) 3 pulocations = dd.from_pandas(common.pulocations, npartitions=1) 4 dolocations = dd.from_pandas(common.dolocations, npartitions=1) 5 payment_types = dd.from_pandas(common.payment_types, npartitions=1) 6 7 taxi = (taxi.merge(vendors, left_on='VendorID', right_index=True, how='inner') 8 .merge(ratecodes, left_on='RatecodeID', right_index=True, how='inner') 9 .merge(pulocations, left_on='PULocationID', right_index=True, how='inner' 10 .merge(dolocations, left_on='DOLocationID', right_index=True, how='inner' 11 .merge(payment_types, left_on='payment_type', right_index=True, how='inne
検証③ テーブル結合(Vaex) Vaexではカラム同士の結合のみ(だけどメソッド名は join) keyにCategoricalDtype型は使えないため、floatに変換してから join 1 vendors = vaex.from_pandas(common.vendors.astype({'VendorID': float}))
2 ratecodes = vaex.from_pandas(common.ratecodes.astype({'RatecodeID': float})) 3 pulocations = vaex.from_pandas(common.pulocations.astype({'PULocationID': float})) 4 dolocations = vaex.from_pandas(common.dolocations.astype({'DOLocationID': float})) 5 payment_types = vaex.from_pandas(common.payment_types.astype({'payment_type': float}) 6 7 taxi = (taxi.join(vendors, on='VendorID', how='inner') 8 .join(ratecodes, on='RatecodeID', how='inner') 9 .join(pulocations, on='PULocationID', how='inner') 10 .join(dolocations, on='DOLocationID', how='inner') 11 .join(payment_types, on='payment_type', how='inner'))
検証③ テーブル結合(PySpark) Indexにカテゴリ型(CategoricalIndex)を使用可能 カラムはカテゴリ型に未対応(今後対応予定)のため、文字列に変換 1 vendors = ps.DataFrame(common.vendors).astype(str) 2 ratecodes =
ps.DataFrame(common.ratecodes).astype(str) 3 payment_types = ps.DataFrame(common.payment_types).astype(str) 4 pulocations = ps.DataFrame(common.pulocations).astype(str) 5 dolocations = ps.DataFrame(common.dolocations).astype(str) 6 7 taxi = (taxi.merge(vendors, left_on='VendorID', right_index=True, how='inner') 8 .merge(ratecodes, left_on='RatecodeID', right_index=True, how='inner') 9 .merge(pulocations, left_on='PULocationID', right_index=True, how='inner' 10 .merge(dolocations, left_on='DOLocationID', right_index=True, how='inner' 11 .merge(payment_types, left_on='payment_type', right_index=True, how='inne 12 )
検証③ テーブル結合 処理時間、メモリ使用量ともにPySparkが優秀
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 ✅検証③ テーブル結合
検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
検証④ リソース使用量の監視(Pandas) memory_profilerやline_profilerなどの追加パッケージが必要 メモリ使用量や処理時間を行単位で追跡可能 途中でGCが発生すると正確に計測できないことも… 1 Line # Mem usage Increment
Occurrences Line Contents 2 ============================================================= 3 4 117.6 MiB 117.6 MiB 1 @profile 4 5 def func(filename): 5 6 4597.1 MiB 4479.5 MiB 1 taxi = pd.read_parquet(filename) 6 7 2164.8 MiB -2432.3 MiB 1 stat = taxi.describe() 7 8 8 9 2062.1 MiB -102.7 MiB 1 del taxi 9
検証④ リソース使用量の監視(Dask) ProgressBar で進捗を出力、ResourceProfiler でCPU・メモリ使用量を可視化
検証④ リソース使用量の監視(Dask) Profiler でタスクの実行順序や経過時間などを出力 タスクの長時間化、メモリ使用量急増などの原因調査に有効
検証④ リソース使用量の監視(Vaex) vaex.progress.tree で進捗やCPU使用量を出力 widget rich
検証④ リソース使用量の監視(PySpark) ステージ毎にタスク進捗を出力 Web UIでタスク毎の処理内容、DAG、Planなどの詳細情報にアクセス可能
アジェンダ ✅自己紹介 ✅本トークの背景と狙い ✅パッケージ紹介 ✅検証 ✅検証環境 ✅使用するデータ ✅検証① 基本統計量 ✅検証② 補間、フィルター、型変換 ✅検証③ テーブル結合
✅検証④ リソース使用量の監視 それでもPandasを使いたいあなたに まとめ
それでもPandasを使いたいあなたに evalで並列化 数式に近い形で記述でき、可読性 の面でも恩恵あり numexprインストールを忘れずに! Pandas統合の並列処理パッケージを活用 apply/mapの並列化に有効 1 df =
pd.DataFrame() 2 df['x'] = np.arange(1e8) 3 4 # Before 5 df['x'] += 1 6 df['y'] = df['x'] * 2 7 df['z'] = df['x'] / df['y'] 8 9 # After 10 df.eval(""" 11 x = x + 1 12 y = x * 2 13 z = x / y 14 """, inplace=True)
まとめ 様々なパッケージで大規模データの効率的な処理を比較検証 Pandas : 小~中規模( )のデータ件数なら、Pandasだけでも十分有用 Dask : Pandasと(ほぼ)同じAPIで大規模データを処理できるのが強み Vaex
: APIがPandasと異なるも、処理時間を最重視したい場合には有益 PySpark : Python/SQLなど複数言語を使用可能、pyspark.pandasの今後に期待 データや処理内容が変われば、各パッケージの優劣も変わる可能性あり 手元のデータで試してみる ユースケースに応じて複数のパッケージを使い分けられる状態が理想的 ~107
ご清聴ありがとうございました。
(免責事項) 当資料は情報提供のみを目的として作成されたものであり、商品の勧誘を目的としたも のではありません。本資料は、当社が信頼できると判断した各種データに基づき作成さ れておりますが、その正確性、確実性を保証するものではありません。また、本資料に記 載された内容は予告なしに変更されることもあります。
None