or
or
By clicking below, you agree to our terms of service.
New to HackMD? Sign up
Syntax | Example | Reference | |
---|---|---|---|
# Header | Header | 基本排版 | |
- Unordered List |
|
||
1. Ordered List |
|
||
- [ ] Todo List |
|
||
> Blockquote | Blockquote |
||
**Bold font** | Bold font | ||
*Italics font* | Italics font | ||
~~Strikethrough~~ | |||
19^th^ | 19th | ||
H~2~O | H2O | ||
++Inserted text++ | Inserted text | ||
==Marked text== | Marked text | ||
[link text](https:// "title") | Link | ||
 | Image | ||
`Code` | Code |
在筆記中貼入程式碼 | |
```javascript var i = 0; ``` |
|
||
:smile: | ![]() |
Emoji list | |
{%youtube youtube_id %} | Externals | ||
$L^aT_eX$ | LaTeX | ||
:::info This is a alert area. ::: |
This is a alert area. |
On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?
Please give us some advice and help us improve HackMD.
Do you want to remove this version name and description?
Syncing
xxxxxxxxxx
Results from initial explorations in leveraging dask in yt
There are a number of possible ways for yt to leverage dask in order to simplify existing parallel and lazy operations but thus far, we have focused on two related areas in which dask may prove particularly useful:
Central to both of these areas is the
chunk
iterator that yt uses to index a dataset. The exact implementation depends both on the frontend and datatype, but in general a singlechunk
is a range of data indeces that may span datafiles or a subset of a single datafile. Thus, data can be read in by separate processes in parallel or sequentially on a single processor by iterating and aggregating over chunks separately. In the case of IO, data can also be subselected byselector
objects (e.g., slices, regions) before aggregation across chunks into yt arrays. And in the case of data processing, some computations such as finding statistical measures of a data field (max, min, weighted means) can also process chunks separately and return intermediate results that are then aggregated.In terms of
dask
, one straightforward conceptual approach with fairly minimal refactoring of yt to use thedask.delayed()
decorator to construct graphs to execute acrosschunks
. In pseudocode this could look like:where
chunk_processor
is some function that we want to apply to each chunk and*args
are the arguments to that function. This list ofdelayed_chunks
can be strung together with any number of subsequent operations to create a dask Task Graph. For example, if we want to find the minimum value of a field across chunks, we might construct a graph that first reads a chunk and then finds the min of each chunk:The final line is where the task graph actually executes, before which
dask
is only constructing a representation of the tasks. When we calldask.compute()
,dask
will distribute the tasks to thedask.distributed.Client
if one is active, otherwise the execution continues sequentially on a single processor.In the following, we first dive into data IO in more detail and describe a prototype particle reader that uses delayed
dask.dataframes
to read fields from a Gadget dataset. We then discuss calculating derived quantities of yt Data Objects using dask.1. data IO:
yt reads particle and grid-based data by iterating across the
chunks
, with frontend-specific IO functions. For gridded data, each frontend implements a_read_fluid_selection
(e.g.,yt.frontend.amrvac.AMRVACIOHandler._read_fluid_selection
) that iterates overchunks
and returns a flat dictionary with numpy arrays concatenated across eachchunk
. The particle data, frontends must implement a similar function,_read_particle_fields
, that typically gets invoked within theBaseIOHanlder._read_particle_selection
function. In both cases, the read functions accept thechunks
iterator, the fields to read and aselector
object:On reading a single chunk, the
selector
is applied so that we only return selected data from each chunk.In order to construct a
dask.delayed()
Task Graph for IO, there are a number of changes required. First, dask usespickle
to serialize the functions and arguments that get distributed to workers, so all arguments to the_read
functions above must be pickleable. A recent PR (PR #2934) added__getstate__
and__setstate__
methods to the geometric Cythonselector
objects, but thechunks
can be more challenging. We touch on this further in the following section on the Particle IO prototype.Second, the
_read_particle_fields
and_read_fluid_selection
typically iterate overchunks
internally, but it is more straightforward to construct a chunk-parallel dask process with a single-chunkread
function. In the following Particle IO prototype, we avoid a large scale refactor by calling_read_particle_fields
repeatedly with a list of a single-chunk, i.e.,_read_particle_fields([chunk],ptf,selector)
, but separating the chunk iteration and concatenation into a single chunk read function may improve maintainability.Finally, one question on IO is what to return? The present read routines return a dictionary of numpy arrays. These arrays are typically pre-allocated though the recent PR #2416 removes particle pre-allocation. But in terms of dask, we could return a distritubed
dask.array
ordask.dataframe
, allowing subsequent functions to compute in parallel. Thedask.array
needs to know chunk sizes a priori, which presents some difficulty. Adask.dataframe
does not need to know the chunk sizes, just the expected data type for each column. In the following prototype, we construct a Task Graph that constructs adask.dataframe
from delayed chunk reads. In order to return the dictionary of flat numpy arrays expected by_read_particle_fields
, prototype reduces the distributed dataframes to standard numpy arrays but in principle we could return thedask.dataframes
, allowing subsequent calculations to leverage distributed calculations by using the distributed dataframes directly.a particle IO prototype
The dxl yt-dask-experiment repo includes some initial chunking experiments with dask in the
dask_chunking
directory. That folder primarily experiments with using dask to do some manual IO on a gadget dataset. In this experiment, the gadget file reading is split into a series of dask-delayed operations that include file IO and selector application. The notebook here gives an overview of that attempt, so I won't go into details here, but will instead focus on an attempt to use dask for file IO natively within yt.The focus of this attempt falls within
BaseIOHandler._read_particle_selection()
. In this function, yt currently pre-allocates arrays for all the particle fields that we're reading and then read in the fields for each chunk using theself._read_particle_fields(chunks, ptf, selector)
generator.One conceptually straightforward way to leverage dask is by building a
dask.dataframe
from delayed objects. Dask dataframes, unlike dask arrays, do not need to know the total size a priori, but we do need to specify a metadata dictionary to declare the column names and datatypes.So to build a
dask.dataframe
, we can do the following (from withinBaseIOHandler._read_particle_selection()
):In this snippet, we build up a dictionary of dask dataframes organized by particle type (
ptypes
). First, let's focus on the innermost loop in the snippet, the actual application of thedask.delayed
decorator:This decorator wraps a
_read_single_ptype
function, which takes a single chunk, a single particle type dictionary (with multiple fields) and a column-datatype metadata dictionary and returns a normal pandas dataframe:We can see this function just calls our normal
self._read_particle_fields
and pulls out the field values as usual. We are storing the fields in columns of our single chunk and single particle type in a pandas dataframe,chunk_results
.So for a single particle type, we build our delayed dataframe
The
delayed_dfs
object is a dictionary with a delayed dataframe for each particle type. The reason we're organizing by particle type is the issue that different particle types may have a different number of records in a given chunk (otherwise we'd have to store a large number of null values). In order to return the expected in-memory dict that_read_particle_selection()
should return, we can very easily pull all the records from across our chunks and particle typeswith:So this is a fairly promising approach, particularly since the dataframes do not need to know the expected array size. And it does indeed work to read data from our particle front ends, with some modifications to the
ParticleContainer
class.The notebook here shows the above approach in action, with notes on the required changes to the
ParticleContainer
class. To load actually execute in parallel, the only requirement is to spin up a daskClient
:The
dask.delayed
anddask.compute
will find and connect to theClient
. If no client is present, the Task Graph will be executed sequentially. Here's a snapshot of the Dask Dashboard's Task Stream during a parallel particle read:2. derived quantities
Calcluation of derived quantities in yt also uses the
chunk
iterator to return intermediate by-chunk results that are then aggregated. The baseDerivedQuantity
object's__call__
function is where the iteration occurs:In principle, if a dask dataframe were to be returned by the chunk IO, then we could completely remove the consideration of intermediate values and reduction and simply use pandas-like operations to return values. More specifically, we could use dataframe aggregation directly, which specifies a by-chunk operation, a by-chunk aggregation operation and a chunk-aggregation operation. From the dask documentations, an example of manually computing a mean value for a dask dataframe,
df
, isSo if we return dask dataframes from the IO, we could replace the derived quantities with custom
Aggregation
operations. We are currently working on a proof-of-concept prototype demonstrating this in action.Working with unyt
yt 4.0 forward uses
unyt
to track and convert units – the baseYTArray
class is actually a model inunyt
. If we are usingdask
for IO, there are situations where it may be advantageous to hold off computing thedask
graph. For example, we may want to return adask
array to the user so that they can construct their own computations in parallel. This requires, however, some level ofdask
support inunyt
.In the notebook, working with unyt and dask, we demonstrate a limited prototype of a
dask
-unyt
array. In this notebook, we create a custom dask collection by sublcassing the primarydask.array
class and adding someunyt
functionality. This custom class is handled automatically by thedask
scheduler, so that if we have a large dask array with a dask client running and we create our newdask
-unyt
array, e.g.:when we do operations like finding the minimum value across all the chunks:
We are returned a standard
unyt_array
that was calculated by processing each chunk of the array separately, as seen here in the Task Stream of the dask dashboard:
This notebook demonstrates a general and fairly straightforward way to build in dask support to
unyt
which can be used in conjuction with, for example, the prototype dask-enabled particle reader to return arrays with dask functionality preserved.some final thoughts
In the above discussion, we've focused primarily on how to use dask within the yt chunking infrastructure. But it is also worth considering whether we can replace or simplify the chunking itself using dask. In the case of reading particle data, yt is primarily looping over datafiles, so it may be possible to read directly into a dask dataframe without the chunking architecture. This remains to be explored…