Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Streaming distributed execution across. CPUs an...

Streaming distributed execution across. CPUs and GPUs

Some of the most demanding machine learning (ML) use cases we have encountered involve pipelines that span both CPU and GPU devices in distributed environments. These situations are common workloads, including:

* Batch inference, which involves a CPU-intensive preprocessing stage (e.g., video decoding or image resizing) before utilizing a GPU-intensive model to make predictions.
* Distributed training, where similar CPU-heavy transformations are required to prepare or augment the dataset prior to GPU training.

In this talk, we examine how Ray data streaming works and how to use it for your own machine learning pipelines to address these common workloads utilizing all your compute resource–CPUs and GPUs–at scale.

Anyscale

June 22, 2023
Tweet

More Decks by Anyscale

Other Decks in Technology

Transcript

  1. Talk Overview • ML Inference and Training workloads • How

    it relates to Ray Data streaming (new feature in Ray 2.4!) • Examples • Backend overview
  2. About me • Technical lead for Ray / OSS at

    Anyscale • Previously: ◦ PhD in systems / ML at Berkeley ◦ Staff eng @ Databricks, storage infra @ Google
  3. ML Workloads • Where does data processing come in for

    ML workloads? ETL Pipeline Preprocessing Training / Inference
  4. ML Workloads ETL Pipeline Preprocessing Training / Inference Resizing images,

    Decoding videos Data augmentation Using PyTorch, TF, HuggingFace, LLMs, etc. Ingesting latest data, Joining data tables
  5. ML Workloads ETL Pipeline Preprocessing Training / Inference CPU CPU

    GPU Resizing images, Decoding videos Data augmentation Using PyTorch, TF, HuggingFace, LLMs, etc. Ingesting latest data, Joining data tables
  6. ML Workloads ETL Pipeline Preprocessing Training / Inference CPU CPU

    GPU Resizing images, Decoding videos Data augmentation usual scope of ML teams Ingesting latest data, Joining data tables Using PyTorch, TF, HuggingFace, LLMs, etc.
  7. Ray Data overview High performance distributed IO ds = ray.data.read_parquet("s3://some/bucket")

    ds = ray.data.read_csv("/tmp/some_file.csv") Leverages Apache Arrow’s high-performance single-threaded IO Parallelized using Ray’s high-throughput task execution Scales to PiB-scale jobs in production (Amazon) Read from storage Transform data ds = ds.map_batches(batch_func) ds = ds.map(func) ds.iter_batches() -> Iterator ds.write_parquet("s3://some/bucket") Consume data
  8. Bulk execution Previous versions of Ray Data (<2.4) used bulk

    execution strategy What is bulk execution? • Load all data into memory • Apply transformations on in-memory data in bulk • Out of memory? -> spill blocks to disk • Similar to Spark's execution model (bulk synchronous parallel)
  9. Streaming (pipelined) execution • Default execution strategy for Ray Data

    in 2.4 • Same data transformations API • Instead of executing operations in bulk, build a pipeline of operators • Data blocks are streamed through operators, reducing memory use and avoiding spilling to disk
  10. Preprocessing can often be the bottleneck • Example: video decoding

    prior to inference / training • Too expensive to run on just GPU nodes: needs scaling out • Large intermediate data: uses lots of memory
  11. Ray Data streaming avoids the bottleneck • E.g., intermediate video

    frames streamed through memory • Decoding can be offloaded onto CPU nodes from GPU nodes • Intermediate frames kept purely in (cluster) memory
  12. Inference <> Training • Same streaming pipeline can easily be

    used for training too! Split[3] Worker [0] Worker [1] Worker [2]
  13. Performance benefits overview Bulk execution Streaming execution CPU-only pipelines (single-stage)

    - Memory optimal - Good for inference - Bad for training Heterogeneous CPU+GPU pipelines (multi-stage)
  14. Performance benefits overview Bulk execution Streaming execution CPU-only pipelines (single-stage)

    - Memory optimal - Good for inference - Bad for training - Memory optimal - Good for inference - Good for training Heterogeneous CPU+GPU pipelines (multi-stage)
  15. Performance benefits overview Bulk execution Streaming execution CPU-only pipelines (single-stage)

    - Memory optimal - Good for inference - Bad for training - Memory optimal - Good for inference - Good for training Heterogeneous CPU+GPU pipelines (multi-stage) - Memory inefficient - Slower for inference - Bad for training
  16. Performance benefits overview Bulk execution Streaming execution CPU-only pipelines (single-stage)

    - Memory optimal - Good for inference - Bad for training - Memory optimal - Good for inference - Good for training Heterogeneous CPU+GPU pipelines (multi-stage) - Memory inefficient - Slower for inference - Bad for training - Memory optimal - Good for inference - Good for training
  17. Bulk physical execution -- single stage • Memory usage is

    optimal (no intermediate data) • Good for inference • Not good for distributed training (cannot consume results incrementally)
  18. Streaming physical execution Operator (Stage 1) data partition 1 data

    partition 2 data partition 3 output 1 output 2
  19. Streaming physical execution Operator (Stage 1) data partition 1 data

    partition 2 data partition 3 output 1 output 2
  20. Streaming physical execution Operator (Stage 1) data partition 1 data

    partition 2 data partition 3 output 1 output 2 output 3
  21. Streaming physical execution -- single stage • Memory usage is

    optimal (no intermediate data) • Good for inference • Good for distributed training
  22. Bulk physical execution -- multi stage • Memory usage is

    inefficient (disk spilling) • Slower for inference • Bad for distributed training
  23. Streaming physical execution -- multi stage • Memory usage is

    optimal (no intermediate data) • Good for inference • Good for distributed training
  24. Comparison to other systems • DataFrame systems (e.g., Spark) ◦

    Ray Data streaming is more memory efficient ◦ Ray Data supports heterogeneous clusters ◦ Execution model a better fit for distributed training • ML ingest libraries: TF Data / Torch Data / Petastorm ◦ Ray Data supports scaling preprocessing out to a cluster
  25. Running this on a Ray cluster $ python workload.py 2023-05-02

    15:10:01,105 INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(decode_frames)] -> ActorPoolMapOperator[MapBatches(FrameAnnotator)] -> ActorPoolMapOperator[MapBatches(FrameClassifier)] -> TaskPoolMapOperator[Write]
  26. Running this on a Ray cluster $ python workload.py 2023-05-02

    15:10:01,105 INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(decode_frames)] -> ActorPoolMapOperator[MapBatches(FrameAnnotator)] -> ActorPoolMapOperator[MapBatches(FrameClassifier)] -> TaskPoolMapOperator[Write] Running: 25.0/112.0 CPU, 2.0/2.0 GPU, 33.19 GiB/32.27 GiB object_store_memory: 28%|███▍ | 285/1000 [01:40<03:12, 3.71it/s]
  27. Running this on a Ray cluster $ python workload.py 2023-05-02

    15:10:01,105 INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(decode_frames)] -> ActorPoolMapOperator[MapBatches(FrameAnnotator)] -> ActorPoolMapOperator[MapBatches(FrameClassifier)] -> TaskPoolMapOperator[Write] Running: 0.0/112.0 CPU, 0.0/2.0 GPU, 0.0 GiB/32.27 GiB object_store_memory: 100%|████████████| 1000/1000 [05:13<00:00, 3.85it/s]
  28. How does Ray data streaming work? • Each transformation is

    implemented as an operator • Use Ray tasks and actors for execution of operators ◦ default -> use Ray tasks ◦ actor pool -> use Ray actors • Intermediate data blocks in Ray object store • Memory usage of operators is limited (backpressure) to enable efficient streaming without spilling to disk
  29. Advantages of using Ray core primitives • Heterogeneous clusters support

    • Fault tolerance out of the box ◦ Lineage-based reconstruction of tasks and actors operations → your ML job will survive failures during preprocessing • Resilient object store layer ◦ Spills to disk in case of unexpectedly high memory usage: slowdown instead of a crash ◦ Can also do large-scale shuffles in a pinch • Easy to add data locality optimizations for both task + actor ops
  30. Training • Can use pipelines for training as well •

    Swap map_batches(Model) call for streaming_split(K) Inference ray.data.read_datasource(...) \ .map_batches(preprocess) .map_batches(Model, compute=ActorPoolStrategy(...)) \ .write_datasource(...) Training iters = ray.data.read_datasource(...) \ .map_batches(preprocess) \ .streaming_split(len(workers)) for i, w in enumerate(workers): w.set_data_iterator.remote(iters[i]) ## in worker for batch in it.iter_batches(batch_size=32): model.forward(batch)...
  31. Advantages of streaming for Training • Example of accelerating an

    expensive Read/Preprocessing operation by adding CPU nodes to a cluster
  32. Summary • Ray Data streaming scales batch inference and training

    workloads • More efficient computation model than bulk processing • Simple API for composing streaming topologies Next steps: • Streaming Inference is available in 2.4: docs.ray.io • Ray Train integration coming in 2.6 • We're hardening streaming to work robustly at 100+ node clusters, 10M+ input files. Contact us!