# Code Repo: Exploring Explainable Multilabel Neural Network as a Psychometric Tool for Short-Form Test Scoring and Diagnostics: A Study on College Major Preference Assessment :::info :bulb: Hi, You've reached an `anonymous author's <blocked for now, for a peer-review>` `Python` and `R` code for his disseration experiments. Since the data is private to `iKoda Research (2018)`, the development team for the `College Major Preference Assessment (CMPA)`, the data actually used to train the neural networs cannot be provided here. However, the analysis code is ready for your reference. ::: # The Organization of this Document This document assumes that you have read or are reading Shun-Fu's dissertation. I show the code and settings I used to perform certain operations in [Chapter 3](#Operations-in-Chapter-3) and [Chapter 4](#Operations-in-Chapter-4). Since these operations require specific `Classes` and `Functions` defined in the files shown in [Overview of Code](#Overview-of-Code), I show the [source code file by file, ordered alphabetically](#Source-Code). It's advisable to use the navigation bar or go to [Overview of Code](#Overview-of-Code) every so often to find the relevant files instead of going through them one by one. # Figures in Chapter 4 (right click and save the files to see high resolution figures) ![](https://i.imgur.com/jpqd3n6.png) ![](https://i.imgur.com/vdccga1.png) ![](https://i.imgur.com/wDD0sNf.jpg) ![](https://i.imgur.com/4iCwXl3.jpg) # Overview of Code The code was [keras](https://keras.io/) for machine learning wrapped in self-defined classes. The following shows files and classes and what they do. Files that are written in `Python` have the extension `.py`, and `R`, `.R`. This document reached the limit of the max number of characters in hack.md, so [strategies.py](/gmIJL8GcS_atekP5l-1iYA), [tuning.py](/gmIJL8GcS_atekP5l-1iYA) and [utils.py](/gmIJL8GcS_atekP5l-1iYA), marked with :arrow_forward:, are stored in a [separate document](/gmIJL8GcS_atekP5l-1iYA). |File|Description|Classes Defined There| |---|---|---| |[analyses.py](#analyses.py)|Backend calculations for simple-sum and multilabel MNN with k-fold corss validation.|`MultilabelKFold`, `PerturbedMultilabelKFold` |[data.py](#data.py)|Defined classes to easily read CMPA data and preprocess it with.| `DataGenerator`, `FullData` |[enums.py](#enums.py)|For enums, which are close-category options convenient for programming. Not important conceptually.| `StrategyOptions`, `MajorNumberOptions`, `MetricsOptions`, `AdvancedMetricsOptions`, `StanadardizationModeOptions`, `ItemTypeOptions`, `EvaluatorTypeOptions` |[evaluatators.py](#evaluators.py)|Calculating performance metrics.| `EvaluatorFactory`, `Evaluator`, `KerasDefaultEvaluator`, `BinaryEvaluator`, `BinaryEvaluatorWithoutPunishment` |[gene_definition.py](#gene_definition.py)|Generating gene definitions for hyperparameter tuning.|`GeneDefinitionGenerator`| |[models.py](#models.py)|Wrapping keras classes under a more convenient model class.|`MultilabelModel` |[perturbation.py](#perturbation.py)|To run perturbation. The real backend analysis is defined in [analyses.py](#analyses.py)|None |[perturbation_pratts.R](#perturbation_pratts.R)|Drawing the graphs in Chapter 4.|None |[reports_mkfold.py](#reports_mkfold.py)|Generating reports of the results in human readable tables.|None |[settings.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |All the settings are here.|`Settings`, `TuningSettings` |[strategies.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |Wrapping everything in here for a basic run with an MNN or baseline.| `RunStrategy`, `StrategyFactory`, `TestStrategy`, `MkFoldBaseRun`, `PerturbationRun`, `RawAggregate` |[tuning.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |For genetic hyperparameter tuning.|`Genes`, `Species`, `DefaultNaturalSelection` |[utils.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |Auxiliary functions and decorators, such as functions that deal with getting the names of the 50 majors in CMPA.|`Decorators`, `ItemCheck`, `Majors`, `ItemSelector` # Operations in Chapter 3 ## Baseline (Simple-Sum Method) The immediately relevant file is [strategies.py](/gmIJL8GcS_atekP5l-1iYA). ```python= from enums import StrategyOptions from strategies import StrategyFactory baseline = StrategyFactory.get_strategy(StrategyOptions.BASIC_KFOLD_MNN) baseline.run() # the results are out as a pd.DataFrame object. That's it! ``` ## Hyperparameter Tuning ### Step 1. Generating Gene Definitions This generates the ranges of the hyperparameters (`# of nodes`, `layers each node`, the min, max, mean and standard deviation of the `alpha weights`.) The parameter `br` stands for the `base rate` of a major, which is `the proportion of respondents choosing the major as their top 3 in CMPA`. Information about the base rate of the majors are provided in the `Appendix A` of the dissertation. The most relevant files are [gene_definition.py](#gene_definition.py) and [tuning.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: ). ```python= # for MNN targeting accuracy def accuracy(br:float): min_ = .9 max_ = 4. mean_ = (1 - br)/br/10 + .9 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] # for MNN targeting adjusted recall def recall_gain(br:float): min_ = .1 max_ = 9999. mean_ = (1 - br)*11 + 3 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] # for MNN targeting adjusted precision def precision_gain(br:float): min_ = .1 max_ = 9999. mean_ = br + .1 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] func_dict = { 'accuracy': accuracy, 'recall_gain': recall_gain, 'precision_gain': precision_gain } # tuning setting files. if __name__ == "__main__": gd = GeneDefinitionGenerator() gd.save_gene_definition_to_json( func=func_dict.get('accuracy'), tuning_settings_path="genetic_tuning_202201_accuracy_crazy_ver_settings.json" ) gd.save_gene_definition_to_json(func=func_dict.get('recall_gain'), tuning_settings_path="genetic_tuning_202201_recall_gain_crazy_ver_settings.json") gd.save_gene_definition_to_json(func=func_dict.get('precision_gain'), tuning_settings_path="genetic_tuning_202201_precision_gain_crazy_ver_settings.json") ``` ### Step 2. The Actual Hyperparameter Tuning Once the appropriate Conda evironment as defined in the dissertation's `Appendix C` is correctly activated, the following code is put in command-line prompts. The most relevant files are [tuning.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: and [settings.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: . ```python # For the MNN targeting accuracy: python tuning.py genetic_tuning_202201_accuracy_crazy_ver.xlsx genetic_tuning_202201_accuracy_crazy_ver_settings.json # For the MNN targeting adjusted recall: python tuning.py genetic_tuning_202201_recall_gain_crazy_ver.xlsx genetic_tuning_202201_recall_gain_crazy_ver_settings.json # For the MNN targeting adjusted precision: python tuning.py genetic_tuning_202201_precision_gain_crazy_ver.xlsx genetic_tuning_202201_precision_gain_crazy_ver_settings.json ``` The code saves the resultant hyperparameters of the top 30 MNNs at the respective goals of `accuracy`, `adjusted recall` and `adjusted precision` in the respective files `genetic_tuning_202201_accuracy_crazy_ver.xlsx`, `genetic_tuning_202201_recall_gain_crazy_ver.xlsx`, `genetic_tuning_202201_precision_gain_crazy_ver.xlsx`. These excel files are basically the 30-organism gene pools for each target. Then, I manually created an excel file named `best_genetic_crazy_ver_results_202201.xlsx`, where the top 1 of all the ### Step 3. Printing out The Report The most relevant file is [reports_mkfold.py](#reports_mkfold.py). ```python import pandas as pd from tuning import DefaultNaturalSelection from utils import Majors from strategies import StrategyFactory from enums import StrategyOptions # deserializing gene pool excel files ns = DefaultNaturalSelection.deserialize( serialization_path="best_genetic_crazy_ver_results_202201.xlsx" ) # the manually created file containing 3 columns: The first is the best-performing MNN for recall-gain; the second is the best performing for precision-gain, and the third column is for the best accuracy. ns.fit_all(forced=True) # run no matter what best_recall_gain = ns.gene_pool[0] best_precision_gain = ns.gene_pool[1] best_accuracy = ns.gene_pool[2] baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run() # result tables # Create a Pandas Excel writer using XlsxWriter as the engine. writer = pd.ExcelWriter('best_genetic_crazy_ver_results_202201_reported.xlsx') # Write each dataframe to a different worksheet. best_recall_gain_df = best_recall_gain.performance best_precision_gain_df = best_precision_gain.performance best_accuracy_df = best_accuracy.performance best_recall_gain_df.index = Majors.get_major_fullnames(best_recall_gain_df.index ) best_precision_gain_df.index = Majors.get_major_fullnames(best_precision_gain_df.index) best_accuracy_df.index = Majors.get_major_fullnames(best_accuracy_df.index) baseline.index = Majors.get_major_fullnames(baseline.index) best_recall_gain_df.to_excel(writer, sheet_name="best_recall") best_precision_gain_df.to_excel(writer, sheet_name="best_precision") best_accuracy_df.to_excel(writer, sheet_name="best_accuracy") baseline.to_excel(writer, sheet_name="baseline") # Close the Pandas Excel writer and output the Excel file. writer.save() writer.close() ``` # Operations in Chapter 4 ## Perturbation ```python= # perturbation for genetically tuned results from enums import StrategyOptions from strategies import StrategyFactory from tuning import DefaultNaturalSelection from settings import TuningSettings, Settings import sys if __name__ == "__main__": ns = DefaultNaturalSelection.deserialize( serialization_path="best_genetic_crazy_ver_results_202201.xlsx" ) # ns.fit_all(forced=True) # run no matter what print("running best_recall_gain") best_recall_gain = ns.gene_pool[0] best_recall_gain.str_op = StrategyOptions.PERTURBED_KFOLD_MNN best_recall_gain.run(forced=True) print("all done!") # best_precision_gain = ns.gene_pool[1] # best_accuracy = ns.gene_pool[2] # baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run() ``` ## Drawing Plots See [perturbation_pratts.R](#perturbation_pratts.R). # Source Code ## [analyses.py](#Overview-of-Code) ```python= from typing import List, final from matplotlib.pyplot import cool import tensorflow as tf import numpy as np import pandas as pd from models import MultilabelModel from data import DataGenerator, FullData from enums import StanadardizationModeOptions from utils import Decorators, Majors from settings import Settings class MultilabelKFold: # refactoring: all model related stuff should be moved to a model def __init__(self, full_data: FullData, model:MultilabelModel): self.full_data = full_data self.model = model def check_gpu_status(self): print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU'))) print("GPU device name --", tf.config.list_physical_devices('GPU')) #return tf.config.list_physical_devices('GPU')[0] @Decorators.time_it("pd.DataFrame Division??!") def divide_df_by_k_fold(self, test_results:pd.DataFrame, k_fold:int): return test_results / k_fold @Decorators.time_it("Analysis.py MKFold's get_evaluation_result_df()") def get_evaluation_result_df(self, model_evaluation_result:dict): return pd.DataFrame(model_evaluation_result) @Decorators.time_it("Analysis.py MKFold's run()") def run(self, k_fold=Settings.K_FOLD) -> pd.DataFrame: # just checking if GPU is used self.check_gpu_status() X_y = self.full_data.shuffle().standardize_x(StanadardizationModeOptions.BY_COL) # doing k_fold_validation nrow, ncol = X_y.shape[0], X_y.shape[1] one_fold = nrow // (k_fold + 1) # reserving one fold for test data # validation = pd.Series(dtype=np.float32) # evaluation = pd.Series(dtype=np.float32) # note class_weight doesn't work as a fit_param for the multilabel case. So, we need to go into loss function in model.compile() if k_fold > 1: for i in range(k_fold): print("Fold " + str(i + 1) + " of " + str(k_fold)) # split X, y val_rows = list(range(one_fold * i, one_fold * (i + 1))) train_rows = [i for i in range(one_fold*k_fold) if i not in val_rows] test_rows = range(one_fold*k_fold, nrow) train_and_val = X_y.train_and_val_split(train_rows, val_rows) test = X_y.test_split(test_rows) if i == 0: # fitting self.model.fit_model(**train_and_val) test_results = self.get_evaluation_result_df(self.model.evaluate_model(**test))# pd.DataFrame(self.model.evaluate_model(**test)) else: self.model.fit_model(**train_and_val) test_results += self.get_evaluation_result_df(self.model.evaluate_model(**test)) test_results = self.divide_df_by_k_fold(test_results, k_fold) #unsupported operation None, int??? else: print("K fold validation disabled. Using default validation split 7000:1000:1300") train_rows = range(0, Settings.DEFAULT_TRAIN_SPLIT_END) val_rows = range(Settings.DEFAULT_TRAIN_SPLIT_END, Settings.DEFAULT_VAL_SPLIT_END) test_rows = range(Settings.DEFAULT_VAL_SPLIT_END, ncol) train_and_val = X_y.train_and_val_split(train_rows, val_rows) test = X_y.test_split(test_rows) self.model.fit_model(**train_and_val) test_results = pd.DataFrame(self.model.evaluate_model(**test)) return test_results class PerturbedMultilabelKFold: def __init__( self, full_data:FullData, model:MultilabelModel, subset=Settings.SUBSET_SIZE, serialization_path=Settings.PERTURBATION_SERIALIZATION_PATH, serialization_frequency=Settings.PERTURBATION_SERIALIZATION_FREQUENCY ): self.full_data = full_data self.model = model self.subset = subset self.serialization_path = serialization_path self.serialization_frequency = serialization_frequency # dynamically generated self.mkfold = MultilabelKFold(full_data, model) self.result_dict = { "selected_cols": [], "mkfold_results": [] } @classmethod def from_mkfold(cls, mkfold:MultilabelKFold, subset=Settings.SUBSET_SIZE, serialization_path=Settings.PERTURBATION_SERIALIZATION_PATH, serialization_frequency=Settings.PERTURBATION_SERIALIZATION_FREQUENCY): full_data = mkfold.full_data model = mkfold.model return cls(full_data, model, subset, serialization_path, serialization_frequency) def draw_random_subset(self, subset=Settings.SUBSET_SIZE) -> List[int]: ncol = self.mkfold.full_data.X.shape[1] selected_items = np.random.choice(range(ncol), subset, replace=False) selected = [i in selected_items for i in range(ncol)] return selected @Decorators.time_it("perturbed multillabel k-fold single_run()") def single_run(self, k_fold=Settings.K_FOLD): selected_cols = self.draw_random_subset(self.subset) subset_data = self.full_data.X.loc[:,selected_cols] subset_full_data = FullData.from_data_generator( data_generator=self.full_data, X=subset_data, y=self.full_data.y) subset_model = MultilabelModel.from_full_data( full_data=subset_full_data, n_nodes=self.model.n_nodes, metrics=self.model.metrics, epochs=self.model.epochs, early_stopping=self.model.early_stopping, evaluator_type=self.model.evaluator_type, evaluator_threshold=self.model.evaluator_threshold ) subset_mkfold = MultilabelKFold(subset_full_data, subset_model) results = subset_mkfold.run(k_fold) return selected_cols, results def cast_selected_cols(self, selected_cols:List[bool]) -> pd.Series: item_names = self.full_data.get_likert_only().columns.to_series() selected_dummies = pd.Series(selected_cols, index=item_names) return selected_dummies.astype(np.float32) def cast_mkfold_result(self, mkfold_result:pd.DataFrame) -> pd.Series: # a single mkfold result result_dict = {} for col in mkfold_result.columns: row_indexes = mkfold_result.loc[:, col].index for row in row_indexes: value = mkfold_result.loc[row, col] result_dict[row +"_" +col] = value return pd.Series(result_dict) def cast_results(self) -> pd.DataFrame: selected_cols = self.result_dict["selected_cols"] mkfold_results = self.result_dict["mkfold_results"] final_result = pd.DataFrame() for idx, (cols, results) in enumerate(zip(selected_cols, mkfold_results)): single_perturbation_result = self.cast_selected_cols(cols) single_perturbation_result = single_perturbation_result.append(self.cast_mkfold_result(results)) final_result[idx] = single_perturbation_result return final_result def serialize(self) -> None: try: to_serialize = self.cast_results() to_serialize.to_excel(self.serialization_path) print(f"perturbation results of shape {to_serialize.shape} saved to" + self.serialization_path) except Exception as e: print("serialization failed") print(e) @Decorators.time_it() def run(self, k_fold=Settings.K_FOLD, iter=Settings.PERTURBATION_ITER) -> pd.DataFrame: for i in range(iter): print("perturbation iteration id: " + str(i)) selected_cols, results = self.single_run(k_fold) try: self.result_dict["selected_cols"].append(selected_cols) self.result_dict["mkfold_results"].append(results) except: print("something went wrong, but whatever...") if i % self.serialization_frequency == 0: self.serialize() self.serialize() return self.cast_results() ``` ## [data.py](#Overview-of-Code) ```python= import pandas as pd import numpy as np from enums import ItemTypeOptions, MajorNumberOptions, StanadardizationModeOptions from settings import GLOBAL_CLEANED_DATA, GLOBAL_LIKERT_DATA, GLOBAL_MAJOR_IS_TOP_N_DF, GLOBAL_MAJOR_SCORES_DF, Settings, GLOBAL_RAW_DATA from utils import Decorators, ItemCheck, ItemSelector, Majors from typing import List, Set # has content # has the part it is supposed to fit in # has methods to split itself class DataGenerator: def __init__(self, path=Settings.DATAPATH, threshold=Settings.CLEANTHRESHOLD, scored_item_type=Settings.SCORED_ITEMTYPE, top_n=Settings.TOP_N, num_of_majors=Settings.NUMBER_OF_MAJORS, class_weight_function=Settings.CLASSWEIGHT ) -> None: self.path = path self.threshold = threshold self.scored_item_type = scored_item_type self.top_n = top_n self.num_of_majors = num_of_majors self.class_weight_function=class_weight_function @Decorators.time_it("DataGenerator.get raw data()") def get_raw_data(self) -> pd.DataFrame: global_raw_data = GLOBAL_RAW_DATA.get(self.path) if global_raw_data is not None: return global_raw_data GLOBAL_RAW_DATA[self.path] = pd.read_csv(self.path, dtype={ 'age': str, 'certificate': str, 'education': str, 'school': str, 'gender': str }) return GLOBAL_RAW_DATA.get(self.path) # here it might already be wrong. @Decorators.time_it("DataGenerator.get_cleaned_data") def get_cleaned_data(self) -> pd.DataFrame: global_cleaned_data = GLOBAL_CLEANED_DATA.get( (self.path, self.threshold) ) if global_cleaned_data is not None: return global_cleaned_data major_all = self.get_raw_data() threshold = self.threshold likert_items = ItemSelector(major_all).get_selected_cols_by_item_type(scored_item_type=ItemTypeOptions.LIKRET) # if missing data exceeds a proportion, the row is deleted ncol = major_all[likert_items].shape[1] passed = major_all[likert_items].isna().sum(axis=1) <= ncol*threshold # also has to not miss any demographics # somehow missing cases have been coded as "null_unknonw" (string, not NA) passed = passed & (major_all[['age', 'certificate', 'education', 'school', 'gender']] == 'null_unknown').sum(axis=1) == 0 cleaned_df = major_all.loc[passed, :].fillna(0.).reset_index(drop=True) GLOBAL_CLEANED_DATA[ (self.path, self.threshold)] = cleaned_df return cleaned_df @Decorators.time_it("DataGenerator.get_likert_only") def get_likert_only(self) -> pd.DataFrame: global_likert_data = GLOBAL_LIKERT_DATA.get( (self.path, self.threshold) ) if global_likert_data is not None: return global_likert_data cleaned_data = self.get_cleaned_data() likert_cols = ItemSelector(cleaned_data).get_selected_cols_by_item_type(scored_item_type=ItemTypeOptions.LIKRET) likert_data = cleaned_data.loc[:, likert_cols] GLOBAL_LIKERT_DATA[(self.path, self.threshold)] = likert_data return likert_data @Decorators.time_it("DataGenerator.merge_major_scores") def merge_major_scores(self, major_scores_df, use_max) -> pd.DataFrame: for general_major, specific_majors_list in Majors.MAPPING.items(): if use_max: major_scores_df[general_major] = major_scores_df[specific_majors_list].max(axis=1) # using max? else: major_scores_df[general_major] = major_scores_df[specific_majors_list].mean(axis=1) major_scores_df = major_scores_df.drop(specific_majors_list ,axis=1) return major_scores_df @Decorators.time_it("DataGenerator.get_major_scores_df") def get_major_scores_df(self) -> pd.DataFrame: global_major_scores_df = GLOBAL_MAJOR_SCORES_DF.get((self.path, self.threshold, self.scored_item_type, self.num_of_majors)) if global_major_scores_df is not None: return global_major_scores_df d = self.get_cleaned_data() # get a list of unique majors all_majors = Majors.get_corresponding_majors(self.scored_item_type) # getting a sum of scores for each major major_scores_df = pd.DataFrame() check_function = ItemCheck.get_scored_item_type_check(self.scored_item_type, per_major=True) for major in all_majors: # oops forgot to enforce forced choice cols_for_the_major = [ col for col in d.columns if check_function(major, col)] major_frame = d.loc[:,cols_for_the_major] major_score = major_frame.sum(axis=1) major_scores_df[major] = major_score # explode generic majors to specific majors # if self.num_of_majors == MajorNumberOptions.FIFTY: if self.scored_item_type == ItemTypeOptions.LIKRET: if major in Majors.MAPPING.keys(): major_scores_df.drop(major,axis=1, inplace=True) for specific_major in Majors.MAPPING.get(major): major_scores_df[specific_major] = major_score if self.scored_item_type == ItemTypeOptions.FORCED_CHOICE: if self.num_of_majors == MajorNumberOptions.THIRTY_THREE_USE_MAX: return self.merge_major_scores(major_scores_df, use_max=True) elif self.num_of_majors == MajorNumberOptions.THIRTY_THREE_USE_MEAN: return self.merge_major_scores(major_scores_df, use_max=False) else: pass # self.number_of_majors == MajorNumberOptions.FIFTY GLOBAL_MAJOR_SCORES_DF[(self.path, self.threshold, self.scored_item_type, self.num_of_majors)] = major_scores_df return major_scores_df @Decorators.time_it("DataGenerator.get_major_is_top_n_df") def get_major_is_top_n_df(self) -> pd.DataFrame: global_major_is_top_n_df = GLOBAL_MAJOR_IS_TOP_N_DF.get( (self.path, self.threshold, self.scored_item_type, self.num_of_majors, self.top_n) ) if global_major_is_top_n_df is not None: return global_major_is_top_n_df top_n = self.top_n major_scores_df = self.get_major_scores_df() major_is_top_n_df = major_scores_df.apply(lambda x: x.nlargest(top_n, keep='all'), axis = 1).apply(lambda x: x > 0.).astype(np.float32) GLOBAL_MAJOR_IS_TOP_N_DF[ (self.path, self.threshold, self.scored_item_type, self.num_of_majors, self.top_n) ] = major_is_top_n_df return major_is_top_n_df @Decorators.time_it("DataGenerator.get_weighted_major_is_top_n_df") def get_weighted_major_is_top_n_df(self) -> pd.DataFrame: major_is_top_n_df = self.get_major_is_top_n_df() class_weight_function = self.class_weight_function if class_weight_function is None: return major_is_top_n_df #e.g. weights = ((1-major_is_top_n_df.mean())**2*1.5+0.5) weighted_major_is_top_n_df = class_weight_function(major_is_top_n_df) return weighted_major_is_top_n_df class FullData(DataGenerator): ''' example use: dg = DataGenerator(top_n=1) X = dg.get_likert_only() y = dg.get_major_is_top_n_df() full_data = FullData(X, y) FullData(likert, major_is_top_n_df) ''' def __init__(self, path=Settings.DATAPATH, threshold=Settings.CLEANTHRESHOLD, scored_item_type=Settings.SCORED_ITEMTYPE, top_n=Settings.TOP_N, num_of_majors=Settings.NUMBER_OF_MAJORS, class_weight_function=Settings.CLASSWEIGHT, X:pd.DataFrame=None, y:pd.DataFrame=None, # unweighted_y:pd.DataFrame=None ): super().__init__(path=path, threshold=threshold, scored_item_type=scored_item_type, top_n=top_n, num_of_majors=num_of_majors, class_weight_function=class_weight_function) if X is None: X = super().get_likert_only() if y is None: y = super().get_weighted_major_is_top_n_df() #if unweighted_y is None: # unweighted_y = super().get_major_is_top_n_df() self._X = X self._y = y self._shape = (X.shape[0], X.shape[1] + y.shape[1]) # self.unweighted_y = unweighted_y @classmethod @Decorators.time_it("FullData.from_data_generator()") def from_data_generator(cls, data_generator:DataGenerator, X=None, y=None): # , unweighted_y=None): path = data_generator.path threshold = data_generator.threshold scored_item_type = data_generator.scored_item_type top_n = data_generator.top_n num_of_majors = data_generator.num_of_majors class_weight_function = data_generator.class_weight_function return cls(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function, X, y) #, unweighted_y) @property def shape(self) -> tuple: return self._shape @property def X(self) -> pd.DataFrame: return self._X @property def y(self) -> pd.DataFrame: return self._y def update_shape(self): self.shape = (self.X.shape[0], self.X.shape[1] + self.y.shape[1]) @shape.setter def shape(self, new_shape): self._shape = new_shape @X.setter def X(self, new_X): self._X = new_X self.update_shape() @y.setter def y(self, new_y): self._y = new_y self.update_shape() @Decorators.time_it("FullData.shuffle()") def shuffle(self) -> "FullData": nrows = self.shape[0] sampled_rows = pd.Series(range(nrows)).sample(frac=1.) self.X = self.X.iloc[sampled_rows, :].reset_index(drop=True) self.y = self.y.iloc[sampled_rows, :].reset_index(drop=True) # self.prepared_data_with_labels = self.prepared_data_with_labels.sample(frac=1.0).reset_index(drop=True) return self @Decorators.time_it("FullData.standardize_x()") def standardize_x(self, mode=Settings.STANDARDIZATION_MODE) -> "FullData": train_data_full = self.X if mode == StanadardizationModeOptions.BY_COL: train_data_full -= train_data_full.mean(axis=0) train_data_full /= train_data_full.std(axis=0) elif mode == StanadardizationModeOptions.BY_ROW: train_data_full -= train_data_full.mean(axis=1) train_data_full /= train_data_full.std(axis=1) elif mode == StanadardizationModeOptions.NONE: pass else: train_data_full = (train_data_full - train_data_full.mean())/train_data_full.std() self.X = train_data_full return self def train_and_val_split(self, train_rows, val_rows) -> dict: return { 'train_data': self.X.loc[train_rows, :].astype(np.float32).to_numpy(), 'train_labels': self.y.loc[train_rows, :].astype(np.float32).to_numpy(), 'val_data':self.X.loc[val_rows, :].astype(np.float32).to_numpy(), 'val_labels':self.y.loc[val_rows, :].astype(np.float32).to_numpy() } # might cause problems -> data type is np.ndarray, so might not have iloc def test_split(self, test_rows) -> dict: unweighted_y = self.y.loc[test_rows, :] > 0. # the answers need to be unweigthed labels! return { 'test_data': self.X.loc[test_rows, :].astype(np.float32).to_numpy(), 'test_labels': unweighted_y.astype(np.float32).to_numpy() } ``` ## [enums.py](#Overview-of-Code) ```python= import tensorflow as tf class StrategyOptions: BASIC_KFOLD_MNN = "__chapter3_default__" GENETIC = "__hyperparametertuning__" PERTURBED_KFOLD_MNN = "__chapter4_default__" BASELINE = "__raw_aggregates__" TEST = "__generate_random__" class MajorNumberOptions: THIRTY_THREE_USE_MAX = "__generic_use_max__" THIRTY_THREE_USE_MEAN = "__generic_use_mean" FIFTY = "__specific__" class MetricsOptions: ACCURACY = "accuracy" CHANCE_ACCURACY = "accuracy_coin" ACCURACY_GAIN = "accuracy_gain" F1 = "f1" CHANCE_F1 = "f1_coin" F1_GAIN = "f1_gain" PRECISION = "precision" CHANCE_PRECISION = "precision_coin" PRECISION_GAIN = "precision_gain" RECALL = "recall" CHANCE_RECALL = "recall_coin" RECALL_GAIN = "recall_gain" YES_SAYING_TENDENCY = "yes" REAL_YES = "true" ALL_OBSERVED = ["accuracy", "precision", "recall", "f1"] ALL_GAIN = ["accuracy_gain", "precision_gain", "recall_gain", "f1_gain"] ALL_CHANCE = ["accuracy_coin", "precision_coin", "recall_coin", "f1_coin"] ALL_OBSERVED_AND_GAIN = ["accuracy", "precision", "recall", "f1", "accuracy_gain", "precision_gain", "recall_gain", "f1_gain"] ALL = ["accuracy", "precision", "recall", "f1"] + ["accuracy_gain", "precision_gain", "recall_gain", "f1_gain"] + ["accuracy_coin", "precision_coin", "recall_coin", "f1_coin"] class AdvancedMetricsOptions: # used in get_included YES_SAYING = ["val_yes", "test_yes"] REAL_YES = ["val_true", "test_true"] ACCURACY_OBSERVED = ["val_acc", "test_acc"] ACCURACY_COIN = ["val_acc_coin", "test_acc_coin"] ACCURACY_GAIN = ["val_acc_gain", "test_acc_gain"] PRECISION_OBSERVED = ["val_precision", "test_precision"] PRECISION_COIN = ["val_true", "test_true"] PRECISION_GAIN = ["val_precision_gain", "test_precision_gain"] RECALL_OBSERVED = ["val_recall", "test_recall"] RECALL_COIN = ["val_yes", "test_yes"] RECALL_GAIN = ["val_recall_gain", "test_recall_gain"] F1_OBSERVED = ["val_f1", "test_f1"] F1_COIN = ["val_f1_coin", "test_f1_coin"] F1_GAIN = ["val_f1_gain", "test_f1_gain"] VAL_YES_SAYING = ["val_yes"] VAL_REAL_YES = ["val_true"] VAL_ACCURACY_OBSERVED = ["val_acc"] VAL_ACCURACY_COIN = ["val_acc_coin"] VAL_ACCURACY_GAIN = ["val_acc_gain"] VAL_PRECISION_OBSERVED = ["val_precision"] VAL_PRECISION_COIN = ["val_true"] VAL_PRECISION_GAIN = ["val_precision_gain"] VAL_RECALL_OBSERVED = ["val_recall"] VAL_RECALL_COIN = ["val_yes"] VAL_RECALL_GAIN = ["val_recall_gain"] VAL_F1_OBSERVED = ["val_f1"] VAL_F1_COIN = ["val_f1_coin"] VAL_F1_GAIN = ["val_f1_gain"] TEST_YES_SAYING = ["test_yes"] TEST_REAL_YES = ["test_true"] TEST_ACCURACY_OBSERVED = ["test_acc"] TEST_ACCURACY_COIN = ["test_acc_coin"] TEST_ACCURACY_GAIN = ["test_acc_gain"] TEST_PRECISION_OBSERVED = ["test_precision"] TEST_PRECISION_COIN = ["test_true"] TEST_PRECISION_GAIN = ["test_precision_gain"] TEST_RECALL_OBSERVED = ["test_recall"] TEST_RECALL_COIN = ["test_yes"] TEST_RECALL_GAIN = ["test_recall_gain"] TEST_F1_OBSERVED = ["test_f1"] TEST_F1_COIN = ["test_f1_coin"] TEST_F1_GAIN = ["test_f1_gain"] class StanadardizationModeOptions: BY_COL = "__by_col__" BY_ROW = "__by_row__" NONE = "__no_standardization__" class ItemTypeOptions: LIKRET = "__likert__" FORCED_CHOICE = "__ipsative__" class EvaluatorTypeOptions: KERAS_DEFAULT = "__keras_default__" BINARY = "__binary_evaluator__" BINARY_NO_PUNISHMENT = "__binary_evaluator_without_punishment__" ``` ## [evaluators.py](#Overview-of-Code) ```python= from typing import List, Set from settings import Settings import pandas as pd import numpy as np from abc import ABC, abstractmethod from enums import EvaluatorTypeOptions from utils import Decorators class EvaluatorFactory: @classmethod def get_evaluator(cls, evaluator_option=Settings.EVALUATOR, *args, **kwargs): if evaluator_option == EvaluatorTypeOptions.BINARY: return BinaryEvaluator(*args, **kwargs) elif evaluator_option == EvaluatorTypeOptions.KERAS_DEFAULT: return KerasDefaultEvaluator(*args, **kwargs) elif evaluator_option == EvaluatorTypeOptions.BINARY_NO_PUNISHMENT: return BinaryEvaluatorWithoutPunishment(*args, **kwargs) else: raise Exception("evaluator option must be one of [Binary, KerasDefault]") class Evaluator(ABC): @abstractmethod def evaluate(self): pass class KerasDefaultEvaluator(Evaluator): def __init__(self) -> None: raise Exception("Keras Default hasn't been worked out yet.") class BinaryEvaluator(Evaluator): def __init__(self, predicted:pd.DataFrame, answers:pd.DataFrame, binary_threshold=Settings.BINARY_THRESHOLD, metrics=Settings.METRICS): self.binary_threshold=Settings.BINARY_THRESHOLD self.predicted = (predicted > binary_threshold).astype(np.float32) self.answers = answers self.metrics = metrics @Decorators.time_it("BinaryEvaluator.evaluate() -- getting all metric properties.") def evaluate(self) -> dict: return {m: self.__getattribute__(m) for m in self.metrics} @property def answers(self) -> pd.DataFrame: return self._answers @answers.setter def answers(self, answers): self._answers = answers @property def predicted(self) -> pd.DataFrame: return self._predicted @predicted.setter def predicted(self, predicted): self._predicted = predicted @property def yes(self): if hasattr(self, "_yes"): return self._yes self._yes = self.predicted.mean() return self._yes @property def true(self): if hasattr(self, "_true"): return self._true self._true = self.answers.mean() return self._true @property def precision_coin(self): return self.true @property def recall_coin(self): return self.yes @property def accuracy_coin(self): if hasattr(self, "_accuracy_coin"): return self._accuracy_coin y = self.yes t = self.true acc_coin = y*t + (1-y)*(1-t) self._accuracy_coin = acc_coin return self._accuracy_coin @property def f1_coin(self): if hasattr(self, "_f1_coin"): return self._f1_coin y = self.yes t = self.true self._f1_coin = 2*y*t/(y + t) return self._f1_coin @property def accuracy(self): if hasattr(self, "_accuracy"): return self._accuracy answers = self.answers.replace(0, -1) predicted = self.predicted.replace(0, -1) results = (answers * predicted).replace(-1, 0) self._accuracy = results.mean().fillna(-1000.) return self._accuracy @property def recall(self): if hasattr(self, "_recall"): return self._recall answers = self.answers.replace(0, np.nan) predicted = self.predicted results = (answers * predicted).mean() results[results<=0] = -1000. # punishing small values self._recall = results.fillna(-1000.) # remove the fillna(0.) if you don't want to punish majors with recall nan return self._recall @property def precision(self): if hasattr(self, "_precision"): return self._precision answers = self.answers predicted = self.predicted.replace(0, np.nan) results = (answers * predicted).mean() results[results<=0] = -1000. self._precision = results.fillna(-1000.) # remove the fillna(0.) if you don't want to punish majors with precision nan return self._precision @property def f1(self): if hasattr(self, "_f1"): return self._f1 p = self.precision r = self.recall self._f1 = (2/(1/p + 1/r)).fillna(-1000.) #remove the fillna(0.) if you don't want to punish majors with f1 nan return self._f1 @property def precision_gain(self): if hasattr(self, "_precision_gain"): return self._precision_gain precision = self.precision coin = self.precision_coin self._precision_gain = self.gain(precision, coin) return self._precision_gain @property def recall_gain(self): if hasattr(self, "_recall_gain"): return self._recall_gain recall = self.recall coin = self.recall_coin self._recall_gain = self.gain(recall, coin) return self._recall_gain @property def f1_gain(self): if hasattr(self, "_f1_gain"): return self._f1_gain f1 = self.f1 coin = self.f1_coin self._f1_gain = self.gain(f1, coin) return self._f1_gain @property def accuracy_gain(self): if hasattr(self, "_accuracy_gain"): return self._accuracy_gain acc = self.accuracy coin = self.accuracy_coin self._accuracy_gain = self.gain(acc, coin) return self._accuracy_gain def gain(self, metric, coin): num = metric - coin # punishing anything below zero denom = 1 - coin results = num/denom results[results <= 0] = -1000. #punishing negative values return results class BinaryEvaluatorWithoutPunishment(BinaryEvaluator): # this class preserves the binary evaluator before 2022 May @property def recall(self): if hasattr(self, "_recall"): return self._recall answers = self.answers.replace(0, np.nan) predicted = self.predicted results = (answers * predicted).mean() self._recall = results return self._recall @property def precision(self): if hasattr(self, "_precision"): return self._precision answers = self.answers predicted = self.predicted.replace(0, np.nan) results = (answers * predicted).mean() self._precision = results return self._precision @property def gain(self, metric, coin): num = metric - coin # punishing anything below zero denom = 1 - coin return num/denom ``` ## [gene_definition.py](#Overview-of-Code) ```python= from settings import Settings, TuningSettings from data import DataGenerator import json import pandas as pd # baserate related gene definition generator class GeneDefinitionGenerator: def __init__(self, path=Settings.DATAPATH, threshold=Settings.CLEANTHRESHOLD, scored_item_type=Settings.SCORED_ITEMTYPE, num_of_majors:str=Settings.NUMBER_OF_MAJORS, top_n=Settings.TOP_N ) -> None: """ example use case: gd = GeneDefinitionGenerator(tuning_settings_path="genetic_tuning_202112_accuracy_crazy_ver_settings.json") gd.save_gene_definition_to_json(func=gd.example_func_for_accuracy) """ self.path = path self.threshold = threshold self.scored_item_type = scored_item_type self.num_of_majors = num_of_majors self.top_n = top_n # generated self.data_generator = DataGenerator(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function=None) self.major_is_top_n_df = self.data_generator.get_major_is_top_n_df() # (min, max, mean, mu) def generate(self, func): # pass_in_a_function_that is takes baserate and returns min, max, mean, mu df = self.major_is_top_n_df.mean().to_dict() return { major:func(br) for major, br in df.items() } def save_gene_definition_to_json(self, func, tuning_settings_path=TuningSettings.TUNING_SETTINGS_PATH): gene_definition = TuningSettings.GENE_DEFINITION gene_definition.update(self.generate(func)) with open(tuning_settings_path) as f: settings = json.load(f) settings['gene_definition'] = gene_definition with open(tuning_settings_path, "w") as f: json.dump(settings, f) print(settings) print("json file saved to ", tuning_settings_path) def precision_gain_202112(br:float): min_ = .1 max_ = 9999. mean_ = br + .1 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] def precision_gain_202205(br:float): min_ = .2 max_ = 4. mean_ = (1 - br**2)**.5 + .1 sigma_ = mean_ / 3.5 return [min_, max_, mean_, sigma_] def recall_gain_202201(br:float): min_ = .1 max_ = 9999. mean_ = (1 - br)*11 + 3 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] def recall_gain_202205(br:float): min_ = .1 max_ = 9999. mean_ = (1 - br)*5 + 2.5 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] def accuracy_202201(br:float): min_ = .9 max_ = 4. mean_ = (1 - br)/br/10 + .9 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] def accuracy_202205(br:float): min_ = .9 max_ = 3. mean_ = ((1 - br)**2)**.5 + .75 sigma_ = mean_ / 4 return [min_, max_, mean_, sigma_] def f1_gain_202201(br:float): min_ = .9 max_ = 4. mean_ = (1 - br)/br/10 + .9 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] def f1_gain_202205(br:float): min_ = .9 max_ = 4. mean_ = (1 - br)/br/10 + .9 sigma_ = mean_ / 2 return [min_, max_, mean_, sigma_] func_dict = { 'f1_gain': f1_gain_202205, 'accuracy': accuracy_202205, 'recall_gain': recall_gain_202205, 'precision_gain': precision_gain_202205 } if __name__ == "__main__": gd = GeneDefinitionGenerator() gd.save_gene_definition_to_json( func=func_dict.get('accuracy'), tuning_settings_path="genetic_tuning_202205_accuracy_crazy_ver_settings.json" ) gd.save_gene_definition_to_json(func=func_dict.get('f1_gain'), tuning_settings_path="genetic_tuning_202205_f1_gain_crazy_ver_settings.json") gd.save_gene_definition_to_json(func=func_dict.get('recall_gain'), tuning_settings_path="genetic_tuning_202205_recall_gain_crazy_ver_settings.json") gd.save_gene_definition_to_json(func=func_dict.get('precision_gain'), tuning_settings_path="genetic_tuning_202205_precision_gain_crazy_ver_settings.json") ``` ## [models.py](#Overview-of-Code) ```python= from data import DataGenerator, FullData from evaluators import EvaluatorFactory from settings import Settings import pandas as pd from tensorflow.keras import layers, Input from tensorflow.keras.models import Model import numpy as np from utils import Decorators, Majors class MultilabelModel(): def __init__( self, num_of_majors=Settings.NUMBER_OF_MAJORS, n_nodes=Settings.N_NODES, input_shape=Settings.INPUT_SHAPE, metrics=Settings.METRICS, epochs=Settings.EPOCHS, early_stopping=Settings.EARLY_STOPPING, evaluator_type=Settings.EVALUATOR, evaluator_threshold=Settings.BINARY_THRESHOLD ): self.num_of_majors = num_of_majors self.n_nodes = n_nodes self.input_shape = input_shape self.metrics = metrics self.epochs = epochs self.early_stopping = early_stopping self.evaluator_type = evaluator_type self.evaluator_threshold = evaluator_threshold # dynamically created majors = Majors.get_corresponding_majors(num_of_majors) self.model = self.get_keras_model(majors, n_nodes, input_shape) self.majors = majors @classmethod def from_data_generator(cls, data_generator: DataGenerator, n_nodes=Settings.N_NODES, metrics=Settings.METRICS, epochs=Settings.EPOCHS, early_stopping=Settings.EARLY_STOPPING, evaluator_type=Settings.EVALUATOR, evaluator_threshold=Settings.BINARY_THRESHOLD ): num_of_majors = data_generator.num_of_majors input_shape = (data_generator.get_likert_only().shape[1], ) print("model input shape=", input_shape) return cls( num_of_majors, n_nodes, input_shape, metrics, epochs, early_stopping, evaluator_type, evaluator_threshold ) @classmethod def from_full_data(cls, full_data: FullData, n_nodes=Settings.N_NODES, metrics=Settings.METRICS, epochs=Settings.EPOCHS, early_stopping=Settings.EARLY_STOPPING, evaluator_type=Settings.EVALUATOR, evaluator_threshold=Settings.BINARY_THRESHOLD ): num_of_majors = full_data.num_of_majors input_shape = (full_data.X.shape[1], ) return cls( num_of_majors, n_nodes, input_shape, metrics, epochs, early_stopping, evaluator_type, evaluator_threshold ) @Decorators.time_it("MultilabelModel.fit_model() ~ sent to keras backend") def fit_model(self, train_data, train_labels, val_data, val_labels): # for debugging print("parameters passed to the keras backend:") print("shape of train data:", train_data.shape) print("lengnth of labels ", len([train_labels[:,j] for j in range(train_labels.shape[1])])) print("epochs", self.epochs) print("shape of validation data", val_data.shape) print("length of validation data", len([val_labels[:,k] for k in range(val_labels.shape[1])])) history = self.model.fit( train_data, [train_labels[:,j] for j in range(train_labels.shape[1])], batch_size=10000, epochs=self.epochs, validation_data = (val_data, [val_labels[:,k] for k in range(val_labels.shape[1])]), callbacks = [self.early_stopping], verbose=0 ) history_df = pd.DataFrame(history.history) # for debugging # print out every parameter return history_df # alias for evaluate def evaluate_model(self, test_data, test_labels): return self.evaluate(X=test_data, y=test_labels) @Decorators.time_it("MultilabelModel.evaluate(), fitted model.predict() and stuff") def evaluate( self, X:np.ndarray, y:np.ndarray ): # needs to make sure that the model has been fitted! # soft scores predicted = self.model.predict(X) predicted_df = pd.concat([pd.DataFrame(col) for col in predicted], axis=1) predicted_df.columns = self.majors answers_df = pd.DataFrame(y, columns=self.majors) evaluator = EvaluatorFactory.get_evaluator( self.evaluator_type, predicted=predicted_df, answers=answers_df, binary_threshold=self.evaluator_threshold) return evaluator.evaluate() # alias for evaluate def get_keras_model(self, majors, n_nodes, input_shape): n_nodes = [n for n in n_nodes if n > 0] # if n <= 0 then that layer doesn't exist if not (0 < len(n_nodes) <= 3): raise Exception("From experience, we need 1 to 3 layers") if len(n_nodes) == 1: print(f"1 layer, {n_nodes}") input_layer = Input(input_shape) dense_1 = layers.Dense(n_nodes[0], activation='relu')(input_layer) norm_1 = layers.BatchNormalization()(dense_1) prediction_layers = [layers.Dense(1, activation='sigmoid', name=major)(norm_1) for major in majors] model = Model(input_layer, prediction_layers) model.compile( optimizer='rmsprop', loss = ['binary_crossentropy' for l in range(len(majors))], # metrics=[m[0] for m in metrics] ) return model elif len(n_nodes) == 2: print(f"2 layers, {n_nodes}") input_layer = Input(input_shape) dense_1 = layers.Dense(n_nodes[0], activation='relu')(input_layer) norm_1 = layers.BatchNormalization()(dense_1) dense_2 = layers.Dense(n_nodes[1], activation='relu')(norm_1) norm_2 = layers.BatchNormalization()(dense_2) prediction_layers = [layers.Dense(1, activation='sigmoid', name=major)(norm_2) for major in majors] model = Model(input_layer, prediction_layers) model.compile( optimizer='rmsprop', loss = ['binary_crossentropy' for l in range(len(majors))], # metrics=[m[0] for m in metrics], ) return model else: print(f"3 layers, {n_nodes}") input_layer = Input(input_shape) dense_1 = layers.Dense(n_nodes[0], activation='relu')(input_layer) norm_1 = layers.BatchNormalization()(dense_1) dense_2 = layers.Dense(n_nodes[1], activation='relu')(norm_1) norm_2 = layers.BatchNormalization()(dense_2) dense_3 = layers.Dense(n_nodes[2], activation='relu')(norm_2) norm_3 = layers.BatchNormalization()(dense_3) prediction_layers = [layers.Dense(1, activation='sigmoid', name=major)(norm_3) for major in majors] model = Model(input_layer, prediction_layers) model.compile( optimizer='rmsprop', loss = ['binary_crossentropy' for l in range(len(majors))], # metrics=[m[0] for m in metrics], ) return model ``` ## [perturbation.py](#Overview-of-Code) ```python= # perturbation for genetically tuned results from enums import StrategyOptions from strategies import StrategyFactory from tuning import DefaultNaturalSelection from settings import TuningSettings, Settings import sys if __name__ == "__main__": ns = DefaultNaturalSelection.deserialize( serialization_path="best_genetic_crazy_ver_results_202201.xlsx" ) # ns.fit_all(forced=True) # run no matter what print("running best_recall_gain") best_recall_gain = ns.gene_pool[0] best_recall_gain.str_op = StrategyOptions.PERTURBED_KFOLD_MNN best_recall_gain.run(forced=True) print("all done!") # best_precision_gain = ns.gene_pool[1] # best_accuracy = ns.gene_pool[2] # baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run() ``` ## [perturbation_pratts.R](#Overview-of-Code) ```r= library(psych) library(tidyverse) library(ggplot2) library(dplyr) library(glue) library(reshape2) library(RColorBrewer) library(readxl) library(jtools) WHAT = 2# 1 std pratts, 2 pratts, 3 beta, or 4 correlation PATH = "E:/career_ml/perturb/recall_gain_based_on_best_genetic_crazy_ver_results_202201.xlsx" #for recall_gain #PATH = "E:/career_ml/perturb/old_perturb_before_genetic_tuning_50_specific_reformatted.xlsx" # for accuracy BASE_RATE = "E:/career_ml/perturbation/20210919_results/top3_base_rates_specific.csv" MODEL_INTERCEPT = F # to model intercept? METRIC = "recall_gain" # out of val_/test_ acc/precision/recall/f1 CACHE = list() get_plot <- function() { if(!is.null(CACHE[[paste0("plot", WHAT, PATH, MODEL_INTERCEPT, METRIC)]])) return(CACHE[[paste0("plot", WHAT, PATH, MODEL_INTERCEPT, METRIC)]]) format_prob <- function(x) { if(length(x) > 1) return(sapply(x, format_prob)) if(x > 1) return("Error") if(x < 0.005) return(" ") if(round(x, 2) == 1) return ("1.00") x <- as.character(round(x,2)) x <- sub("^0+.", ".", x) if(!grepl(".\\d{2}", x)) x <- paste0(x, "0") return(x) } if(is.null(CACHE[[paste0("raw", PATH)]])) CACHE[[paste0("raw", PATH)]] <- read_excel(PATH) raw <- CACHE[[paste0("raw", PATH)]] cat(paste0("entries: ", nrow(raw))) # br <- read.csv(BASE_RATE)[,2] # likert items l <- raw[, grepl("l\\d\\w+?\\d", names(raw))] f1 <- raw[, grepl(".+_f1$", names(raw))] recall <- raw[, grepl(".+_recall$", names(raw))] precision <- raw[, grepl(".+_precision$", names(raw))] accuracy <- raw[, grepl(".+_accuracy", names(raw))] recall_gain <- raw[, grepl(".+_recall_gain$", names(raw))] precisoin_gain <- raw[, grepl(".+_precision_gain$", names(raw))] f1_gain <- raw[, grepl(".+_f1_gain$", names(raw))] get_df_pratts <- function(what, model_intercept) { # standardized pratts: 1 # unstandardized pratts: 2 # betas: 3 # correlation: 4 # model_intercept: # true = with intercept in the multiple regression model if(!(what %in% 1:4)) return("make sure what is 1 std pratts, 2 pratts, 3 beta, or 4 correlation") df_pratts <- NULL rsquares <- vector() # metrics tf = NULL (if(METRIC == "recall") tf = recall else if(METRIC == "precision") tf = precision else if(METRIC == "accuracy") tf = accuracy else if(METRIC == "f1") tf = f1 else if(METRIC == "precision_gain") tf = precision_gain else if(METRIC == "f1_gain") tf = f1_gain else tf = recall_gain ) for(i in 1:50){ d = cbind(l, tf[i]) predicted <- names(tf)[i] predictors <- names(l) formula = "" if(model_intercept) # adding the scale(predicted) is simply to prevent singularity formula <- as.formula(glue("scale({predicted}) ~ {paste0(\"scale(\", predictors, \")\", collapse=\" + \")}")) else # no intercepts formula <- as.formula(glue("scale({predicted}) ~ 0 + {paste0(\"scale(\", predictors, \")\", collapse=\" + \")}")) result <- lm(formula, data = d) r2 <- summary(result)$r.squared rsquares <- append(rsquares, ifelse(round(r2,2) == .00, "<.01", round(r2, 2)) ) # calculating pratt's importance index (unstandardized), add up to r^2 betas <- coef(result) #[, "Estimate"] if(model_intercept) betas <- betas[2:length(betas)] # dropping intercept <- if no intercept, delete this line. rs <- cor(d)[, predicted] rs <- rs[1:(length(rs) - 1)] # dropping the 1.000 for self-correlation if(length(betas)!= length(rs)){ names(betas) <- names(rs)[1:length(betas)] } else { names(betas) <- names(rs) } if(what == 1) { pratts <- betas*rs pratts <- pratts/r2 } else if (what == 2) { pratts <- betas*rs } else if (what == 3) { pratts <- betas } else { pratts <- rs } #t_values <- coef(result)[, "t value"] if(!is.null(df_pratts)) df_pratts <- cbind(df_pratts, pratts) else df_pratts <- pratts } assign("rsquares", rsquares, envir = .GlobalEnv) return(df_pratts) } if (!is.null(CACHE[[paste0("df_pratts", WHAT, PATH, MODEL_INTERCEPT, METRIC)]])) { df_pratts <- CACHE[[paste0("df_pratts", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] } else { df_pratts <- get_df_pratts(what=WHAT,model_intercept = MODEL_INTERCEPT) # ok setting the names, in alphabetical majors <- c( "14-1 Accounting", "01 African American Studies", "16-1 Anthropology", "16-2 Archaeologly", "02 Architecture", "03 Asian Studies", "04 Biology", "14-2 Business", "15-1 Chemical Engineering", "05 Chemistry", "15-2 Civil Engineering", "16-3 Classics", "06 Computer Science", "07 Communications", "08 Criminology", "09 Dance", "13-1 Digital Art", "14-3 Economics", "10 Education", "15-3 Electronic Engineering", "11 English", "12 Environmental Science", "18-1 French", "19 Gender Studies", "20 Geology", "21 Geography", "18-2 German", "17-1 Health Science", "16-4 History", "22 International Studies", "18-3 Italian", "17-2 Kinesiology", "23 Linguistics", "14-4 Marketing", "15-4 Materials Science", "24 Mathematics", "15-5 Mechanical Engineering", "25 Music", "26 Neuroscience", "17-3 Nursing", "27 Performance Art", "28 Philosophy", "29 Physics", "30 Politics", "31 Psychology", "16-5 Religion", "32 Sociology", "18-4 Spanish", "33 Statistics", "13-2 Visual Art") items <- c("01 African American Studies", "02 Architecture", "03 Asian Studies", "04 Biology", "05 Chemistry", "06 Computer Science", "07 Communications", "08 Criminology", "09 Dance", "10 Education", "11 English", "12 Environmental Science", "13 General Arts", "14 General Business", "15 General Engineering", "16 General History", "17 General Health", "18 General Language", "19 Gender Studies", "20 Geology", "21 Geography", "22 International Studies", "23 Linguistics", "24 Mathematics", "25 Music", "26 Neuroscience", "27 Performance Art", "28 Philosophy", "29 Physics", "30 Politics", "31 Psychology", "32 Sociology", "33 Statistics" ) col_names <- majors col_order <- rank(col_names) names(col_order) <- col_names colnames(df_pratts) <- col_names #names(tf) rownames(df_pratts) <- c( paste0(items, " 1"), paste0(items, " 2"), paste0(items, " 3") ) CACHE[[paste0("df_pratts", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] <- df_pratts } heatmap(as.matrix(df_pratts), col=colorRampPalette(brewer.pal(11, "PiYG"))(100), Colv = NA, Rowv = NA ) # let's forget about ggplot2 for now if(is.null(CACHE[[paste0("df_pratts_majors_long", WHAT, PATH, MODEL_INTERCEPT, METRIC)]])) { df_pratts_long <- as.data.frame(df_pratts) df_pratts_long$likert <- rownames(df_pratts_long) rownames(df_pratts_long) <- NULL df_pratts_majors_long <- df_pratts_long df_pratts_majors_long <- melt(df_pratts_majors_long, id.vars="likert") df_pratts_long <- melt(df_pratts_long, id.vars="likert") df_pratts_majors_long$colOrder <- as.numeric(col_order[as.character(df_pratts_majors_long$variable)]) CACHE[[paste0("df_pratts_majors_long", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] <- df_pratts_majors_long } df_pratts_majors_long <- CACHE[[paste0("df_pratts_majors_long", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] what = "" (if(WHAT==1) { what = "Std. Pratt"} else if(WHAT==2) { what = "UnStd. Pratt" } else if(WHAT==3) { what = "Beta" } else { what = "Correlation" } ) # https://www.royfrancis.com/a-guide-to-elegant-tiled-heatmaps-in-r/ plot <- ggplot(data=df_pratts_majors_long) + geom_tile(aes(y=reorder(variable, colOrder), x=likert, fill=value), color="whitesmoke", size=.25) + geom_text(aes( y=variable, x=likert, label= ifelse(round(value*100,0)==.0," ", round(value*100,0)) ), #round(value*100,0)), color="#cccccc", size=2.25, ) + #geom_hline(yintercept=c(12.5), linetype="dashed", color = "#cccccc") + #geom_hline(yintercept=c(14.5), linetype="dashed", color = "#cccccc") + #geom_hline(yintercept=c(18.5), linetype="dashed", color = "#cccccc") + #geom_hline(yintercept=c(23.5), linetype="dashed", color = "#cccccc") + #geom_hline(yintercept=c(28.5), linetype="dashed", color = "#cccccc") + #geom_hline(yintercept=c(31.5), linetype="dashed", color = "#cccccc") + #geom_hline(yintercept=c(35.5), linetype="dashed", color = "#cccccc") + #geom_vline(xintercept=c(36.5), linetype="solid", color = "#cceecc") + #geom_vline(xintercept=c(54.5), linetype="solid", color = "#cceecc") + labs( title="", x="Predictors (Inclusion Status of Likert-type Items)", y="Major (Outcomes)", fill=what) + scale_fill_gradient2( low="#eb593d", mid="white", midpoint=0, space="Lab", high="#49b675", na.value = "white" ) + scale_y_discrete(expand=c(0,0)) + #scale_x_discret e(expand=c(0,0)) + # theme_grey(base_size=12)+ coord_fixed()+ # maintaining aspect ratio # set base size for all fonts theme( axis.text=element_text(face="bold"), axis.text.x=element_text(angle = 90, vjust=.25, hjust=.95), axis.ticks=element_line(size=.3), plot.background=element_blank(), panel.border=element_blank() ) plot = plot + geom_vline(xintercept=seq(3.5,99.5, 3), color="#aaaaaa") + geom_rect(xmin=0.5, xmax=3.5, ymin=0.5, ymax=1.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=3.5, xmax=6.5, ymin=1.5, ymax=2.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=6.5, xmax=9.5, ymin=2.5, ymax=3.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=9.5, xmax=12.5, ymin=3.5, ymax=4.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=12.5, xmax=15.5, ymin=4.5, ymax=5.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=15.5, xmax=18.5, ymin=5.5, ymax=6.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=18.5, xmax=21.5, ymin=6.5, ymax=7.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=21.5, xmax=24.5, ymin=7.5, ymax=8.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=24.5, xmax=27.5, ymin=8.5, ymax=9.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=27.5, xmax=30.5, ymin=9.5, ymax=10.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=30.5, xmax=33.5, ymin=10.5, ymax=11.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=33.5, xmax=36.5, ymin=11.5, ymax=12.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=36.5, xmax=39.5, ymin=12.5, ymax=14.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=39.5, xmax=42.5, ymin=14.5, ymax=18.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=42.5, xmax=45.5, ymin=18.5, ymax=23.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=45.5, xmax=48.5, ymin=23.5, ymax=28.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=48.5, xmax=51.5, ymin=28.5, ymax=31.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=51.5, xmax=54.5, ymin=31.5, ymax=35.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=54.5, xmax=57.5, ymin=35.5, ymax=36.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=57.5, xmax=60.5, ymin=36.5, ymax=37.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=60.5, xmax=63.5, ymin=37.5, ymax=38.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=63.5, xmax=66.5, ymin=38.5, ymax=39.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=66.5, xmax=69.5, ymin=39.5, ymax=40.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=69.5, xmax=72.5, ymin=40.5, ymax=41.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=72.5, xmax=75.5, ymin=41.5, ymax=42.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=75.5, xmax=78.5, ymin=42.5, ymax=43.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=78.5, xmax=81.5, ymin=43.5, ymax=44.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=81.5, xmax=84.5, ymin=44.5, ymax=45.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=84.5, xmax=87.5, ymin=45.5, ymax=46.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=87.5, xmax=90.5, ymin=46.5, ymax=47.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=90.5, xmax=93.5, ymin=47.5, ymax=48.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=93.5, xmax=96.5, ymin=48.5, ymax=49.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) + geom_rect(xmin=96.5, xmax=99.5, ymin=49.5, ymax=50.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) CACHE[[paste0("plot", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] <- plot assign("CACHE", CACHE, envir = .GlobalEnv) return(plot) } plot <- get_plot() plot + geom_vline(xintercept=seq(3.5,99.5, 3), color="#cccccc") # adding vertical lines # drawing subplots df_long <- CACHE$`df_pratts_majors_long1E:/career_ml/perturb/recall_gain_based_on_best_genetic_crazy_ver_results_202201.xlsxFALSErecall_gain` df_long <- CACHE$`df_pratts_majors_long1E:/career_ml/per ``` ## [reports_mkfold.py](#Overview-of-Code) ```python= import pandas as pd from tuning import DefaultNaturalSelection import sys from utils import Majors from strategies import StrategyFactory from enums import StrategyOptions ns = DefaultNaturalSelection.deserialize( serialization_path="best_genetic_crazy_ver_results_202205.xlsx" ) ns.fit_all(forced=True) # run no matter what best_recall_gain = ns.gene_pool[0] best_precision_gain = ns.gene_pool[1] best_accuracy = ns.gene_pool[2] baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run() # result tables # Create a Pandas Excel writer using XlsxWriter as the engine. writer = pd.ExcelWriter('best_genetic_crazy_ver_results_202205_reported.xlsx') # Write each dataframe to a different worksheet. best_recall_gain_df = best_recall_gain.performance best_precision_gain_df = best_precision_gain.performance best_accuracy_df = best_accuracy.performance best_recall_gain_df.index = Majors.get_major_fullnames(best_recall_gain_df.index ) best_precision_gain_df.index = Majors.get_major_fullnames(best_precision_gain_df.index) best_accuracy_df.index = Majors.get_major_fullnames(best_accuracy_df.index) baseline.index = Majors.get_major_fullnames(baseline.index) best_recall_gain_df.to_excel(writer, sheet_name="best_recall") best_precision_gain_df.to_excel(writer, sheet_name="best_precision") best_accuracy_df.to_excel(writer, sheet_name="best_accuracy") baseline.to_excel(writer, sheet_name="baseline") # Close the Pandas Excel writer and output the Excel file. writer.save() writer.close() ``` ###### tags: `dissertation` `replication` `python`