# 22 Sept 2022 ML Talk
## Ray Pytorch Boilerplate
``` python=
import torch
import torch.nn as nn
import ray
from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3
# A simple neural network
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.layer1 = nn.Linear(input_size, layer_size)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(layer_size, output_size)
def forward(self, input):
return self.layer2(self.relu(self.layer1(input)))
# Each worker has by default 1 CPU and optionally 1 GPU assigned
def train_loop_per_worker():
dataset_shard = session.get_dataset_shard("train")
model = NeuralNetwork()
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
model = train.torch.prepare_model(model)
for epoch in range(num_epochs):
for batches in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
):
inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
output = model(inputs)
loss = loss_fn(output, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")
session.report(
{},
checkpoint=Checkpoint.from_dict(
dict(epoch=epoch, model=model.state_dict())
),
)
# use Ray dataset
train_dataset = ray.data.from_items(
[{"x": x, "y": 2 * x + 1} for x in range(200)]
)
# configurate compute resources
scaling_config = ScalingConfig(num_workers=3)
# User the Trainer class for your framework of choice
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=scaling_config,
datasets={"train": train_dataset})
result = trainer.fit()
```
## Compute Resources Configuration
```python=
@dataclass
@PublicAPI(stability="beta")
class ScalingConfig:
"""Configuration for scaling training.
Args:
trainer_resources: Resources to allocate for the trainer. If None is provided,
will default to 1 CPU.
num_workers: The number of workers (Ray actors) to launch.
Each worker will reserve 1 CPU by default. The number of CPUs
reserved by each worker can be overridden with the
``resources_per_worker`` argument.
use_gpu: If True, training will be done on GPUs (1 per worker).
Defaults to False. The number of GPUs reserved by each
worker can be overridden with the ``resources_per_worker``
argument.
resources_per_worker: If specified, the resources
defined in this Dict will be reserved for each worker. The
``CPU`` and ``GPU`` keys (case-sensitive) can be defined to
override the number of CPU/GPUs used by each worker.
placement_strategy: The placement strategy to use for the
placement group of the Ray actors. See :ref:`Placement Group
Strategies <pgroup-strategy>` for the possible options.
_max_cpu_fraction_per_node: (Experimental) The max fraction of CPUs per node
that Train will use for scheduling training actors. The remaining CPUs
can be used for dataset tasks. It is highly recommended that you set this
to less than 1.0 (e.g., 0.8) when passing datasets to trainers, to avoid
hangs / CPU starvation of dataset tasks. Warning: this feature is
experimental and is not recommended for use with autoscaling (scale-up will
not trigger properly).
"""
trainer_resources: Optional[Union[Dict, SampleRange]] = None
num_workers: Optional[Union[int, SampleRange]] = None
use_gpu: Union[bool, SampleRange] = False
resources_per_worker: Optional[Union[Dict, SampleRange]] = None
placement_strategy: Union[str, SampleRange] = "PACK"
_max_cpu_fraction_per_node: Optional[Union[float, SampleRange]] = None
def __post_init__(self):
if self.resources_per_worker:
if not self.use_gpu and self.num_gpus_per_worker > 0:
raise ValueError(
"`use_gpu` is False but `GPU` was found in "
"`resources_per_worker`. Either set `use_gpu` to True or "
"remove `GPU` from `resources_per_worker."
)
if self.use_gpu and self.num_gpus_per_worker == 0:
raise ValueError(
"`use_gpu` is True but `GPU` is set to 0 in "
"`resources_per_worker`. Either set `use_gpu` to False or "
"request a positive number of `GPU` in "
"`resources_per_worker."
)
def __repr__(self):
return _repr_dataclass(self)
def __eq__(self, o: "ScalingConfig") -> bool:
if not isinstance(o, type(self)):
return False
return self.as_placement_group_factory() == o.as_placement_group_factory()
@property
def _resources_per_worker_not_none(self):
if self.resources_per_worker is None:
if self.use_gpu:
# Note that we don't request any CPUs, which avoids possible
# scheduling contention. Generally nodes have many more CPUs than
# GPUs, so not requesting a CPU does not lead to oversubscription.
return {"GPU": 1}
else:
return {"CPU": 1}
resources_per_worker = {
k: v for k, v in self.resources_per_worker.items() if v != 0
}
resources_per_worker.setdefault("GPU", int(self.use_gpu))
return resources_per_worker
@property
def _trainer_resources_not_none(self):
if self.trainer_resources is None:
return {"CPU": 1}
return {k: v for k, v in self.trainer_resources.items() if v != 0}
@property
def total_resources(self):
"""Map of total resources required for the trainer."""
total_resource_map = defaultdict(float, self._trainer_resources_not_none)
num_workers = self.num_workers or 0
for k, value in self._resources_per_worker_not_none.items():
total_resource_map[k] += value * num_workers
return dict(total_resource_map)
@property
def num_cpus_per_worker(self):
"""The number of CPUs to set per worker."""
return self._resources_per_worker_not_none.get("CPU", 0)
@property
def num_gpus_per_worker(self):
"""The number of GPUs to set per worker."""
return self._resources_per_worker_not_none.get("GPU", 0)
@property
def additional_resources_per_worker(self):
"""Resources per worker, not including CPU or GPU resources."""
return {
k: v
for k, v in self._resources_per_worker_not_none.items()
if k not in ["CPU", "GPU"]
}
[docs] def as_placement_group_factory(self) -> "PlacementGroupFactory":
"""Returns a PlacementGroupFactory to specify resources for Tune."""
from ray.tune.execution.placement_groups import PlacementGroupFactory
trainer_resources = self._trainer_resources_not_none
trainer_bundle = [trainer_resources]
worker_resources = {
"CPU": self.num_cpus_per_worker,
"GPU": self.num_gpus_per_worker,
}
worker_resources_extra = (
{} if self.resources_per_worker is None else self.resources_per_worker
)
worker_bundles = [
{**worker_resources, **worker_resources_extra}
for _ in range(self.num_workers if self.num_workers else 0)
]
bundles = trainer_bundle + worker_bundles
if self._max_cpu_fraction_per_node is not None:
kwargs = {
"_max_cpu_fraction_per_node": self._max_cpu_fraction_per_node,
}
else:
kwargs = {}
return PlacementGroupFactory(
bundles, strategy=self.placement_strategy, **kwargs
)
[docs] @classmethod
def from_placement_group_factory(
cls, pgf: "PlacementGroupFactory"
) -> "ScalingConfig":
"""Create a ScalingConfig from a Tune's PlacementGroupFactory"""
if pgf.head_bundle_is_empty:
trainer_resources = {}
worker_bundles = pgf.bundles
else:
trainer_resources = pgf.bundles[0]
worker_bundles = pgf.bundles[1:]
use_gpu = False
placement_strategy = pgf.strategy
resources_per_worker = None
num_workers = None
max_cpu_fraction_per_node = None
if worker_bundles:
first_bundle = worker_bundles[0]
if not all(bundle == first_bundle for bundle in worker_bundles[1:]):
raise ValueError(
"All worker bundles (any other than the first one) "
"must be equal to each other."
)
use_gpu = bool(first_bundle.get("GPU"))
num_workers = len(worker_bundles)
resources_per_worker = first_bundle
if "_max_cpu_fraction_per_node" in pgf._kwargs:
max_cpu_fraction_per_node = pgf._kwargs["_max_cpu_fraction_per_node"]
return ScalingConfig(
trainer_resources=trainer_resources,
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker=resources_per_worker,
placement_strategy=placement_strategy,
_max_cpu_fraction_per_node=max_cpu_fraction_per_node,
)
```
## Serve A Composition of Model
``` python=
import requests
from ray import serve
from ray.serve.drivers import DAGDriver
from ray.serve.dag import InputNode
from ray.serve.http_adapters import json_request
# 1. Define the models in our composition graph
@serve.deployment
class Adder:
def __init__(self, increment: int):
self.increment = increment
def predict(self, inp: int):
return self.increment + inp
@serve.deployment
def combine_average(*input_values) -> float:
return {"result": sum(input_values) / len(input_values)}
# 2: Define the model composition graph and call it.
with InputNode() as input_node:
adder_1 = Adder.bind(increment=1)
adder_2 = Adder.bind(increment=2)
dag = combine_average.bind(
adder_1.predict.bind(input_node), adder_2.predict.bind(input_node)
)
serve.run(DAGDriver.bind(dag, http_adapter=json_request))
# 3: Query the deployment and print the result.
print(requests.post("http://localhost:8000/", json=100).json())
# {"result": 101.5}
```