Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Optimizing Spark Jobs

Reema
June 09, 2017

Optimizing Spark Jobs

Reema

June 09, 2017
Tweet

More Decks by Reema

Other Decks in Technology

Transcript

  1. 100TB Data processed daily 5TB Html data crawled per day

    1.5B Product urls 3000 sites 6000 categories 30B Price points
  2. ◇ Alternate to Hadoop MapReduce ◇ In memory processing engine

    ◇ Support for both batch and stream processing ◇ SQL, MLlib, GraphX ◇ 100x times faster in memory and 10x faster running on disk ◇ Can run on a standalone cluster, Mesos or YARN ◇ HDFS for data storage
  3. Why is it faster? ◇ Writes intermediate output to memory

    instead of disk ◇ Less disk I/O ◇ Lazy evaluation of tasks - optimizes data processing workflow ◇ In-memory caching helpful when multiple operations access the same dataset
  4. Hadoop’s MapReduce MR Operations Input MR Operations …... HDFS Write

    HDFS Write HDFS Read HDFS Read Input Query 1 Query 2 Query 3 HDFS Read Result 1 Result 2 Result 3
  5. Spark MR Operations Input MR Operations …... Write Write Read

    HDFS Read Input Query 1 Query 2 Query 3 HDFS Read Result 1 Result 2 Result 3 memory memory memory
  6. Sample code val sc = new SparkContext() val rdd =

    sc.parallelize(List(1, 2, 3, 4, 5, 6), 3) // RDD[Int] rdd .filter(no => no % 2) .map(no => no + 1) .saveAsTextFile(“s3://bucket/path”) - Action Transformations
  7. RDD ◇ Collection of objects operated in parallel ◇ Transformations

    and actions ◇ Immutable - Every transformation returns a new RDD ◇ Fault tolerant - recomputed Eg: rdd .map(person -> person.age) .filter(_ > 18) Dataframe ◇ Distributed data organized into named columns ◇ Like a relational db ◇ As of Spark 2.0 Dataframe is nothing but a Dataset[Row] Eg: df .select(“age”) .filter(“age > 18”) Datasets ◇ Available since 1.6 ◇ Has all the benefits of an RDD along with Spark SQL’s optimized execution engine ◇ Has a strongly typed API as well as an untyped one(df) Eg: ds .map(person -> person.age) .filter(_ > 18)
  8. Memory Configuration spark.memory.storageFraction (0.5) Execution memory (0.5) spark.memory.fraction (0.6) User

    memory (1-spark.memory.fraction) Reserved memory(300 MB) JVM Heap Space
  9. Sample Configurations Config 1 10 executors 8 cores(per executor) 45GB

    executor memory Config 2 20 executors 4 cores(per executor) 23GB executor memory Config 3 50 executors 1 cores(per executor) 9GB executor memory 10 nodes / 8 cores / 64GB per node Goldilocks and the three bears?
  10. ◇ Large executors - more cores ◇ Preferable to have

    larger executors - less movement of data across the n/w. ◇ The more the number of cores per executor, the more number of tasks that can run in parallel ◇ Having a single core executor doesn’t fully take advantage of running multiple tasks in the same JVM
  11. “ Most Spark Jobs run as is but there are

    certain pitfalls one should be careful of
  12. Common Issues ◇ Executor OOM ◇ Driver OOM ◇ Straggler

    Tasks ◇ Shuffle Failures ◇ GC Limit Exceeded ◇ No space left on device
  13. Executors going OOM ◇ Executor memory + memory overhead ◇

    Have smaller partitions ◇ >2000 partitions ◇ Avoid groupByKeys while working with RDDs - Switch to reduceByKey if aggregating (Map side combine) - Apply filters before grouping than after
  14. Driver going OOM ◇ Increase driver memory + overhead ◇

    Avoid actions that bring all data to the driver for large datasets(collect) ◇ Too many partitions - Every task sends a map status object back to the driver - Has to deal with multiple map output status requests from the reducers
  15. Straggler Tasks ◇ Skewed data - A single task deals

    with more data than the rest ◇ Increase spark.locality.wait - long running task might be having poor locality ◇ Increase the number of partitions
  16. GC Limit Exceeded ◇ Too much time spent on GC

    ◇ Increase executor heap size (executor memory) ◇ Increase storage memory (spark.memory.storageFraction) ◇ Choose a different GC policy(CMS, ParallelOld GC, G1GC)
  17. Shuffle Failures ◇ Expensive operation - involves disk I/O, data

    serialization, and network I/O ◇ Avoid shuffles when possible - Broadcast the smaller dataset while joining. Moving all of the data of the smaller table could be more expensive than just putting a copy of the small table on all executors. ◇ More cores per executor - still achieves parallelism and lesser shuffles over the network vs having more executors with lesser cores
  18. No space left on device ◇ Continuous eviction of data

    to disk - UnsafeExternalSorter: Thread 75 spilling sort data of 141.0 MB to disk (90 times so far) ◇ Ensure that spark.memory.fraction isn’t too low ◇ If you’re running with minimal number of nodes - increase them ◇ By default the spilled data is written to /tmp, you can add more disks by specifying them in spark.local.dirs
  19. Using Parquet as File Format ◇ Columnar data storage ◇

    75% data compression on an average ◇ Push down filters(1.6 onwards) to reduce disk I/O : Filter pushdowns to access only required columns ◇ Use DirectParquetOutputCommitter - Avoids expensive renames
  20. Use Aggregate functions ◇ Prefer reduceBy, combineBy, aggregateBy over groupBys

    ◇ Support for custom aggregate functions ◇ Map side combines
  21. Persist data ◇ When multiple operations are performed on the

    same data, persist it ◇ Doesn’t recompute every time an action is performed ◇ Helpful during interactive analysis
  22. To summarize Avoid unnecessary groupBys in RDDs - replace them

    with aggregate functions wherever possible Never collect/coalesce large data into a single node - Let the executors write out the data in a distributed manner Use parquet file format and kryo serialization Switch to using datasets to take full advantage of Spark’s SQL engine Default partitions may not work all the time - Know when to increase or decrease them Cache data especially in jobs where it is re-used across stages - Avoids re-computation Prefer to have larger executors with more cores - Achieves more parallelism Broadcast variables that are shared to all executors - Also while performing joins with a smaller dataset