## Levaraging Dask in *yt*
**Chris Havlin (U. Illinois DXL, chavlin@illinois.edu)**
**RHytHM Workshop, 2020-12-10**
with: Matt Turk, Madicken Munk, Kacper Kowalik

---
### What's to come:
* *yt* data chunking and parallelism
* how Dask can help
* some experiments:
* a daskified particle reader
* a unyt-dask array class
---
### *yt* chunking
when you do:
```python
ds = yt.load(*)
ad = ds.all_data()
ad['density']
```
*yt* iterates through `chunks`, reads field into memory.
`chunk` = range of data potentially across datafiles.
---
### *yt* chunking
when you do:
```python
ad.quantities.extrema('density')
```
*yt* will:
* find the extrema of each chunk (the `reduction`)
* find the extrema of the extrema
---
### *yt* in parallel with MPI
after setting up MPI ([see the docs](https://yt-project.org/doc/analyzing/parallel_computation.html)) when you do:
```python
import yt
yt.enable_parallelism()
ds = yt.load("RD0035/RedshiftOutput0035")
v, c = ds.find_max("density")
```
*yt* will:
* divide `chunks` between MPI processes
* collect the max value from each chunk
* return the max of the reduced max values
---
### yt in parallel with MPI
From Ryan Farber:


---
### Dask?
Can dask simplify yt's chunking and parallel processing?
---
### Dask?
"Dask is a flexible library for parallel computing in Python."

---
### Dask?

---
### Dask?
`dask.delayed()` operator for parallel workflows:
```
delayed_tasks = []
for chunk in dataset_chunks:
result = dask.delayed(chunk_operator)(chunk)
delayed_tasks.append(cool_function(result))
in_mem_results = dask.compute(*delayed_tasks)
```
---
### Dask!
Can dask simplify yt's chunking and parallel processing?
Yes
**Devs**: codebase simplification
**Users**: simpler parallel intallation, usage
---
### A daskified particle reader
Not so risky "live" demo time!
[a daskified Gadget reader with dask.delayed](https://nbviewer.jupyter.org/github/data-exp-lab/yt-dask-experiments/blob/master/dask_chunking/native_gadget_read.ipynb)
---
### A daskified particle reader
In `_read_particle_selection()`, the final bit reads the values into memory:
```python
rv = {}
for ptype in ptypes:
for col in delayed_dfs[ptype].columns:
rv[(ptype, col)] = delayed_dfs[ptype][col].values.compute()
return rv
```
but what if... we didnt?
---
### an imagined future:
rather than returning an in-memory array, what if:
```python
ds = yt.load(*)
ad = ds.all_data()
den = ad['density']
```
returned a dask array?
```python
den.max().compute()
```
Dask handles the scheduling, reductions across chunks!
---
### an imagined future:
From Ryan Farber:


---
### an imagined future:
(pseudocode)
```python
import dask.array as da
from dask.distributed import Client
c = Client(n_workers = 10, threads_per_worker = 4)
<<<< yt imports, data load >>
ad = ds.all_data() # or equivalent for dataseries
time = ad['times'] # parallel dask array
delv = ad['delv'] # parallel dask array
coldMass = ad['massCold'] # parallel dask array
new_array = complex_function(time,delv,coldMass)
da.to_hdf5('myfile.hdf5', {'/new_array': new_array})
```
same syntax for parallel or serial!
---
### ... but units ...
```python
ds = yt.load(*)
ad = ds.all_data()
ad['density'] # a unyt_array
```
arrays from `_read_*_selection()` are converted to `unyt_array` objects.
---
### dask-unyt arrays
Creating a custom Dask collection:
```plantuml
@startuml
skinparam monochrome reverse
DaskMethodsMixin <|-- dask_array
DaskMethodsMixin <|-- unyt_dask_array
@enduml
```
---
### dask-unyt arrays
but we want all the array-reductions of `dask_array`:
```plantuml
@startuml
skinparam monochrome reverse
DaskMethodsMixin <|-- dask_array
dask_array <|-- unyt_dask_array
@enduml
```
---
### dask-unyt arrays
but we also want the unit management of `unyt_array`:
```plantuml
@startuml
class unyt_dask_array {
._unyt_array
}
skinparam monochrome reverse
DaskMethodsMixin <|-- dask_array
dask_array <|-- unyt_dask_array
@enduml
```
---
### dask-unyt arrays
Slightly riskier live demo time!
[full notebook](https://nbviewer.jupyter.org/github/data-exp-lab/yt-dask-experiments/blob/master/unyt_dask/unyt_from_dask.ipynb)
---
### dask & yt: next steps
* dask-unyt arrays: better automate dask-unyt array subclassing (np array protocols!)
* extend the particle reader prototype to return dask-unyt arrays
* chunking outside of particle IO
* yt community input! (YTEP...)

{"metaMigratedAt":"2023-06-15T16:50:12.228Z","metaMigratedFrom":"YAML","title":"Levaraging Dask in yt","breaks":true,"slideOptions":"{\"theme\":\"Blood\",\"transition\":\"fade\",\"spotlight\":{\"enabled\":false},\"progress\":true,\"center\":false,\"slideNumber\":true}","contributors":"[{\"id\":\"4152b526-03cd-421d-8c14-f6b324bec337\",\"add\":10367,\"del\":5180}]","description":"Chris Havlin (U. Illinois DXL, chavlin@illinois.edu)RHytHM Workshop, 2020-12-10"}