chavlin
    • Create new note
    • Create a note from template
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Write
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
      • Invitee
    • Publish Note

      Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

      Your note will be visible on your profile and discoverable by anyone.
      Your note is now live.
      This note is visible on your profile and discoverable online.
      Everyone on the web can find and read all notes of this public team.
      See published notes
      Unpublish note
      Please check the box to agree to the Community Guidelines.
      View profile
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Engagement control
    • Transfer ownership
    • Delete this note
    • Save as template
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Sharing URL Create Help
Create Create new note Create a note from template
Menu
Options
Versions and GitHub Sync Engagement control Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Write
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
Invitee
Publish Note

Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

Your note will be visible on your profile and discoverable by anyone.
Your note is now live.
This note is visible on your profile and discoverable online.
Everyone on the web can find and read all notes of this public team.
See published notes
Unpublish note
Please check the box to agree to the Community Guidelines.
View profile
Engagement control
Commenting
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
  • Everyone
Suggest edit
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
Emoji Reply
Enable
Import from Dropbox Google Drive Gist Clipboard
   owned this note    owned this note      
Published Linked with GitHub
Subscribed
  • Any changes
    Be notified of any changes
  • Mention me
    Be notified of mention me
  • Unsubscribe
Subscribe
--- title: Initial exploration of yt + Dask tags: yt, dask --- # Results from initial explorations in leveraging dask in *yt* There are a number of possible ways for *yt* to leverage [dask](https://dask.org/) in order to simplify existing parallel and lazy operations but thus far, we have focused on two related areas in which dask may prove particularly useful: 1. data IO: reading data from disk 2. processing data after reading Central to both of these areas is the `chunk` iterator that *yt* uses to index a dataset. The exact implementation depends both on the frontend and datatype, but in general a single `chunk` is a range of data indeces that may span datafiles or a subset of a single datafile. Thus, data can be read in by separate processes in parallel or sequentially on a single processor by iterating and aggregating over chunks separately. In the case of IO, data can also be subselected by `selector` objects (e.g., slices, regions) before aggregation across chunks into *yt* arrays. And in the case of data processing, some computations such as finding statistical measures of a data field (max, min, weighted means) can also process chunks separately and return intermediate results that are then aggregated. In terms of `dask`, one straightforward conceptual approach with fairly minimal refactoring of *yt* to use the `dask.delayed()` decorator to construct graphs to execute across `chunks`. In pseudocode this could look like: ```python delayed_chunks = [dask.delayed(chunk_processor)(ch, *args) for ch in chunks] ``` where `chunk_processor` is some function that we want to apply to each chunk and `*args` are the arguments to that function. This list of `delayed_chunks` can be strung together with any number of subsequent operations to create a dask [Task Graph](https://docs.dask.org/en/latest/graphs.html). For example, if we want to find the minimum value of a field across chunks, we might construct a graph that first reads a chunk and then finds the min of each chunk: ```python delayed_chunks = [dask.delayed(read_one_chunk)(ch,field) for ch in chunks] delayed_mins = [dask.delayed(find_chunk_min)(dc) for dc in delayed_chunks] min_val = min(dask.compute(*delayed_mins)) ``` The final line is where the task graph actually executes, before which `dask` is only constructing a representation of the tasks. When we call `dask.compute()`, `dask` will distribute the tasks to the `dask.distributed.Client` if one is active, otherwise the execution continues sequentially on a single processor. In the following, we first dive into data IO in more detail and describe a prototype particle reader that uses delayed `dask.dataframes` to read fields from a Gadget dataset. We then discuss calculating [derived quantities of *yt* Data Objects](https://yt-project.org/doc/analyzing/objects.html#available-derived-quantities) using dask. ## 1. data IO: *yt* reads particle and grid-based data by iterating across the `chunks`, with frontend-specific IO functions. For gridded data, each frontend implements a `_read_fluid_selection` (e.g., `yt.frontend.amrvac.AMRVACIOHandler._read_fluid_selection`) that iterates over `chunks` and returns a flat dictionary with numpy arrays concatenated across each `chunk`. The particle data, frontends must implement a similar function, `_read_particle_fields`, that typically gets invoked within the `BaseIOHanlder._read_particle_selection` function. In both cases, the read functions accept the `chunks` iterator, the fields to read and a `selector` object: ```python def _read_particle_fields(self, chunks, ptf, selector): def _read_fluid_selection(self, chunks, selector, fields, size): ``` On reading a single chunk, the `selector` is applied so that we only return selected data from each chunk. In order to construct a `dask.delayed()` Task Graph for IO, there are a number of changes required. First, dask uses `pickle` to serialize the functions and arguments that get distributed to workers, so all arguments to the `_read` functions above must be pickleable. A recent PR ([PR #2934](https://github.com/yt-project/yt/pull/2934)) added `__getstate__` and `__setstate__` methods to the geometric Cython `selector` objects, but the `chunks` can be more challenging. We touch on this further in the following section on the Particle IO prototype. Second, the `_read_particle_fields` and `_read_fluid_selection` typically iterate over `chunks` internally, but it is more straightforward to construct a chunk-parallel dask process with a single-chunk `read` function. In the following Particle IO prototype, we avoid a large scale refactor by calling `_read_particle_fields` repeatedly with a list of a single-chunk, i.e.,`_read_particle_fields([chunk],ptf,selector)`, but separating the chunk iteration and concatenation into a single chunk read function may improve maintainability. Finally, one question on IO is what to return? The present read routines return a dictionary of numpy arrays. These arrays are typically pre-allocated though the recent PR [#2416](https://github.com/yt-project/yt/pull/2416) removes particle pre-allocation. But in terms of dask, we could return a distritubed `dask.array` or `dask.dataframe`, allowing subsequent functions to compute in parallel. The `dask.array` needs to know chunk sizes a priori, which presents some difficulty. A `dask.dataframe` does not need to know the chunk sizes, just the expected data type for each column. In the following prototype, we construct a Task Graph that constructs a `dask.dataframe` from delayed chunk reads. In order to return the dictionary of flat numpy arrays expected by `_read_particle_fields`, prototype reduces the distributed dataframes to standard numpy arrays but in principle we could return the `dask.dataframes`, allowing subsequent calculations to leverage distributed calculations by using the distributed dataframes directly. ### a particle IO prototype The [dxl yt-dask-experiment repo](https://github.com/data-exp-lab/yt-dask-experiments) includes some initial chunking experiments with dask in the `dask_chunking` directory. That folder primarily experiments with using dask to do some manual IO on a gadget dataset. In this experiment, the gadget file reading is split into a series of dask-delayed operations that include file IO and selector application. The notebook [here](https://nbviewer.jupyter.org/github/data-exp-lab/yt-dask-experiments/blob/master/dask_chunking/gadget_dask_delayed.ipynb) gives an overview of that attempt, so I won't go into details here, but will instead focus on an attempt to use dask for file IO natively within *yt*. The focus of this attempt falls within `BaseIOHandler._read_particle_selection()`. In this function, *yt* currently pre-allocates arrays for all the particle fields that we're reading and then read in the fields for each chunk using the `self._read_particle_fields(chunks, ptf, selector)` generator. One conceptually straightforward way to leverage dask is by building a `dask.dataframe` from delayed objects. Dask dataframes, unlike dask arrays, do not need to know the total size a priori, but we do need to specify a metadata dictionary to declare the column names and datatypes. So to build a `dask.dataframe`, we can do the following (from within `BaseIOHandler._read_particle_selection()`): ```python ptypes = list(ptf.keys()) delayed_dfs = {} for ptype in ptypes: # build a dataframe from delayed for each particle type this_ptf = {ptype: ptf[ptype]} delayed_chunks = [ dask.delayed(self._read_single_ptype)( ch, this_ptf, selector, ptype_meta[ptype] ) for ch in chunks ] delayed_dfs[ptype] = ddf.from_delayed(delayed_chunks, meta=ptype_meta[ptype]) ``` In this snippet, we build up a dictionary of dask dataframes organized by particle type (`ptypes`). First, let's focus on the innermost loop in the snippet, the actual application of the `dask.delayed` decorator: ```python dask.delayed(self._read_single_ptype)( ch, this_ptf, selector, ptype_meta[ptype] ) ``` This decorator wraps a `_read_single_ptype` function, which takes a single chunk, a single particle type dictionary (with multiple fields) and a column-datatype metadata dictionary and returns a normal pandas dataframe: ```python def _read_single_ptype(self, chunk, ptf, selector, meta_dict): # read a single chunk and single particle type into a pandas dataframe so that # we can use dask.dataframe.from_delayed! fields within a particle type should # have the same length? chunk_results = pd.DataFrame(meta_dict) # each particle type could be a different dataframe... for field_r, vals in self._read_particle_fields([chunk], ptf, selector): chunk_results[field_r[1]] = vals return chunk_results ``` We can see this function just calls our normal `self._read_particle_fields` and pulls out the field values as usual. We are storing the fields in columns of our single chunk and single particle type in a pandas dataframe, `chunk_results`. So for a single particle type, we build our delayed dataframe ```python delayed_chunks = [ dask.delayed(self._read_single_ptype)( ch, this_ptf, selector, ptype_meta[ptype] ) for ch in chunks ] delayed_dfs[ptype] = ddf.from_delayed(delayed_chunks, meta=ptype_meta[ptype]) ``` The `delayed_dfs` object is a dictionary with a delayed dataframe for each particle type. The reason we're organizing by particle type is the issue that different particle types may have a different number of records in a given chunk (otherwise we'd have to store a large number of null values). In order to return the expected in-memory dict that `_read_particle_selection()` should return, we can very easily pull all the records from across our chunks and particle typeswith: ```python rv = {} for ptype in ptypes: for col in delayed_dfs[ptype].columns: rv[(ptype, col)] = delayed_dfs[ptype][col].values.compute() ``` So this is a fairly promising approach, particularly since the dataframes do not need to know the expected array size. And it does indeed work to read data from our particle front ends, with some modifications to the `ParticleContainer` class. The notebook [here](https://nbviewer.jupyter.org/github/data-exp-lab/yt-dask-experiments/blob/master/dask_chunking/native_gadget_read.ipynb) shows the above approach in action, with notes on the required changes to the `ParticleContainer` class. To load actually execute in parallel, the only requirement is to spin up a dask `Client`: ```python from dask.distributed import Client import yt c = Client(threads_per_worker=3,n_workers=4) ds = yt.load_sample("snapshot_033") ad = ds.all_data() d = ad[('PartType0','Density') ``` The `dask.delayed` and `dask.compute` will find and connect to the `Client`. If no client is present, the Task Graph will be executed sequentially. Here's a snapshot of the Dask Dashboard's Task Stream during a parallel particle read: ![a daskified *yt* particle read](https://i.imgur.com/rqmfifn.png) ## 2. derived quantities Calcluation of derived quantities in *yt* also uses the `chunk` iterator to return intermediate by-chunk results that are then aggregated. The base `DerivedQuantity` object's `__call__` function is where the iteration occurs: ```python def __call__(self, *args, **kwargs): """Calculate results for the derived quantity""" # create the index if it doesn't exist yet self.data_source.ds.index self.count_values(*args, **kwargs) chunks = self.data_source.chunks([], chunking_style="io") storage = {} for sto, ds in parallel_objects(chunks, -1, storage=storage): sto.result = self.process_chunk(ds, *args, **kwargs) # Now storage will have everything, and will be done via pickling, so # the units will be preserved. (Credit to Nathan for this # idea/implementation.) values = [[] for i in range(self.num_vals)] for key in sorted(storage): for i in range(self.num_vals): values[i].append(storage[key][i]) # These will be YTArrays values = [self.data_source.ds.arr(values[i]) for i in range(self.num_vals)] values = self.reduce_intermediate(values) return values ``` In principle, if a dask dataframe were to be returned by the chunk IO, then we could completely remove the consideration of intermediate values and reduction and simply use pandas-like operations to return values. More specifically, we could use [dataframe aggregation](https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate) directly, which specifies a by-chunk operation, a by-chunk aggregation operation and a chunk-aggregation operation. From the dask documentations, an example of manually computing a mean value for a dask dataframe, `df`, is ```python custom_mean = dd.Aggregation( 'custom_mean', lambda s: (s.count(), s.sum()), lambda count, sum: (count.sum(), sum.sum()), lambda count, sum: sum / count, ) df.groupby('g').agg(custom_mean) ``` So if we return dask dataframes from the IO, we could replace the derived quantities with custom `Aggregation` operations. We are currently working on a proof-of-concept prototype demonstrating this in action. ## Working with unyt *yt* 4.0 forward uses `unyt` to track and convert units -- the base `YTArray` class is actually a model in `unyt`. If we are using `dask` for IO, there are situations where it may be advantageous to hold off computing the `dask` graph. For example, we may want to return a `dask` array to the user so that they can construct their own computations in parallel. This requires, however, some level of `dask` support in `unyt`. In the notebook, [working with unyt and dask](https://github.com/data-exp-lab/yt-dask-experiments/blob/master/unyt_dask/unyt_from_dask.ipynb), we demonstrate a limited prototype of a `dask`-`unyt` array. In this notebook, we create a custom dask collection by sublcassing the primary `dask.array` class and adding some `unyt` functionality. This custom class is handled automatically by the `dask` scheduler, so that if we have a large dask array with a dask client running and we create our new `dask`-`unyt` array, e.g.: ```python import dask.array as da from dask.distributed import Client client = Client(threads_per_worker=2, n_workers=2) x = da.random.random((10000, 10000), chunks=(1000, 1000)) x_da = unyt_from_dask(x, unyt.m) ``` when we do operations like finding the minimum value across all the chunks: ``` x_da.min() ``` We are returned a standard `unyt_array` ``` unyt_array(3.03822589e-09, 'm') ``` that was calculated by processing each chunk of the array separately, as seen here in the Task Stream of the dask dashboard: ![unyt_dask_stream](https://i.imgur.com/U1l2CcQ.png) This notebook demonstrates a general and fairly straightforward way to build in dask support to `unyt` which can be used in conjuction with, for example, the prototype dask-enabled particle reader to return arrays with dask functionality preserved. ## some final thoughts In the above discussion, we've focused primarily on how to use dask within the *yt* chunking infrastructure. But it is also worth considering whether we can replace or simplify the chunking itself using dask. In the case of reading particle data, *yt* is primarily looping over datafiles, so it may be possible to read directly into a dask dataframe without the chunking architecture. This remains to be explored...

Import from clipboard

Paste your markdown or webpage here...

Advanced permission required

Your current role can only read. Ask the system administrator to acquire write and comment permission.

This team is disabled

Sorry, this team is disabled. You can't edit this note.

This note is locked

Sorry, only owner can edit this note.

Reach the limit

Sorry, you've reached the max length this note can be.
Please reduce the content or divide it to more notes, thank you!

Import from Gist

Import from Snippet

or

Export to Snippet

Are you sure?

Do you really want to delete this note?
All users will lose their connection.

Create a note from template

Create a note from template

Oops...
This template has been removed or transferred.
Upgrade
All
  • All
  • Team
No template.

Create a template

Upgrade

Delete template

Do you really want to delete this template?
Turn this template into a regular note and keep its content, versions, and comments.

This page need refresh

You have an incompatible client version.
Refresh to update.
New version available!
See releases notes here
Refresh to enjoy new features.
Your user state has changed.
Refresh to load new user state.

Sign in

Forgot password

or

By clicking below, you agree to our terms of service.

Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
Wallet ( )
Connect another wallet

New to HackMD? Sign up

Help

  • English
  • 中文
  • Français
  • Deutsch
  • 日本語
  • Español
  • Català
  • Ελληνικά
  • Português
  • italiano
  • Türkçe
  • Русский
  • Nederlands
  • hrvatski jezik
  • język polski
  • Українська
  • हिन्दी
  • svenska
  • Esperanto
  • dansk

Documents

Help & Tutorial

How to use Book mode

Slide Example

API Docs

Edit in VSCode

Install browser extension

Contacts

Feedback

Discord

Send us email

Resources

Releases

Pricing

Blog

Policy

Terms

Privacy

Cheatsheet

Syntax Example Reference
# Header Header 基本排版
- Unordered List
  • Unordered List
1. Ordered List
  1. Ordered List
- [ ] Todo List
  • Todo List
> Blockquote
Blockquote
**Bold font** Bold font
*Italics font* Italics font
~~Strikethrough~~ Strikethrough
19^th^ 19th
H~2~O H2O
++Inserted text++ Inserted text
==Marked text== Marked text
[link text](https:// "title") Link
![image alt](https:// "title") Image
`Code` Code 在筆記中貼入程式碼
```javascript
var i = 0;
```
var i = 0;
:smile: :smile: Emoji list
{%youtube youtube_id %} Externals
$L^aT_eX$ LaTeX
:::info
This is a alert area.
:::

This is a alert area.

Versions and GitHub Sync
Get Full History Access

  • Edit version name
  • Delete

revision author avatar     named on  

More Less

Note content is identical to the latest version.
Compare
    Choose a version
    No search result
    Version not found
Sign in to link this note to GitHub
Learn more
This note is not linked with GitHub
 

Feedback

Submission failed, please try again

Thanks for your support.

On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

Please give us some advice and help us improve HackMD.

 

Thanks for your feedback

Remove version name

Do you want to remove this version name and description?

Transfer ownership

Transfer to
    Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

      Link with GitHub

      Please authorize HackMD on GitHub
      • Please sign in to GitHub and install the HackMD app on your GitHub repo.
      • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
      Learn more  Sign in to GitHub

      Push the note to GitHub Push to GitHub Pull a file from GitHub

        Authorize again
       

      Choose which file to push to

      Select repo
      Refresh Authorize more repos
      Select branch
      Select file
      Select branch
      Choose version(s) to push
      • Save a new version and push
      • Choose from existing versions
      Include title and tags
      Available push count

      Pull from GitHub

       
      File from GitHub
      File from HackMD

      GitHub Link Settings

      File linked

      Linked by
      File path
      Last synced branch
      Available push count

      Danger Zone

      Unlink
      You will no longer receive notification when GitHub file changes after unlink.

      Syncing

      Push failed

      Push successfully