The prototype dask-Gadget reader above returns the flat numpy arrays expected by _read_particle_selection()
, but if instead we could return dask objects, we could easily build up parallel workflows.
In the notebook here, I explored how to create a daskified-profile calculation under the assumption that our yt IO returns dask arrays.
The notebook demonstrates two approaches. In the first, I use intrinsic dask array functions to recreate a yt profile using dask's implementation of np.digitize
and then summing over the digitized bins. The performance isn't great, though, and it's not obvious how the approach could be extended to reproduce all of the functionality of ds.create_profile()
in yt
(multiple binning variables, returning multiple statistics at once, etc.).
In the second approach, I instead work out how to apply the existing yt
profile functions directly to the dask arrays. When you have a delayed dask array, you can loop over a chunk, apply a function to each chunk and then reduce the result across the chunks. In pseudo-code, creating a profile looks like:
field_to_bin,binning_field, weighting_field = get_dask_delayed()
ops = []
for fdata,bdata,wdata in zip(field_to_bin,binning_field, weighting_field ):
ops.append(bin_a_single_chunk(fdata,bdata,wdata))
binned_chunks = compute(*ops)
aggregated_statistic = aggregate_the_chunks(binned_chunks)
This is more or less what yt already does in calculating derived quantities and profiles – stats/values are calculated for each chunk and then aggregated across chunks. So in the notebook, I set out to use yt's existing by-chunk and aggregation functions directly with a dask array to calculate a profile.
To do this, I created a bin_a_chunk
function using yt.data_objects.profiles.ProfileFieldAccumulator
and the C-optimized yt.utilities.lib.misc_utilities.new_binprofile1d
. Copying and cutting the code down to its most salient steps, for each chunk we first use np.digitize
to find the bin indices only for this chunk, initialize the arrays we need for the cython function using the ProfileFieldAccumulator
and then call new_binprofile1d
to fill in the stats for this chunk.
from yt.data_objects.profiles import ProfileFieldAccumulator
from yt.utilities.lib.misc_utilities import new_bin_profile1d
def bin_a_chunk(field_to_bin, binning_field, bins, weighting_field = None):
# binning_field : the current chunk of the field that we are binning BY
# field_to_bin : the current chunk of the field that we want to bin
# bins : the bins we want to use for binning
# weighting_field : the field to weight by
#
# returns ProfileFieldAccumulator for this chunk
bin_ind = np.digitize(binning_field, bins) - 1
<< --- cut some bin/weighting prep --- >>
temp_storage = ProfileFieldAccumulator(n_fields, binned_sizes)
# profile this chunk!
new_bin_profile1d(
bin_ind,
weighting_field,
fdata,
temp_storage.weight_values,
temp_storage.values,
temp_storage.mvalues,
temp_storage.qvalues,
temp_storage.used,
)
<< --- cut some cleanup --- >>
return temp_storage
So in order to use this, we just have to call bin_a_chunk
for each chunk and then aggregate. In the notebook, I use a slightly modified version of yt.data_objects.profiles.ProfileND._finalize_storage
to do that aggregation (the only modifications were to remove some of the MPI communication).
This approach can be easily extended to other parts of yt where we are applying collections and reductions across chunks. By replacing the MPI communications with loops over delayed dask objects we can re-use existing yt functions with fairly minimal refactoring! It is also worth stressing that these changes would not disrupt using MPI – with the dask-mpi package, executing this modified code with MPI is unchanged. It's just that all of the MPI communication is handed off to Dask instead of yt's internal MPI protocols.