owned this note
owned this note
Published
Linked with GitHub
# Extending DGL Graph Sampling Pipeline
Recall that the DGL DataLoaders' signature is as follows:
```python=
class DataLoader:
def __init__(self, graph, indices, graph_sampler, **kwargs):
pass
```
The most common usage is
* `graph` being a `DGLGraph` object.
* `indices` being a single Tensor for homogeneous graphs, or a dictionary of Tensors for heterogeneous graphs.
* `graph_sampler` being one of the samplers defined in `dgl.dataloading` namespace.
However, `DataLoader` also supports `graph` that are not `DGLGraph` objects, as long as `graph` follows the convention in the section below.
:::info
**Note (won't go into the formal documentation):** The same also goes for `indices` argument, where it doesn't have to be a Tensor or a dict of Tensors. This probably doesn't matter for the case of cuGraph or distributed graphs so I'll table its documentation, although one of our interns (Peiqi) already utilized this feature.
:::
## Extending to non-`DGLGraph` objects
To use DGL `DataLoader` and certain features, including various neighborhood samplers, negative samplers, and feature prefetching, one would need to implement the appropriate methods with the signatures listed below.
### List of methods required by DGL samplers
The method signatures, including the types and semantics of the argument and the return types, are almost the same as those in `DGLGraph`. The DGLGraph-specific arguments are not included.
:::info
<font color=blue> Blue fonts </font> represent notes or issues that should be addressed; they will not go into the formal documentation.
:::
#### `NeighborSampler`
| Method/Attribute Name | Signature | Notes |
|:------------:|:----------:|:---------:|
|`sample_neighbors()`| `sample_neighbors(seed_nodes, fanout, edge_dir='in', prob=None, replace=False, exclude_edges=None, output_device=None) -> DGLGraph` | Arguments, returned types have identical meaning as [here](https://docs.dgl.ai/generated/dgl.sampling.sample_neighbors.html#dgl.sampling.sample_neighbors). Note that the returned DGLGraph should also have `dgl.EID` edge data storing the IDs of sampled edges as described in DGL's documentation. |
#### `as_edge_prediction_sampler` (`negative_sampler` is None, for edge classification)
| Method/Attribute Name | Signature | Notes |
|:------------:|:----------:|:---------:|
|`to_canonical_etype()` | `to_canonical_etype(etype)` | Converts an edge type (string) to a canonical edge type (triplet of strings). <font color=blue> This is because users will either feed in dictionary like `{'user-likes-item': ...}` or dictionaries like `{('user', 'user-likes-item', 'item'): ...}` and DGL has to support both cases. Better to avoid having people implementing this but I'm not sure how to achieve that. </font> |
| `edge_subgraph()` | `edge_subgraph(seed_edges, relabel_nodes=False, output_device=None) -> DGLGraph` | Only `relabel_nodes=False` is used. You don't have to support `relabel_nodes=True`. <font color=blue> Unnecessary if we are to refactor edge prediction such that the DataLoader returns triplets instead of a subgraph. </font> |
#### `as_edge_prediction_sampler` (`negative_sampler` is not None, for link prediction)
| Method/Attribute Name | Signature | Notes |
|:------------:|:----------:|:---------:|
|`to_canonical_etype()` | `to_canonical_etype(etype)` | Converts an edge type (string) to a canonical edge type (triplet of strings). <font color=blue> This is because users will either feed in dictionary like `{'user-likes-item': ...}` or dictionaries like `{('user', 'user-likes-item', 'item'): ...}` and DGL has to support both cases. Better to avoid having people implementing this but I'm not sure how to achieve that. </font> |
| `edge_subgraph()` | `edge_subgraph(seed_edges, relabel_nodes=False, output_device=None) -> DGLGraph` | Only `relabel_nodes=False` is used. You don't have to support `relabel_nodes=True`. <font color=blue> If we are to refactor edge prediction such that the DataLoader returns triplets instead of a subgraph, then this interface is not necessary. </font> |
| `num_nodes()` | `num_nodes(ntype=None) -> int` | Number of nodes of a given type. <font color=blue> Used in negative graph construction. Unnecessary if we are to refactor link prediction such that the DataLoader returns pairs instead of a subgraph. </font> |
| `ntypes` | `list[str]` | The list of node types. <font color=blue> Used in negative graph construction together with `num_nodes()` </font> |
| `canonical_etypes` | `list[tuple[str, str, str]]` | List of canonical edge types. <font color=blue> Used to ensure that when the graph has more than one edge type, a dictionary must be returned by the (maybe-user-defined) negative sampler. </font> |
#### Negative Sampler `dgl.dataloading.negative_sampler.Uniform`
| Method/Attribute Name | Signature | Notes |
|:------------:|:----------:|:---------:|
| `num_nodes()` | `num_nodes(ntype=None) -> int` | Number of nodes of a given type. |
### Feature prefetching support
Samplers in DGL has several arguments prefixed with `prefetch_`. For instance, `prefetch_node_feats` and `prefetch_labels` in `NeighborSampler` specifies the feature data that should be set in the first MFG returned from the `DataLoader`, as in the GraphSAGE example:
```python=
sampler = dgl.dataloading.NeighborSampler(
[15, 10, 5], prefetch_node_feats=['feat'], prefetch_labels=['label'])
train_dataloader = dgl.dataloading.DataLoader(
graph, train_idx, sampler, device=device, batch_size=1024, shuffle=True,
drop_last=False, num_workers=0)
# ...
for it, (input_nodes, output_nodes, blocks) in enumerate(train_dataloader):
x = blocks[0].srcdata['feat']
y = blocks[-1].dstdata['label']
y_hat = model(blocks, x)
loss = F.cross_entropy(y_hat, y)
opt.zero_grad()
loss.backward()
opt.step()
```
Note that in this case, `blocks[0].srcdata['feat']` and `blocks[-1].dstdata['label']` already contains the node features and label fetched from the original graph:
```python=
assert torch.equal(
blocks[0].srcdata['feat'],
graph.ndata['feat'][blocks[0].srcdata[dgl.NID]])
assert torch.equal(
blocks[-1].dstdata['label'],
graph.ndata['label'][blocks[0].dstdata[dgl.NID]])
```
If one wishes to utilize feature prefetching for your custom graph object, you will need to implement the method `get_node_storage()` and `get_edge_storage()` for node and edge data respectively.
```python=
class CustomGraph(object):
def get_node_storage(self, key, ntype) -> dgl.storages.FeatureStorage:
pass
def get_edge_storage(self, key, canonical_etype) -> dgl.storages.FeatureStorage:
pass
```
Your `get_node_storage(key, ntype)` should return an object inheriting `dgl.storages.FeatureStorage` representing the node data storage for node type `ntype` and key `key`. Your subclass should implement the `fetch` method that could return either of the following:
* The node data with ID given in `indices`, as a Tensor on the given `device`.
* Any object that has a `wait()` method, which returns such a Tensor. In this case, DGL DataLoader will prefetch features asynchronously, and will not block until the next batch is requested. Note that feature prefetching will still be synchronous if `use_prefetch_thread` is set to False.
`get_edge_storage` is exactly the same as `get_node_storage` except that you should return edge data of the given type with the given key instead.
#### Example: numpy `memmap` arrays
The following uses `numpy.memmap` arrays as example, where the data is stored on disk. First, you need to implement a `MemmapStorage` class with a `fetch()` function that retrieves the data at given indices as a Tensor.
```python=
class MemmapStorage(dgl.storages.FeatureStorage):
def __init__(self, memmap_array):
self.memmap_array = memmap_array
def fetch(self, indices, device, pin_memory=False, **kwargs):
indices_np = indices.cpu().numpy()
return torch.from_numpy(self.memmap_array[indices_np]).to(device)
```
Then you will need to implement the `get_node_storage` method of your custom graph class. Assume that you already have a two-level dictionary for node data, with the node types as outer keys and feature names as inner keys:
```python=
graph.node_data = {
'node_type1': {
'x': numpy.memmap(...)},
'node_type2': {
'y': numpy.memmap(...)}}
```
You can implement `get_node_storage` like this:
```python=
class CustomGraph(object):
def get_node_storage(self, key, ntype):
return MemmapStorage(self.node_data[ntype][key])
```
#### Example: numpy `memmap` arrays but asynchronous
Since disk IO is usually slow, one would like to have asynchronous feature prefetching. DGL provides a `dgl.storages.ThreadedFuture` class that wraps a synchronous but time-consuming operation into a Python-thread-powered asynchronous operation with a `wait()` method.
```python=
class MemmapStorage(dgl.storages.FeatureStorage):
def __init__(self, memmap_array):
self.memmap_array = memmap_array
def _fetch(self, indices, device, pin_memory=False, **kwargs):
indices_np = indices.cpu().numpy()
return torch.from_numpy(self.memmap_array[indices_np]).to(device)
def fetch(self, indices, device, pin_memory=False, **kwargs):
return dgl.storages.ThreadedFuture(
target=self._fetch, args=(indices, device, pin_memory))
```