--- title: yt + Dask profiles tags: yt, dask author: Chris Havlin --- [**return to main post**](https://hackmd.io/@chavlin/r1E_o6lAv#experiments-in-daskifying-yt) ### 2. profile calculation: operations on chunked data The prototype dask-Gadget reader above returns the flat numpy arrays expected by `_read_particle_selection()`, but if instead we could return dask objects, we could easily build up parallel workflows. In the notebook [here](https://github.com/data-exp-lab/yt-dask-experiments/blob/master/notebooks/yt_profiles_with_dask.ipynb), I explored how to create a daskified-profile calculation under the assumption that our yt IO returns dask arrays. The notebook demonstrates two approaches. In the first, I use intrinsic dask array functions to recreate a yt profile using dask's implementation of `np.digitize` and then summing over the digitized bins. The performance isn't great, though, and it's not obvious how the approach could be extended to reproduce all of the functionality of `ds.create_profile()` in `yt` (multiple binning variables, returning multiple statistics at once, etc.). In the second approach, I instead work out how to apply the existing `yt` profile functions directly to the dask arrays. When you have a delayed dask array, you can loop over a chunk, apply a function to each chunk and then reduce the result across the chunks. In pseudo-code, creating a profile looks like: ```python field_to_bin,binning_field, weighting_field = get_dask_delayed() ops = [] for fdata,bdata,wdata in zip(field_to_bin,binning_field, weighting_field ): ops.append(bin_a_single_chunk(fdata,bdata,wdata)) binned_chunks = compute(*ops) aggregated_statistic = aggregate_the_chunks(binned_chunks) ``` This is more or less what *yt* already does in calculating derived quantities and profiles -- stats/values are calculated for each chunk and then aggregated across chunks. So in the notebook, I set out to use *yt*'s existing by-chunk and aggregation functions directly with a dask array to calculate a profile. To do this, I created a `bin_a_chunk` function using `yt.data_objects.profiles.ProfileFieldAccumulator` and the C-optimized `yt.utilities.lib.misc_utilities.new_binprofile1d`. Copying and cutting the code down to its most salient steps, for each chunk we first use `np.digitize` to find the bin indices only for this chunk, initialize the arrays we need for the cython function using the `ProfileFieldAccumulator` and then call `new_binprofile1d` to fill in the stats for this chunk. ```python from yt.data_objects.profiles import ProfileFieldAccumulator from yt.utilities.lib.misc_utilities import new_bin_profile1d def bin_a_chunk(field_to_bin, binning_field, bins, weighting_field = None): # binning_field : the current chunk of the field that we are binning BY # field_to_bin : the current chunk of the field that we want to bin # bins : the bins we want to use for binning # weighting_field : the field to weight by # # returns ProfileFieldAccumulator for this chunk bin_ind = np.digitize(binning_field, bins) - 1 << --- cut some bin/weighting prep --- >> temp_storage = ProfileFieldAccumulator(n_fields, binned_sizes) # profile this chunk! new_bin_profile1d( bin_ind, weighting_field, fdata, temp_storage.weight_values, temp_storage.values, temp_storage.mvalues, temp_storage.qvalues, temp_storage.used, ) << --- cut some cleanup --- >> return temp_storage ``` So in order to use this, we just have to call `bin_a_chunk` for each chunk and then aggregate. In the notebook, I use a slightly modified version of `yt.data_objects.profiles.ProfileND._finalize_storage` to do that aggregation (the only modifications were to remove some of the MPI communication). This approach can be easily extended to other parts of *yt* where we are applying collections and reductions across chunks. By replacing the MPI communications with loops over delayed dask objects we can re-use existing *yt* functions with fairly minimal refactoring! It is also worth stressing that these changes would not disrupt **using** MPI -- with the dask-mpi package, executing this modified code with MPI is unchanged. It's just that all of the MPI communication is handed off to Dask instead of *yt*'s internal MPI protocols. [**return to main post**](https://hackmd.io/@chavlin/r1E_o6lAv#experiments-in-daskifying-yt)