# 22 Sept 2022 ML Talk ## Ray Pytorch Boilerplate ``` python= import torch import torch.nn as nn import ray from ray import train from ray.air import session, Checkpoint from ray.train.torch import TorchTrainer from ray.air.config import ScalingConfig input_size = 1 layer_size = 15 output_size = 1 num_epochs = 3 # A simple neural network class NeuralNetwork(nn.Module): def __init__(self): super(NeuralNetwork, self).__init__() self.layer1 = nn.Linear(input_size, layer_size) self.relu = nn.ReLU() self.layer2 = nn.Linear(layer_size, output_size) def forward(self, input): return self.layer2(self.relu(self.layer1(input))) # Each worker has by default 1 CPU and optionally 1 GPU assigned def train_loop_per_worker(): dataset_shard = session.get_dataset_shard("train") model = NeuralNetwork() loss_fn = nn.MSELoss() optimizer = torch.optim.SGD(model.parameters(), lr=0.1) model = train.torch.prepare_model(model) for epoch in range(num_epochs): for batches in dataset_shard.iter_torch_batches( batch_size=32, dtypes=torch.float ): inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"] output = model(inputs) loss = loss_fn(output, labels) optimizer.zero_grad() loss.backward() optimizer.step() print(f"epoch: {epoch}, loss: {loss.item()}") session.report( {}, checkpoint=Checkpoint.from_dict( dict(epoch=epoch, model=model.state_dict()) ), ) # use Ray dataset train_dataset = ray.data.from_items( [{"x": x, "y": 2 * x + 1} for x in range(200)] ) # configurate compute resources scaling_config = ScalingConfig(num_workers=3) # User the Trainer class for your framework of choice trainer = TorchTrainer( train_loop_per_worker=train_loop_per_worker, scaling_config=scaling_config, datasets={"train": train_dataset}) result = trainer.fit() ``` ## Compute Resources Configuration ```python= @dataclass @PublicAPI(stability="beta") class ScalingConfig: """Configuration for scaling training. Args: trainer_resources: Resources to allocate for the trainer. If None is provided, will default to 1 CPU. num_workers: The number of workers (Ray actors) to launch. Each worker will reserve 1 CPU by default. The number of CPUs reserved by each worker can be overridden with the ``resources_per_worker`` argument. use_gpu: If True, training will be done on GPUs (1 per worker). Defaults to False. The number of GPUs reserved by each worker can be overridden with the ``resources_per_worker`` argument. resources_per_worker: If specified, the resources defined in this Dict will be reserved for each worker. The ``CPU`` and ``GPU`` keys (case-sensitive) can be defined to override the number of CPU/GPUs used by each worker. placement_strategy: The placement strategy to use for the placement group of the Ray actors. See :ref:`Placement Group Strategies <pgroup-strategy>` for the possible options. _max_cpu_fraction_per_node: (Experimental) The max fraction of CPUs per node that Train will use for scheduling training actors. The remaining CPUs can be used for dataset tasks. It is highly recommended that you set this to less than 1.0 (e.g., 0.8) when passing datasets to trainers, to avoid hangs / CPU starvation of dataset tasks. Warning: this feature is experimental and is not recommended for use with autoscaling (scale-up will not trigger properly). """ trainer_resources: Optional[Union[Dict, SampleRange]] = None num_workers: Optional[Union[int, SampleRange]] = None use_gpu: Union[bool, SampleRange] = False resources_per_worker: Optional[Union[Dict, SampleRange]] = None placement_strategy: Union[str, SampleRange] = "PACK" _max_cpu_fraction_per_node: Optional[Union[float, SampleRange]] = None def __post_init__(self): if self.resources_per_worker: if not self.use_gpu and self.num_gpus_per_worker > 0: raise ValueError( "`use_gpu` is False but `GPU` was found in " "`resources_per_worker`. Either set `use_gpu` to True or " "remove `GPU` from `resources_per_worker." ) if self.use_gpu and self.num_gpus_per_worker == 0: raise ValueError( "`use_gpu` is True but `GPU` is set to 0 in " "`resources_per_worker`. Either set `use_gpu` to False or " "request a positive number of `GPU` in " "`resources_per_worker." ) def __repr__(self): return _repr_dataclass(self) def __eq__(self, o: "ScalingConfig") -> bool: if not isinstance(o, type(self)): return False return self.as_placement_group_factory() == o.as_placement_group_factory() @property def _resources_per_worker_not_none(self): if self.resources_per_worker is None: if self.use_gpu: # Note that we don't request any CPUs, which avoids possible # scheduling contention. Generally nodes have many more CPUs than # GPUs, so not requesting a CPU does not lead to oversubscription. return {"GPU": 1} else: return {"CPU": 1} resources_per_worker = { k: v for k, v in self.resources_per_worker.items() if v != 0 } resources_per_worker.setdefault("GPU", int(self.use_gpu)) return resources_per_worker @property def _trainer_resources_not_none(self): if self.trainer_resources is None: return {"CPU": 1} return {k: v for k, v in self.trainer_resources.items() if v != 0} @property def total_resources(self): """Map of total resources required for the trainer.""" total_resource_map = defaultdict(float, self._trainer_resources_not_none) num_workers = self.num_workers or 0 for k, value in self._resources_per_worker_not_none.items(): total_resource_map[k] += value * num_workers return dict(total_resource_map) @property def num_cpus_per_worker(self): """The number of CPUs to set per worker.""" return self._resources_per_worker_not_none.get("CPU", 0) @property def num_gpus_per_worker(self): """The number of GPUs to set per worker.""" return self._resources_per_worker_not_none.get("GPU", 0) @property def additional_resources_per_worker(self): """Resources per worker, not including CPU or GPU resources.""" return { k: v for k, v in self._resources_per_worker_not_none.items() if k not in ["CPU", "GPU"] } [docs] def as_placement_group_factory(self) -> "PlacementGroupFactory": """Returns a PlacementGroupFactory to specify resources for Tune.""" from ray.tune.execution.placement_groups import PlacementGroupFactory trainer_resources = self._trainer_resources_not_none trainer_bundle = [trainer_resources] worker_resources = { "CPU": self.num_cpus_per_worker, "GPU": self.num_gpus_per_worker, } worker_resources_extra = ( {} if self.resources_per_worker is None else self.resources_per_worker ) worker_bundles = [ {**worker_resources, **worker_resources_extra} for _ in range(self.num_workers if self.num_workers else 0) ] bundles = trainer_bundle + worker_bundles if self._max_cpu_fraction_per_node is not None: kwargs = { "_max_cpu_fraction_per_node": self._max_cpu_fraction_per_node, } else: kwargs = {} return PlacementGroupFactory( bundles, strategy=self.placement_strategy, **kwargs ) [docs] @classmethod def from_placement_group_factory( cls, pgf: "PlacementGroupFactory" ) -> "ScalingConfig": """Create a ScalingConfig from a Tune's PlacementGroupFactory""" if pgf.head_bundle_is_empty: trainer_resources = {} worker_bundles = pgf.bundles else: trainer_resources = pgf.bundles[0] worker_bundles = pgf.bundles[1:] use_gpu = False placement_strategy = pgf.strategy resources_per_worker = None num_workers = None max_cpu_fraction_per_node = None if worker_bundles: first_bundle = worker_bundles[0] if not all(bundle == first_bundle for bundle in worker_bundles[1:]): raise ValueError( "All worker bundles (any other than the first one) " "must be equal to each other." ) use_gpu = bool(first_bundle.get("GPU")) num_workers = len(worker_bundles) resources_per_worker = first_bundle if "_max_cpu_fraction_per_node" in pgf._kwargs: max_cpu_fraction_per_node = pgf._kwargs["_max_cpu_fraction_per_node"] return ScalingConfig( trainer_resources=trainer_resources, num_workers=num_workers, use_gpu=use_gpu, resources_per_worker=resources_per_worker, placement_strategy=placement_strategy, _max_cpu_fraction_per_node=max_cpu_fraction_per_node, ) ``` ## Serve A Composition of Model ``` python= import requests from ray import serve from ray.serve.drivers import DAGDriver from ray.serve.dag import InputNode from ray.serve.http_adapters import json_request # 1. Define the models in our composition graph @serve.deployment class Adder: def __init__(self, increment: int): self.increment = increment def predict(self, inp: int): return self.increment + inp @serve.deployment def combine_average(*input_values) -> float: return {"result": sum(input_values) / len(input_values)} # 2: Define the model composition graph and call it. with InputNode() as input_node: adder_1 = Adder.bind(increment=1) adder_2 = Adder.bind(increment=2) dag = combine_average.bind( adder_1.predict.bind(input_node), adder_2.predict.bind(input_node) ) serve.run(DAGDriver.bind(dag, http_adapter=json_request)) # 3: Query the deployment and print the result. print(requests.post("http://localhost:8000/", json=100).json()) # {"result": 101.5} ```