---
tags: decompiler
title: Ground truth study
---
# ALL FETs:



# Problem overview R17~29.
For python 3.9, we are going to tackle files with only a subset of instructions i.e., we show groundtruth of sample requiring only one of the following instructions changed.

Reference [here](https://hackmd.io/@aliahad97/HkDD5eVWq) for slightly more detail.
## Step 1.
Take a file from Application A.
One function is as follows:
```python=3
def _signature_get_partial(wrapped_sig, partial, extra_args=()):
"""Private helper to calculate how 'wrapped_sig' signature will
look like after applying a 'functools.partial' object (or alike)
on it.
"""
old_params = wrapped_sig.parameters
partial_args = partial.args or ()
partial_keywords = partial.keywords or {}
if extra_args:
partial_args = extra_args + partial_args
try:
ba = wrapped_sig.bind_partial(*partial_args, **partial_keywords)
except TypeError as ex:
msg = 'partial object {!r} has incorrect arguments'.format(partial)
raise ValueError(msg) from ex
transform_to_kwonly = False
for param_name, param in old_params.items():
FET_else = 0
try:
arg_value = ba.arguments[param_name]
FET_else = 1
except KeyError:
pass#FET_null()
if FET_else == 1:
if param.kind is _POSITIONAL_ONLY:
# If positional-only parameter is bound by partial,
# it effectively disappears from the signature
new_params.pop(param_name)
continue
if param.kind is _POSITIONAL_OR_KEYWORD:
if param_name in partial_keywords:
# This means that this parameter, and all parameters
# after it should be keyword-only (and var-positional
# should be removed). Here's why. Consider the following
# function:
# foo(a, b, *args, c):
# pass
#
# "partial(foo, a='spam')" will have the following
# signature: "(*, a='spam', b, c)". Because attempting
# to call that partial with "(10, 20)" arguments will
# raise a TypeError, saying that "a" argument received
# multiple values.
transform_to_kwonly = True
# Set the new default value
new_params[param_name] = param.replace(default=arg_value)
else:
# was passed as a positional argument
new_params.pop(param.name)
continue
if param.kind is _KEYWORD_ONLY:
# Set the new default value
new_params[param_name] = param.replace(default=arg_value)
if transform_to_kwonly:
assert param.kind is not _POSITIONAL_ONLY
if param.kind is _POSITIONAL_OR_KEYWORD:
new_param = new_params[param_name].replace(kind=_KEYWORD_ONLY)
new_params[param_name] = new_param
new_params.move_to_end(param_name)
elif param.kind in (_KEYWORD_ONLY, _VAR_KEYWORD):
new_params.move_to_end(param_name)
elif param.kind is _VAR_POSITIONAL:
new_params.pop(param.name)
return wrapped_sig.replace(parameters=new_params.values())
```
## Step 2.
Recompile the file in both python 3.8 and 3.9. And then disassemble both of them using the commands as follows.
```
python -m compileall -b main.py # python 3+
pydisasm main.py > main.dis
```
By running the command for Python 3.8 and 3.9 above, you will have two files:
1. `main.dis` for python 3.8
2. `main.dis` for python 3.9
## Step 3.
Compare the two in a diff tool to see the different instructions.
You should see specific lines with difference. In this example we see the following differences.
Diff 1 (line 17) (No rule):

Diff 2 (line 18) (R18):

Diff 3 (line 20) (R20):

Diff 4 (line 29) (R18):

Diff 5 (line 30) (R19):

Diff 6 (line 32) (R17):

Diff 7 (line 38) (R17):

Diff 8 (line 39) (R17):

Diff 9 (line 60) (R17):

Diff 10 (line 65) (R17 and R24):

Diff 11 (line 67) (R17):

Diff 12 (line 71) (R17):

Diff 13 (line 73) (R17):

This means that we have 13 errors. Though we will focus on only one at a time. From the above we can use the following rules looking at the diff:
- R17
- R18
- R19
- R20
- R24
## Step 4.
Trim the code so it has only one error. We trim each line so it has only one error as follows.
```python=3
def _signature_get_partial(wrapped_sig, partial, extra_args=()):
"""Private helper to calculate how 'wrapped_sig' signature will
look like after applying a 'functools.partial' object (or alike)
on it.
"""
old_params = wrapped_sig.parameters
partial_args = partial.args or ()
partial_keywords = partial.keywords or {}
if extra_args:
partial_args = extra_args + partial_args
# try:
# ba = wrapped_sig.bind_partial(*partial_args, **partial_keywords)
# except TypeError as ex:
# msg = 'partial object {!r} has incorrect arguments'.format(partial)
# raise ValueError(msg) from ex
transform_to_kwonly = False
for param_name, param in old_params.items():
FET_else = 0
try:
arg_value = ba.arguments[param_name]
FET_else = 1
except KeyError:
pass#FET_null()
if FET_else == 1:
if param.kind == _POSITIONAL_ONLY:
# If positional-only parameter is bound by partial,
# it effectively disappears from the signature
new_params.pop(param_name)
continue
if param.kind == _POSITIONAL_OR_KEYWORD:
if param_name == partial_keywords:
# This means that this parameter, and all parameters
# after it should be keyword-only (and var-positional
# should be removed). Here's why. Consider the following
# function:
# foo(a, b, *args, c):
# pass
#
# "partial(foo, a='spam')" will have the following
# signature: "(*, a='spam', b, c)". Because attempting
# to call that partial with "(10, 20)" arguments will
# raise a TypeError, saying that "a" argument received
# multiple values.
transform_to_kwonly = True
# Set the new default value
new_params[param_name] = param.replace(default=arg_value)
else:
# was passed as a positional argument
new_params.pop(param.name)
continue
if param.kind == _KEYWORD_ONLY:
# Set the new default value
new_params[param_name] = param.replace(default=arg_value)
if transform_to_kwonly:
# assert param.kind is not _POSITIONAL_ONLY
if param.kind == _POSITIONAL_OR_KEYWORD:
new_param = new_params[param_name].replace(kind=_KEYWORD_ONLY)
new_params[param_name] = new_param
new_params.move_to_end(param_name)
elif param.kind == (_KEYWORD_ONLY, _VAR_KEYWORD):
new_params.move_to_end(param_name)
elif param.kind == _VAR_POSITIONAL:
new_params.pop(param.name)
return wrapped_sig.replace(parameters=new_params.values())
```
Notice that now it will have only the following diff in code:

Notice that now we need only R16 for `RERAISE` and R18 for `JUMP_IFNOT_EXC_MATCH`.
## Step 5 - Saving.
Now decompile the original file to remove any implicit errors. Following file is after removing implicit errors.
```python=3
def _signature_get_partial(wrapped_sig, partial, extra_args=()):
"""Private helper to calculate how 'wrapped_sig' signature will
look like after applying a 'functools.partial' object (or alike)
on it.
"""
old_params = wrapped_sig.parameters
partial_args = partial.args or ()
partial_keywords = partial.keywords or {}
if extra_args:
partial_args = extra_args + partial_args
else:
partial_args = partial_args
# try:
# ba = wrapped_sig.bind_partial(*partial_args, **partial_keywords)
# except TypeError as ex:
# msg = 'partial object {!r} has incorrect arguments'.format(partial)
# raise ValueError(msg) from ex
transform_to_kwonly = False
param_name, param = old_params.items()
FET_else = 0
try:
arg_value = ba.arguments[param_name]
FET_else = 1
except KeyError:
pass#FET_null()
else:
FET_null()
if FET_else == 1:
if param.kind == _POSITIONAL_ONLY:
# If positional-only parameter is bound by partial,
# it effectively disappears from the signature
new_params.pop(param_name)
new_params.pop(param.name)
if param.kind == _POSITIONAL_OR_KEYWORD:
if param_name == partial_keywords:
# This means that this parameter, and all parameters
# after it should be keyword-only (and var-positional
# should be removed). Here's why. Consider the following
# function:
# foo(a, b, *args, c):
# pass
#
# "partial(foo, a='spam')" will have the following
# signature: "(*, a='spam', b, c)". Because attempting
# to call that partial with "(10, 20)" arguments will
# raise a TypeError, saying that "a" argument received
# multiple values.
transform_to_kwonly = True
# Set the new default value
new_params[param_name] = param.replace(default=arg_value)
else:
# was passed as a positional argument
new_params.pop(param.name)
new_params.pop(param.name)
if param.kind == _KEYWORD_ONLY:
# Set the new default value
new_params[param_name] = param.replace(default=arg_value)
new_params.pop(param.name)
if transform_to_kwonly:
# assert param.kind is not _POSITIONAL_ONLY
if param.kind == _POSITIONAL_OR_KEYWORD:
new_param = new_params[param_name].replace(kind=_KEYWORD_ONLY)
new_params[param_name] = new_param
new_params.move_to_end(param_name)
new_params.pop(param.name)
if param.kind == (_KEYWORD_ONLY, _VAR_KEYWORD):
new_params.move_to_end(param_name)
new_params.pop(param.name)
if param.kind == _VAR_POSITIONAL:
new_params.pop(param.name)
new_params.pop(param.name)
return wrapped_sig.replace(parameters=new_params.values())
```
Notice that I removed loops and added extra instructions to avoid any implicit errors since py3.8 has alot of them.
Next make sure the error is still there which I do by recompiling in the respective. I see the diff as follows.

I also made sure that no new errors were introduced.
## Step 6.
Save the two files:
1. File before decompilation that you generated (`orignal.py`) - Has no implicit errors and only target instructions
2. File after decompilation (`final.py`)
3. Create a text file labeling decompiler version and rules use. In this case it will be as follows.
```
Uncompyle6
R16
R18
```
## Step 7.
Repeat this process until you have for each rule in R17~29, atleast one file in each Application.
Notes:
- Try to make sure there are as little rules as possible used in the file
- Make sure there are no implicit errors or explicit errors. We want to showcase only the rule used
# Problem overview R1~R16.
We want to find original file with error and apply PyFET to it. The resultant file will then be fed to a decompiler and the result is preserved.
# Steps to carry out
## Step 1.
Choose an application (**A_i**) from top 10 application. List [here](https://docs.google.com/spreadsheets/d/1huxO_R_Si5m4iQv63Pmqr2hRkxKIIKhP7z6RYZ6p4k4/edit#gid=0).
## Step 2.
Choose an FET **F_i** (1~30). Refer the paper.


## Step 3.
Choose the decompiler that uses the corresponding FET chosen **F_i**.

## Step 4.
For each **A_i**, find function **f_i** in one of the files of **A_i** where you can use the FET **F_i**.
### Example for R1:
Original file:
```python=
def __init__(
self,
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
config: AlgorithmConfigDict,
*,
model: Optional[TorchModelV2] = None,
loss: Optional[
Callable[
[Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch],
Union[TensorType, List[TensorType]],
]
] = None,
action_distribution_class: Optional[Type[TorchDistributionWrapper]] = None,
action_sampler_fn: Optional[
Callable[
[TensorType, List[TensorType]],
Union[
Tuple[TensorType, TensorType, List[TensorType]],
Tuple[TensorType, TensorType, TensorType, List[TensorType]],
],
]
] = None,
action_distribution_fn: Optional[
Callable[
[Policy, ModelV2, TensorType, TensorType, TensorType],
Tuple[TensorType, Type[TorchDistributionWrapper], List[TensorType]],
]
] = None,
max_seq_len: int = 20,
get_batch_divisibility_req: Optional[Callable[[Policy], int]] = None,
):
self.framework = config["framework"] = "torch"
super().__init__(observation_space, action_space, config)
# Create multi-GPU model towers, if necessary.
# - The central main model will be stored under self.model, residing
# on self.device (normally, a CPU).
# - Each GPU will have a copy of that model under
# self.model_gpu_towers, matching the devices in self.devices.
# - Parallelization is done by splitting the train batch and passing
# it through the model copies in parallel, then averaging over the
# resulting gradients, applying these averages on the main model and
# updating all towers' weights from the main model.
# - In case of just one device (1 (fake or real) GPU or 1 CPU), no
# parallelization will be done.
# If no Model is provided, build a default one here.
if model is None:
dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"], framework=self.framework
)
model = ModelCatalog.get_model_v2(
obs_space=self.observation_space,
action_space=self.action_space,
num_outputs=logit_dim,
model_config=self.config["model"],
framework=self.framework,
)
if action_distribution_class is None:
action_distribution_class = dist_class
# Get devices to build the graph on.
worker_idx = self.config.get("worker_index", 0)
if (
not config["_fake_gpus"]
and ray._private.worker._mode() == ray._private.worker.LOCAL_MODE
):
num_gpus = 0
elif worker_idx == 0:
num_gpus = config["num_gpus"]
else:
num_gpus = config["num_gpus_per_worker"]
gpu_ids = list(range(torch.cuda.device_count()))
# Place on one or more CPU(s) when either:
# - Fake GPU mode.
# - num_gpus=0 (either set by user or we are in local_mode=True).
# - No GPUs available.
# tmp =
if config["_fake_gpus"] or num_gpus == 0 or not gpu_ids:
logger.info(
"TorchPolicy (worker={}) running on {}.".format(
worker_idx if worker_idx > 0 else "local",
"{} fake-GPUs".format(num_gpus) if config["_fake_gpus"] else "CPU",
)
)
self.device = torch.device("cpu")
self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)]
self.model_gpu_towers = [
model if i == 0 else copy.deepcopy(model)
for i in range(int(math.ceil(num_gpus)) or 1)
]
# if hasattr(self, "target_model"):
# self.target_models = {
# m: self.target_model for m in self.model_gpu_towers
# }
self.model = model
# Place on one or more actual GPU(s), when:
# - num_gpus > 0 (set by user) AND
# - local_mode=False AND
# - actual GPUs available AND
# - non-fake GPU mode.
else:
logger.info(
"TorchPolicy (worker={}) running on {} GPU(s).".format(
worker_idx if worker_idx > 0 else "local", num_gpus
)
)
# We are a remote worker (WORKER_MODE=1):
# GPUs should be assigned to us by ray.
if ray._private.worker._mode() == ray._private.worker.WORKER_MODE:
gpu_ids = ray.get_gpu_ids()
if len(gpu_ids) < num_gpus:
raise ValueError(
"TorchPolicy was not able to find enough GPU IDs! Found "
f"{gpu_ids}, but num_gpus={num_gpus}."
)
self.devices = [
torch.device("cuda:{}".format(i))
for i, id_ in enumerate(gpu_ids)
if i < num_gpus
]
self.device = self.devices[0]
ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.model_gpu_towers = []
for i, _ in enumerate(ids):
model_copy = copy.deepcopy(model)
self.model_gpu_towers.append(model_copy.to(self.devices[i]))
# if hasattr(self, "target_model"):
# self.target_models = {
# m: copy.deepcopy(self.target_model).to(self.devices[i])
# for i, m in enumerate(self.model_gpu_towers)
# }
self.model = self.model_gpu_towers[0]
# Lock used for locking some methods on the object-level.
# This prevents possible race conditions when calling the model
# first, then its value function (e.g. in a loss function), in
# between of which another model call is made (e.g. to compute an
# action).
self._lock = threading.RLock()
self._state_inputs = self.model.get_initial_state()
self._is_recurrent = len(self._state_inputs) > 0
# Auto-update model's inference view requirements, if recurrent.
self._update_model_view_requirements_from_init_state()
# Combine view_requirements for Model and Policy.
self.view_requirements.update(self.model.view_requirements)
self.exploration = self._create_exploration()
self.unwrapped_model = model # used to support DistributedDataParallel
# To ensure backward compatibility:
# Old way: If `loss` provided here, use as-is (as a function).
if loss is not None:
self._loss = loss
# New way: Convert the overridden `self.loss` into a plain function,
# so it can be called the same way as `loss` would be, ensuring
# backward compatibility.
elif self.loss.__func__.__qualname__ != "Policy.loss":
self._loss = self.loss.__func__
# `loss` not provided nor overridden from Policy -> Set to None.
else:
self._loss = None
self._optimizers = force_list(self.optimizer())
# Store, which params (by index within the model's list of
# parameters) should be updated per optimizer.
# Maps optimizer idx to set or param indices.
self.multi_gpu_param_groups: List[Set[int]] = []
main_params = {p: i for i, p in enumerate(self.model.parameters())}
for o in self._optimizers:
param_indices = []
for pg_idx, pg in enumerate(o.param_groups):
for p in pg["params"]:
param_indices.append(main_params[p])
self.multi_gpu_param_groups.append(set(param_indices))
# Create n sample-batch buffers (num_multi_gpu_tower_stacks), each
# one with m towers (num_gpus).
num_buffers = self.config.get("num_multi_gpu_tower_stacks", 1)
self._loaded_batches = [[] for _ in range(num_buffers)]
self.dist_class = action_distribution_class
self.action_sampler_fn = action_sampler_fn
self.action_distribution_fn = action_distribution_fn
# If set, means we are using distributed allreduce during learning.
self.distributed_world_size = None
self.max_seq_len = max_seq_len
self.batch_divisibility_req = (
get_batch_divisibility_req(self)
if callable(get_batch_divisibility_req)
else (get_batch_divisibility_req or 1)
)
```
Line 81 `if config["_fake_gpus"] or num_gpus == 0 or not gpu_ids:` requires the fix for R1. Apply and see the output:
```python=
# uncompyle6 version 3.7.4
# Python bytecode 3.7 (3394)
# Decompiled from: Python 3.7.11 (default, Jun 29 2021, 20:31:06)
# [GCC 8.3.0]
# Embedded file name: 0_fix.py
# Compiled at: 2022-07-19 15:00:43
# Size of source mod 2**32: 8739 bytes
def __init__(self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2]=None, loss: Optional[Callable[([Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[(TensorType, List[TensorType])])]]=None, action_distribution_class: Optional[Type[TorchDistributionWrapper]]=None, action_sampler_fn: Optional[Callable[([TensorType, List[TensorType]], Union[(Tuple[(TensorType, TensorType, List[TensorType])], Tuple[(TensorType, TensorType, TensorType, List[TensorType])])])]]=None, action_distribution_fn: Optional[Callable[([Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[(TensorType, Type[TorchDistributionWrapper], List[TensorType])])]]=None, max_seq_len: int=20, get_batch_divisibility_req: Optional[Callable[([Policy], int)]]=None):
self.framework = config['framework'] = 'torch'
super().__init__(observation_space, action_space, config)
if model is None:
dist_class, logit_dim = ModelCatalog.get_action_dist(action_space,
(self.config['model']), framework=(self.framework))
model = ModelCatalog.get_model_v2(obs_space=(self.observation_space),
action_space=(self.action_space),
num_outputs=logit_dim,
model_config=(self.config['model']),
framework=(self.framework))
if action_distribution_class is None:
action_distribution_class = dist_class
worker_idx = self.config.get('worker_index', 0)
if not config['_fake_gpus']:
if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE:
num_gpus = 0
else:
if worker_idx == 0:
num_gpus = config['num_gpus']
else:
num_gpus = config['num_gpus_per_worker']
else:
gpu_ids = list(range(torch.cuda.device_count()))
FET_cond = config['_fake_gpus'] or num_gpus == 0 or not gpu_ids
if FET_cond:
logger.info('TorchPolicy (worker={}) running on {}.'.format(worker_idx if worker_idx > 0 else 'local', '{} fake-GPUs'.format(num_gpus) if config['_fake_gpus'] else 'CPU'))
self.device = torch.device('cpu')
self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)]
self.model_gpu_towers = [model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1)]
self.model = model
else:
logger.info('TorchPolicy (worker={}) running on {} GPU(s).'.format(worker_idx if worker_idx > 0 else 'local', num_gpus))
if ray._private.worker._mode() == ray._private.worker.WORKER_MODE:
gpu_ids = ray.get_gpu_ids()
if len(gpu_ids) < num_gpus:
raise ValueError(f"TorchPolicy was not able to find enough GPU IDs! Found {gpu_ids}, but num_gpus={num_gpus}.")
self.devices = [torch.device('cuda:{}'.format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.device = self.devices[0]
ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.model_gpu_towers = []
for i, _ in enumerate(ids):
model_copy = copy.deepcopy(model)
self.model_gpu_towers.append(model_copy.to(self.devices[i]))
self.model = self.model_gpu_towers[0]
self._lock = threading.RLock()
self._state_inputs = self.model.get_initial_state()
self._is_recurrent = len(self._state_inputs) > 0
self._update_model_view_requirements_from_init_state()
self.view_requirements.update(self.model.view_requirements)
self.exploration = self._create_exploration()
self.unwrapped_model = model
if loss is not None:
self._loss = loss
else:
if self.loss.__func__.__qualname__ != 'Policy.loss':
self._loss = self.loss.__func__
else:
self._loss = None
self._optimizers = force_list(self.optimizer())
self.multi_gpu_param_groups = []
main_params = {p:i for i, p in enumerate(self.model.parameters())}
for o in self._optimizers:
param_indices = []
for pg_idx, pg in enumerate(o.param_groups):
for p in pg['params']:
param_indices.append(main_params[p])
self.multi_gpu_param_groups.append(set(param_indices))
num_buffers = self.config.get('num_multi_gpu_tower_stacks', 1)
self._loaded_batches = [[] for _ in range(num_buffers)]
self.dist_class = action_distribution_class
self.action_sampler_fn = action_sampler_fn
self.action_distribution_fn = action_distribution_fn
self.distributed_world_size = None
self.max_seq_len = max_seq_len
self.batch_divisibility_req = get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else get_batch_divisibility_req or 1
```
Line 25 has the fix in the decompiled output.
## Step 5.
If found implicit errors. For instance in the decompiled output we can see an implicit error on these lines:
```python=62
if loss is not None:
self._loss = loss
else:
if self.loss.__func__.__qualname__ != 'Policy.loss':
self._loss = self.loss.__func__
else:
self._loss = None
```
Remove it from the original code as follows.
```python=
def __init__(
self,
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
config: AlgorithmConfigDict,
*,
model: Optional[TorchModelV2] = None,
loss: Optional[
Callable[
[Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch],
Union[TensorType, List[TensorType]],
]
] = None,
action_distribution_class: Optional[Type[TorchDistributionWrapper]] = None,
action_sampler_fn: Optional[
Callable[
[TensorType, List[TensorType]],
Union[
Tuple[TensorType, TensorType, List[TensorType]],
Tuple[TensorType, TensorType, TensorType, List[TensorType]],
],
]
] = None,
action_distribution_fn: Optional[
Callable[
[Policy, ModelV2, TensorType, TensorType, TensorType],
Tuple[TensorType, Type[TorchDistributionWrapper], List[TensorType]],
]
] = None,
max_seq_len: int = 20,
get_batch_divisibility_req: Optional[Callable[[Policy], int]] = None,
):
self.framework = config["framework"] = "torch"
super().__init__(observation_space, action_space, config)
# Create multi-GPU model towers, if necessary.
# - The central main model will be stored under self.model, residing
# on self.device (normally, a CPU).
# - Each GPU will have a copy of that model under
# self.model_gpu_towers, matching the devices in self.devices.
# - Parallelization is done by splitting the train batch and passing
# it through the model copies in parallel, then averaging over the
# resulting gradients, applying these averages on the main model and
# updating all towers' weights from the main model.
# - In case of just one device (1 (fake or real) GPU or 1 CPU), no
# parallelization will be done.
# If no Model is provided, build a default one here.
if model is None:
dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"], framework=self.framework
)
model = ModelCatalog.get_model_v2(
obs_space=self.observation_space,
action_space=self.action_space,
num_outputs=logit_dim,
model_config=self.config["model"],
framework=self.framework,
)
if action_distribution_class is None:
action_distribution_class = dist_class
# Get devices to build the graph on.
worker_idx = self.config.get("worker_index", 0)
if (
not config["_fake_gpus"]
and ray._private.worker._mode() == ray._private.worker.LOCAL_MODE
):
num_gpus = 0
if worker_idx == 0:
num_gpus = config["num_gpus"]
else:
num_gpus = config["num_gpus_per_worker"]
gpu_ids = list(range(torch.cuda.device_count()))
# Place on one or more CPU(s) when either:
# - Fake GPU mode.
# - num_gpus=0 (either set by user or we are in local_mode=True).
# - No GPUs available.
FET_cond = config["_fake_gpus"] or num_gpus == 0 or not gpu_ids
if FET_cond:
logger.info(
"TorchPolicy (worker={}) running on {}.".format(
worker_idx if worker_idx > 0 else "local",
"{} fake-GPUs".format(num_gpus) if config["_fake_gpus"] else "CPU",
)
)
self.device = torch.device("cpu")
self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)]
self.model_gpu_towers = [
model if i == 0 else copy.deepcopy(model)
for i in range(int(math.ceil(num_gpus)) or 1)
]
# if hasattr(self, "target_model"):
# self.target_models = {
# m: self.target_model for m in self.model_gpu_towers
# }
self.model = model
# Place on one or more actual GPU(s), when:
# - num_gpus > 0 (set by user) AND
# - local_mode=False AND
# - actual GPUs available AND
# - non-fake GPU mode.
else:
logger.info(
"TorchPolicy (worker={}) running on {} GPU(s).".format(
worker_idx if worker_idx > 0 else "local", num_gpus
)
)
# We are a remote worker (WORKER_MODE=1):
# GPUs should be assigned to us by ray.
if ray._private.worker._mode() == ray._private.worker.WORKER_MODE:
gpu_ids = ray.get_gpu_ids()
if len(gpu_ids) < num_gpus:
raise ValueError(
"TorchPolicy was not able to find enough GPU IDs! Found "
f"{gpu_ids}, but num_gpus={num_gpus}."
)
self.devices = [
torch.device("cuda:{}".format(i))
for i, id_ in enumerate(gpu_ids)
if i < num_gpus
]
self.device = self.devices[0]
ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.model_gpu_towers = []
for i, _ in enumerate(ids):
model_copy = copy.deepcopy(model)
self.model_gpu_towers.append(model_copy.to(self.devices[i]))
# if hasattr(self, "target_model"):
# self.target_models = {
# m: copy.deepcopy(self.target_model).to(self.devices[i])
# for i, m in enumerate(self.model_gpu_towers)
# }
self.model = self.model_gpu_towers[0]
# Lock used for locking some methods on the object-level.
# This prevents possible race conditions when calling the model
# first, then its value function (e.g. in a loss function), in
# between of which another model call is made (e.g. to compute an
# action).
self._lock = threading.RLock()
self._state_inputs = self.model.get_initial_state()
self._is_recurrent = len(self._state_inputs) > 0
# Auto-update model's inference view requirements, if recurrent.
self._update_model_view_requirements_from_init_state()
# Combine view_requirements for Model and Policy.
self.view_requirements.update(self.model.view_requirements)
self.exploration = self._create_exploration()
self.unwrapped_model = model # used to support DistributedDataParallel
# To ensure backward compatibility:
# Old way: If `loss` provided here, use as-is (as a function).
if loss is not None:
self._loss = loss
self._optimizers = force_list(self.optimizer())
# Store, which params (by index within the model's list of
# parameters) should be updated per optimizer.
# Maps optimizer idx to set or param indices.
self.multi_gpu_param_groups: List[Set[int]] = []
main_params = {p: i for i, p in enumerate(self.model.parameters())}
for o in self._optimizers:
param_indices = []
for pg_idx, pg in enumerate(o.param_groups):
for p in pg["params"]:
param_indices.append(main_params[p])
self.multi_gpu_param_groups.append(set(param_indices))
# Create n sample-batch buffers (num_multi_gpu_tower_stacks), each
# one with m towers (num_gpus).
num_buffers = self.config.get("num_multi_gpu_tower_stacks", 1)
self._loaded_batches = [[] for _ in range(num_buffers)]
self.dist_class = action_distribution_class
self.action_sampler_fn = action_sampler_fn
self.action_distribution_fn = action_distribution_fn
# If set, means we are using distributed allreduce during learning.
self.distributed_world_size = None
self.max_seq_len = max_seq_len
self.batch_divisibility_req = (
get_batch_divisibility_req(self)
if callable(get_batch_divisibility_req)
else (get_batch_divisibility_req or 1)
)
```
Notice I remove all `elif` and `if` from the program. The output is as follows with `uncompyle6`:
```python=
# uncompyle6 version 3.7.4
# Python bytecode 3.7 (3394)
# Decompiled from: Python 3.7.11 (default, Jun 29 2021, 20:31:06)
# [GCC 8.3.0]
# Embedded file name: 0_fix.py
# Compiled at: 2022-08-08 20:19:43
# Size of source mod 2**32: 8330 bytes
def __init__(self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2]=None, loss: Optional[Callable[([Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[(TensorType, List[TensorType])])]]=None, action_distribution_class: Optional[Type[TorchDistributionWrapper]]=None, action_sampler_fn: Optional[Callable[([TensorType, List[TensorType]], Union[(Tuple[(TensorType, TensorType, List[TensorType])], Tuple[(TensorType, TensorType, TensorType, List[TensorType])])])]]=None, action_distribution_fn: Optional[Callable[([Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[(TensorType, Type[TorchDistributionWrapper], List[TensorType])])]]=None, max_seq_len: int=20, get_batch_divisibility_req: Optional[Callable[([Policy], int)]]=None):
self.framework = config['framework'] = 'torch'
super().__init__(observation_space, action_space, config)
if model is None:
dist_class, logit_dim = ModelCatalog.get_action_dist(action_space,
(self.config['model']), framework=(self.framework))
model = ModelCatalog.get_model_v2(obs_space=(self.observation_space),
action_space=(self.action_space),
num_outputs=logit_dim,
model_config=(self.config['model']),
framework=(self.framework))
if action_distribution_class is None:
action_distribution_class = dist_class
worker_idx = self.config.get('worker_index', 0)
if not config['_fake_gpus']:
if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE:
num_gpus = 0
elif worker_idx == 0:
num_gpus = config['num_gpus']
else:
num_gpus = config['num_gpus_per_worker']
gpu_ids = list(range(torch.cuda.device_count()))
FET_cond = config['_fake_gpus'] or num_gpus == 0 or not gpu_ids
if FET_cond:
logger.info('TorchPolicy (worker={}) running on {}.'.format(worker_idx if worker_idx > 0 else 'local', '{} fake-GPUs'.format(num_gpus) if config['_fake_gpus'] else 'CPU'))
self.device = torch.device('cpu')
self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)]
self.model_gpu_towers = [model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1)]
self.model = model
else:
logger.info('TorchPolicy (worker={}) running on {} GPU(s).'.format(worker_idx if worker_idx > 0 else 'local', num_gpus))
if ray._private.worker._mode() == ray._private.worker.WORKER_MODE:
gpu_ids = ray.get_gpu_ids()
if len(gpu_ids) < num_gpus:
raise ValueError(f"TorchPolicy was not able to find enough GPU IDs! Found {gpu_ids}, but num_gpus={num_gpus}.")
self.devices = [torch.device('cuda:{}'.format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.device = self.devices[0]
ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.model_gpu_towers = []
for i, _ in enumerate(ids):
model_copy = copy.deepcopy(model)
self.model_gpu_towers.append(model_copy.to(self.devices[i]))
self.model = self.model_gpu_towers[0]
self._lock = threading.RLock()
self._state_inputs = self.model.get_initial_state()
self._is_recurrent = len(self._state_inputs) > 0
self._update_model_view_requirements_from_init_state()
self.view_requirements.update(self.model.view_requirements)
self.exploration = self._create_exploration()
self.unwrapped_model = model
if loss is not None:
self._loss = loss
self._optimizers = force_list(self.optimizer())
self.multi_gpu_param_groups = []
main_params = {p:i for i, p in enumerate(self.model.parameters())}
for o in self._optimizers:
param_indices = []
for pg_idx, pg in enumerate(o.param_groups):
for p in pg['params']:
param_indices.append(main_params[p])
self.multi_gpu_param_groups.append(set(param_indices))
num_buffers = self.config.get('num_multi_gpu_tower_stacks', 1)
self._loaded_batches = [[] for _ in range(num_buffers)]
self.dist_class = action_distribution_class
self.action_sampler_fn = action_sampler_fn
self.action_distribution_fn = action_distribution_fn
self.distributed_world_size = None
self.max_seq_len = max_seq_len
self.batch_divisibility_req = get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else get_batch_divisibility_req or 1
# okay decompiling 0_fix.py
```
Notice that there are no implicit errors and only consists of the fix in line 32.
## Step 6.
Finally, do the follows.
Take the decompiled out from Step 5.
```python=
# uncompyle6 version 3.7.4
# Python bytecode 3.7 (3394)
# Decompiled from: Python 3.7.11 (default, Jun 29 2021, 20:31:06)
# [GCC 8.3.0]
# Embedded file name: 0_fix.py
# Compiled at: 2022-08-08 20:19:43
# Size of source mod 2**32: 8330 bytes
def __init__(self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2]=None, loss: Optional[Callable[([Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[(TensorType, List[TensorType])])]]=None, action_distribution_class: Optional[Type[TorchDistributionWrapper]]=None, action_sampler_fn: Optional[Callable[([TensorType, List[TensorType]], Union[(Tuple[(TensorType, TensorType, List[TensorType])], Tuple[(TensorType, TensorType, TensorType, List[TensorType])])])]]=None, action_distribution_fn: Optional[Callable[([Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[(TensorType, Type[TorchDistributionWrapper], List[TensorType])])]]=None, max_seq_len: int=20, get_batch_divisibility_req: Optional[Callable[([Policy], int)]]=None):
self.framework = config['framework'] = 'torch'
super().__init__(observation_space, action_space, config)
if model is None:
dist_class, logit_dim = ModelCatalog.get_action_dist(action_space,
(self.config['model']), framework=(self.framework))
model = ModelCatalog.get_model_v2(obs_space=(self.observation_space),
action_space=(self.action_space),
num_outputs=logit_dim,
model_config=(self.config['model']),
framework=(self.framework))
if action_distribution_class is None:
action_distribution_class = dist_class
worker_idx = self.config.get('worker_index', 0)
if not config['_fake_gpus']:
if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE:
num_gpus = 0
elif worker_idx == 0:
num_gpus = config['num_gpus']
else:
num_gpus = config['num_gpus_per_worker']
gpu_ids = list(range(torch.cuda.device_count()))
FET_cond = config['_fake_gpus'] or num_gpus == 0 or not gpu_ids
if FET_cond:
logger.info('TorchPolicy (worker={}) running on {}.'.format(worker_idx if worker_idx > 0 else 'local', '{} fake-GPUs'.format(num_gpus) if config['_fake_gpus'] else 'CPU'))
self.device = torch.device('cpu')
self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)]
self.model_gpu_towers = [model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1)]
self.model = model
else:
logger.info('TorchPolicy (worker={}) running on {} GPU(s).'.format(worker_idx if worker_idx > 0 else 'local', num_gpus))
if ray._private.worker._mode() == ray._private.worker.WORKER_MODE:
gpu_ids = ray.get_gpu_ids()
if len(gpu_ids) < num_gpus:
raise ValueError(f"TorchPolicy was not able to find enough GPU IDs! Found {gpu_ids}, but num_gpus={num_gpus}.")
self.devices = [torch.device('cuda:{}'.format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.device = self.devices[0]
ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.model_gpu_towers = []
for i, _ in enumerate(ids):
model_copy = copy.deepcopy(model)
self.model_gpu_towers.append(model_copy.to(self.devices[i]))
self.model = self.model_gpu_towers[0]
self._lock = threading.RLock()
self._state_inputs = self.model.get_initial_state()
self._is_recurrent = len(self._state_inputs) > 0
self._update_model_view_requirements_from_init_state()
self.view_requirements.update(self.model.view_requirements)
self.exploration = self._create_exploration()
self.unwrapped_model = model
if loss is not None:
self._loss = loss
self._optimizers = force_list(self.optimizer())
self.multi_gpu_param_groups = []
main_params = {p:i for i, p in enumerate(self.model.parameters())}
for o in self._optimizers:
param_indices = []
for pg_idx, pg in enumerate(o.param_groups):
for p in pg['params']:
param_indices.append(main_params[p])
self.multi_gpu_param_groups.append(set(param_indices))
num_buffers = self.config.get('num_multi_gpu_tower_stacks', 1)
self._loaded_batches = [[] for _ in range(num_buffers)]
self.dist_class = action_distribution_class
self.action_sampler_fn = action_sampler_fn
self.action_distribution_fn = action_distribution_fn
self.distributed_world_size = None
self.max_seq_len = max_seq_len
self.batch_divisibility_req = get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else get_batch_divisibility_req or 1
# okay decompiling 0_fix.py
```
Then remove only the fix where `FET_cond` is and preserve the remaining file as follows.
```python=
def __init__(
self,
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
config: AlgorithmConfigDict,
*,
model: Optional[TorchModelV2] = None,
loss: Optional[
Callable[
[Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch],
Union[TensorType, List[TensorType]],
]
] = None,
action_distribution_class: Optional[Type[TorchDistributionWrapper]] = None,
action_sampler_fn: Optional[
Callable[
[TensorType, List[TensorType]],
Union[
Tuple[TensorType, TensorType, List[TensorType]],
Tuple[TensorType, TensorType, TensorType, List[TensorType]],
],
]
] = None,
action_distribution_fn: Optional[
Callable[
[Policy, ModelV2, TensorType, TensorType, TensorType],
Tuple[TensorType, Type[TorchDistributionWrapper], List[TensorType]],
]
] = None,
max_seq_len: int = 20,
get_batch_divisibility_req: Optional[Callable[[Policy], int]] = None,
):
self.framework = config["framework"] = "torch"
super().__init__(observation_space, action_space, config)
# Create multi-GPU model towers, if necessary.
# - The central main model will be stored under self.model, residing
# on self.device (normally, a CPU).
# - Each GPU will have a copy of that model under
# self.model_gpu_towers, matching the devices in self.devices.
# - Parallelization is done by splitting the train batch and passing
# it through the model copies in parallel, then averaging over the
# resulting gradients, applying these averages on the main model and
# updating all towers' weights from the main model.
# - In case of just one device (1 (fake or real) GPU or 1 CPU), no
# parallelization will be done.
# If no Model is provided, build a default one here.
if model is None:
dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"], framework=self.framework
)
model = ModelCatalog.get_model_v2(
obs_space=self.observation_space,
action_space=self.action_space,
num_outputs=logit_dim,
model_config=self.config["model"],
framework=self.framework,
)
if action_distribution_class is None:
action_distribution_class = dist_class
# Get devices to build the graph on.
worker_idx = self.config.get("worker_index", 0)
if (
not config["_fake_gpus"]
and ray._private.worker._mode() == ray._private.worker.LOCAL_MODE
):
num_gpus = 0
if worker_idx == 0:
num_gpus = config["num_gpus"]
else:
num_gpus = config["num_gpus_per_worker"]
gpu_ids = list(range(torch.cuda.device_count()))
# Place on one or more CPU(s) when either:
# - Fake GPU mode.
# - num_gpus=0 (either set by user or we are in local_mode=True).
# - No GPUs available.
if config["_fake_gpus"] or num_gpus == 0 or not gpu_ids:
logger.info(
"TorchPolicy (worker={}) running on {}.".format(
worker_idx if worker_idx > 0 else "local",
"{} fake-GPUs".format(num_gpus) if config["_fake_gpus"] else "CPU",
)
)
self.device = torch.device("cpu")
self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)]
self.model_gpu_towers = [
model if i == 0 else copy.deepcopy(model)
for i in range(int(math.ceil(num_gpus)) or 1)
]
# if hasattr(self, "target_model"):
# self.target_models = {
# m: self.target_model for m in self.model_gpu_towers
# }
self.model = model
# Place on one or more actual GPU(s), when:
# - num_gpus > 0 (set by user) AND
# - local_mode=False AND
# - actual GPUs available AND
# - non-fake GPU mode.
else:
logger.info(
"TorchPolicy (worker={}) running on {} GPU(s).".format(
worker_idx if worker_idx > 0 else "local", num_gpus
)
)
# We are a remote worker (WORKER_MODE=1):
# GPUs should be assigned to us by ray.
if ray._private.worker._mode() == ray._private.worker.WORKER_MODE:
gpu_ids = ray.get_gpu_ids()
if len(gpu_ids) < num_gpus:
raise ValueError(
"TorchPolicy was not able to find enough GPU IDs! Found "
f"{gpu_ids}, but num_gpus={num_gpus}."
)
self.devices = [
torch.device("cuda:{}".format(i))
for i, id_ in enumerate(gpu_ids)
if i < num_gpus
]
self.device = self.devices[0]
ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus]
self.model_gpu_towers = []
for i, _ in enumerate(ids):
model_copy = copy.deepcopy(model)
self.model_gpu_towers.append(model_copy.to(self.devices[i]))
# if hasattr(self, "target_model"):
# self.target_models = {
# m: copy.deepcopy(self.target_model).to(self.devices[i])
# for i, m in enumerate(self.model_gpu_towers)
# }
self.model = self.model_gpu_towers[0]
# Lock used for locking some methods on the object-level.
# This prevents possible race conditions when calling the model
# first, then its value function (e.g. in a loss function), in
# between of which another model call is made (e.g. to compute an
# action).
self._lock = threading.RLock()
self._state_inputs = self.model.get_initial_state()
self._is_recurrent = len(self._state_inputs) > 0
# Auto-update model's inference view requirements, if recurrent.
self._update_model_view_requirements_from_init_state()
# Combine view_requirements for Model and Policy.
self.view_requirements.update(self.model.view_requirements)
self.exploration = self._create_exploration()
self.unwrapped_model = model # used to support DistributedDataParallel
# To ensure backward compatibility:
# Old way: If `loss` provided here, use as-is (as a function).
if loss is not None:
self._loss = loss
self._optimizers = force_list(self.optimizer())
# Store, which params (by index within the model's list of
# parameters) should be updated per optimizer.
# Maps optimizer idx to set or param indices.
self.multi_gpu_param_groups: List[Set[int]] = []
main_params = {p: i for i, p in enumerate(self.model.parameters())}
for o in self._optimizers:
param_indices = []
for pg_idx, pg in enumerate(o.param_groups):
for p in pg["params"]:
param_indices.append(main_params[p])
self.multi_gpu_param_groups.append(set(param_indices))
# Create n sample-batch buffers (num_multi_gpu_tower_stacks), each
# one with m towers (num_gpus).
num_buffers = self.config.get("num_multi_gpu_tower_stacks", 1)
self._loaded_batches = [[] for _ in range(num_buffers)]
self.dist_class = action_distribution_class
self.action_sampler_fn = action_sampler_fn
self.action_distribution_fn = action_distribution_fn
# If set, means we are using distributed allreduce during learning.
self.distributed_world_size = None
self.max_seq_len = max_seq_len
self.batch_divisibility_req = (
get_batch_divisibility_req(self)
if callable(get_batch_divisibility_req)
else (get_batch_divisibility_req or 1)
)
```
Save both of them as:
`<original_name>.py` and `<original_name>_R1_fix_and_decompiled.py`
and create a txt file (`<original_name>_R1_conf.txt`) with follows:
```
Decompiler: Uncompyle6 3.7.5
Python version: 3.7.5
```
## Step 7.
Repeat the process for R2~R16 for remaining files for **A_i** and do this for 10 Applications.
# Tips.
- Go rule by rule i.e., start with R1, then R2 and so on.
- If you find R1 fails in U6 and python3.7. Preserve this environment and use it for other applications.
- If you don't find an error, inject it yourself. Meaning just instrument the failure code in the function.
- Make sure there is only one error that we fix i.e., has no implicit errors or any new errors.
- If you find new errors, just delete the code corresponding to the new error in original file but try to preserve as much content as possible.
- Good luck!
---
# R3