--- tags: decompiler title: Ground truth study --- # ALL FETs: ![](https://i.imgur.com/OnAeN6P.png) ![](https://i.imgur.com/3fgAk3g.png) ![](https://i.imgur.com/E0sQlpw.png) # Problem overview R17~29. For python 3.9, we are going to tackle files with only a subset of instructions i.e., we show groundtruth of sample requiring only one of the following instructions changed. ![](https://i.imgur.com/E0sQlpw.png) Reference [here](https://hackmd.io/@aliahad97/HkDD5eVWq) for slightly more detail. ## Step 1. Take a file from Application A. One function is as follows: ```python=3 def _signature_get_partial(wrapped_sig, partial, extra_args=()): """Private helper to calculate how 'wrapped_sig' signature will look like after applying a 'functools.partial' object (or alike) on it. """ old_params = wrapped_sig.parameters partial_args = partial.args or () partial_keywords = partial.keywords or {} if extra_args: partial_args = extra_args + partial_args try: ba = wrapped_sig.bind_partial(*partial_args, **partial_keywords) except TypeError as ex: msg = 'partial object {!r} has incorrect arguments'.format(partial) raise ValueError(msg) from ex transform_to_kwonly = False for param_name, param in old_params.items(): FET_else = 0 try: arg_value = ba.arguments[param_name] FET_else = 1 except KeyError: pass#FET_null() if FET_else == 1: if param.kind is _POSITIONAL_ONLY: # If positional-only parameter is bound by partial, # it effectively disappears from the signature new_params.pop(param_name) continue if param.kind is _POSITIONAL_OR_KEYWORD: if param_name in partial_keywords: # This means that this parameter, and all parameters # after it should be keyword-only (and var-positional # should be removed). Here's why. Consider the following # function: # foo(a, b, *args, c): # pass # # "partial(foo, a='spam')" will have the following # signature: "(*, a='spam', b, c)". Because attempting # to call that partial with "(10, 20)" arguments will # raise a TypeError, saying that "a" argument received # multiple values. transform_to_kwonly = True # Set the new default value new_params[param_name] = param.replace(default=arg_value) else: # was passed as a positional argument new_params.pop(param.name) continue if param.kind is _KEYWORD_ONLY: # Set the new default value new_params[param_name] = param.replace(default=arg_value) if transform_to_kwonly: assert param.kind is not _POSITIONAL_ONLY if param.kind is _POSITIONAL_OR_KEYWORD: new_param = new_params[param_name].replace(kind=_KEYWORD_ONLY) new_params[param_name] = new_param new_params.move_to_end(param_name) elif param.kind in (_KEYWORD_ONLY, _VAR_KEYWORD): new_params.move_to_end(param_name) elif param.kind is _VAR_POSITIONAL: new_params.pop(param.name) return wrapped_sig.replace(parameters=new_params.values()) ``` ## Step 2. Recompile the file in both python 3.8 and 3.9. And then disassemble both of them using the commands as follows. ``` python -m compileall -b main.py # python 3+ pydisasm main.py > main.dis ``` By running the command for Python 3.8 and 3.9 above, you will have two files: 1. `main.dis` for python 3.8 2. `main.dis` for python 3.9 ## Step 3. Compare the two in a diff tool to see the different instructions. You should see specific lines with difference. In this example we see the following differences. Diff 1 (line 17) (No rule): ![](https://i.imgur.com/dzKWqNr.png) Diff 2 (line 18) (R18): ![](https://i.imgur.com/R4Sged6.png) Diff 3 (line 20) (R20): ![](https://i.imgur.com/RfC9CA0.png) Diff 4 (line 29) (R18): ![](https://i.imgur.com/QAMQ0nT.png) Diff 5 (line 30) (R19): ![](https://i.imgur.com/dzWlnVh.png) Diff 6 (line 32) (R17): ![](https://i.imgur.com/PW5e0Wg.png) Diff 7 (line 38) (R17): ![](https://i.imgur.com/gMEQmeY.png) Diff 8 (line 39) (R17): ![](https://i.imgur.com/6eyq8y7.png) Diff 9 (line 60) (R17): ![](https://i.imgur.com/wKy9n6a.png) Diff 10 (line 65) (R17 and R24): ![](https://i.imgur.com/pBr4HOk.png) Diff 11 (line 67) (R17): ![](https://i.imgur.com/7MVA99f.png) Diff 12 (line 71) (R17): ![](https://i.imgur.com/KWqWxIx.png) Diff 13 (line 73) (R17): ![](https://i.imgur.com/JuwRXdN.png) This means that we have 13 errors. Though we will focus on only one at a time. From the above we can use the following rules looking at the diff: - R17 - R18 - R19 - R20 - R24 ## Step 4. Trim the code so it has only one error. We trim each line so it has only one error as follows. ```python=3 def _signature_get_partial(wrapped_sig, partial, extra_args=()): """Private helper to calculate how 'wrapped_sig' signature will look like after applying a 'functools.partial' object (or alike) on it. """ old_params = wrapped_sig.parameters partial_args = partial.args or () partial_keywords = partial.keywords or {} if extra_args: partial_args = extra_args + partial_args # try: # ba = wrapped_sig.bind_partial(*partial_args, **partial_keywords) # except TypeError as ex: # msg = 'partial object {!r} has incorrect arguments'.format(partial) # raise ValueError(msg) from ex transform_to_kwonly = False for param_name, param in old_params.items(): FET_else = 0 try: arg_value = ba.arguments[param_name] FET_else = 1 except KeyError: pass#FET_null() if FET_else == 1: if param.kind == _POSITIONAL_ONLY: # If positional-only parameter is bound by partial, # it effectively disappears from the signature new_params.pop(param_name) continue if param.kind == _POSITIONAL_OR_KEYWORD: if param_name == partial_keywords: # This means that this parameter, and all parameters # after it should be keyword-only (and var-positional # should be removed). Here's why. Consider the following # function: # foo(a, b, *args, c): # pass # # "partial(foo, a='spam')" will have the following # signature: "(*, a='spam', b, c)". Because attempting # to call that partial with "(10, 20)" arguments will # raise a TypeError, saying that "a" argument received # multiple values. transform_to_kwonly = True # Set the new default value new_params[param_name] = param.replace(default=arg_value) else: # was passed as a positional argument new_params.pop(param.name) continue if param.kind == _KEYWORD_ONLY: # Set the new default value new_params[param_name] = param.replace(default=arg_value) if transform_to_kwonly: # assert param.kind is not _POSITIONAL_ONLY if param.kind == _POSITIONAL_OR_KEYWORD: new_param = new_params[param_name].replace(kind=_KEYWORD_ONLY) new_params[param_name] = new_param new_params.move_to_end(param_name) elif param.kind == (_KEYWORD_ONLY, _VAR_KEYWORD): new_params.move_to_end(param_name) elif param.kind == _VAR_POSITIONAL: new_params.pop(param.name) return wrapped_sig.replace(parameters=new_params.values()) ``` Notice that now it will have only the following diff in code: ![](https://i.imgur.com/AHqYh1K.png) Notice that now we need only R16 for `RERAISE` and R18 for `JUMP_IFNOT_EXC_MATCH`. ## Step 5 - Saving. Now decompile the original file to remove any implicit errors. Following file is after removing implicit errors. ```python=3 def _signature_get_partial(wrapped_sig, partial, extra_args=()): """Private helper to calculate how 'wrapped_sig' signature will look like after applying a 'functools.partial' object (or alike) on it. """ old_params = wrapped_sig.parameters partial_args = partial.args or () partial_keywords = partial.keywords or {} if extra_args: partial_args = extra_args + partial_args else: partial_args = partial_args # try: # ba = wrapped_sig.bind_partial(*partial_args, **partial_keywords) # except TypeError as ex: # msg = 'partial object {!r} has incorrect arguments'.format(partial) # raise ValueError(msg) from ex transform_to_kwonly = False param_name, param = old_params.items() FET_else = 0 try: arg_value = ba.arguments[param_name] FET_else = 1 except KeyError: pass#FET_null() else: FET_null() if FET_else == 1: if param.kind == _POSITIONAL_ONLY: # If positional-only parameter is bound by partial, # it effectively disappears from the signature new_params.pop(param_name) new_params.pop(param.name) if param.kind == _POSITIONAL_OR_KEYWORD: if param_name == partial_keywords: # This means that this parameter, and all parameters # after it should be keyword-only (and var-positional # should be removed). Here's why. Consider the following # function: # foo(a, b, *args, c): # pass # # "partial(foo, a='spam')" will have the following # signature: "(*, a='spam', b, c)". Because attempting # to call that partial with "(10, 20)" arguments will # raise a TypeError, saying that "a" argument received # multiple values. transform_to_kwonly = True # Set the new default value new_params[param_name] = param.replace(default=arg_value) else: # was passed as a positional argument new_params.pop(param.name) new_params.pop(param.name) if param.kind == _KEYWORD_ONLY: # Set the new default value new_params[param_name] = param.replace(default=arg_value) new_params.pop(param.name) if transform_to_kwonly: # assert param.kind is not _POSITIONAL_ONLY if param.kind == _POSITIONAL_OR_KEYWORD: new_param = new_params[param_name].replace(kind=_KEYWORD_ONLY) new_params[param_name] = new_param new_params.move_to_end(param_name) new_params.pop(param.name) if param.kind == (_KEYWORD_ONLY, _VAR_KEYWORD): new_params.move_to_end(param_name) new_params.pop(param.name) if param.kind == _VAR_POSITIONAL: new_params.pop(param.name) new_params.pop(param.name) return wrapped_sig.replace(parameters=new_params.values()) ``` Notice that I removed loops and added extra instructions to avoid any implicit errors since py3.8 has alot of them. Next make sure the error is still there which I do by recompiling in the respective. I see the diff as follows. ![](https://i.imgur.com/JkAcxyU.png) I also made sure that no new errors were introduced. ## Step 6. Save the two files: 1. File before decompilation that you generated (`orignal.py`) - Has no implicit errors and only target instructions 2. File after decompilation (`final.py`) 3. Create a text file labeling decompiler version and rules use. In this case it will be as follows. ``` Uncompyle6 R16 R18 ``` ## Step 7. Repeat this process until you have for each rule in R17~29, atleast one file in each Application. Notes: - Try to make sure there are as little rules as possible used in the file - Make sure there are no implicit errors or explicit errors. We want to showcase only the rule used # Problem overview R1~R16. We want to find original file with error and apply PyFET to it. The resultant file will then be fed to a decompiler and the result is preserved. # Steps to carry out ## Step 1. Choose an application (**A_i**) from top 10 application. List [here](https://docs.google.com/spreadsheets/d/1huxO_R_Si5m4iQv63Pmqr2hRkxKIIKhP7z6RYZ6p4k4/edit#gid=0). ## Step 2. Choose an FET **F_i** (1~30). Refer the paper. ![](https://i.imgur.com/OnAeN6P.png) ![](https://i.imgur.com/3fgAk3g.png) ## Step 3. Choose the decompiler that uses the corresponding FET chosen **F_i**. ![](https://i.imgur.com/zYLShwL.png) ## Step 4. For each **A_i**, find function **f_i** in one of the files of **A_i** where you can use the FET **F_i**. ### Example for R1: Original file: ```python= def __init__( self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2] = None, loss: Optional[ Callable[ [Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[TensorType, List[TensorType]], ] ] = None, action_distribution_class: Optional[Type[TorchDistributionWrapper]] = None, action_sampler_fn: Optional[ Callable[ [TensorType, List[TensorType]], Union[ Tuple[TensorType, TensorType, List[TensorType]], Tuple[TensorType, TensorType, TensorType, List[TensorType]], ], ] ] = None, action_distribution_fn: Optional[ Callable[ [Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[TensorType, Type[TorchDistributionWrapper], List[TensorType]], ] ] = None, max_seq_len: int = 20, get_batch_divisibility_req: Optional[Callable[[Policy], int]] = None, ): self.framework = config["framework"] = "torch" super().__init__(observation_space, action_space, config) # Create multi-GPU model towers, if necessary. # - The central main model will be stored under self.model, residing # on self.device (normally, a CPU). # - Each GPU will have a copy of that model under # self.model_gpu_towers, matching the devices in self.devices. # - Parallelization is done by splitting the train batch and passing # it through the model copies in parallel, then averaging over the # resulting gradients, applying these averages on the main model and # updating all towers' weights from the main model. # - In case of just one device (1 (fake or real) GPU or 1 CPU), no # parallelization will be done. # If no Model is provided, build a default one here. if model is None: dist_class, logit_dim = ModelCatalog.get_action_dist( action_space, self.config["model"], framework=self.framework ) model = ModelCatalog.get_model_v2( obs_space=self.observation_space, action_space=self.action_space, num_outputs=logit_dim, model_config=self.config["model"], framework=self.framework, ) if action_distribution_class is None: action_distribution_class = dist_class # Get devices to build the graph on. worker_idx = self.config.get("worker_index", 0) if ( not config["_fake_gpus"] and ray._private.worker._mode() == ray._private.worker.LOCAL_MODE ): num_gpus = 0 elif worker_idx == 0: num_gpus = config["num_gpus"] else: num_gpus = config["num_gpus_per_worker"] gpu_ids = list(range(torch.cuda.device_count())) # Place on one or more CPU(s) when either: # - Fake GPU mode. # - num_gpus=0 (either set by user or we are in local_mode=True). # - No GPUs available. # tmp = if config["_fake_gpus"] or num_gpus == 0 or not gpu_ids: logger.info( "TorchPolicy (worker={}) running on {}.".format( worker_idx if worker_idx > 0 else "local", "{} fake-GPUs".format(num_gpus) if config["_fake_gpus"] else "CPU", ) ) self.device = torch.device("cpu") self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)] self.model_gpu_towers = [ model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1) ] # if hasattr(self, "target_model"): # self.target_models = { # m: self.target_model for m in self.model_gpu_towers # } self.model = model # Place on one or more actual GPU(s), when: # - num_gpus > 0 (set by user) AND # - local_mode=False AND # - actual GPUs available AND # - non-fake GPU mode. else: logger.info( "TorchPolicy (worker={}) running on {} GPU(s).".format( worker_idx if worker_idx > 0 else "local", num_gpus ) ) # We are a remote worker (WORKER_MODE=1): # GPUs should be assigned to us by ray. if ray._private.worker._mode() == ray._private.worker.WORKER_MODE: gpu_ids = ray.get_gpu_ids() if len(gpu_ids) < num_gpus: raise ValueError( "TorchPolicy was not able to find enough GPU IDs! Found " f"{gpu_ids}, but num_gpus={num_gpus}." ) self.devices = [ torch.device("cuda:{}".format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus ] self.device = self.devices[0] ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.model_gpu_towers = [] for i, _ in enumerate(ids): model_copy = copy.deepcopy(model) self.model_gpu_towers.append(model_copy.to(self.devices[i])) # if hasattr(self, "target_model"): # self.target_models = { # m: copy.deepcopy(self.target_model).to(self.devices[i]) # for i, m in enumerate(self.model_gpu_towers) # } self.model = self.model_gpu_towers[0] # Lock used for locking some methods on the object-level. # This prevents possible race conditions when calling the model # first, then its value function (e.g. in a loss function), in # between of which another model call is made (e.g. to compute an # action). self._lock = threading.RLock() self._state_inputs = self.model.get_initial_state() self._is_recurrent = len(self._state_inputs) > 0 # Auto-update model's inference view requirements, if recurrent. self._update_model_view_requirements_from_init_state() # Combine view_requirements for Model and Policy. self.view_requirements.update(self.model.view_requirements) self.exploration = self._create_exploration() self.unwrapped_model = model # used to support DistributedDataParallel # To ensure backward compatibility: # Old way: If `loss` provided here, use as-is (as a function). if loss is not None: self._loss = loss # New way: Convert the overridden `self.loss` into a plain function, # so it can be called the same way as `loss` would be, ensuring # backward compatibility. elif self.loss.__func__.__qualname__ != "Policy.loss": self._loss = self.loss.__func__ # `loss` not provided nor overridden from Policy -> Set to None. else: self._loss = None self._optimizers = force_list(self.optimizer()) # Store, which params (by index within the model's list of # parameters) should be updated per optimizer. # Maps optimizer idx to set or param indices. self.multi_gpu_param_groups: List[Set[int]] = [] main_params = {p: i for i, p in enumerate(self.model.parameters())} for o in self._optimizers: param_indices = [] for pg_idx, pg in enumerate(o.param_groups): for p in pg["params"]: param_indices.append(main_params[p]) self.multi_gpu_param_groups.append(set(param_indices)) # Create n sample-batch buffers (num_multi_gpu_tower_stacks), each # one with m towers (num_gpus). num_buffers = self.config.get("num_multi_gpu_tower_stacks", 1) self._loaded_batches = [[] for _ in range(num_buffers)] self.dist_class = action_distribution_class self.action_sampler_fn = action_sampler_fn self.action_distribution_fn = action_distribution_fn # If set, means we are using distributed allreduce during learning. self.distributed_world_size = None self.max_seq_len = max_seq_len self.batch_divisibility_req = ( get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else (get_batch_divisibility_req or 1) ) ``` Line 81 `if config["_fake_gpus"] or num_gpus == 0 or not gpu_ids:` requires the fix for R1. Apply and see the output: ```python= # uncompyle6 version 3.7.4 # Python bytecode 3.7 (3394) # Decompiled from: Python 3.7.11 (default, Jun 29 2021, 20:31:06) # [GCC 8.3.0] # Embedded file name: 0_fix.py # Compiled at: 2022-07-19 15:00:43 # Size of source mod 2**32: 8739 bytes def __init__(self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2]=None, loss: Optional[Callable[([Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[(TensorType, List[TensorType])])]]=None, action_distribution_class: Optional[Type[TorchDistributionWrapper]]=None, action_sampler_fn: Optional[Callable[([TensorType, List[TensorType]], Union[(Tuple[(TensorType, TensorType, List[TensorType])], Tuple[(TensorType, TensorType, TensorType, List[TensorType])])])]]=None, action_distribution_fn: Optional[Callable[([Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[(TensorType, Type[TorchDistributionWrapper], List[TensorType])])]]=None, max_seq_len: int=20, get_batch_divisibility_req: Optional[Callable[([Policy], int)]]=None): self.framework = config['framework'] = 'torch' super().__init__(observation_space, action_space, config) if model is None: dist_class, logit_dim = ModelCatalog.get_action_dist(action_space, (self.config['model']), framework=(self.framework)) model = ModelCatalog.get_model_v2(obs_space=(self.observation_space), action_space=(self.action_space), num_outputs=logit_dim, model_config=(self.config['model']), framework=(self.framework)) if action_distribution_class is None: action_distribution_class = dist_class worker_idx = self.config.get('worker_index', 0) if not config['_fake_gpus']: if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE: num_gpus = 0 else: if worker_idx == 0: num_gpus = config['num_gpus'] else: num_gpus = config['num_gpus_per_worker'] else: gpu_ids = list(range(torch.cuda.device_count())) FET_cond = config['_fake_gpus'] or num_gpus == 0 or not gpu_ids if FET_cond: logger.info('TorchPolicy (worker={}) running on {}.'.format(worker_idx if worker_idx > 0 else 'local', '{} fake-GPUs'.format(num_gpus) if config['_fake_gpus'] else 'CPU')) self.device = torch.device('cpu') self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)] self.model_gpu_towers = [model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1)] self.model = model else: logger.info('TorchPolicy (worker={}) running on {} GPU(s).'.format(worker_idx if worker_idx > 0 else 'local', num_gpus)) if ray._private.worker._mode() == ray._private.worker.WORKER_MODE: gpu_ids = ray.get_gpu_ids() if len(gpu_ids) < num_gpus: raise ValueError(f"TorchPolicy was not able to find enough GPU IDs! Found {gpu_ids}, but num_gpus={num_gpus}.") self.devices = [torch.device('cuda:{}'.format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.device = self.devices[0] ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.model_gpu_towers = [] for i, _ in enumerate(ids): model_copy = copy.deepcopy(model) self.model_gpu_towers.append(model_copy.to(self.devices[i])) self.model = self.model_gpu_towers[0] self._lock = threading.RLock() self._state_inputs = self.model.get_initial_state() self._is_recurrent = len(self._state_inputs) > 0 self._update_model_view_requirements_from_init_state() self.view_requirements.update(self.model.view_requirements) self.exploration = self._create_exploration() self.unwrapped_model = model if loss is not None: self._loss = loss else: if self.loss.__func__.__qualname__ != 'Policy.loss': self._loss = self.loss.__func__ else: self._loss = None self._optimizers = force_list(self.optimizer()) self.multi_gpu_param_groups = [] main_params = {p:i for i, p in enumerate(self.model.parameters())} for o in self._optimizers: param_indices = [] for pg_idx, pg in enumerate(o.param_groups): for p in pg['params']: param_indices.append(main_params[p]) self.multi_gpu_param_groups.append(set(param_indices)) num_buffers = self.config.get('num_multi_gpu_tower_stacks', 1) self._loaded_batches = [[] for _ in range(num_buffers)] self.dist_class = action_distribution_class self.action_sampler_fn = action_sampler_fn self.action_distribution_fn = action_distribution_fn self.distributed_world_size = None self.max_seq_len = max_seq_len self.batch_divisibility_req = get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else get_batch_divisibility_req or 1 ``` Line 25 has the fix in the decompiled output. ## Step 5. If found implicit errors. For instance in the decompiled output we can see an implicit error on these lines: ```python=62 if loss is not None: self._loss = loss else: if self.loss.__func__.__qualname__ != 'Policy.loss': self._loss = self.loss.__func__ else: self._loss = None ``` Remove it from the original code as follows. ```python= def __init__( self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2] = None, loss: Optional[ Callable[ [Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[TensorType, List[TensorType]], ] ] = None, action_distribution_class: Optional[Type[TorchDistributionWrapper]] = None, action_sampler_fn: Optional[ Callable[ [TensorType, List[TensorType]], Union[ Tuple[TensorType, TensorType, List[TensorType]], Tuple[TensorType, TensorType, TensorType, List[TensorType]], ], ] ] = None, action_distribution_fn: Optional[ Callable[ [Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[TensorType, Type[TorchDistributionWrapper], List[TensorType]], ] ] = None, max_seq_len: int = 20, get_batch_divisibility_req: Optional[Callable[[Policy], int]] = None, ): self.framework = config["framework"] = "torch" super().__init__(observation_space, action_space, config) # Create multi-GPU model towers, if necessary. # - The central main model will be stored under self.model, residing # on self.device (normally, a CPU). # - Each GPU will have a copy of that model under # self.model_gpu_towers, matching the devices in self.devices. # - Parallelization is done by splitting the train batch and passing # it through the model copies in parallel, then averaging over the # resulting gradients, applying these averages on the main model and # updating all towers' weights from the main model. # - In case of just one device (1 (fake or real) GPU or 1 CPU), no # parallelization will be done. # If no Model is provided, build a default one here. if model is None: dist_class, logit_dim = ModelCatalog.get_action_dist( action_space, self.config["model"], framework=self.framework ) model = ModelCatalog.get_model_v2( obs_space=self.observation_space, action_space=self.action_space, num_outputs=logit_dim, model_config=self.config["model"], framework=self.framework, ) if action_distribution_class is None: action_distribution_class = dist_class # Get devices to build the graph on. worker_idx = self.config.get("worker_index", 0) if ( not config["_fake_gpus"] and ray._private.worker._mode() == ray._private.worker.LOCAL_MODE ): num_gpus = 0 if worker_idx == 0: num_gpus = config["num_gpus"] else: num_gpus = config["num_gpus_per_worker"] gpu_ids = list(range(torch.cuda.device_count())) # Place on one or more CPU(s) when either: # - Fake GPU mode. # - num_gpus=0 (either set by user or we are in local_mode=True). # - No GPUs available. FET_cond = config["_fake_gpus"] or num_gpus == 0 or not gpu_ids if FET_cond: logger.info( "TorchPolicy (worker={}) running on {}.".format( worker_idx if worker_idx > 0 else "local", "{} fake-GPUs".format(num_gpus) if config["_fake_gpus"] else "CPU", ) ) self.device = torch.device("cpu") self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)] self.model_gpu_towers = [ model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1) ] # if hasattr(self, "target_model"): # self.target_models = { # m: self.target_model for m in self.model_gpu_towers # } self.model = model # Place on one or more actual GPU(s), when: # - num_gpus > 0 (set by user) AND # - local_mode=False AND # - actual GPUs available AND # - non-fake GPU mode. else: logger.info( "TorchPolicy (worker={}) running on {} GPU(s).".format( worker_idx if worker_idx > 0 else "local", num_gpus ) ) # We are a remote worker (WORKER_MODE=1): # GPUs should be assigned to us by ray. if ray._private.worker._mode() == ray._private.worker.WORKER_MODE: gpu_ids = ray.get_gpu_ids() if len(gpu_ids) < num_gpus: raise ValueError( "TorchPolicy was not able to find enough GPU IDs! Found " f"{gpu_ids}, but num_gpus={num_gpus}." ) self.devices = [ torch.device("cuda:{}".format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus ] self.device = self.devices[0] ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.model_gpu_towers = [] for i, _ in enumerate(ids): model_copy = copy.deepcopy(model) self.model_gpu_towers.append(model_copy.to(self.devices[i])) # if hasattr(self, "target_model"): # self.target_models = { # m: copy.deepcopy(self.target_model).to(self.devices[i]) # for i, m in enumerate(self.model_gpu_towers) # } self.model = self.model_gpu_towers[0] # Lock used for locking some methods on the object-level. # This prevents possible race conditions when calling the model # first, then its value function (e.g. in a loss function), in # between of which another model call is made (e.g. to compute an # action). self._lock = threading.RLock() self._state_inputs = self.model.get_initial_state() self._is_recurrent = len(self._state_inputs) > 0 # Auto-update model's inference view requirements, if recurrent. self._update_model_view_requirements_from_init_state() # Combine view_requirements for Model and Policy. self.view_requirements.update(self.model.view_requirements) self.exploration = self._create_exploration() self.unwrapped_model = model # used to support DistributedDataParallel # To ensure backward compatibility: # Old way: If `loss` provided here, use as-is (as a function). if loss is not None: self._loss = loss self._optimizers = force_list(self.optimizer()) # Store, which params (by index within the model's list of # parameters) should be updated per optimizer. # Maps optimizer idx to set or param indices. self.multi_gpu_param_groups: List[Set[int]] = [] main_params = {p: i for i, p in enumerate(self.model.parameters())} for o in self._optimizers: param_indices = [] for pg_idx, pg in enumerate(o.param_groups): for p in pg["params"]: param_indices.append(main_params[p]) self.multi_gpu_param_groups.append(set(param_indices)) # Create n sample-batch buffers (num_multi_gpu_tower_stacks), each # one with m towers (num_gpus). num_buffers = self.config.get("num_multi_gpu_tower_stacks", 1) self._loaded_batches = [[] for _ in range(num_buffers)] self.dist_class = action_distribution_class self.action_sampler_fn = action_sampler_fn self.action_distribution_fn = action_distribution_fn # If set, means we are using distributed allreduce during learning. self.distributed_world_size = None self.max_seq_len = max_seq_len self.batch_divisibility_req = ( get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else (get_batch_divisibility_req or 1) ) ``` Notice I remove all `elif` and `if` from the program. The output is as follows with `uncompyle6`: ```python= # uncompyle6 version 3.7.4 # Python bytecode 3.7 (3394) # Decompiled from: Python 3.7.11 (default, Jun 29 2021, 20:31:06) # [GCC 8.3.0] # Embedded file name: 0_fix.py # Compiled at: 2022-08-08 20:19:43 # Size of source mod 2**32: 8330 bytes def __init__(self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2]=None, loss: Optional[Callable[([Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[(TensorType, List[TensorType])])]]=None, action_distribution_class: Optional[Type[TorchDistributionWrapper]]=None, action_sampler_fn: Optional[Callable[([TensorType, List[TensorType]], Union[(Tuple[(TensorType, TensorType, List[TensorType])], Tuple[(TensorType, TensorType, TensorType, List[TensorType])])])]]=None, action_distribution_fn: Optional[Callable[([Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[(TensorType, Type[TorchDistributionWrapper], List[TensorType])])]]=None, max_seq_len: int=20, get_batch_divisibility_req: Optional[Callable[([Policy], int)]]=None): self.framework = config['framework'] = 'torch' super().__init__(observation_space, action_space, config) if model is None: dist_class, logit_dim = ModelCatalog.get_action_dist(action_space, (self.config['model']), framework=(self.framework)) model = ModelCatalog.get_model_v2(obs_space=(self.observation_space), action_space=(self.action_space), num_outputs=logit_dim, model_config=(self.config['model']), framework=(self.framework)) if action_distribution_class is None: action_distribution_class = dist_class worker_idx = self.config.get('worker_index', 0) if not config['_fake_gpus']: if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE: num_gpus = 0 elif worker_idx == 0: num_gpus = config['num_gpus'] else: num_gpus = config['num_gpus_per_worker'] gpu_ids = list(range(torch.cuda.device_count())) FET_cond = config['_fake_gpus'] or num_gpus == 0 or not gpu_ids if FET_cond: logger.info('TorchPolicy (worker={}) running on {}.'.format(worker_idx if worker_idx > 0 else 'local', '{} fake-GPUs'.format(num_gpus) if config['_fake_gpus'] else 'CPU')) self.device = torch.device('cpu') self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)] self.model_gpu_towers = [model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1)] self.model = model else: logger.info('TorchPolicy (worker={}) running on {} GPU(s).'.format(worker_idx if worker_idx > 0 else 'local', num_gpus)) if ray._private.worker._mode() == ray._private.worker.WORKER_MODE: gpu_ids = ray.get_gpu_ids() if len(gpu_ids) < num_gpus: raise ValueError(f"TorchPolicy was not able to find enough GPU IDs! Found {gpu_ids}, but num_gpus={num_gpus}.") self.devices = [torch.device('cuda:{}'.format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.device = self.devices[0] ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.model_gpu_towers = [] for i, _ in enumerate(ids): model_copy = copy.deepcopy(model) self.model_gpu_towers.append(model_copy.to(self.devices[i])) self.model = self.model_gpu_towers[0] self._lock = threading.RLock() self._state_inputs = self.model.get_initial_state() self._is_recurrent = len(self._state_inputs) > 0 self._update_model_view_requirements_from_init_state() self.view_requirements.update(self.model.view_requirements) self.exploration = self._create_exploration() self.unwrapped_model = model if loss is not None: self._loss = loss self._optimizers = force_list(self.optimizer()) self.multi_gpu_param_groups = [] main_params = {p:i for i, p in enumerate(self.model.parameters())} for o in self._optimizers: param_indices = [] for pg_idx, pg in enumerate(o.param_groups): for p in pg['params']: param_indices.append(main_params[p]) self.multi_gpu_param_groups.append(set(param_indices)) num_buffers = self.config.get('num_multi_gpu_tower_stacks', 1) self._loaded_batches = [[] for _ in range(num_buffers)] self.dist_class = action_distribution_class self.action_sampler_fn = action_sampler_fn self.action_distribution_fn = action_distribution_fn self.distributed_world_size = None self.max_seq_len = max_seq_len self.batch_divisibility_req = get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else get_batch_divisibility_req or 1 # okay decompiling 0_fix.py ``` Notice that there are no implicit errors and only consists of the fix in line 32. ## Step 6. Finally, do the follows. Take the decompiled out from Step 5. ```python= # uncompyle6 version 3.7.4 # Python bytecode 3.7 (3394) # Decompiled from: Python 3.7.11 (default, Jun 29 2021, 20:31:06) # [GCC 8.3.0] # Embedded file name: 0_fix.py # Compiled at: 2022-08-08 20:19:43 # Size of source mod 2**32: 8330 bytes def __init__(self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2]=None, loss: Optional[Callable[([Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[(TensorType, List[TensorType])])]]=None, action_distribution_class: Optional[Type[TorchDistributionWrapper]]=None, action_sampler_fn: Optional[Callable[([TensorType, List[TensorType]], Union[(Tuple[(TensorType, TensorType, List[TensorType])], Tuple[(TensorType, TensorType, TensorType, List[TensorType])])])]]=None, action_distribution_fn: Optional[Callable[([Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[(TensorType, Type[TorchDistributionWrapper], List[TensorType])])]]=None, max_seq_len: int=20, get_batch_divisibility_req: Optional[Callable[([Policy], int)]]=None): self.framework = config['framework'] = 'torch' super().__init__(observation_space, action_space, config) if model is None: dist_class, logit_dim = ModelCatalog.get_action_dist(action_space, (self.config['model']), framework=(self.framework)) model = ModelCatalog.get_model_v2(obs_space=(self.observation_space), action_space=(self.action_space), num_outputs=logit_dim, model_config=(self.config['model']), framework=(self.framework)) if action_distribution_class is None: action_distribution_class = dist_class worker_idx = self.config.get('worker_index', 0) if not config['_fake_gpus']: if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE: num_gpus = 0 elif worker_idx == 0: num_gpus = config['num_gpus'] else: num_gpus = config['num_gpus_per_worker'] gpu_ids = list(range(torch.cuda.device_count())) FET_cond = config['_fake_gpus'] or num_gpus == 0 or not gpu_ids if FET_cond: logger.info('TorchPolicy (worker={}) running on {}.'.format(worker_idx if worker_idx > 0 else 'local', '{} fake-GPUs'.format(num_gpus) if config['_fake_gpus'] else 'CPU')) self.device = torch.device('cpu') self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)] self.model_gpu_towers = [model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1)] self.model = model else: logger.info('TorchPolicy (worker={}) running on {} GPU(s).'.format(worker_idx if worker_idx > 0 else 'local', num_gpus)) if ray._private.worker._mode() == ray._private.worker.WORKER_MODE: gpu_ids = ray.get_gpu_ids() if len(gpu_ids) < num_gpus: raise ValueError(f"TorchPolicy was not able to find enough GPU IDs! Found {gpu_ids}, but num_gpus={num_gpus}.") self.devices = [torch.device('cuda:{}'.format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.device = self.devices[0] ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.model_gpu_towers = [] for i, _ in enumerate(ids): model_copy = copy.deepcopy(model) self.model_gpu_towers.append(model_copy.to(self.devices[i])) self.model = self.model_gpu_towers[0] self._lock = threading.RLock() self._state_inputs = self.model.get_initial_state() self._is_recurrent = len(self._state_inputs) > 0 self._update_model_view_requirements_from_init_state() self.view_requirements.update(self.model.view_requirements) self.exploration = self._create_exploration() self.unwrapped_model = model if loss is not None: self._loss = loss self._optimizers = force_list(self.optimizer()) self.multi_gpu_param_groups = [] main_params = {p:i for i, p in enumerate(self.model.parameters())} for o in self._optimizers: param_indices = [] for pg_idx, pg in enumerate(o.param_groups): for p in pg['params']: param_indices.append(main_params[p]) self.multi_gpu_param_groups.append(set(param_indices)) num_buffers = self.config.get('num_multi_gpu_tower_stacks', 1) self._loaded_batches = [[] for _ in range(num_buffers)] self.dist_class = action_distribution_class self.action_sampler_fn = action_sampler_fn self.action_distribution_fn = action_distribution_fn self.distributed_world_size = None self.max_seq_len = max_seq_len self.batch_divisibility_req = get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else get_batch_divisibility_req or 1 # okay decompiling 0_fix.py ``` Then remove only the fix where `FET_cond` is and preserve the remaining file as follows. ```python= def __init__( self, observation_space: gym.spaces.Space, action_space: gym.spaces.Space, config: AlgorithmConfigDict, *, model: Optional[TorchModelV2] = None, loss: Optional[ Callable[ [Policy, ModelV2, Type[TorchDistributionWrapper], SampleBatch], Union[TensorType, List[TensorType]], ] ] = None, action_distribution_class: Optional[Type[TorchDistributionWrapper]] = None, action_sampler_fn: Optional[ Callable[ [TensorType, List[TensorType]], Union[ Tuple[TensorType, TensorType, List[TensorType]], Tuple[TensorType, TensorType, TensorType, List[TensorType]], ], ] ] = None, action_distribution_fn: Optional[ Callable[ [Policy, ModelV2, TensorType, TensorType, TensorType], Tuple[TensorType, Type[TorchDistributionWrapper], List[TensorType]], ] ] = None, max_seq_len: int = 20, get_batch_divisibility_req: Optional[Callable[[Policy], int]] = None, ): self.framework = config["framework"] = "torch" super().__init__(observation_space, action_space, config) # Create multi-GPU model towers, if necessary. # - The central main model will be stored under self.model, residing # on self.device (normally, a CPU). # - Each GPU will have a copy of that model under # self.model_gpu_towers, matching the devices in self.devices. # - Parallelization is done by splitting the train batch and passing # it through the model copies in parallel, then averaging over the # resulting gradients, applying these averages on the main model and # updating all towers' weights from the main model. # - In case of just one device (1 (fake or real) GPU or 1 CPU), no # parallelization will be done. # If no Model is provided, build a default one here. if model is None: dist_class, logit_dim = ModelCatalog.get_action_dist( action_space, self.config["model"], framework=self.framework ) model = ModelCatalog.get_model_v2( obs_space=self.observation_space, action_space=self.action_space, num_outputs=logit_dim, model_config=self.config["model"], framework=self.framework, ) if action_distribution_class is None: action_distribution_class = dist_class # Get devices to build the graph on. worker_idx = self.config.get("worker_index", 0) if ( not config["_fake_gpus"] and ray._private.worker._mode() == ray._private.worker.LOCAL_MODE ): num_gpus = 0 if worker_idx == 0: num_gpus = config["num_gpus"] else: num_gpus = config["num_gpus_per_worker"] gpu_ids = list(range(torch.cuda.device_count())) # Place on one or more CPU(s) when either: # - Fake GPU mode. # - num_gpus=0 (either set by user or we are in local_mode=True). # - No GPUs available. if config["_fake_gpus"] or num_gpus == 0 or not gpu_ids: logger.info( "TorchPolicy (worker={}) running on {}.".format( worker_idx if worker_idx > 0 else "local", "{} fake-GPUs".format(num_gpus) if config["_fake_gpus"] else "CPU", ) ) self.device = torch.device("cpu") self.devices = [self.device for _ in range(int(math.ceil(num_gpus)) or 1)] self.model_gpu_towers = [ model if i == 0 else copy.deepcopy(model) for i in range(int(math.ceil(num_gpus)) or 1) ] # if hasattr(self, "target_model"): # self.target_models = { # m: self.target_model for m in self.model_gpu_towers # } self.model = model # Place on one or more actual GPU(s), when: # - num_gpus > 0 (set by user) AND # - local_mode=False AND # - actual GPUs available AND # - non-fake GPU mode. else: logger.info( "TorchPolicy (worker={}) running on {} GPU(s).".format( worker_idx if worker_idx > 0 else "local", num_gpus ) ) # We are a remote worker (WORKER_MODE=1): # GPUs should be assigned to us by ray. if ray._private.worker._mode() == ray._private.worker.WORKER_MODE: gpu_ids = ray.get_gpu_ids() if len(gpu_ids) < num_gpus: raise ValueError( "TorchPolicy was not able to find enough GPU IDs! Found " f"{gpu_ids}, but num_gpus={num_gpus}." ) self.devices = [ torch.device("cuda:{}".format(i)) for i, id_ in enumerate(gpu_ids) if i < num_gpus ] self.device = self.devices[0] ids = [id_ for i, id_ in enumerate(gpu_ids) if i < num_gpus] self.model_gpu_towers = [] for i, _ in enumerate(ids): model_copy = copy.deepcopy(model) self.model_gpu_towers.append(model_copy.to(self.devices[i])) # if hasattr(self, "target_model"): # self.target_models = { # m: copy.deepcopy(self.target_model).to(self.devices[i]) # for i, m in enumerate(self.model_gpu_towers) # } self.model = self.model_gpu_towers[0] # Lock used for locking some methods on the object-level. # This prevents possible race conditions when calling the model # first, then its value function (e.g. in a loss function), in # between of which another model call is made (e.g. to compute an # action). self._lock = threading.RLock() self._state_inputs = self.model.get_initial_state() self._is_recurrent = len(self._state_inputs) > 0 # Auto-update model's inference view requirements, if recurrent. self._update_model_view_requirements_from_init_state() # Combine view_requirements for Model and Policy. self.view_requirements.update(self.model.view_requirements) self.exploration = self._create_exploration() self.unwrapped_model = model # used to support DistributedDataParallel # To ensure backward compatibility: # Old way: If `loss` provided here, use as-is (as a function). if loss is not None: self._loss = loss self._optimizers = force_list(self.optimizer()) # Store, which params (by index within the model's list of # parameters) should be updated per optimizer. # Maps optimizer idx to set or param indices. self.multi_gpu_param_groups: List[Set[int]] = [] main_params = {p: i for i, p in enumerate(self.model.parameters())} for o in self._optimizers: param_indices = [] for pg_idx, pg in enumerate(o.param_groups): for p in pg["params"]: param_indices.append(main_params[p]) self.multi_gpu_param_groups.append(set(param_indices)) # Create n sample-batch buffers (num_multi_gpu_tower_stacks), each # one with m towers (num_gpus). num_buffers = self.config.get("num_multi_gpu_tower_stacks", 1) self._loaded_batches = [[] for _ in range(num_buffers)] self.dist_class = action_distribution_class self.action_sampler_fn = action_sampler_fn self.action_distribution_fn = action_distribution_fn # If set, means we are using distributed allreduce during learning. self.distributed_world_size = None self.max_seq_len = max_seq_len self.batch_divisibility_req = ( get_batch_divisibility_req(self) if callable(get_batch_divisibility_req) else (get_batch_divisibility_req or 1) ) ``` Save both of them as: `<original_name>.py` and `<original_name>_R1_fix_and_decompiled.py` and create a txt file (`<original_name>_R1_conf.txt`) with follows: ``` Decompiler: Uncompyle6 3.7.5 Python version: 3.7.5 ``` ## Step 7. Repeat the process for R2~R16 for remaining files for **A_i** and do this for 10 Applications. # Tips. - Go rule by rule i.e., start with R1, then R2 and so on. - If you find R1 fails in U6 and python3.7. Preserve this environment and use it for other applications. - If you don't find an error, inject it yourself. Meaning just instrument the failure code in the function. - Make sure there is only one error that we fix i.e., has no implicit errors or any new errors. - If you find new errors, just delete the code corresponding to the new error in original file but try to preserve as much content as possible. - Good luck! --- # R3