A sketch of what we might want to do in the yt stuff for dask, async, etc.
Basically, we currently have index objects on every dataset. I think the obvious next steps for getting us to work with dask involve first identifying the operations that are important, which I would put as derived fields (including spatial derived fields), generation of input to visualization routines, and high-level operations like max mean etc. These include dimensional reduction (projections).
If we assume that each field type is associated with an index, this would simplify the process – so we could have 1:N mappings from index to field types, but 1:1 from field types to index objects
Alternately, if we had non-overlapping index objects (or multiple discrete objects) we could allow multiple indexes for a single field type
Kacper suggests that maybe we should be looking at Chunks some what differently. If we separate out into coarse/refined operations, we could move our selection operations into the chunks, and have the index just yield the high-level chunks – or even just have the index yield all of the chunks.
(copying over some more notes from the slack discussion)
Possible organization for constructing a set of objects:
So the flow for trivially parallel operations would be that the selector object would go to the index, it would coarsely index and emit chunks (and if there are not enough chunks it could potentially subchunk them, dunno) and then each chunk would be a dask or async future and when the IO occurred on it it would read from disk.
right now for a grid frontend, it goes something like this:
yt/data_objects/grid_patch.py
)yield
ed and operated onFor particle reading, there are a handful of specific things that need to be kept in mind:
For grids:
For both:
If we were to start afresh, here's what we could do:
This will break down very rapidly, but we can get some functionality back over time.
It's also possible that what we might want to do is skip the first item above and just do post-selection particles.
By the time the chunks get to the IO handler, it often is just something like this:
and then the chunks are not seen again – only the data_files
.
But, the data files do get sorted and whatnot to attempt to reduce IO overhead, because the chunks themselves are sized according to some heuristics.
So a few things. The ultimate result of this function is nothing more than an iterable of key/value store that associates selected particle types and fields with the numpy arrays. Note that it will still iterate over the chunks, but there's no attempt to line up a chunk ID with the particles. The particles are read in a consistent order (because they are assumed to be sorted internally, and we sort by a consistent sorting method) but they are otherwise not keyed to the chunks, just the data files.
The result here is yield (ptype, field), data
.
The dxl yt-dask-experiment repo includes some initial chunking experiments with dask in the dask_chunking
directory. That folder primarily experiments with using dask to do some manual IO on a gadget dataset. In this experiment, the gadget file reading is split into a series of dask-delayed operations that include file IO and selector application. The notebook here gives an overview of that attempt, so I won't go into details here, but will instead focus on an attempt to use dask for file IO natively within yt.
The focus of this attempt falls within BaseIOHandler._read_particle_selection()
. In this function, we pre-allocate arrays for all the particle fields that we're reading and then read in the fields for each chunk using the self._read_particle_fields(chunks, ptf, selector)
generator. In the case of gadget, _read_particle_fields
follows the code samples in the above section on Particle IO. Also note that an open PR removes the preallocation of the particles (PR 2416).
One conceptually straightforward way to leverage dask is by building a dask.dataframe
from delayed objects. Dask dataframes, unlike dask arrays, do not need to know the total size a priori, but we do need to specify a metadata dictionary to declare the column names and datatypes.
So to build a dask.dataframe
, we can do the following (from within BaseIOHandler._read_particle_selection()
):
In this snippet, we build up a dictionary of dask dataframes organized by particle type. First, let's focus on the actual application of the dask.delayed
decorator:
This decorator wraps a _read_single_ptype
function, which takes a single chunk, a single particle type dictionary (with multiple fields) and a column-datatype metadata dictionary and returns a normal pandas dataframe:
We can see this function just calls our normal self._read_particle_fields
and pulls out the field values as usual. We are storing the fields in columns of our single chunk and single particle type pandas dataframe, chunk_results
.
So for a single particle type, we build our delayed dataframe
The delayed_dfs
object is a dictionary with a delayed dataframe for each particle type. The reason we're organizing by particle type is the issue that different particle types may have a different number of records in a given chunk (otherwise we'd have to store a large number of null values). In order to return the expected in-memory dict that _read_particle_selection()
should return, we can very easily pull all the records from across our chunks and particle typeswith:
So this is a fairly promising approach, particularly since the dataframes do not need to know the expected array size. And it does indeed work to read data from our particle front ends, with some modifications to the ParticleContainer
class.
The notebook here shows the above approach in action, with notes on the required changes to the ParticleContainer
class.