Chris Havlin (U. Illinois DXL, chavlin@illinois.edu)
RHytHM Workshop, 2020-12-10
with: Matt Turk, Madicken Munk, Kacper Kowalik
when you do:
ds = yt.load(*)
ad = ds.all_data()
ad['density']
yt iterates through chunks
, reads field into memory.
chunk
= range of data potentially across datafiles.
when you do:
ad.quantities.extrema('density')
yt will:
reduction
)after setting up MPI (see the docs) when you do:
import yt
yt.enable_parallelism()
ds = yt.load("RD0035/RedshiftOutput0035")
v, c = ds.find_max("density")
yt will:
chunks
between MPI processesFrom Ryan Farber:
Can dask simplify yt's chunking and parallel processing?
"Dask is a flexible library for parallel computing in Python."
dask.delayed()
operator for parallel workflows:
delayed_tasks = []
for chunk in dataset_chunks:
result = dask.delayed(chunk_operator)(chunk)
delayed_tasks.append(cool_function(result))
in_mem_results = dask.compute(*delayed_tasks)
Can dask simplify yt's chunking and parallel processing?
Yes
Devs: codebase simplification
Users: simpler parallel intallation, usage
Not so risky "live" demo time!
In _read_particle_selection()
, the final bit reads the values into memory:
rv = {}
for ptype in ptypes:
for col in delayed_dfs[ptype].columns:
rv[(ptype, col)] = delayed_dfs[ptype][col].values.compute()
return rv
but what if… we didnt?
rather than returning an in-memory array, what if:
ds = yt.load(*)
ad = ds.all_data()
den = ad['density']
returned a dask array?
den.max().compute()
Dask handles the scheduling, reductions across chunks!
From Ryan Farber:
(pseudocode)
import dask.array as da
from dask.distributed import Client
c = Client(n_workers = 10, threads_per_worker = 4)
<<<< yt imports, data load >>
ad = ds.all_data() # or equivalent for dataseries
time = ad['times'] # parallel dask array
delv = ad['delv'] # parallel dask array
coldMass = ad['massCold'] # parallel dask array
new_array = complex_function(time,delv,coldMass)
da.to_hdf5('myfile.hdf5', {'/new_array': new_array})
same syntax for parallel or serial!
ds = yt.load(*)
ad = ds.all_data()
ad['density'] # a unyt_array
arrays from _read_*_selection()
are converted to unyt_array
objects.
Creating a custom Dask collection:
but we want all the array-reductions of dask_array
:
but we also want the unit management of unyt_array
: