---
title: yt + Dask profiles
tags: yt, dask
author: Chris Havlin
---
[**return to main post**](https://hackmd.io/@chavlin/r1E_o6lAv#experiments-in-daskifying-yt)
### 2. profile calculation: operations on chunked data
The prototype dask-Gadget reader above returns the flat numpy arrays expected by `_read_particle_selection()`, but if instead we could return dask objects, we could easily build up parallel workflows.
In the notebook [here](https://github.com/data-exp-lab/yt-dask-experiments/blob/master/notebooks/yt_profiles_with_dask.ipynb), I explored how to create a daskified-profile calculation under the assumption that our yt IO returns dask arrays.
The notebook demonstrates two approaches. In the first, I use intrinsic dask array functions to recreate a yt profile using dask's implementation of `np.digitize` and then summing over the digitized bins. The performance isn't great, though, and it's not obvious how the approach could be extended to reproduce all of the functionality of `ds.create_profile()` in `yt` (multiple binning variables, returning multiple statistics at once, etc.).
In the second approach, I instead work out how to apply the existing `yt` profile functions directly to the dask arrays. When you have a delayed dask array, you can loop over a chunk, apply a function to each chunk and then reduce the result across the chunks. In pseudo-code, creating a profile looks like:
```python
field_to_bin,binning_field, weighting_field = get_dask_delayed()
ops = []
for fdata,bdata,wdata in zip(field_to_bin,binning_field, weighting_field ):
ops.append(bin_a_single_chunk(fdata,bdata,wdata))
binned_chunks = compute(*ops)
aggregated_statistic = aggregate_the_chunks(binned_chunks)
```
This is more or less what *yt* already does in calculating derived quantities and profiles -- stats/values are calculated for each chunk and then aggregated across chunks. So in the notebook, I set out to use *yt*'s existing by-chunk and aggregation functions directly with a dask array to calculate a profile.
To do this, I created a `bin_a_chunk` function using `yt.data_objects.profiles.ProfileFieldAccumulator` and the C-optimized `yt.utilities.lib.misc_utilities.new_binprofile1d`. Copying and cutting the code down to its most salient steps, for each chunk we first use `np.digitize` to find the bin indices only for this chunk, initialize the arrays we need for the cython function using the `ProfileFieldAccumulator` and then call `new_binprofile1d` to fill in the stats for this chunk.
```python
from yt.data_objects.profiles import ProfileFieldAccumulator
from yt.utilities.lib.misc_utilities import new_bin_profile1d
def bin_a_chunk(field_to_bin, binning_field, bins, weighting_field = None):
# binning_field : the current chunk of the field that we are binning BY
# field_to_bin : the current chunk of the field that we want to bin
# bins : the bins we want to use for binning
# weighting_field : the field to weight by
#
# returns ProfileFieldAccumulator for this chunk
bin_ind = np.digitize(binning_field, bins) - 1
<< --- cut some bin/weighting prep --- >>
temp_storage = ProfileFieldAccumulator(n_fields, binned_sizes)
# profile this chunk!
new_bin_profile1d(
bin_ind,
weighting_field,
fdata,
temp_storage.weight_values,
temp_storage.values,
temp_storage.mvalues,
temp_storage.qvalues,
temp_storage.used,
)
<< --- cut some cleanup --- >>
return temp_storage
```
So in order to use this, we just have to call `bin_a_chunk` for each chunk and then aggregate. In the notebook, I use a slightly modified version of `yt.data_objects.profiles.ProfileND._finalize_storage` to do that aggregation (the only modifications were to remove some of the MPI communication).
This approach can be easily extended to other parts of *yt* where we are applying collections and reductions across chunks. By replacing the MPI communications with loops over delayed dask objects we can re-use existing *yt* functions with fairly minimal refactoring! It is also worth stressing that these changes would not disrupt **using** MPI -- with the dask-mpi package, executing this modified code with MPI is unchanged. It's just that all of the MPI communication is handed off to Dask instead of *yt*'s internal MPI protocols.
[**return to main post**](https://hackmd.io/@chavlin/r1E_o6lAv#experiments-in-daskifying-yt)