Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Rechunking

Sponsored · Your Podcast. Everywhere. Effortlessly. Share. Educate. Inspire. Entertain. You do you. We'll handle the rest.

 Rechunking

Avatar for Ryan Abernathey

Ryan Abernathey

June 04, 2020
Tweet

More Decks by Ryan Abernathey

Other Decks in Science

Transcript

  1. The Problem: Rechunking FROM TO • Source data is often

    collected contiguously in 1+ dimensions (e.g. satellite image) and chunked in another (e.g. time) • But then we want to analyze it along the time axis • This requires rechecking • dask.array.rechunk often fails • Huge graphs • Workers running out of memory
  2. FROM TO • Source data is often collected contiguously in

    1+ dimensions (e.g. satellite image) and chunked in another (e.g. time) • But then we want to analyze it along the time axis • This requires rechecking • dask.array.rechunk often fails • Huge graphs • Workers running out of memory The Problem: Rechunking
  3. 2D Case C1 x C1 t C0 x C0 t

    Array Shape Conserved Nt = const, Nx = const Number of Chunks Nt = nt Ct , Nx = nx Cx Chunk Size Sc = Ct Cx Chunk-Size-Preserving Operations C0 t C0 x = C1 t C1 x Full-Shuffle Rechunk C0 x = Nx , C1 t = Nt C1 x = C0 t Nx /Nt
  4. Algorithms: Push Source Target • Read each source chunk only

    once • Write source data to many target chunks • Problem: need to synchronize writes
  5. Algorithms: Pull Source Target • Write each target only once

    • Read each source file many times • Problem: can blow out memory, serial loop over source chunks may be very slow
  6. Consolidating Reads Source Intermediate Target single read • Fewer Read

    Tasks • Fewer Intermediate Chunks Can consolidate source chunks up to size of target chunks.
  7. sing if consolidate_reads: read_chunks = consolidate_chunks( source_chunks, mem_limit, constraint=target_chunks) else:


    read_chunks = source_chunks
 
 
 intermediate_chunks = min(read_chunks, write_chunks)
 
 
 if consolidate_writes:
 write_chunks = consolidate_chunks( target_chunks, mem_limit) else:
 write_chunks = target_chunks Push-Pull-Consolidated
  8. sing if consolidate_reads: read_chunks = consolidate_chunks( source_chunks, mem_limit, constraint=target_chunks) else:


    read_chunks = source_chunks
 
 
 intermediate_chunks = min(read_chunks, write_chunks)
 
 
 if consolidate_writes:
 write_chunks = consolidate_chunks( target_chunks, mem_limit) else:
 write_chunks = target_chunks Push-Pull-Consolidated
  9. sing if consolidate_reads: read_chunks = consolidate_chunks( source_chunks, mem_limit, constraint=target_chunks) else:


    read_chunks = source_chunks
 
 
 intermediate_chunks = min(read_chunks, write_chunks)
 
 
 if consolidate_writes:
 write_chunks = consolidate_chunks( target_chunks, mem_limit) else:
 write_chunks = target_chunks Push-Pull-Consolidated
  10. sing if consolidate_reads: read_chunks = consolidate_chunks( source_chunks, mem_limit, constraint=target_chunks) else:


    read_chunks = source_chunks
 
 
 intermediate_chunks = min(read_chunks, write_chunks)
 
 
 if consolidate_writes:
 write_chunks = consolidate_chunks( target_chunks, mem_limit) else:
 write_chunks = target_chunks Push-Pull-Consolidated
  11. Source Intermediate Target single write single read single read for

    each dim: assert read_chunk >= source_chunk assert write_chunk >= target_chunk assert int_chunk == min(read_chunk, write_chunk)