# API Design
:point_right: **Single-process and Distributed RL at the tame time**
### EnvPool: Bached envrionments
:point_right: Supporting both sync and async envs.
:point_right: For popular envs, we can make effort to optimize it through C++ implementation.
:point_right: For non-popular envs, we just provide a python wraper to parallize through thread/multiprocess.
```python=3.8
Class SyncEnv:
def __init__(self, name_or_pybuilder, size, type):
"""
Args:
name_or_pybuilder: the name of the env or a python builder the of env
such that an env can be created by call it.
`env = pybuilder()`
size: how many envs in this pool
type: how the env is parallized, e.g.
1. python for loop
2. python thread
3. python multiprocess
4. c++ threadpool
"""
pass
def step(self, actions) -> TimeStep:
pass
Class AsyncEnv:
def __init__(self, name_or_pybuilder, size, wait_num, type):
"""
Args:
name_or_pybuilder: the name of the env or a python builder the of env
such that an env can be created by call it.
`env = pybuilder()`
size: how many envs in this pool
wait_num: how many envs to output at one call
type: how the env is parallized, e.g.
1. python multiprocess
2. c++ threadpool
"""
pass
def recv(self) -> Tuple[env_ids, TimeStep]:
pass
def send(self, env_ids, actions) -> None:
pass
```
### Agent Design
:point_right: To fulfill the requirements of different algorithms.
:point_right: Decoupled from optimization and interaction.
Note: Agent does not hold parameters, which are usually held by the learner.
Mainly follows Wukong's agent design, which I think is able to satisfy our requirements. For more details, go to https://git.garena.com/sail/gameai/wukong/-/blob/master/agents/base.py.
```python=3.8
AgentState # could be RNN state
LearnerState # contains optimizer state or meta variables
class Agent:
def __init__(self, cfg, obs_spec, action_spec, net_factory, policy_factor) -> None:
pass
def initial_state(self. batch_size) -> AgentState:
pass
def initial_params(self, rng_key) -> hk.Params:
pass
def actor_step(self, rng_key, params, timestep, state: AgentState
) -> Tuple[AgentOut, AgentState]:
pass
def initial_learnerstate(self, params) -> LearnerState:
pass
def learner_step(self, params, learner_state, batch
) -> Tuple[hk.Params, LearnerState, Logs]:
logs, grads = self.compute_gradients(params, batch)
params, opt_state, logs_app = self.apply_gradients(params, grads,
learner_state)
updated_learner_state = self.update_learnerstate(params, opt_state,
learner_state)
logs.update(logs_app)
return params, updated_learner_state, logs
```
### Actor Design
> linmin: I have an idea here about the run and step, how about step takes tensors as input and outputs tensors. While run takes exactly the same number of inputs and outputs, but in the form of queues, either grpc queue or local queue. For sync programs, step is used, for async programs, run is used.
:point_right:
```python=3.8
class Actor:
def __init__(self, env_builder, inferencer, replay_buffer, rank=None):
"""
Args:
env_builder: can genearte a local or remote env.
inferencer: can be a local inferencer or remote service
replay_buffer: can be local buffer or a remove one as long as
it supports replay_buffer.add(data)
"""
self._env = env_builder
self._inferencer = inferencer
if isinstance(replay_buffer, reverb.client):
self._replay_buffer = create_reverb_buffer(replay_buffer)
else:
self._replay_buffer = replay_buffer
self._is_sync = self._env.is_sync
if self._is_sync:
self._ts = self._env.reset()
self._ids = get_ids_from_rank(rank)
else:
self._ts, self._ids = self._env.recv()
def step(self) -> Optional[Logs]:
"""Called when actor is local"""
agent_out, agent_state = self._inferencer(self._ts)
self._replay_buffer.add(self._ids, [self._ts, agent_out, agent_state])
if self._is_sync:
self._ts = self._env.step(agent_out.action)
else:
self._env.send(self._ids, agent_out.action)
self._ts, self._ids = self._env.recv()
return logs
def run(self):
"""Called when actor is run as a service."""
logger = ...
while True:
self.step()
```
### Learner
```python=3.8
class Learner:
def __init__(self, cfg, agent, replay_buffer):
self._cfg = cfg
self._agent = agent
self._replay_buffer = replay_buffer
self._params = get_init_params(agent)
def _make_dataset(self, data_cfg) -> Dataset:
pass
def get_params(self):
return self._params
def step(self, params, learner_state, batch
) -> Tuple[hk.Params, LearnerState, Logs]:
pass
def run(self):
ds = self._make_dataset(self._cfg.data_cfg)
learner_state = self.agent.initial_learner_state()
for i in range(max_iters):
batch = next(ds)
self.step(self._params, learner_state, batch)
```
### Inferencer
```python=3.8
class Inferencer:
def __init__(self, cfg, agent, learner):
self._cfg = cfg
self._agent = agent
self._learner = learner
self._cnt = 0
self._params = self._learner.get_params()
def inference(self, env_ids, timestep) -> Tuple[AgentOut, AgentState]:
if self._cnt % self._cfg.sync_interval == 0:
self._params = self._learner.get_params()
self._agent.actor_step(params, timestep, ...)
```
### Replay Buffer
```python=3.8
class ReplayBuffer:
def add(self, env_ids, timestep):
pass
def sample(self):
pass
def make_dataset(self) -> Dataset:
pass
```
# Usage
### Single-process RL
```python=3.8
agent = Agent()
replay_buffer = ReplayBuffer()
learner = Learner(learner_cfg, agent, replay_buffer)
inferencer = Inferencer(inferencer_cfg, agent, learner)
actor = Actor(env_builder, inferencer, replay_buffer)
for i in range(max_iters):
for _ in range(env_steps):
actor.step()
batch = replay_buffer.sample()
learner.step(batch)
```
Depends on `env_builder`, it can have various implementations.
- `env_builder`== single env
- `env_builder`== process/thread parallized env
### Single-machine distributed RL: Heavy Env
:point_right: Inferencers are local to Actors
```python=3.8
agent = Agent()
replay_buffer = SharedMemReplayBuffer()
learner = lp.CourierNode(Learner, learner_cfg, agent, replay_buffer)
for _ in range(num_actors):
inferencer = Inferencer(inferencer_cfg, agent, learner)
actor = lp.CourierNode(Actor, env_builder, inferencer, replay_buffer)
```
### Single-machine distributed RL: Light Env
:point_right: Inferencers are stand-alone services
```python=3.8
agent = Agent()
replay_buffer = SharedMemReplayBuffer()
learner = lp.CourierNode(Learner, learner_cfg, agent, replay_buffer)
inferencer = lp.CourierNode(Inferencer, inferencer_cfg, agent, learner)
for _ in range(num_actors):
actor = lp.CourierNode(Actor, env_builder, inferencer, replay_buffer)
```
### Distributed RL
:point_right: ReplayBuffer should be a network service, e.g. reverb
```python=3.8
agent = Agent()
replay_buffer = lp.ReverbNode(ReverbReplayBuffer)
learner = lp.CourierNode(Learner, learner_cfg, agent, replay_buffer)
inferencer = lp.CourierNode(Inferencer, inferencer_cfg, agent, learner)
for _ in range(num_actors):
actor = lp.CourierNode(Actor, env_builder, inferencer, replay_buffer)
```