Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Accelerating fuzzy document deduplication to im...

Accelerating fuzzy document deduplication to improve LLM training with RAPIDS and Dask

Training Large Language Models (LLMs) requires a vast amount of input data, and the higher the quality of that data the better the model will be at producing useful natural language. NVIDIA NeMo Data Curator is a toolkit built with RAPIDS and Dask for extracting, cleaning, filtering and deduplicating training data for LLMs.

In this session, we will zoom in on one element of LLM pretraining and explore how we can scale out fuzzy deduplication of many terabytes of documents. We can run a distributed Jaccard similarity workload by deploying a RAPIDS accelerated Dask cluster on Kubernetes to remove duplicate documents from our training set.

Jacob Tomlinson

December 08, 2023
Tweet

More Decks by Jacob Tomlinson

Other Decks in Technology

Transcript

  1. Accelerating fuzzy document deduplication to improve LLM training with RAPIDS

    and Dask PyData Global - Dec 2023 Jacob Tomlinson Cloud Deployment NVIDIA RAPIDS
  2. 4 The starting point for preparing custom pretraining datasets for

    many LLM practitioners is a list of URLs that point to data files or websites that contain content of interest for LLM pretraining such as Common Crawl, Wikidumps and ArXiv. for LLM pretraining Datasets Source: https://developer.nvidia.com/blog/curating-trillion-token-datasets-introducing-nemo-data-curator/
  3. 5 for LLM pretraining Data Curation A common LLM data-curation

    pipeline for datasets like Common Crawl Source: https://developer.nvidia.com/blog/curating-trillion-token-datasets-introducing-nemo-data-curator/
  4. 7 Improved model downstream performance Evaluating with RACE-High, PiQA, Winogrande,

    and Hellaswag Results of dataset ablation tests for a 357M parameter model trained on data generated from each stage of the processing pipeline within NeMo Data Curator Source: https://developer.nvidia.com/blog/curating-trillion-token-datasets-introducing-nemo-data-curator/
  5. 10 Lightning-Fast End-to-End Performance Reducing Data Science Processes from Hours

    to Seconds *CPU approximate to n1-highmem-8 (8 vCPUs, 52GB memory) on Google Cloud Platform. TCO calculations-based on Cloud instance costs. A100s Provide More Power than 100 CPU Nodes 16 More Cost-Effective than Similar CPU Configuration 20x Faster Performance than Similar CPU Configuration 70x
  6. 12

  7. General purpose Python library for parallelism Scales existing libraries, like

    Numpy, Pandas, and Scikit-Learn Flexible enough to build complex and custom systems Accessible for beginners, secure and trusted for institutions Jacob Tomlinson Core Developer Dask
  8. Dask accelerates the existing Python ecosystem Built alongside with the

    current community import numpy as np x = np.ones((1000, 1000)) x + x.T - x.mean(axis=0 import pandas as pd df = pd.read_csv(“file.csv”) df.groupby(“x”).y.mean() from scikit_learn.linear_model \ import LogisticRegression lr = LogisticRegression() lr.fit(data, labels) Numpy Pandas Scikit-Learn
  9. Distributed Task Graphs Constructing tasks in a DAG allows tasks

    to executed by a selection of schedulers. The distributed scheduler allows a DAG to be shared by many workers running over many machines to spread out work.
  10. Dask’s distributed scheduler “For the first year of Dask’s life

    it was focused on single-machine parallelism. But inevitably, Dask was used on problems that didn’t fit on a single machine. This led us to develop a distributed-memory scheduler for Dask that supported the same API as the existing single-machine scheduler. For Dask users this was like magic. Suddenly their existing workloads on 50GB datasets could run comfortably on 5TB (and then 50TB a bit later).” Matthew Rocklin Dask Creator Source https://coiled.io/blog/history-dask/
  11. Monitoring our work # Connect a Dask client >>> from

    dask.distributed import Client >>> client = Client(cluster) # Do come computation >>> import dask.array as da >>> arr = da.random.random((10_000, 1_000, 1_000), chunks=(1000, 1000, 100)) >>> result = arr.mean().compute()
  12. 21 Scaling RAPIDS in the Cloud Current Focus Areas •

    NVIDIA DGX Cloud • Kubernetes • Helm Charts • Operator • Kubeflow • Cloud ML Platforms • Amazon Sagemaker Studio • Google Vertex AI • Cloud Compute • Amazon EC2, ECS, Fargate, EKS • Google Compute Engine, Dataproc, GKE • Cloud ML examples gallery Deployment documentation website RAPIDS Deployment Documentation: docs.rapids.ai/deployment/stable
  13. 23 Launch a Kubernetes Cluster # Launch a Kubernetes Cluster

    with GPUs $ gcloud container clusters create jtomlinson-rapids-demo \ --accelerator type=nvidia-tesla-a100,count=2 \ --machine-type a2-highgpu-2g \ --zone us-central1-c *Other cloud platforms have similar tools for launching GPU Kubernetes clusters easily
  14. 24 Install NVIDIA Drivers # Install the NVIDIA Drivers $

    kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/contain er-engine-accelerators/master/nvidia-driver-installer/cos/dae monset-preloaded-latest.yaml
  15. 25 Install the Dask operator # Install the Dask Operator

    $ helm install --repo https://helm.dask.org \ --create-namespace -n dask-operator \ --generate-name dask-kubernetes-operator
  16. 26 Installing the operator # Check that we can list

    daskcluster resources $ kubectl get daskclusters No resources found in default namespace. # Check that the operator pod is running $ kubectl get pods -A -l application=dask-kubernetes-operator NAMESPACE NAME READY STATUS RESTARTS AGE dask-operator dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s # 🚀 done!
  17. 27 Get a Jupyter notebook on your Dask cluster #

    Create a cluster with Jupyter running alongside the scheduler $ dask kubernetes gen cluster \ --name rapids \ --image rapidsai/notebooks:23.10-cuda12.0-py3.10 \ --worker-command dask-cuda-worker \ --resources='{"limits": {"nvidia.com/gpu": "1"}}' \ --jupyter \ | kubectl apply -f -
  18. 28 Create RAPIDS Clusters within Notebooks With on prem or

    cloud-managed Kubernetes # Install dask-kubernetes $ pip install dask-kubernetes # Launch a cluster >>> from dask_kubernetes.operator \ import KubeCluster >>> cluster = KubeCluster(name="rapids") # List the DaskCluster custom resource that was created for us under the hood $ kubectl get daskclusters NAME AGE rapids 6m3s
  19. 29 Deploy RAPIDS Everywhere Platforms Coiled Databricks Google Colab Frameworks

    Kubernetes Kubeflow KServe HPC SLURM MPI Cloud NVIDIA DGX Cloud Base Command Platform (BCP) Amazon Web Services Elastic Compute Cloud (EC2) EC2 Cluster (via Dask) AWS Elastic Kubernetes Service (EKS) Elastic Container Service (ECS) SageMaker Microsoft Azure Azure Virtual Machine Azure Kubernetes Service Azure VM Cluster (via Dask) Azure Machine Learning Google Cloud Platform Compute Engine Instance Vertex AI Google Kubernetes Engine Dataproc IBM Cloud Virtual Server for VPC Currently documented deployments
  20. 31 Roughly group similar documents using Locality Sensitive Hashing Bucket

    documents using MinHash Computing similarity is an all-to-all operation, so we can do a first rough cut by calculating a locality sensitive hash for each document. Because the hash’s structure has a direct relationship to the structure of the document similar documents end up with the same hash. By running this on all documents we can group them into buckets, one for each hash, and then only deduplicate within each bucket and reducing the need for all-to-all comparisons. We could build this using pylsh.
  21. 32 Source: Dask Demo Day Feb 23 - https://www.youtube.com/watch?v=R0Hdnhey0pc We

    can generate a dataframe containing pairs of documents within each bucket Generate pairs of documents
  22. 33 Source: Dask Demo Day Feb 23 - https://www.youtube.com/watch?v=R0Hdnhey0pc Load

    our document pairs and join them with the document text Load and join the data Dask loads this data lazily and handles swapping partitions in and out of memory as we need them. So we can open the entire dataset as a single dataframe with all comparisons regardless of how many TB the dataset is. We can then map over the partitions to calculate the Jaccard similarity for each row. Each partition can be loaded by a different node in the cluster allowing us to scale out to many GPUs on many nodes.
  23. 34 Source: Dask Demo Day Feb 23 - https://www.youtube.com/watch?v=R0Hdnhey0pc Explode

    the text into ngrams for comparison Generate ngrams For each comparison we want to make we need to explode the text of each document into ngrams.
  24. 35 Calculate the Jaccard similarity using join and groupby Compare

    ngarms Source: Dask Demo Day Feb 23 - https://www.youtube.com/watch?v=R0Hdnhey0pc We can then use simple Pandas operations to calculate the Jaccard similarity for each document. Then we can discard any documents over a given threshold.
  25. 36 Using RAPIDS to accelerate Pandas and Dask to scale

    Pandas out Scaling Using the popular 4.5-TB RedPajama dataset as an example, the initial CPU-based deduplication stage completed in 37 hours, using 20 high-end CPU nodes with 188 GB of RAM and 48 CPU cores per node. On four DGX A100 nodes (8x 80-GB GPUs each), deduplication now completes in 3 hours. That enables the pipeline to scale to multiple nodes and run much faster, making it possible to curate large-scale datasets for foundation models in hours instead of days. Source: https://developer.nvidia.com/blog/curating-trillion-token-datasets-introducing-nemo-data-curator/