Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Distributed computing library for big data ML a...

Distributed computing library for big data ML applications

Yuki Saito
LINE Data Labs Machine Learning Development Team Machine Learning Engineer
Hongwei Zhang
LINE Data Labs Machine Learning Development Team Machine Learning Engineer
Wit Tatiyanupanwong
LINE Data Labs Machine Learning Development Team Machine Learning Engineer
https://linedevday.linecorp.com/2020/ja/sessions/9750
https://linedevday.linecorp.com/2020/en/sessions/9750

LINE DevDay 2020

November 26, 2020
Tweet

More Decks by LINE DevDay 2020

Other Decks in Technology

Transcript

  1. ML Applications at LINE As of September 2020 Classification User

    Demographics CTR Prediction Ads Category Manga Rating Prediction Sticker Tag Prediction etc. Recommendation Sticker Manga Theme Music Official Account Point News etc. Representation Learning User Vector News Text Vector Sticker Image Vector etc.
  2. Session Overview Part 1: ghee - Distributed Data Processor Part

    3: cumin - Data Management Part 2: ghee-models - Model Management
  3. Session Overview Part 1: ghee - Distributed Data Processor Part

    3: cumin - Data Management Part 2: ghee-models - Model Management
  4. Agenda part 1 ghee › motivation › Introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  5. Motivation %BUBTFU General-purpose Hadoop Cluster hdfs Machine Learning Kubernetes cluster

    %BUBTFU ceph / nfs 4QBSL&YFDVUPS (16QPET 1. Dataset Preparation 3. Model Development 2. Copy Dataset 4. Copy Prediction Output Current Workflow 1600+ CPU nodes 20+ GPU nodes 40+ CPU nodes
  6. Motivation %BUBTFU General-purpose Hadoop Cluster hdfs Machine Learning Kubernetes cluster

    %BUBTFU ceph / nfs 4QBSL&YFDVUPS (16QPET 1. Dataset Preparation 3. Model Development 2. Copy Dataset 4. Copy Prediction Output Current Workflow 1600+ CPU nodes 20+ GPU nodes 40+ CPU nodes
  7. Motivation %BUBTFU General-purpose Hadoop Cluster hdfs Machine Learning Kubernetes cluster

    %BUBTFU ceph / nfs 4QBSL&YFDVUPS (16QPET 1. Dataset Preparation 3. Model Development 2. Copy Dataset 4. Copy Prediction Output Current Workflow 1600+ CPU nodes 20+ GPU nodes 40+ CPU nodes
  8. Motivation %BUBTFU General-purpose Hadoop Cluster hdfs Machine Learning Kubernetes cluster

    %BUBTFU ceph / nfs 4QBSL&YFDVUPS (16QPET 1. Dataset Preparation 3. Model Development 2. Copy Dataset 4. Copy Prediction Output Current Workflow 1600+ CPU nodes 20+ GPU nodes 40+ CPU nodes
  9. Motivation %BUBTFU General-purpose Hadoop Cluster hdfs Machine Learning Kubernetes cluster

    %BUBTFU ceph / nfs 4QBSL&YFDVUPS (16QPET 1. Dataset Preparation 3. Model Development 2. Copy Dataset 4. Copy Prediction Output Current Workflow 1600+ CPU nodes 20+ GPU nodes 40+ CPU nodes
  10. Motivation › Manually acquire and release › CPU/IO overhead Resource

    Management GPU Utilization %BUBTFU Hadoop cluster HDFS Kubernetes cluster %BUBTFU CephFS (16QPE Other Issues
  11. Motivation Wishlist › Fast and efficient data transfer › Stream

    instead of copy › Ease of use › Hide infrastructure details › Distributed Training Support › Uniformly send training data
  12. Motivation Existing Solutions Strength Ease of use Error handling Weakness

    Data Transfer Speed across Executors Data Transfer Speed Flexibility (DAG) Learning-curve Spark Dask
  13. Agenda part 1 ghee › motivation › introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  14. Introducing ghee Overview › A python library for running ML

    applications by specifying › Input and output specification › Execution environment (cpu/memory/docker image) › User-defined function (preprocess, train/predict, postprocess) › Encapsulate data transfer and Kubernetes pod management › Allow ML engineers to focus on model development
  15. ghee.task.KubeMPITask Introducing ghee Kubernetes cluster *OQVU 4UPSBHF 5SBJO1SFEJDU (16QPE ghee

    program execution 0VUQVU 4UPSBHF 1SFQSPDFTT $16QPE 1PTUQSPDFTT $16QPE
  16. ghee.task.KubeMPITask Introducing ghee 1. Create Kubernetes Jobs › Docker image

    › Resource (cpu, gpu, memory) › Run user-defined functions Kubernetes cluster *OQVU 4UPSBHF 5SBJO1SFEJDU (16QPE ghee program execution 0VUQVU 4UPSBHF 1SFQSPDFTT $16QPE 1PTUQSPDFTT $16QPE
  17. ghee.task.KubeMPITask Introducing ghee › Support HDFS, S3, Kafka › Random

    shuffle, round-robin distribution 1. Create Kubernetes Jobs › Docker image › Resource (cpu, gpu, memory) › Run user-defined functions 2. Stream input data Kubernetes cluster *OQVU 4UPSBHF 5SBJO1SFEJDU (16QPE ghee program execution 0VUQVU 4UPSBHF 1SFQSPDFTT $16QPE 1PTUQSPDFTT $16QPE
  18. ghee.task.KubeMPITask Introducing ghee › Support HDFS, S3, Kafka › Random

    shuffle, round-robin distribution 1. Create Kubernetes Jobs › Preprocess > Train › Preprocess > Predict > Postprocess › Docker image › Resource (cpu, gpu, memory) › Run user-defined functions 2. Stream input data 3. Process data Kubernetes cluster *OQVU 4UPSBHF 5SBJO1SFEJDU (16QPE ghee program execution 0VUQVU 4UPSBHF 1SFQSPDFTT $16QPE 1PTUQSPDFTT $16QPE
  19. ghee.task.KubeMPITask Introducing ghee › Support HDFS, S3, Kafka › Random

    shuffle, round-robin distribution 1. Create Kubernetes Jobs › Preprocess > Train › Preprocess > Predict > Postprocess › Docker image › Resource (cpu, gpu, memory) › Run user-defined functions 2. Stream input data 3. Process data Kubernetes cluster *OQVU 4UPSBHF 5SBJO1SFEJDU (16QPE ghee program execution 0VUQVU 4UPSBHF 1SFQSPDFTT $16QPE 1PTUQSPDFTT $16QPE 4. Send data to next stage
  20. ghee.task.KubeMPITask Introducing ghee › Support HDFS, S3, Kafka › Random

    shuffle, round-robin distribution 1. Create Kubernetes Jobs › Preprocess > Train › Preprocess > Predict > Postprocess › Docker image › Resource (cpu, gpu, memory) › Run user-defined functions 2. Stream input data 3. Process data Kubernetes cluster *OQVU 4UPSBHF 5SBJO1SFEJDU (16QPE ghee program execution 0VUQVU 4UPSBHF 1SFQSPDFTT $16QPE 1PTUQSPDFTT $16QPE 4. Send data to output storage
  21. Kubernetes cluster USBJO  (16QPE QSFQSPDFTT  $16QPE Task Client

    hdfs ghee sample code Example: Model Training
  22. ghee sample code Kubernetes cluster QSFEJDU  (16QPE QSFQSPDFTT 

    $16QPE Task Client HDFS QPTUQSPDFTT  $16QPE S3 Example: Model Inference
  23. Agenda part 1 ghee › motivation › introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  24. mpirun mpirun Implementation Data Transfer $161PE 5SBOTGFS .BOBHFS 1SPDFTT 1SPDFTT

    $161PE 1SPDFTT 1SPDFTT QVTI QVTI QVTI QVTI (161PE 1SPDFTT 1SPDFTT (161PE 1SPDFTT 1SPDFTT QVMM QVMM QVMM 1VMM › ZeroMQ › Fast and stable › asyncio with aiozmq library › Transfer Manager › Manage push/pull sockets lifecycle › MPI › State Synchronization › Distributed Training (e.g. Horovod)
  25. LT +PC Implementation Remote Procedure Call › Ship code with

    cloudpickle and execute on Kubernetes pod › Develop another proprietary library (named swimmy) for › Pod lifecycle management › Health check › Run MPI on swimmy for distributed preprocessing and training › Example: 3 pods QPE TXJNNZBHFOU NQJSVO QPE TXJNNZBHFOU NQJSVO QPE TXJNNZBHFOU NQJSVO $PEF DMPVEQJDLMF $PEF DMPVEQJDLMF $PEF DMPVEQJDLMF
  26. Implementation Simple comparison to Dask › Setup - Recommendation Task

    › Data: Movielens-20M › Model: Factorization Machine › Preprocess on 16 CPUs, Train on 1 GPU › Simulate preprocessing with parameter n › Line of codes › ghee: 115 lines › Dask: 236 lines 5JNFJOTFDPOET TNBMMFSJTCFUUFS O O O. O.           HIFF %BTL
  27. Session Overview Part 1: ghee - Distributed Data Processor Part

    3: cumin - Data Management Part 2: ghee-models - Model Management
  28. Agenda part 1 ghee › motivation › introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  29. ghee-models GOAL › Provide a collection of ML models based

    on ghee › Provide a standard way to manage the ML lifecycle using MLflow ghee ghee-models ML applications MLflow
  30. GHEE PROGRAM EXECUTION ghee-models Kubernetes cluster Input Storage Train/Predict GPU

    pod Output Storage Train/Predict Preprocess CPU pod Postprocess CPU pod ghee.task.KubeMPITask
  31. GHEE-MODELS PROGRAM EXECUTION ghee-models Kubernetes cluster Input Storage Train/Predict GPU

    pod Output Storage Train/Predict Preprocess CPU pod Postprocess CPU pod ghee.task.KubeMPITask MLflow Data checker CPU pod Input Storage Config loader Driver Config
  32. › Bert natural language processing recommender › Graph Attention Networks

    (GAT) › GraphSage Networks › Multi Interests Graph Convolutional Networks › RankNet › Neural Collaboration Networks (NCF) classification › DNN for multi classes › DNN for Multi labels ghee-models MODEL image processing › EfficientNet B0-B7
  33. Agenda part 1 ghee › motivation › introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  34. Goal Provide user/item dense vectors using graph convolutional networks, which

    can be used in wide range of machine tasks graph convolutional networks USE CASE - user/item dense vectors item1 user item2 tag1 male 23 tag2 tag3 item3 dislike click buy has has
  35. graph convolutional networks USE CASE - user/item dense vectors [0.3,

    -0.1, 0.9] [0.3, -0.05, 0.73] [0.29, -0.09, 0.58] [0.1, 0.02, 0.59] [0.3, -0.1, 0.9] Recommender [0.3, -0.1, 0.9] Age: 32 Income: 500M Interests: anime, movie Segments prediction Lookalike engine [0.26, -0.12, 0.87] [0.25, 0.01, 0.7]
  36. Services 15+ Graph Nodes 1.5B+ Users 860M+ graph convolutional networks

    USE CASE - user/item dense vectors adapt to predict as of September 2020
  37. graph convolutional layers graph convolutional networks USE CASE - user/item

    dense vectors loss deep deep item vectors user vectors loss BPR, Focal, ArcFace losses are also provided GCN1 GCN0 incremental learning
  38. graph convolutional networks USE CASE - user/item dense vectors News

    Sticker AD Game OA Music Manga Delima Shopping Timeline Theme … … 27 models for 15+ services incremental learning
  39. News … Sticker MLP user segments prediction graph convolutional networks

    USE CASE - user/item dense vectors model parameters f1 score sparse DNN 463M 0.6239 dense DNN 1M 0.6275 Evaluation
  40. Session Overview Part 1: ghee - Distributed Data Processor Part

    3: cumin - Data Management Part 2: ghee-models - Model Management
  41. Agenda part 1 ghee › motivation › introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  42. Data Configuration Motivation › Each recommendation tasks requires different source

    tables and columns › Each ML engineers has different ways to define dataset
  43. Routine process Motivation › Routine data sanity check such as

    hive partition check, data count and error handlings are required when generating dataset › These similar routines are implemented differently for each recommendation tasks
  44. Manage parameter Motivation › ML engineers fine-tunes not only model

    hyperparameters but also filter conditions when generating training dataset › The model training parameters are managed by MLflow in experiments, but dataset generation is fragmented and not reproducible
  45. Agenda part 1 ghee › motivation › introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  46. Cumin › Provide a standard format to define dataset ›

    Handle routine processes such as hive partition and data count checks › Write filter conditions explicitly Table and Path setting
  47. Cumin › Perform data sanity check according to data definition

    when Spark session starts › Rest of the code is a typical Spark program How to use
  48. Cumin Other functions › Take into account HDFS block size

    › Increase number files to increase parallelism in ghee Auto adjust output file size Suport YAML format › Receive data definition from upstream components
  49. Implementation Spark start Spark stop WBMJEBUF EBUB data processing SFBE

    DVNJO QPTU QSPDFTT › Add hooks to standard Spark function › Check filter conditions and data size when spark starts › Estimate file size of DataFrame and repartition it to fit HDFS block size when writing output
  50. Agenda part 1 ghee › motivation › introducing ghee ›

    ghee implementation part 2 ghee-models › introducing ghee-models › graph convolutional networks part 3 cumin › motivation › introducing cumin › example
  51. Example cumin with ghee-models › Manage experiments from generating dataset

    to training a model in a Jupyter Notebook › Explicitly write filter conditions and partitions
  52. Example cumin with ghee-models › Use dataset generated by cumin

    › Hyperparameters are recorded in MLFlow Reproducible experiments from dataset generation to model training
  53. Session Summary › ghee › Perform distributed processing and training

    with large data › Transfer data between CPU and GPU nodes efficiently › Encapsulate Kubernetes pod management › ghee-models › Improve model reusability › Utilize MLflow to manage the ML lifecycle › cumin › Provide dataset definition and validation › Make dataset generation reproducible VOS: Amazon S3 compatible internal data storage
  54. Future work › ghee-models › Add models › Add loss

    function › cumin › Support more data sources VOS: Amazon S3 compatible internal data storage