Upgrade to Pro — share decks privately, control downloads, hide ads and more …

大規模データの機械学習におけるDaskの活用

Sinhrks
October 20, 2018

 大規模データの機械学習におけるDaskの活用

@PyData.Tokyo One Day Conference 2018/10/20

Sinhrks

October 20, 2018
Tweet

More Decks by Sinhrks

Other Decks in Programming

Transcript

  1. (Incomplete) List of OSS uses Dask • (TFLearn) Deep learning

    library featuring a higher-level API for TensorFlow. • (Distributed Scheduler) A platform to author, schedule and monitor workflows. • Image Processing SciKit. • N-D labeled arrays and datasets in Python. • Executes end-to-end data science and analytics pipelines entirely on GPUs. Airflow
  2. Daskͷσʔλߏ଄(API) %BTL"1* #BTF$MBTT %FGBVMU4DIFEVMFS %BTL"SSBZ /VN1ZOEBSSBZ UISFBEJOH %BTL%BUB'SBNF QBOEBT%BUB'SBNF UISFBEJOH

    %BTL#BH 1Z5PPM[ MJTU TFU EJDU NVMUJQSPDFTTJOH DBOOPUSFMFBTF(*- %BTL%FMBZFE ೚ҙͷؔ਺ UISFBEJOH
  3. array([[ 1., 1., 1., 1., 1., 1., 1., 1., 1.,

    1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], [ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]]) Dask Array import numpy as np x = np.ones((10, 10)) x import dask.array as da dx = da.ones((10, 10), chunks=(5, 5)) dx dask.array<ones, shape=(10, 10), dtype=float64, chunksize=(5, 5)> YͷOEBSSBZΛ࡞੒ Yͷ%BTL"SSBZΛ࡞੒ ಺෦͸YͷͭͷDIVOLʹ෼ׂ
  4. Dask Array import dask.array as da dx = da.ones((10, 10),

    chunks=(5, 5)) dx dask.array<ones, shape=(10, 10), dtype=float64, chunksize=(5, 5)> dx.visualize() ಺෦ͷܭࢉάϥϑΛදࣔ ֤ϊʔυ ༿ ͕֤DIVOLʹରԠ Yͷ%BTL"SSBZΛ࡞੒ ಺෦͸YͷͭͷDIVOLʹ෼ׂ
  5. ߦྻͷ QBOEBT%BUB'SBN FΛ࡞੒ Dask Array dy = dx.sum(axis=0) dy dask.array<sum-aggregate,

    shape=(10,), dtype=float64, chunksize=(5,)> BYJTʹԊͬͯ஋Λ߹ܭ dy.visualize() dy.compute() array([10., 10., 10., 10., 10., 10., 10., 10., 10., 10.]) 4VN 4VN
  6. Dask Array • Dask Array ͸೚ҙͷ shape ͱ chunk sizeΛ΋ͭ

    • chunkؒͷॲཧ͕ൃੜ͢Δ৔߹ɺchunk size ͸ Ұகͤͨ͞ํ͕ྑ͍ da.ones((30, 20, 1, 15), chunks=(3, 7, 1, 2)) dask.array<ones, shape=(30, 20, 1, 15), dtype=float64, chunksize=(3, 7, 1, 2)>
  7. ߦྻͷ QBOEBT%BUB'SBNFΛ࡞੒ Dask DataFrame import dask.dataframe as dd ddf =

    dd.from_pandas(df, 2) ddf QBSUJUJPO QBSUJUJPO EJWJTJPO EJWJTJPO EJWJTJPO
  8. Linear Algebra 'VODUJPO %FTDSJQUJPO MJOBMHDIPMFTLZ 3FUVSOTUIF$IPMFTLZEFDPNQPTJUJPO PSPGB)FSNJUJBOQPTJUJWFEFpOJUF NBUSJY" MJOBMHJOW $PNQVUFUIFJOWFSTFPGBNBUSJYXJUI-6EFDPNQPTJUJPOBOEGPSXBSE

    CBDLXBSETVCTUJUVUJPOT MJOBMHMTUTR 3FUVSOUIFMFBTUTRVBSFTTPMVUJPOUPBMJOFBSNBUSJYFRVBUJPOVTJOH23 EFDPNQPTJUJPO MJOBMHMV $PNQVUFUIFMVEFDPNQPTJUJPOPGBNBUSJY MJOBMHOPSN .BUSJYPSWFDUPSOPSN MJOBMHRS $PNQVUFUIFRSGBDUPSJ[BUJPOPGBNBUSJY MJOBMHTPMWF 4PMWFUIFFRVBUJPOBYCGPSY MJOBMHTPMWF@USJBOHVMBS 4PMWFUIFFRVBUJPOBYCGPSY BTTVNJOHBJTBUSJBOHVMBSNBUSJY MJOBMHTWE $PNQVUFUIFTJOHVMBSWBMVFEFDPNQPTJUJPOPGBNBUSJY MJOBMHTWE@DPNQSFTTFE 3BOEPNMZDPNQSFTTFESBOLLUIJO4JOHVMBS7BMVF%FDPNQPTJUJPO MJOBMHTGRS %JSFDU4IPSUBOE'BU23 MJOBMHUTRS %JSFDU5BMMBOE4LJOOZ23BMHPSJUIN
  9. Blocked LU Decomposition • Diagonal Block • Row-direction(i < j)

    • Columns direction (i < j) ∴ ∴ * LU: Function to solve LU decomposition * Solve: Function to solve equation
  10. Blocked LU Decomposition arr = da.random.random((9, 9), chunks=(3, 3)) arr

    dask.array<random_sample, shape=(9, 9), dtype=float64, chunksize=(3, 3)> from dask import compute t, l, u = da.linalg.lu(arr) t, l, u = compute(t, l, u) ͭͷܭࢉάϥϑΛ Ϛʔδ࣮ͯ͠ߦ
  11. DaskʹΑΔॲཧͷશମ૾ ஞ࣍ॲཧ /VN1Z QBOEBT TDJLJUMFBSO %BTL %BTL %JTUSJCVUFE ฒྻॲཧ ϊʔυ಺

    0VUPGDPSF ॲཧ ϊʔυ಺ ෼ࢄॲཧ ϊʔυؒ %BTL.- ߦྻܭࢉ ςʔϒϧ σʔλॲཧ ػցֶश
  12. scikit-learnͷฒྻॲཧ • “n_jobs” Ҿ਺Ͱฒྻ࣮ߦ਺Λࢦఆ • ಺෦తʹ͸joblibΛར༻ • scikit-learnίϛολத৺ʹ։ൃ • ϊʔυ಺ฒྻ

    (threading, multiprocessing) • Out-of-coreॲཧ΍ϊʔυؒ෼ࢄॲཧ͸Ͱ͖ͳ͍ from sklearn.model_selection import GridSearchCV grid = GridSearchCV(pipe, cv=3, n_jobs=12, param_grid=param_grid)
  13. DaskʹΑΔॲཧͷશମ૾ ஞ࣍ॲཧ /VN1Z QBOEBT TDJLJUMFBSO %BTL %BTL %JTUSJCVUFE ฒྻॲཧ ϊʔυ಺

    0VUPGDPSF ॲཧ ϊʔυ಺ ෼ࢄॲཧ ϊʔυؒ %BTL.- ߦྻܭࢉ ςʔϒϧ σʔλॲཧ ػցֶश
  14. Dask ML • ػցֶशͷͨΊͷαϒύοέʔδΛఏڙ • Preprocessing • Model Selection •

    Cross validation • Hyper parameter search • GLM • Clustering • Incremental • ParallelPostFit • XGBoost • TensorFlow %BTLʹରԠͨ͠ TDJLJUMFBSOޓ׵ͷ ֶशثΛఏڙ TDJLJUMFBSOΛϥοϓ͠ ฒྻɾ0VUPGDPSFԽ TDJLJUMFBSOҎ֎ͷ ύοέʔδରԠ
  15. • scikit-learnޓ׵ͷֶशثͰDaskͷσʔλߏ଄Λ ѻ͑Δ Dask ML TDJLJU MFBSO %BTL .- /VN1Z

    OEBSSBZ %BTL "SSBZ /VN1Z OEBSSBZ /VN1Z OEBSSBZ %BTL "SSBZ %BTL "SSBZ "SSBZ*OUFSGBDFʹΑΓɺ /VN1ZOEBSSBZʹม׵ ܭࢉॲཧ ܭࢉॲཧ %BTLͷσʔλߏ଄Λҡ࣋ͯ͠ ॲཧΛ࣮ߦ
  16. k-means (Dask ML) ʜ $IVOL ʜ $IVOL ʜ $IVOL 4BNQMJOH

    σʔληοτ͔Βη ϯτϩΠυͷॳظ஋ Λܾఆ ηϯτϩΠυ͔Βͷ ڑ཭Λܭࢉ͠ɺΫϥ ελʹ෼ྨ $IVOL͝ͱʹ࣮ߦ Ϋϥελ͝ͱʹϨίʔ υΛू໿͠ɺηϯτ ϩΠυΛߋ৽ $IVOL͝ͱʹ࣮ߦˠ ू໿ 4VN 4VN $PVOU ʜ طఆͰ͸4DBMBCMF,NFBOT  #BINBOJFUBM ͷ,NFBOTccͰηϯτϩΠυΛॳظԽ
  17. Incremental • IncrementalͰֶशثΛϥοϓ • ෼ྨ໰୊ͷ৔߹ɺΫϥεΛclassesҾ਺Ͱ౉͢ • ෳ਺ΤϙοΫͰֶश͢Δ৔߹͸ Incremental.partial_fit • for

    _ in range(10): inc.partial_fit(X_train, y_train, classes=classes) print('Score:', inc.score(X_test, y_test)) from sklearn.linear_model import SGDClassifier from dask_ml.wrappers import Inclemental clf = SGDClassifier(loss='log', penalty='l2', tol=1e-3)) inc = Incremental(clf, scoring='accuracy') inc.fit(X_train, y_train, classes=classes)
  18. • ParallelPostFitͰֶशثΛϥοϓ • fit࣌͸Կ΋ߦΘͳ͍ (scikit-learnͷॲཧͱಉҰ) • ΑΓେ͖ͳσʔλʹରͯ͠ predict ΛฒྻͰߦ͏ ParallelPostFit

    y_pred = clf.predict(X_large) y_pred from sklearn.linear_model import LogisticRegressionCV from dask_ml.wrappers import ParallelPostFit clf = ParallelPostFit(LogisticRegressionCV(cv=3)) clf.fit(X_train, y_train)
  19. DaskʹΑΔॲཧͷશମ૾ ஞ࣍ॲཧ /VN1Z QBOEBT TDJLJUMFBSO %BTL %BTL %JTUSJCVUFE ฒྻॲཧ ϊʔυ಺

    0VUPGDPSF ॲཧ ϊʔυ಺ ෼ࢄॲཧ ϊʔυؒ %BTL.- ߦྻܭࢉ ςʔϒϧ σʔλॲཧ ػցֶश
  20. Dask Distributed • εέδϡʔϥͰͷܭࢉ࣮ߦΛෳ਺ϊʔυͰ෼ࢄͰ͖Δ • ௿ϨΠςϯγ: λεΫຖͷΦʔόʔϔου͸1msఔ౓ • WorkerؒͰͷσʔλڞ༗: σʔλసૹ͸WorkerؒͰ௚઀࣮ࢪ

    • ෳࡶͳεέδϡʔϦϯά: ೚ҙͷܭࢉάϥϑΛ࣮ߦՄ • ہॴੑ: WorkerؒͷσʔλసૹΛͳΔ΂͘ߦΘͳ͍ %JTUSJCVUFE 8PSLFS %JTUSJCVUFE 8PSLFS %JTUSJCVUFE 4DIFEVMFS %JTUSJCVUFE $MJFOU
  21. Distributed joblib • ϓϥΨϒϧAPI • with ϒϩοΫͰ joblib.Parallel ͷطఆόοΫΤϯυΛมߋՄ •

    ஫ҙ఺ • scikit-learnʹόϯυϧ͞Ε͍ͯΔjoblibΛ࢖͏ (sklearn.externals.joblib) • ෼ࢄͰ͖ͳ͍৔߹΋͋Δ • backendͱͯ͠threading / multiprocessing͕໌ࣔ͞Ε͍ͯΔ΋ͷ import distributed.joblib from sklearn.externals.joblib import parallel_backend with parallel_backend('dask.distributed', scheduler_host=‘scheduler-addr:8786’): grid.fit(digits.data, digits.target)
  22. ·ͱΊ • Daskͱ͸ • NumPy΍ pandas ޓ׵ͷσʔλߏ଄Λఏڙ • ػցֶशʹ͓͚ΔDaskͷ׆༻ •

    Dask ML: ػցֶशͰDaskͷσʔλߏ଄Λѻ͑ Δ • Distributed: scikit-learnͷॲཧΛϊʔυؒͰ෼ࢄ ॲཧͰ͖Δ