changed 5 years ago
Published Linked with GitHub

Levaraging Dask in yt

Chris Havlin (U. Illinois DXL, chavlin@illinois.edu)
RHytHM Workshop, 2020-12-10

with: Matt Turk, Madicken Munk, Kacper Kowalik

ytdask


What's to come:

  • yt data chunking and parallelism
  • how Dask can help
  • some experiments:
    • a daskified particle reader
    • a unyt-dask array class

yt chunking

when you do:

ds = yt.load(*)
ad = ds.all_data()
ad['density'] 

yt iterates through chunks, reads field into memory.

chunk = range of data potentially across datafiles.


yt chunking

when you do:

ad.quantities.extrema('density')

yt will:

  • find the extrema of each chunk (the reduction)
  • find the extrema of the extrema

yt in parallel with MPI

after setting up MPI (see the docs) when you do:

import yt
yt.enable_parallelism()

ds = yt.load("RD0035/RedshiftOutput0035")
v, c = ds.find_max("density")

yt will:

  • divide chunks between MPI processes
  • collect the max value from each chunk
  • return the max of the reduced max values

yt in parallel with MPI

From Ryan Farber:

parallel iteration

parallel iteration2


Dask?

Can dask simplify yt's chunking and parallel processing?


Dask?

"Dask is a flexible library for parallel computing in Python."

dask array


Dask?

dask distributed


Dask?

dask.delayed() operator for parallel workflows:

delayed_tasks = [] 
for chunk in dataset_chunks: 
    result = dask.delayed(chunk_operator)(chunk)
    delayed_tasks.append(cool_function(result))
    
in_mem_results = dask.compute(*delayed_tasks)

Dask!

Can dask simplify yt's chunking and parallel processing?

Yes

Devs: codebase simplification
Users: simpler parallel intallation, usage


A daskified particle reader

Not so risky "live" demo time!

a daskified Gadget reader with dask.delayed


A daskified particle reader

In _read_particle_selection(), the final bit reads the values into memory:

rv = {}
for ptype in ptypes:
    for col in delayed_dfs[ptype].columns:
        rv[(ptype, col)] = delayed_dfs[ptype][col].values.compute()
        
return rv

but what if we didnt?


an imagined future:

rather than returning an in-memory array, what if:

ds = yt.load(*)
ad = ds.all_data()
den = ad['density'] 

returned a dask array?

den.max().compute()

Dask handles the scheduling, reductions across chunks!


an imagined future:

From Ryan Farber:

parallel iteration

parallel iteration2


an imagined future:

(pseudocode)

import dask.array as da
from dask.distributed import Client 

c = Client(n_workers = 10, threads_per_worker = 4)

<<<< yt imports, data load >>

ad = ds.all_data() # or equivalent for dataseries
time = ad['times'] # parallel dask array
delv = ad['delv'] # parallel dask array
coldMass = ad['massCold'] # parallel dask array
new_array = complex_function(time,delv,coldMass)
da.to_hdf5('myfile.hdf5', {'/new_array': new_array})

same syntax for parallel or serial!


but units

ds = yt.load(*)
ad = ds.all_data()
ad['density'] # a unyt_array 

arrays from _read_*_selection() are converted to unyt_array objects.


dask-unyt arrays

Creating a custom Dask collection:

        
      

dask-unyt arrays

but we want all the array-reductions of dask_array:

        
      

dask-unyt arrays

but we also want the unit management of unyt_array:

        
      

dask-unyt arrays

Slightly riskier live demo time!

full notebook


dask & yt: next steps

  • dask-unyt arrays: better automate dask-unyt array subclassing (np array protocols!)
  • extend the particle reader prototype to return dask-unyt arrays
  • chunking outside of particle IO
  • yt community input! (YTEP)

bye

Select a repo