---
title: yt + Dask particle IO
tags: yt, dask
author: Chris Havlin
---
[**return to main post**](https://hackmd.io/@chavlin/r1E_o6lAv#experiments-in-daskifying-yt)
### 1. (particle) data IO
Full notebook available [here](https://nbviewer.jupyter.org/github/data-exp-lab/yt-dask-experiments/blob/master/dask_chunking/native_gadget_read.ipynb).
*yt* reads particle and grid-based data by iterating across the `chunks`, with frontend-specific IO functions. For gridded data, each frontend implements a `_read_fluid_selection` (e.g., `yt.frontend.amrvac.AMRVACIOHandler._read_fluid_selection`) that iterates over `chunks` and returns a flat dictionary with numpy arrays concatenated across each `chunk`. The particle data, frontends must implement a similar function, `_read_particle_fields`, that typically gets invoked within the `BaseIOHanlder._read_particle_selection` function. In both cases, the read functions accept the `chunks` iterator, the fields to read and a `selector` object:
```python
def _read_particle_fields(self, chunks, ptf, selector):
def _read_fluid_selection(self, chunks, selector, fields, size):
```
On reading a single chunk, the `selector` is applied so that we only return selected data from each chunk.
In order to construct a `dask.delayed()` Task Graph for IO, there are a number of changes required. First, dask uses `pickle` to serialize the functions and arguments that get distributed to workers, so all arguments to the `_read` functions above must be pickleable. A recent PR ([PR #2934](https://github.com/yt-project/yt/pull/2934)) added `__getstate__` and `__setstate__` methods to the geometric Cython `selector` objects, but the `chunks` can be more challenging. We touch on this further in the following section on the Particle IO prototype.
Second, the `_read_particle_fields` and `_read_fluid_selection` typically iterate over `chunks` internally, but it is more straightforward to construct a chunk-parallel dask process with a single-chunk `read` function. In the following Particle IO prototype, we avoid a large scale refactor by calling `_read_particle_fields` repeatedly with a list of a single-chunk, i.e.,`_read_particle_fields([chunk],ptf,selector)`, but separating the chunk iteration and concatenation into a single chunk read function may improve maintainability.
Finally, one question on IO is what to return? The present read routines return a dictionary of numpy arrays. These arrays are typically pre-allocated though the recent PR [#2416](https://github.com/yt-project/yt/pull/2416) removes particle pre-allocation. But in terms of dask, we could return a distritubed `dask.array` or `dask.dataframe`, allowing subsequent functions to compute in parallel. The `dask.array` needs to know chunk sizes a priori, which presents some difficulty. A `dask.dataframe` does not need to know the chunk sizes, just the expected data type for each column. So at present, in the following prototype, we construct a Task Graph that constructs a `dask.dataframe` from delayed chunk reads. In order to return the dictionary of flat numpy arrays expected by `_read_particle_fields`, prototype reduces the distributed dataframes to standard numpy arrays but in principle we could return the `dask.dataframes`, allowing subsequent calculations to leverage distributed calculations by using the distributed dataframes directly.
The [dxl yt-dask-experiment repo](https://github.com/data-exp-lab/yt-dask-experiments) includes some initial chunking experiments with dask in the `dask_chunking` directory. That folder primarily experiments with using dask to do some manual IO on a gadget dataset. In this experiment, the gadget file reading is split into a series of dask-delayed operations that include file IO and selector application. The notebook [here](https://nbviewer.jupyter.org/github/data-exp-lab/yt-dask-experiments/blob/master/dask_chunking/native_gadget_read.ipynb) gives an overview of that attempt, so I won't go into details here, but will instead focus on an attempt to use dask for file IO natively within *yt*.
The focus of this attempt falls within `BaseIOHandler._read_particle_selection()`. In this function, *yt* currently pre-allocates arrays for all the particle fields that we're reading and then read in the fields for each chunk using the `self._read_particle_fields(chunks, ptf, selector)` generator.
One conceptually straightforward way to leverage dask is by building a `dask.dataframe` from delayed objects. Dask dataframes, unlike dask arrays, do not need to know the total size a priori, but we do need to specify a metadata dictionary to declare the column names and datatypes.
So to build a `dask.dataframe`, we can do the following (from within `BaseIOHandler._read_particle_selection()`):
```python
ptypes = list(ptf.keys())
delayed_dfs = {}
for ptype in ptypes:
# build a dataframe from delayed for each particle type
this_ptf = {ptype: ptf[ptype]}
delayed_chunks = [
dask.delayed(self._read_single_ptype)(
ch, this_ptf, selector, ptype_meta[ptype]
)
for ch in chunks
]
delayed_dfs[ptype] = ddf.from_delayed(delayed_chunks, meta=ptype_meta[ptype])
```
In this snippet, we build up a dictionary of dask dataframes organized by particle type (`ptypes`). First, let's focus on the innermost loop in the snippet, the actual application of the `dask.delayed` decorator:
```python
dask.delayed(self._read_single_ptype)(
ch, this_ptf, selector, ptype_meta[ptype]
)
```
This decorator wraps a `_read_single_ptype` function, which takes a single chunk, a single particle type dictionary (with multiple fields) and a column-datatype metadata dictionary and returns a normal pandas dataframe:
```python
def _read_single_ptype(self, chunk, ptf, selector, meta_dict):
# read a single chunk and single particle type into a pandas dataframe so that
# we can use dask.dataframe.from_delayed! fields within a particle type should
# have the same length?
chunk_results = pd.DataFrame(meta_dict)
# each particle type could be a different dataframe...
for field_r, vals in self._read_particle_fields([chunk], ptf, selector):
chunk_results[field_r[1]] = vals
return chunk_results
```
We can see this function just calls our normal `self._read_particle_fields` and pulls out the field values as usual. We are storing the fields in columns of our single chunk and single particle type in a pandas dataframe, `chunk_results`.
So for a single particle type, we build our delayed dataframe
```python
delayed_chunks = [
dask.delayed(self._read_single_ptype)(
ch, this_ptf, selector, ptype_meta[ptype]
)
for ch in chunks
]
delayed_dfs[ptype] = ddf.from_delayed(delayed_chunks, meta=ptype_meta[ptype])
```
The `delayed_dfs` object is a dictionary with a delayed dataframe for each particle type. The reason we're organizing by particle type is the issue that different particle types may have a different number of records in a given chunk (otherwise we'd have to store a large number of null values). In order to return the expected in-memory dict that `_read_particle_selection()` should return, we can very easily pull all the records from across our chunks and particle typeswith:
```python
rv = {}
for ptype in ptypes:
for col in delayed_dfs[ptype].columns:
rv[(ptype, col)] = delayed_dfs[ptype][col].values.compute()
```
So this is a fairly promising approach, particularly since the dataframes do not need to know the expected array size. And it does indeed work to read data from our particle front ends, with some modifications to the `ParticleContainer` class.
The notebook [here](https://nbviewer.jupyter.org/github/data-exp-lab/yt-dask-experiments/blob/master/dask_chunking/native_gadget_read.ipynb) shows the above approach in action, with notes on the required changes to the `ParticleContainer` class. To load actually execute in parallel, the only requirement is to spin up a dask `Client`:
```python
from dask.distributed import Client
import yt
c = Client(threads_per_worker=3,n_workers=4)
ds = yt.load_sample("snapshot_033")
ad = ds.all_data()
d = ad[('PartType0','Density')
```
The `dask.delayed` and `dask.compute` will find and connect to the `Client`. If no client is present, the Task Graph will be executed sequentially. Here's a snapshot of the Dask Dashboard's Task Stream during a parallel particle read:
![a daskified *yt* particle read](https://i.imgur.com/rqmfifn.png)
[**return to main post**](https://hackmd.io/@chavlin/r1E_o6lAv#experiments-in-daskifying-yt)