# Code Repo: Exploring Explainable Multilabel Neural Network as a Psychometric Tool for Short-Form Test Scoring and Diagnostics: A Study on College Major Preference Assessment
:::info
:bulb: Hi, You've reached an `anonymous author's <blocked for now, for a peer-review>` `Python` and `R` code for his disseration experiments. Since the data is private to `iKoda Research (2018)`, the development team for the `College Major Preference Assessment (CMPA)`, the data actually used to train the neural networs cannot be provided here. However, the analysis code is ready for your reference.
:::
# The Organization of this Document
This document assumes that you have read or are reading Shun-Fu's dissertation. I show the code and settings I used to perform certain operations in [Chapter 3](#Operations-in-Chapter-3) and [Chapter 4](#Operations-in-Chapter-4). Since these operations require specific `Classes` and `Functions` defined in the files shown in [Overview of Code](#Overview-of-Code), I show the [source code file by file, ordered alphabetically](#Source-Code). It's advisable to use the navigation bar or go to [Overview of Code](#Overview-of-Code) every so often to find the relevant files instead of going through them one by one.
# Figures in Chapter 4
(right click and save the files to see high resolution figures)




# Overview of Code
The code was [keras](https://keras.io/) for machine learning wrapped in self-defined classes. The following shows files and classes and what they do. Files that are written in `Python` have the extension `.py`, and `R`, `.R`. This document reached the limit of the max number of characters in hack.md, so [strategies.py](/gmIJL8GcS_atekP5l-1iYA), [tuning.py](/gmIJL8GcS_atekP5l-1iYA) and [utils.py](/gmIJL8GcS_atekP5l-1iYA), marked with :arrow_forward:, are stored in a [separate document](/gmIJL8GcS_atekP5l-1iYA).
|File|Description|Classes Defined There|
|---|---|---|
|[analyses.py](#analyses.py)|Backend calculations for simple-sum and multilabel MNN with k-fold corss validation.|`MultilabelKFold`, `PerturbedMultilabelKFold`
|[data.py](#data.py)|Defined classes to easily read CMPA data and preprocess it with.| `DataGenerator`, `FullData`
|[enums.py](#enums.py)|For enums, which are close-category options convenient for programming. Not important conceptually.| `StrategyOptions`, `MajorNumberOptions`, `MetricsOptions`, `AdvancedMetricsOptions`, `StanadardizationModeOptions`, `ItemTypeOptions`, `EvaluatorTypeOptions`
|[evaluatators.py](#evaluators.py)|Calculating performance metrics.| `EvaluatorFactory`, `Evaluator`, `KerasDefaultEvaluator`, `BinaryEvaluator`, `BinaryEvaluatorWithoutPunishment`
|[gene_definition.py](#gene_definition.py)|Generating gene definitions for hyperparameter tuning.|`GeneDefinitionGenerator`|
|[models.py](#models.py)|Wrapping keras classes under a more convenient model class.|`MultilabelModel`
|[perturbation.py](#perturbation.py)|To run perturbation. The real backend analysis is defined in [analyses.py](#analyses.py)|None
|[perturbation_pratts.R](#perturbation_pratts.R)|Drawing the graphs in Chapter 4.|None
|[reports_mkfold.py](#reports_mkfold.py)|Generating reports of the results in human readable tables.|None
|[settings.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |All the settings are here.|`Settings`, `TuningSettings`
|[strategies.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |Wrapping everything in here for a basic run with an MNN or baseline.| `RunStrategy`, `StrategyFactory`, `TestStrategy`, `MkFoldBaseRun`, `PerturbationRun`, `RawAggregate`
|[tuning.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |For genetic hyperparameter tuning.|`Genes`, `Species`, `DefaultNaturalSelection`
|[utils.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: |Auxiliary functions and decorators, such as functions that deal with getting the names of the 50 majors in CMPA.|`Decorators`, `ItemCheck`, `Majors`, `ItemSelector`
# Operations in Chapter 3
## Baseline (Simple-Sum Method)
The immediately relevant file is [strategies.py](/gmIJL8GcS_atekP5l-1iYA).
```python=
from enums import StrategyOptions
from strategies import StrategyFactory
baseline = StrategyFactory.get_strategy(StrategyOptions.BASIC_KFOLD_MNN)
baseline.run() # the results are out as a pd.DataFrame object. That's it!
```
## Hyperparameter Tuning
### Step 1. Generating Gene Definitions
This generates the ranges of the hyperparameters (`# of nodes`, `layers each node`, the min, max, mean and standard deviation of the `alpha weights`.) The parameter `br` stands for the `base rate` of a major, which is `the proportion of respondents choosing the major as their top 3 in CMPA`. Information about the base rate of the majors are provided in the `Appendix A` of the dissertation. The most relevant files are [gene_definition.py](#gene_definition.py) and [tuning.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: ).
```python=
# for MNN targeting accuracy
def accuracy(br:float):
min_ = .9
max_ = 4.
mean_ = (1 - br)/br/10 + .9
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
# for MNN targeting adjusted recall
def recall_gain(br:float):
min_ = .1
max_ = 9999.
mean_ = (1 - br)*11 + 3
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
# for MNN targeting adjusted precision
def precision_gain(br:float):
min_ = .1
max_ = 9999.
mean_ = br + .1
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
func_dict = {
'accuracy': accuracy,
'recall_gain': recall_gain,
'precision_gain': precision_gain
}
# tuning setting files.
if __name__ == "__main__":
gd = GeneDefinitionGenerator()
gd.save_gene_definition_to_json(
func=func_dict.get('accuracy'), tuning_settings_path="genetic_tuning_202201_accuracy_crazy_ver_settings.json"
)
gd.save_gene_definition_to_json(func=func_dict.get('recall_gain'), tuning_settings_path="genetic_tuning_202201_recall_gain_crazy_ver_settings.json")
gd.save_gene_definition_to_json(func=func_dict.get('precision_gain'), tuning_settings_path="genetic_tuning_202201_precision_gain_crazy_ver_settings.json")
```
### Step 2. The Actual Hyperparameter Tuning
Once the appropriate Conda evironment as defined in the dissertation's `Appendix C` is correctly activated, the following code is put in command-line prompts. The most relevant files are [tuning.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: and [settings.py](/gmIJL8GcS_atekP5l-1iYA):arrow_forward: .
```python
# For the MNN targeting accuracy:
python tuning.py genetic_tuning_202201_accuracy_crazy_ver.xlsx genetic_tuning_202201_accuracy_crazy_ver_settings.json
# For the MNN targeting adjusted recall:
python tuning.py genetic_tuning_202201_recall_gain_crazy_ver.xlsx genetic_tuning_202201_recall_gain_crazy_ver_settings.json
# For the MNN targeting adjusted precision:
python tuning.py genetic_tuning_202201_precision_gain_crazy_ver.xlsx genetic_tuning_202201_precision_gain_crazy_ver_settings.json
```
The code saves the resultant hyperparameters of the top 30 MNNs at the respective goals of `accuracy`, `adjusted recall` and `adjusted precision` in the respective files `genetic_tuning_202201_accuracy_crazy_ver.xlsx`, `genetic_tuning_202201_recall_gain_crazy_ver.xlsx`, `genetic_tuning_202201_precision_gain_crazy_ver.xlsx`. These excel files are basically the 30-organism gene pools for each target.
Then, I manually created an excel file named `best_genetic_crazy_ver_results_202201.xlsx`, where the top 1 of all the
### Step 3. Printing out The Report
The most relevant file is [reports_mkfold.py](#reports_mkfold.py).
```python
import pandas as pd
from tuning import DefaultNaturalSelection
from utils import Majors
from strategies import StrategyFactory
from enums import StrategyOptions
# deserializing gene pool excel files
ns = DefaultNaturalSelection.deserialize(
serialization_path="best_genetic_crazy_ver_results_202201.xlsx"
) # the manually created file containing 3 columns: The first is the best-performing MNN for recall-gain; the second is the best performing for precision-gain, and the third column is for the best accuracy.
ns.fit_all(forced=True) # run no matter what
best_recall_gain = ns.gene_pool[0]
best_precision_gain = ns.gene_pool[1]
best_accuracy = ns.gene_pool[2]
baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run()
# result tables
# Create a Pandas Excel writer using XlsxWriter as the engine.
writer = pd.ExcelWriter('best_genetic_crazy_ver_results_202201_reported.xlsx')
# Write each dataframe to a different worksheet.
best_recall_gain_df = best_recall_gain.performance
best_precision_gain_df = best_precision_gain.performance
best_accuracy_df = best_accuracy.performance
best_recall_gain_df.index = Majors.get_major_fullnames(best_recall_gain_df.index )
best_precision_gain_df.index = Majors.get_major_fullnames(best_precision_gain_df.index)
best_accuracy_df.index = Majors.get_major_fullnames(best_accuracy_df.index)
baseline.index = Majors.get_major_fullnames(baseline.index)
best_recall_gain_df.to_excel(writer, sheet_name="best_recall")
best_precision_gain_df.to_excel(writer, sheet_name="best_precision")
best_accuracy_df.to_excel(writer, sheet_name="best_accuracy")
baseline.to_excel(writer, sheet_name="baseline")
# Close the Pandas Excel writer and output the Excel file.
writer.save()
writer.close()
```
# Operations in Chapter 4
## Perturbation
```python=
# perturbation for genetically tuned results
from enums import StrategyOptions
from strategies import StrategyFactory
from tuning import DefaultNaturalSelection
from settings import TuningSettings, Settings
import sys
if __name__ == "__main__":
ns = DefaultNaturalSelection.deserialize(
serialization_path="best_genetic_crazy_ver_results_202201.xlsx"
)
# ns.fit_all(forced=True) # run no matter what
print("running best_recall_gain")
best_recall_gain = ns.gene_pool[0]
best_recall_gain.str_op = StrategyOptions.PERTURBED_KFOLD_MNN
best_recall_gain.run(forced=True)
print("all done!")
# best_precision_gain = ns.gene_pool[1]
# best_accuracy = ns.gene_pool[2]
# baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run()
```
## Drawing Plots
See [perturbation_pratts.R](#perturbation_pratts.R).
# Source Code
## [analyses.py](#Overview-of-Code)
```python=
from typing import List, final
from matplotlib.pyplot import cool
import tensorflow as tf
import numpy as np
import pandas as pd
from models import MultilabelModel
from data import DataGenerator, FullData
from enums import StanadardizationModeOptions
from utils import Decorators, Majors
from settings import Settings
class MultilabelKFold:
# refactoring: all model related stuff should be moved to a model
def __init__(self, full_data: FullData, model:MultilabelModel):
self.full_data = full_data
self.model = model
def check_gpu_status(self):
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
print("GPU device name --", tf.config.list_physical_devices('GPU'))
#return tf.config.list_physical_devices('GPU')[0]
@Decorators.time_it("pd.DataFrame Division??!")
def divide_df_by_k_fold(self, test_results:pd.DataFrame, k_fold:int):
return test_results / k_fold
@Decorators.time_it("Analysis.py MKFold's get_evaluation_result_df()")
def get_evaluation_result_df(self, model_evaluation_result:dict):
return pd.DataFrame(model_evaluation_result)
@Decorators.time_it("Analysis.py MKFold's run()")
def run(self, k_fold=Settings.K_FOLD) -> pd.DataFrame:
# just checking if GPU is used
self.check_gpu_status()
X_y = self.full_data.shuffle().standardize_x(StanadardizationModeOptions.BY_COL)
# doing k_fold_validation
nrow, ncol = X_y.shape[0], X_y.shape[1]
one_fold = nrow // (k_fold + 1) # reserving one fold for test data
# validation = pd.Series(dtype=np.float32)
# evaluation = pd.Series(dtype=np.float32)
# note class_weight doesn't work as a fit_param for the multilabel case. So, we need to go into loss function in model.compile()
if k_fold > 1:
for i in range(k_fold):
print("Fold " + str(i + 1) + " of " + str(k_fold))
# split X, y
val_rows = list(range(one_fold * i, one_fold * (i + 1)))
train_rows = [i for i in range(one_fold*k_fold) if i not in val_rows]
test_rows = range(one_fold*k_fold, nrow)
train_and_val = X_y.train_and_val_split(train_rows, val_rows)
test = X_y.test_split(test_rows)
if i == 0:
# fitting
self.model.fit_model(**train_and_val)
test_results = self.get_evaluation_result_df(self.model.evaluate_model(**test))# pd.DataFrame(self.model.evaluate_model(**test))
else:
self.model.fit_model(**train_and_val)
test_results += self.get_evaluation_result_df(self.model.evaluate_model(**test))
test_results = self.divide_df_by_k_fold(test_results, k_fold) #unsupported operation None, int???
else:
print("K fold validation disabled. Using default validation split 7000:1000:1300")
train_rows = range(0, Settings.DEFAULT_TRAIN_SPLIT_END)
val_rows = range(Settings.DEFAULT_TRAIN_SPLIT_END, Settings.DEFAULT_VAL_SPLIT_END)
test_rows = range(Settings.DEFAULT_VAL_SPLIT_END, ncol)
train_and_val = X_y.train_and_val_split(train_rows, val_rows)
test = X_y.test_split(test_rows)
self.model.fit_model(**train_and_val)
test_results = pd.DataFrame(self.model.evaluate_model(**test))
return test_results
class PerturbedMultilabelKFold:
def __init__(
self,
full_data:FullData,
model:MultilabelModel,
subset=Settings.SUBSET_SIZE,
serialization_path=Settings.PERTURBATION_SERIALIZATION_PATH,
serialization_frequency=Settings.PERTURBATION_SERIALIZATION_FREQUENCY
):
self.full_data = full_data
self.model = model
self.subset = subset
self.serialization_path = serialization_path
self.serialization_frequency = serialization_frequency
# dynamically generated
self.mkfold = MultilabelKFold(full_data, model)
self.result_dict = {
"selected_cols": [],
"mkfold_results": []
}
@classmethod
def from_mkfold(cls, mkfold:MultilabelKFold, subset=Settings.SUBSET_SIZE, serialization_path=Settings.PERTURBATION_SERIALIZATION_PATH, serialization_frequency=Settings.PERTURBATION_SERIALIZATION_FREQUENCY):
full_data = mkfold.full_data
model = mkfold.model
return cls(full_data, model, subset, serialization_path, serialization_frequency)
def draw_random_subset(self, subset=Settings.SUBSET_SIZE) -> List[int]:
ncol = self.mkfold.full_data.X.shape[1]
selected_items = np.random.choice(range(ncol), subset, replace=False)
selected = [i in selected_items for i in range(ncol)]
return selected
@Decorators.time_it("perturbed multillabel k-fold single_run()")
def single_run(self, k_fold=Settings.K_FOLD):
selected_cols = self.draw_random_subset(self.subset)
subset_data = self.full_data.X.loc[:,selected_cols]
subset_full_data = FullData.from_data_generator(
data_generator=self.full_data,
X=subset_data,
y=self.full_data.y)
subset_model = MultilabelModel.from_full_data(
full_data=subset_full_data,
n_nodes=self.model.n_nodes,
metrics=self.model.metrics,
epochs=self.model.epochs,
early_stopping=self.model.early_stopping,
evaluator_type=self.model.evaluator_type,
evaluator_threshold=self.model.evaluator_threshold
)
subset_mkfold = MultilabelKFold(subset_full_data, subset_model)
results = subset_mkfold.run(k_fold)
return selected_cols, results
def cast_selected_cols(self, selected_cols:List[bool]) -> pd.Series:
item_names = self.full_data.get_likert_only().columns.to_series()
selected_dummies = pd.Series(selected_cols, index=item_names)
return selected_dummies.astype(np.float32)
def cast_mkfold_result(self, mkfold_result:pd.DataFrame) -> pd.Series:
# a single mkfold result
result_dict = {}
for col in mkfold_result.columns:
row_indexes = mkfold_result.loc[:, col].index
for row in row_indexes:
value = mkfold_result.loc[row, col]
result_dict[row +"_" +col] = value
return pd.Series(result_dict)
def cast_results(self) -> pd.DataFrame:
selected_cols = self.result_dict["selected_cols"]
mkfold_results = self.result_dict["mkfold_results"]
final_result = pd.DataFrame()
for idx, (cols, results) in enumerate(zip(selected_cols, mkfold_results)):
single_perturbation_result = self.cast_selected_cols(cols)
single_perturbation_result = single_perturbation_result.append(self.cast_mkfold_result(results))
final_result[idx] = single_perturbation_result
return final_result
def serialize(self) -> None:
try:
to_serialize = self.cast_results()
to_serialize.to_excel(self.serialization_path)
print(f"perturbation results of shape {to_serialize.shape} saved to" + self.serialization_path)
except Exception as e:
print("serialization failed")
print(e)
@Decorators.time_it()
def run(self, k_fold=Settings.K_FOLD, iter=Settings.PERTURBATION_ITER) -> pd.DataFrame:
for i in range(iter):
print("perturbation iteration id: " + str(i))
selected_cols, results = self.single_run(k_fold)
try:
self.result_dict["selected_cols"].append(selected_cols)
self.result_dict["mkfold_results"].append(results)
except:
print("something went wrong, but whatever...")
if i % self.serialization_frequency == 0:
self.serialize()
self.serialize()
return self.cast_results()
```
## [data.py](#Overview-of-Code)
```python=
import pandas as pd
import numpy as np
from enums import ItemTypeOptions, MajorNumberOptions, StanadardizationModeOptions
from settings import GLOBAL_CLEANED_DATA, GLOBAL_LIKERT_DATA, GLOBAL_MAJOR_IS_TOP_N_DF, GLOBAL_MAJOR_SCORES_DF, Settings, GLOBAL_RAW_DATA
from utils import Decorators, ItemCheck, ItemSelector, Majors
from typing import List, Set
# has content
# has the part it is supposed to fit in
# has methods to split itself
class DataGenerator:
def __init__(self,
path=Settings.DATAPATH,
threshold=Settings.CLEANTHRESHOLD,
scored_item_type=Settings.SCORED_ITEMTYPE,
top_n=Settings.TOP_N,
num_of_majors=Settings.NUMBER_OF_MAJORS,
class_weight_function=Settings.CLASSWEIGHT
) -> None:
self.path = path
self.threshold = threshold
self.scored_item_type = scored_item_type
self.top_n = top_n
self.num_of_majors = num_of_majors
self.class_weight_function=class_weight_function
@Decorators.time_it("DataGenerator.get raw data()")
def get_raw_data(self) -> pd.DataFrame:
global_raw_data = GLOBAL_RAW_DATA.get(self.path)
if global_raw_data is not None:
return global_raw_data
GLOBAL_RAW_DATA[self.path] = pd.read_csv(self.path, dtype={
'age': str,
'certificate': str,
'education': str,
'school': str,
'gender': str
})
return GLOBAL_RAW_DATA.get(self.path)
# here it might already be wrong.
@Decorators.time_it("DataGenerator.get_cleaned_data")
def get_cleaned_data(self) -> pd.DataFrame:
global_cleaned_data = GLOBAL_CLEANED_DATA.get( (self.path, self.threshold) )
if global_cleaned_data is not None:
return global_cleaned_data
major_all = self.get_raw_data()
threshold = self.threshold
likert_items = ItemSelector(major_all).get_selected_cols_by_item_type(scored_item_type=ItemTypeOptions.LIKRET)
# if missing data exceeds a proportion, the row is deleted
ncol = major_all[likert_items].shape[1]
passed = major_all[likert_items].isna().sum(axis=1) <= ncol*threshold
# also has to not miss any demographics
# somehow missing cases have been coded as "null_unknonw" (string, not NA)
passed = passed & (major_all[['age', 'certificate', 'education', 'school', 'gender']] == 'null_unknown').sum(axis=1) == 0
cleaned_df = major_all.loc[passed, :].fillna(0.).reset_index(drop=True)
GLOBAL_CLEANED_DATA[ (self.path, self.threshold)] = cleaned_df
return cleaned_df
@Decorators.time_it("DataGenerator.get_likert_only")
def get_likert_only(self) -> pd.DataFrame:
global_likert_data = GLOBAL_LIKERT_DATA.get( (self.path, self.threshold) )
if global_likert_data is not None:
return global_likert_data
cleaned_data = self.get_cleaned_data()
likert_cols = ItemSelector(cleaned_data).get_selected_cols_by_item_type(scored_item_type=ItemTypeOptions.LIKRET)
likert_data = cleaned_data.loc[:, likert_cols]
GLOBAL_LIKERT_DATA[(self.path, self.threshold)] = likert_data
return likert_data
@Decorators.time_it("DataGenerator.merge_major_scores")
def merge_major_scores(self, major_scores_df, use_max) -> pd.DataFrame:
for general_major, specific_majors_list in Majors.MAPPING.items():
if use_max:
major_scores_df[general_major] = major_scores_df[specific_majors_list].max(axis=1) # using max?
else:
major_scores_df[general_major] = major_scores_df[specific_majors_list].mean(axis=1)
major_scores_df = major_scores_df.drop(specific_majors_list ,axis=1)
return major_scores_df
@Decorators.time_it("DataGenerator.get_major_scores_df")
def get_major_scores_df(self) -> pd.DataFrame:
global_major_scores_df = GLOBAL_MAJOR_SCORES_DF.get((self.path, self.threshold, self.scored_item_type, self.num_of_majors))
if global_major_scores_df is not None:
return global_major_scores_df
d = self.get_cleaned_data()
# get a list of unique majors
all_majors = Majors.get_corresponding_majors(self.scored_item_type)
# getting a sum of scores for each major
major_scores_df = pd.DataFrame()
check_function = ItemCheck.get_scored_item_type_check(self.scored_item_type, per_major=True)
for major in all_majors:
# oops forgot to enforce forced choice
cols_for_the_major = [
col for col in d.columns if check_function(major, col)]
major_frame = d.loc[:,cols_for_the_major]
major_score = major_frame.sum(axis=1)
major_scores_df[major] = major_score
# explode generic majors to specific majors
# if self.num_of_majors == MajorNumberOptions.FIFTY:
if self.scored_item_type == ItemTypeOptions.LIKRET:
if major in Majors.MAPPING.keys():
major_scores_df.drop(major,axis=1, inplace=True)
for specific_major in Majors.MAPPING.get(major):
major_scores_df[specific_major] = major_score
if self.scored_item_type == ItemTypeOptions.FORCED_CHOICE:
if self.num_of_majors == MajorNumberOptions.THIRTY_THREE_USE_MAX:
return self.merge_major_scores(major_scores_df, use_max=True)
elif self.num_of_majors == MajorNumberOptions.THIRTY_THREE_USE_MEAN:
return self.merge_major_scores(major_scores_df, use_max=False)
else:
pass
# self.number_of_majors == MajorNumberOptions.FIFTY
GLOBAL_MAJOR_SCORES_DF[(self.path, self.threshold, self.scored_item_type, self.num_of_majors)] = major_scores_df
return major_scores_df
@Decorators.time_it("DataGenerator.get_major_is_top_n_df")
def get_major_is_top_n_df(self) -> pd.DataFrame:
global_major_is_top_n_df = GLOBAL_MAJOR_IS_TOP_N_DF.get(
(self.path, self.threshold, self.scored_item_type, self.num_of_majors, self.top_n)
)
if global_major_is_top_n_df is not None:
return global_major_is_top_n_df
top_n = self.top_n
major_scores_df = self.get_major_scores_df()
major_is_top_n_df = major_scores_df.apply(lambda x: x.nlargest(top_n, keep='all'), axis = 1).apply(lambda x: x > 0.).astype(np.float32)
GLOBAL_MAJOR_IS_TOP_N_DF[
(self.path, self.threshold, self.scored_item_type, self.num_of_majors, self.top_n)
] = major_is_top_n_df
return major_is_top_n_df
@Decorators.time_it("DataGenerator.get_weighted_major_is_top_n_df")
def get_weighted_major_is_top_n_df(self) -> pd.DataFrame:
major_is_top_n_df = self.get_major_is_top_n_df()
class_weight_function = self.class_weight_function
if class_weight_function is None:
return major_is_top_n_df
#e.g. weights = ((1-major_is_top_n_df.mean())**2*1.5+0.5)
weighted_major_is_top_n_df = class_weight_function(major_is_top_n_df)
return weighted_major_is_top_n_df
class FullData(DataGenerator):
'''
example use:
dg = DataGenerator(top_n=1)
X = dg.get_likert_only()
y = dg.get_major_is_top_n_df()
full_data = FullData(X, y)
FullData(likert, major_is_top_n_df)
'''
def __init__(self,
path=Settings.DATAPATH,
threshold=Settings.CLEANTHRESHOLD,
scored_item_type=Settings.SCORED_ITEMTYPE,
top_n=Settings.TOP_N,
num_of_majors=Settings.NUMBER_OF_MAJORS,
class_weight_function=Settings.CLASSWEIGHT,
X:pd.DataFrame=None,
y:pd.DataFrame=None,
# unweighted_y:pd.DataFrame=None
):
super().__init__(path=path, threshold=threshold, scored_item_type=scored_item_type, top_n=top_n, num_of_majors=num_of_majors, class_weight_function=class_weight_function)
if X is None:
X = super().get_likert_only()
if y is None:
y = super().get_weighted_major_is_top_n_df()
#if unweighted_y is None:
# unweighted_y = super().get_major_is_top_n_df()
self._X = X
self._y = y
self._shape = (X.shape[0], X.shape[1] + y.shape[1])
# self.unweighted_y = unweighted_y
@classmethod
@Decorators.time_it("FullData.from_data_generator()")
def from_data_generator(cls, data_generator:DataGenerator, X=None, y=None): # , unweighted_y=None):
path = data_generator.path
threshold = data_generator.threshold
scored_item_type = data_generator.scored_item_type
top_n = data_generator.top_n
num_of_majors = data_generator.num_of_majors
class_weight_function = data_generator.class_weight_function
return cls(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function, X, y) #, unweighted_y)
@property
def shape(self) -> tuple:
return self._shape
@property
def X(self) -> pd.DataFrame:
return self._X
@property
def y(self) -> pd.DataFrame:
return self._y
def update_shape(self):
self.shape = (self.X.shape[0], self.X.shape[1] + self.y.shape[1])
@shape.setter
def shape(self, new_shape):
self._shape = new_shape
@X.setter
def X(self, new_X):
self._X = new_X
self.update_shape()
@y.setter
def y(self, new_y):
self._y = new_y
self.update_shape()
@Decorators.time_it("FullData.shuffle()")
def shuffle(self) -> "FullData":
nrows = self.shape[0]
sampled_rows = pd.Series(range(nrows)).sample(frac=1.)
self.X = self.X.iloc[sampled_rows, :].reset_index(drop=True)
self.y = self.y.iloc[sampled_rows, :].reset_index(drop=True)
# self.prepared_data_with_labels = self.prepared_data_with_labels.sample(frac=1.0).reset_index(drop=True)
return self
@Decorators.time_it("FullData.standardize_x()")
def standardize_x(self, mode=Settings.STANDARDIZATION_MODE) -> "FullData":
train_data_full = self.X
if mode == StanadardizationModeOptions.BY_COL:
train_data_full -= train_data_full.mean(axis=0)
train_data_full /= train_data_full.std(axis=0)
elif mode == StanadardizationModeOptions.BY_ROW:
train_data_full -= train_data_full.mean(axis=1)
train_data_full /= train_data_full.std(axis=1)
elif mode == StanadardizationModeOptions.NONE:
pass
else:
train_data_full = (train_data_full - train_data_full.mean())/train_data_full.std()
self.X = train_data_full
return self
def train_and_val_split(self, train_rows, val_rows) -> dict:
return {
'train_data': self.X.loc[train_rows, :].astype(np.float32).to_numpy(),
'train_labels': self.y.loc[train_rows, :].astype(np.float32).to_numpy(),
'val_data':self.X.loc[val_rows, :].astype(np.float32).to_numpy(),
'val_labels':self.y.loc[val_rows, :].astype(np.float32).to_numpy()
}
# might cause problems -> data type is np.ndarray, so might not have iloc
def test_split(self, test_rows) -> dict:
unweighted_y = self.y.loc[test_rows, :] > 0.
# the answers need to be unweigthed labels!
return {
'test_data': self.X.loc[test_rows, :].astype(np.float32).to_numpy(),
'test_labels': unweighted_y.astype(np.float32).to_numpy()
}
```
## [enums.py](#Overview-of-Code)
```python=
import tensorflow as tf
class StrategyOptions:
BASIC_KFOLD_MNN = "__chapter3_default__"
GENETIC = "__hyperparametertuning__"
PERTURBED_KFOLD_MNN = "__chapter4_default__"
BASELINE = "__raw_aggregates__"
TEST = "__generate_random__"
class MajorNumberOptions:
THIRTY_THREE_USE_MAX = "__generic_use_max__"
THIRTY_THREE_USE_MEAN = "__generic_use_mean"
FIFTY = "__specific__"
class MetricsOptions:
ACCURACY = "accuracy"
CHANCE_ACCURACY = "accuracy_coin"
ACCURACY_GAIN = "accuracy_gain"
F1 = "f1"
CHANCE_F1 = "f1_coin"
F1_GAIN = "f1_gain"
PRECISION = "precision"
CHANCE_PRECISION = "precision_coin"
PRECISION_GAIN = "precision_gain"
RECALL = "recall"
CHANCE_RECALL = "recall_coin"
RECALL_GAIN = "recall_gain"
YES_SAYING_TENDENCY = "yes"
REAL_YES = "true"
ALL_OBSERVED = ["accuracy", "precision", "recall", "f1"]
ALL_GAIN = ["accuracy_gain", "precision_gain", "recall_gain", "f1_gain"]
ALL_CHANCE = ["accuracy_coin", "precision_coin", "recall_coin", "f1_coin"]
ALL_OBSERVED_AND_GAIN = ["accuracy", "precision", "recall", "f1", "accuracy_gain", "precision_gain", "recall_gain", "f1_gain"]
ALL = ["accuracy", "precision", "recall", "f1"] + ["accuracy_gain", "precision_gain", "recall_gain", "f1_gain"] + ["accuracy_coin", "precision_coin", "recall_coin", "f1_coin"]
class AdvancedMetricsOptions:
# used in get_included
YES_SAYING = ["val_yes", "test_yes"]
REAL_YES = ["val_true", "test_true"]
ACCURACY_OBSERVED = ["val_acc", "test_acc"]
ACCURACY_COIN = ["val_acc_coin", "test_acc_coin"]
ACCURACY_GAIN = ["val_acc_gain", "test_acc_gain"]
PRECISION_OBSERVED = ["val_precision", "test_precision"]
PRECISION_COIN = ["val_true", "test_true"]
PRECISION_GAIN = ["val_precision_gain", "test_precision_gain"]
RECALL_OBSERVED = ["val_recall", "test_recall"]
RECALL_COIN = ["val_yes", "test_yes"]
RECALL_GAIN = ["val_recall_gain", "test_recall_gain"]
F1_OBSERVED = ["val_f1", "test_f1"]
F1_COIN = ["val_f1_coin", "test_f1_coin"]
F1_GAIN = ["val_f1_gain", "test_f1_gain"]
VAL_YES_SAYING = ["val_yes"]
VAL_REAL_YES = ["val_true"]
VAL_ACCURACY_OBSERVED = ["val_acc"]
VAL_ACCURACY_COIN = ["val_acc_coin"]
VAL_ACCURACY_GAIN = ["val_acc_gain"]
VAL_PRECISION_OBSERVED = ["val_precision"]
VAL_PRECISION_COIN = ["val_true"]
VAL_PRECISION_GAIN = ["val_precision_gain"]
VAL_RECALL_OBSERVED = ["val_recall"]
VAL_RECALL_COIN = ["val_yes"]
VAL_RECALL_GAIN = ["val_recall_gain"]
VAL_F1_OBSERVED = ["val_f1"]
VAL_F1_COIN = ["val_f1_coin"]
VAL_F1_GAIN = ["val_f1_gain"]
TEST_YES_SAYING = ["test_yes"]
TEST_REAL_YES = ["test_true"]
TEST_ACCURACY_OBSERVED = ["test_acc"]
TEST_ACCURACY_COIN = ["test_acc_coin"]
TEST_ACCURACY_GAIN = ["test_acc_gain"]
TEST_PRECISION_OBSERVED = ["test_precision"]
TEST_PRECISION_COIN = ["test_true"]
TEST_PRECISION_GAIN = ["test_precision_gain"]
TEST_RECALL_OBSERVED = ["test_recall"]
TEST_RECALL_COIN = ["test_yes"]
TEST_RECALL_GAIN = ["test_recall_gain"]
TEST_F1_OBSERVED = ["test_f1"]
TEST_F1_COIN = ["test_f1_coin"]
TEST_F1_GAIN = ["test_f1_gain"]
class StanadardizationModeOptions:
BY_COL = "__by_col__"
BY_ROW = "__by_row__"
NONE = "__no_standardization__"
class ItemTypeOptions:
LIKRET = "__likert__"
FORCED_CHOICE = "__ipsative__"
class EvaluatorTypeOptions:
KERAS_DEFAULT = "__keras_default__"
BINARY = "__binary_evaluator__"
BINARY_NO_PUNISHMENT = "__binary_evaluator_without_punishment__"
```
## [evaluators.py](#Overview-of-Code)
```python=
from typing import List, Set
from settings import Settings
import pandas as pd
import numpy as np
from abc import ABC, abstractmethod
from enums import EvaluatorTypeOptions
from utils import Decorators
class EvaluatorFactory:
@classmethod
def get_evaluator(cls, evaluator_option=Settings.EVALUATOR, *args, **kwargs):
if evaluator_option == EvaluatorTypeOptions.BINARY:
return BinaryEvaluator(*args, **kwargs)
elif evaluator_option == EvaluatorTypeOptions.KERAS_DEFAULT:
return KerasDefaultEvaluator(*args, **kwargs)
elif evaluator_option == EvaluatorTypeOptions.BINARY_NO_PUNISHMENT:
return BinaryEvaluatorWithoutPunishment(*args, **kwargs)
else:
raise Exception("evaluator option must be one of [Binary, KerasDefault]")
class Evaluator(ABC):
@abstractmethod
def evaluate(self):
pass
class KerasDefaultEvaluator(Evaluator):
def __init__(self) -> None:
raise Exception("Keras Default hasn't been worked out yet.")
class BinaryEvaluator(Evaluator):
def __init__(self, predicted:pd.DataFrame, answers:pd.DataFrame, binary_threshold=Settings.BINARY_THRESHOLD, metrics=Settings.METRICS):
self.binary_threshold=Settings.BINARY_THRESHOLD
self.predicted = (predicted > binary_threshold).astype(np.float32)
self.answers = answers
self.metrics = metrics
@Decorators.time_it("BinaryEvaluator.evaluate() -- getting all metric properties.")
def evaluate(self) -> dict:
return {m: self.__getattribute__(m) for m in self.metrics}
@property
def answers(self) -> pd.DataFrame:
return self._answers
@answers.setter
def answers(self, answers):
self._answers = answers
@property
def predicted(self) -> pd.DataFrame:
return self._predicted
@predicted.setter
def predicted(self, predicted):
self._predicted = predicted
@property
def yes(self):
if hasattr(self, "_yes"):
return self._yes
self._yes = self.predicted.mean()
return self._yes
@property
def true(self):
if hasattr(self, "_true"):
return self._true
self._true = self.answers.mean()
return self._true
@property
def precision_coin(self):
return self.true
@property
def recall_coin(self):
return self.yes
@property
def accuracy_coin(self):
if hasattr(self, "_accuracy_coin"):
return self._accuracy_coin
y = self.yes
t = self.true
acc_coin = y*t + (1-y)*(1-t)
self._accuracy_coin = acc_coin
return self._accuracy_coin
@property
def f1_coin(self):
if hasattr(self, "_f1_coin"):
return self._f1_coin
y = self.yes
t = self.true
self._f1_coin = 2*y*t/(y + t)
return self._f1_coin
@property
def accuracy(self):
if hasattr(self, "_accuracy"):
return self._accuracy
answers = self.answers.replace(0, -1)
predicted = self.predicted.replace(0, -1)
results = (answers * predicted).replace(-1, 0)
self._accuracy = results.mean().fillna(-1000.)
return self._accuracy
@property
def recall(self):
if hasattr(self, "_recall"):
return self._recall
answers = self.answers.replace(0, np.nan)
predicted = self.predicted
results = (answers * predicted).mean()
results[results<=0] = -1000. # punishing small values
self._recall = results.fillna(-1000.) # remove the fillna(0.) if you don't want to punish majors with recall nan
return self._recall
@property
def precision(self):
if hasattr(self, "_precision"):
return self._precision
answers = self.answers
predicted = self.predicted.replace(0, np.nan)
results = (answers * predicted).mean()
results[results<=0] = -1000.
self._precision = results.fillna(-1000.) # remove the fillna(0.) if you don't want to punish majors with precision nan
return self._precision
@property
def f1(self):
if hasattr(self, "_f1"):
return self._f1
p = self.precision
r = self.recall
self._f1 = (2/(1/p + 1/r)).fillna(-1000.) #remove the fillna(0.) if you don't want to punish majors with f1 nan
return self._f1
@property
def precision_gain(self):
if hasattr(self, "_precision_gain"):
return self._precision_gain
precision = self.precision
coin = self.precision_coin
self._precision_gain = self.gain(precision, coin)
return self._precision_gain
@property
def recall_gain(self):
if hasattr(self, "_recall_gain"):
return self._recall_gain
recall = self.recall
coin = self.recall_coin
self._recall_gain = self.gain(recall, coin)
return self._recall_gain
@property
def f1_gain(self):
if hasattr(self, "_f1_gain"):
return self._f1_gain
f1 = self.f1
coin = self.f1_coin
self._f1_gain = self.gain(f1, coin)
return self._f1_gain
@property
def accuracy_gain(self):
if hasattr(self, "_accuracy_gain"):
return self._accuracy_gain
acc = self.accuracy
coin = self.accuracy_coin
self._accuracy_gain = self.gain(acc, coin)
return self._accuracy_gain
def gain(self, metric, coin):
num = metric - coin # punishing anything below zero
denom = 1 - coin
results = num/denom
results[results <= 0] = -1000. #punishing negative values
return results
class BinaryEvaluatorWithoutPunishment(BinaryEvaluator):
# this class preserves the binary evaluator before 2022 May
@property
def recall(self):
if hasattr(self, "_recall"):
return self._recall
answers = self.answers.replace(0, np.nan)
predicted = self.predicted
results = (answers * predicted).mean()
self._recall = results
return self._recall
@property
def precision(self):
if hasattr(self, "_precision"):
return self._precision
answers = self.answers
predicted = self.predicted.replace(0, np.nan)
results = (answers * predicted).mean()
self._precision = results
return self._precision
@property
def gain(self, metric, coin):
num = metric - coin # punishing anything below zero
denom = 1 - coin
return num/denom
```
## [gene_definition.py](#Overview-of-Code)
```python=
from settings import Settings, TuningSettings
from data import DataGenerator
import json
import pandas as pd
# baserate related gene definition generator
class GeneDefinitionGenerator:
def __init__(self,
path=Settings.DATAPATH,
threshold=Settings.CLEANTHRESHOLD,
scored_item_type=Settings.SCORED_ITEMTYPE,
num_of_majors:str=Settings.NUMBER_OF_MAJORS,
top_n=Settings.TOP_N
) -> None:
"""
example use case:
gd = GeneDefinitionGenerator(tuning_settings_path="genetic_tuning_202112_accuracy_crazy_ver_settings.json")
gd.save_gene_definition_to_json(func=gd.example_func_for_accuracy)
"""
self.path = path
self.threshold = threshold
self.scored_item_type = scored_item_type
self.num_of_majors = num_of_majors
self.top_n = top_n
# generated
self.data_generator = DataGenerator(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function=None)
self.major_is_top_n_df = self.data_generator.get_major_is_top_n_df()
# (min, max, mean, mu)
def generate(self, func):
# pass_in_a_function_that is takes baserate and returns min, max, mean, mu
df = self.major_is_top_n_df.mean().to_dict()
return { major:func(br) for major, br in df.items() }
def save_gene_definition_to_json(self, func, tuning_settings_path=TuningSettings.TUNING_SETTINGS_PATH):
gene_definition = TuningSettings.GENE_DEFINITION
gene_definition.update(self.generate(func))
with open(tuning_settings_path) as f:
settings = json.load(f)
settings['gene_definition'] = gene_definition
with open(tuning_settings_path, "w") as f:
json.dump(settings, f)
print(settings)
print("json file saved to ", tuning_settings_path)
def precision_gain_202112(br:float):
min_ = .1
max_ = 9999.
mean_ = br + .1
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
def precision_gain_202205(br:float):
min_ = .2
max_ = 4.
mean_ = (1 - br**2)**.5 + .1
sigma_ = mean_ / 3.5
return [min_, max_, mean_, sigma_]
def recall_gain_202201(br:float):
min_ = .1
max_ = 9999.
mean_ = (1 - br)*11 + 3
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
def recall_gain_202205(br:float):
min_ = .1
max_ = 9999.
mean_ = (1 - br)*5 + 2.5
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
def accuracy_202201(br:float):
min_ = .9
max_ = 4.
mean_ = (1 - br)/br/10 + .9
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
def accuracy_202205(br:float):
min_ = .9
max_ = 3.
mean_ = ((1 - br)**2)**.5 + .75
sigma_ = mean_ / 4
return [min_, max_, mean_, sigma_]
def f1_gain_202201(br:float):
min_ = .9
max_ = 4.
mean_ = (1 - br)/br/10 + .9
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
def f1_gain_202205(br:float):
min_ = .9
max_ = 4.
mean_ = (1 - br)/br/10 + .9
sigma_ = mean_ / 2
return [min_, max_, mean_, sigma_]
func_dict = {
'f1_gain': f1_gain_202205,
'accuracy': accuracy_202205,
'recall_gain': recall_gain_202205,
'precision_gain': precision_gain_202205
}
if __name__ == "__main__":
gd = GeneDefinitionGenerator()
gd.save_gene_definition_to_json(
func=func_dict.get('accuracy'),
tuning_settings_path="genetic_tuning_202205_accuracy_crazy_ver_settings.json"
)
gd.save_gene_definition_to_json(func=func_dict.get('f1_gain'), tuning_settings_path="genetic_tuning_202205_f1_gain_crazy_ver_settings.json")
gd.save_gene_definition_to_json(func=func_dict.get('recall_gain'), tuning_settings_path="genetic_tuning_202205_recall_gain_crazy_ver_settings.json")
gd.save_gene_definition_to_json(func=func_dict.get('precision_gain'), tuning_settings_path="genetic_tuning_202205_precision_gain_crazy_ver_settings.json")
```
## [models.py](#Overview-of-Code)
```python=
from data import DataGenerator, FullData
from evaluators import EvaluatorFactory
from settings import Settings
import pandas as pd
from tensorflow.keras import layers, Input
from tensorflow.keras.models import Model
import numpy as np
from utils import Decorators, Majors
class MultilabelModel():
def __init__(
self,
num_of_majors=Settings.NUMBER_OF_MAJORS,
n_nodes=Settings.N_NODES,
input_shape=Settings.INPUT_SHAPE,
metrics=Settings.METRICS,
epochs=Settings.EPOCHS,
early_stopping=Settings.EARLY_STOPPING,
evaluator_type=Settings.EVALUATOR,
evaluator_threshold=Settings.BINARY_THRESHOLD
):
self.num_of_majors = num_of_majors
self.n_nodes = n_nodes
self.input_shape = input_shape
self.metrics = metrics
self.epochs = epochs
self.early_stopping = early_stopping
self.evaluator_type = evaluator_type
self.evaluator_threshold = evaluator_threshold
# dynamically created
majors = Majors.get_corresponding_majors(num_of_majors)
self.model = self.get_keras_model(majors, n_nodes, input_shape)
self.majors = majors
@classmethod
def from_data_generator(cls,
data_generator: DataGenerator,
n_nodes=Settings.N_NODES,
metrics=Settings.METRICS,
epochs=Settings.EPOCHS,
early_stopping=Settings.EARLY_STOPPING,
evaluator_type=Settings.EVALUATOR,
evaluator_threshold=Settings.BINARY_THRESHOLD
):
num_of_majors = data_generator.num_of_majors
input_shape = (data_generator.get_likert_only().shape[1], )
print("model input shape=", input_shape)
return cls(
num_of_majors,
n_nodes,
input_shape,
metrics,
epochs,
early_stopping,
evaluator_type,
evaluator_threshold
)
@classmethod
def from_full_data(cls,
full_data: FullData,
n_nodes=Settings.N_NODES,
metrics=Settings.METRICS,
epochs=Settings.EPOCHS,
early_stopping=Settings.EARLY_STOPPING,
evaluator_type=Settings.EVALUATOR,
evaluator_threshold=Settings.BINARY_THRESHOLD
):
num_of_majors = full_data.num_of_majors
input_shape = (full_data.X.shape[1], )
return cls(
num_of_majors,
n_nodes,
input_shape,
metrics,
epochs,
early_stopping,
evaluator_type,
evaluator_threshold
)
@Decorators.time_it("MultilabelModel.fit_model() ~ sent to keras backend")
def fit_model(self, train_data, train_labels, val_data, val_labels):
# for debugging
print("parameters passed to the keras backend:")
print("shape of train data:", train_data.shape)
print("lengnth of labels ", len([train_labels[:,j] for j in range(train_labels.shape[1])]))
print("epochs", self.epochs)
print("shape of validation data", val_data.shape)
print("length of validation data", len([val_labels[:,k] for k in range(val_labels.shape[1])]))
history = self.model.fit(
train_data,
[train_labels[:,j] for j in range(train_labels.shape[1])],
batch_size=10000,
epochs=self.epochs,
validation_data = (val_data, [val_labels[:,k] for k in range(val_labels.shape[1])]),
callbacks = [self.early_stopping],
verbose=0
)
history_df = pd.DataFrame(history.history)
# for debugging
# print out every parameter
return history_df
# alias for evaluate
def evaluate_model(self, test_data, test_labels):
return self.evaluate(X=test_data, y=test_labels)
@Decorators.time_it("MultilabelModel.evaluate(), fitted model.predict() and stuff")
def evaluate(
self,
X:np.ndarray,
y:np.ndarray
):
# needs to make sure that the model has been fitted!
# soft scores
predicted = self.model.predict(X)
predicted_df = pd.concat([pd.DataFrame(col) for col in predicted], axis=1)
predicted_df.columns = self.majors
answers_df = pd.DataFrame(y, columns=self.majors)
evaluator = EvaluatorFactory.get_evaluator(
self.evaluator_type,
predicted=predicted_df,
answers=answers_df,
binary_threshold=self.evaluator_threshold)
return evaluator.evaluate()
# alias for evaluate
def get_keras_model(self, majors, n_nodes, input_shape):
n_nodes = [n for n in n_nodes if n > 0] # if n <= 0 then that layer doesn't exist
if not (0 < len(n_nodes) <= 3):
raise Exception("From experience, we need 1 to 3 layers")
if len(n_nodes) == 1:
print(f"1 layer, {n_nodes}")
input_layer = Input(input_shape)
dense_1 = layers.Dense(n_nodes[0], activation='relu')(input_layer)
norm_1 = layers.BatchNormalization()(dense_1)
prediction_layers = [layers.Dense(1, activation='sigmoid', name=major)(norm_1) for major in majors]
model = Model(input_layer, prediction_layers)
model.compile(
optimizer='rmsprop',
loss = ['binary_crossentropy' for l in range(len(majors))],
# metrics=[m[0] for m in metrics]
)
return model
elif len(n_nodes) == 2:
print(f"2 layers, {n_nodes}")
input_layer = Input(input_shape)
dense_1 = layers.Dense(n_nodes[0], activation='relu')(input_layer)
norm_1 = layers.BatchNormalization()(dense_1)
dense_2 = layers.Dense(n_nodes[1], activation='relu')(norm_1)
norm_2 = layers.BatchNormalization()(dense_2)
prediction_layers = [layers.Dense(1, activation='sigmoid', name=major)(norm_2) for major in majors]
model = Model(input_layer, prediction_layers)
model.compile(
optimizer='rmsprop',
loss = ['binary_crossentropy' for l in range(len(majors))],
# metrics=[m[0] for m in metrics],
)
return model
else:
print(f"3 layers, {n_nodes}")
input_layer = Input(input_shape)
dense_1 = layers.Dense(n_nodes[0], activation='relu')(input_layer)
norm_1 = layers.BatchNormalization()(dense_1)
dense_2 = layers.Dense(n_nodes[1], activation='relu')(norm_1)
norm_2 = layers.BatchNormalization()(dense_2)
dense_3 = layers.Dense(n_nodes[2], activation='relu')(norm_2)
norm_3 = layers.BatchNormalization()(dense_3)
prediction_layers = [layers.Dense(1, activation='sigmoid', name=major)(norm_3) for major in majors]
model = Model(input_layer, prediction_layers)
model.compile(
optimizer='rmsprop',
loss = ['binary_crossentropy' for l in range(len(majors))],
# metrics=[m[0] for m in metrics],
)
return model
```
## [perturbation.py](#Overview-of-Code)
```python=
# perturbation for genetically tuned results
from enums import StrategyOptions
from strategies import StrategyFactory
from tuning import DefaultNaturalSelection
from settings import TuningSettings, Settings
import sys
if __name__ == "__main__":
ns = DefaultNaturalSelection.deserialize(
serialization_path="best_genetic_crazy_ver_results_202201.xlsx"
)
# ns.fit_all(forced=True) # run no matter what
print("running best_recall_gain")
best_recall_gain = ns.gene_pool[0]
best_recall_gain.str_op = StrategyOptions.PERTURBED_KFOLD_MNN
best_recall_gain.run(forced=True)
print("all done!")
# best_precision_gain = ns.gene_pool[1]
# best_accuracy = ns.gene_pool[2]
# baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run()
```
## [perturbation_pratts.R](#Overview-of-Code)
```r=
library(psych)
library(tidyverse)
library(ggplot2)
library(dplyr)
library(glue)
library(reshape2)
library(RColorBrewer)
library(readxl)
library(jtools)
WHAT = 2# 1 std pratts, 2 pratts, 3 beta, or 4 correlation
PATH = "E:/career_ml/perturb/recall_gain_based_on_best_genetic_crazy_ver_results_202201.xlsx" #for recall_gain
#PATH = "E:/career_ml/perturb/old_perturb_before_genetic_tuning_50_specific_reformatted.xlsx" # for accuracy
BASE_RATE = "E:/career_ml/perturbation/20210919_results/top3_base_rates_specific.csv"
MODEL_INTERCEPT = F # to model intercept?
METRIC = "recall_gain" # out of val_/test_ acc/precision/recall/f1
CACHE = list()
get_plot <- function() {
if(!is.null(CACHE[[paste0("plot", WHAT, PATH, MODEL_INTERCEPT, METRIC)]]))
return(CACHE[[paste0("plot", WHAT, PATH, MODEL_INTERCEPT, METRIC)]])
format_prob <- function(x) {
if(length(x) > 1)
return(sapply(x, format_prob))
if(x > 1)
return("Error")
if(x < 0.005)
return(" ")
if(round(x, 2) == 1)
return ("1.00")
x <- as.character(round(x,2))
x <- sub("^0+.", ".", x)
if(!grepl(".\\d{2}", x))
x <- paste0(x, "0")
return(x)
}
if(is.null(CACHE[[paste0("raw", PATH)]])) CACHE[[paste0("raw", PATH)]] <- read_excel(PATH)
raw <- CACHE[[paste0("raw", PATH)]]
cat(paste0("entries: ", nrow(raw)))
# br <- read.csv(BASE_RATE)[,2]
# likert items
l <- raw[, grepl("l\\d\\w+?\\d", names(raw))]
f1 <- raw[, grepl(".+_f1$", names(raw))]
recall <- raw[, grepl(".+_recall$", names(raw))]
precision <- raw[, grepl(".+_precision$", names(raw))]
accuracy <- raw[, grepl(".+_accuracy", names(raw))]
recall_gain <- raw[, grepl(".+_recall_gain$", names(raw))]
precisoin_gain <- raw[, grepl(".+_precision_gain$", names(raw))]
f1_gain <- raw[, grepl(".+_f1_gain$", names(raw))]
get_df_pratts <- function(what, model_intercept) {
# standardized pratts: 1
# unstandardized pratts: 2
# betas: 3
# correlation: 4
# model_intercept:
# true = with intercept in the multiple regression model
if(!(what %in% 1:4))
return("make sure what is 1 std pratts, 2 pratts, 3 beta, or 4 correlation")
df_pratts <- NULL
rsquares <- vector()
# metrics
tf = NULL
(if(METRIC == "recall")
tf = recall
else if(METRIC == "precision")
tf = precision
else if(METRIC == "accuracy")
tf = accuracy
else if(METRIC == "f1")
tf = f1
else if(METRIC == "precision_gain")
tf = precision_gain
else if(METRIC == "f1_gain")
tf = f1_gain
else
tf = recall_gain
)
for(i in 1:50){
d = cbind(l, tf[i])
predicted <- names(tf)[i]
predictors <- names(l)
formula = ""
if(model_intercept)
# adding the scale(predicted) is simply to prevent singularity
formula <- as.formula(glue("scale({predicted}) ~ {paste0(\"scale(\", predictors, \")\", collapse=\" + \")}"))
else
# no intercepts
formula <- as.formula(glue("scale({predicted}) ~ 0 + {paste0(\"scale(\", predictors, \")\", collapse=\" + \")}"))
result <- lm(formula, data = d)
r2 <- summary(result)$r.squared
rsquares <- append(rsquares,
ifelse(round(r2,2) == .00,
"<.01",
round(r2, 2))
)
# calculating pratt's importance index (unstandardized), add up to r^2
betas <- coef(result) #[, "Estimate"]
if(model_intercept)
betas <- betas[2:length(betas)] # dropping intercept <- if no intercept, delete this line.
rs <- cor(d)[, predicted]
rs <- rs[1:(length(rs) - 1)] # dropping the 1.000 for self-correlation
if(length(betas)!= length(rs)){
names(betas) <- names(rs)[1:length(betas)]
} else {
names(betas) <- names(rs)
}
if(what == 1) {
pratts <- betas*rs
pratts <- pratts/r2
} else if (what == 2) {
pratts <- betas*rs
} else if (what == 3) {
pratts <- betas
} else {
pratts <- rs
}
#t_values <- coef(result)[, "t value"]
if(!is.null(df_pratts))
df_pratts <- cbind(df_pratts, pratts)
else
df_pratts <- pratts
}
assign("rsquares", rsquares, envir = .GlobalEnv)
return(df_pratts)
}
if (!is.null(CACHE[[paste0("df_pratts", WHAT, PATH, MODEL_INTERCEPT, METRIC)]])) {
df_pratts <- CACHE[[paste0("df_pratts", WHAT, PATH, MODEL_INTERCEPT, METRIC)]]
} else {
df_pratts <- get_df_pratts(what=WHAT,model_intercept = MODEL_INTERCEPT)
# ok setting the names, in alphabetical
majors <- c(
"14-1 Accounting",
"01 African American Studies",
"16-1 Anthropology",
"16-2 Archaeologly",
"02 Architecture",
"03 Asian Studies",
"04 Biology",
"14-2 Business",
"15-1 Chemical Engineering",
"05 Chemistry",
"15-2 Civil Engineering",
"16-3 Classics",
"06 Computer Science",
"07 Communications",
"08 Criminology",
"09 Dance",
"13-1 Digital Art",
"14-3 Economics",
"10 Education",
"15-3 Electronic Engineering",
"11 English",
"12 Environmental Science",
"18-1 French",
"19 Gender Studies",
"20 Geology",
"21 Geography",
"18-2 German",
"17-1 Health Science",
"16-4 History",
"22 International Studies",
"18-3 Italian",
"17-2 Kinesiology",
"23 Linguistics",
"14-4 Marketing",
"15-4 Materials Science",
"24 Mathematics",
"15-5 Mechanical Engineering",
"25 Music",
"26 Neuroscience",
"17-3 Nursing",
"27 Performance Art",
"28 Philosophy",
"29 Physics",
"30 Politics",
"31 Psychology",
"16-5 Religion",
"32 Sociology",
"18-4 Spanish",
"33 Statistics",
"13-2 Visual Art")
items <- c("01 African American Studies",
"02 Architecture",
"03 Asian Studies",
"04 Biology",
"05 Chemistry",
"06 Computer Science",
"07 Communications",
"08 Criminology",
"09 Dance",
"10 Education",
"11 English",
"12 Environmental Science",
"13 General Arts",
"14 General Business",
"15 General Engineering",
"16 General History",
"17 General Health",
"18 General Language",
"19 Gender Studies",
"20 Geology",
"21 Geography",
"22 International Studies",
"23 Linguistics",
"24 Mathematics",
"25 Music",
"26 Neuroscience",
"27 Performance Art",
"28 Philosophy",
"29 Physics",
"30 Politics",
"31 Psychology",
"32 Sociology",
"33 Statistics"
)
col_names <- majors
col_order <- rank(col_names)
names(col_order) <- col_names
colnames(df_pratts) <- col_names #names(tf)
rownames(df_pratts) <- c(
paste0(items, " 1"),
paste0(items, " 2"),
paste0(items, " 3")
)
CACHE[[paste0("df_pratts", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] <- df_pratts
}
heatmap(as.matrix(df_pratts),
col=colorRampPalette(brewer.pal(11, "PiYG"))(100),
Colv = NA, Rowv = NA
)
# let's forget about ggplot2 for now
if(is.null(CACHE[[paste0("df_pratts_majors_long", WHAT, PATH, MODEL_INTERCEPT, METRIC)]])) {
df_pratts_long <- as.data.frame(df_pratts)
df_pratts_long$likert <- rownames(df_pratts_long)
rownames(df_pratts_long) <- NULL
df_pratts_majors_long <- df_pratts_long
df_pratts_majors_long <- melt(df_pratts_majors_long, id.vars="likert")
df_pratts_long <- melt(df_pratts_long, id.vars="likert")
df_pratts_majors_long$colOrder <- as.numeric(col_order[as.character(df_pratts_majors_long$variable)])
CACHE[[paste0("df_pratts_majors_long", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] <- df_pratts_majors_long
}
df_pratts_majors_long <- CACHE[[paste0("df_pratts_majors_long", WHAT, PATH, MODEL_INTERCEPT, METRIC)]]
what = ""
(if(WHAT==1) {
what = "Std. Pratt"}
else if(WHAT==2) {
what = "UnStd. Pratt" }
else if(WHAT==3) {
what = "Beta" }
else {
what = "Correlation" }
)
# https://www.royfrancis.com/a-guide-to-elegant-tiled-heatmaps-in-r/
plot <- ggplot(data=df_pratts_majors_long) +
geom_tile(aes(y=reorder(variable, colOrder), x=likert, fill=value), color="whitesmoke", size=.25) +
geom_text(aes(
y=variable,
x=likert,
label=
ifelse(round(value*100,0)==.0," ",
round(value*100,0))
), #round(value*100,0)),
color="#cccccc",
size=2.25,
) +
#geom_hline(yintercept=c(12.5), linetype="dashed", color = "#cccccc") +
#geom_hline(yintercept=c(14.5), linetype="dashed", color = "#cccccc") +
#geom_hline(yintercept=c(18.5), linetype="dashed", color = "#cccccc") +
#geom_hline(yintercept=c(23.5), linetype="dashed", color = "#cccccc") +
#geom_hline(yintercept=c(28.5), linetype="dashed", color = "#cccccc") +
#geom_hline(yintercept=c(31.5), linetype="dashed", color = "#cccccc") +
#geom_hline(yintercept=c(35.5), linetype="dashed", color = "#cccccc") +
#geom_vline(xintercept=c(36.5), linetype="solid", color = "#cceecc") +
#geom_vline(xintercept=c(54.5), linetype="solid", color = "#cceecc") +
labs(
title="",
x="Predictors (Inclusion Status of Likert-type Items)",
y="Major (Outcomes)",
fill=what) +
scale_fill_gradient2(
low="#eb593d",
mid="white",
midpoint=0,
space="Lab",
high="#49b675",
na.value = "white"
) +
scale_y_discrete(expand=c(0,0)) +
#scale_x_discret e(expand=c(0,0)) + #
theme_grey(base_size=12)+
coord_fixed()+ # maintaining aspect ratio
# set base size for all fonts
theme(
axis.text=element_text(face="bold"),
axis.text.x=element_text(angle = 90, vjust=.25, hjust=.95),
axis.ticks=element_line(size=.3),
plot.background=element_blank(),
panel.border=element_blank()
)
plot = plot +
geom_vline(xintercept=seq(3.5,99.5, 3), color="#aaaaaa") +
geom_rect(xmin=0.5, xmax=3.5, ymin=0.5, ymax=1.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=3.5, xmax=6.5, ymin=1.5, ymax=2.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=6.5, xmax=9.5, ymin=2.5, ymax=3.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=9.5, xmax=12.5, ymin=3.5, ymax=4.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=12.5, xmax=15.5, ymin=4.5, ymax=5.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=15.5, xmax=18.5, ymin=5.5, ymax=6.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=18.5, xmax=21.5, ymin=6.5, ymax=7.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=21.5, xmax=24.5, ymin=7.5, ymax=8.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=24.5, xmax=27.5, ymin=8.5, ymax=9.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=27.5, xmax=30.5, ymin=9.5, ymax=10.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=30.5, xmax=33.5, ymin=10.5, ymax=11.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=33.5, xmax=36.5, ymin=11.5, ymax=12.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=36.5, xmax=39.5, ymin=12.5, ymax=14.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=39.5, xmax=42.5, ymin=14.5, ymax=18.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=42.5, xmax=45.5, ymin=18.5, ymax=23.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=45.5, xmax=48.5, ymin=23.5, ymax=28.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=48.5, xmax=51.5, ymin=28.5, ymax=31.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=51.5, xmax=54.5, ymin=31.5, ymax=35.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=54.5, xmax=57.5, ymin=35.5, ymax=36.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=57.5, xmax=60.5, ymin=36.5, ymax=37.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=60.5, xmax=63.5, ymin=37.5, ymax=38.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=63.5, xmax=66.5, ymin=38.5, ymax=39.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=66.5, xmax=69.5, ymin=39.5, ymax=40.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=69.5, xmax=72.5, ymin=40.5, ymax=41.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=72.5, xmax=75.5, ymin=41.5, ymax=42.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=75.5, xmax=78.5, ymin=42.5, ymax=43.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=78.5, xmax=81.5, ymin=43.5, ymax=44.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=81.5, xmax=84.5, ymin=44.5, ymax=45.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=84.5, xmax=87.5, ymin=45.5, ymax=46.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=87.5, xmax=90.5, ymin=46.5, ymax=47.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=90.5, xmax=93.5, ymin=47.5, ymax=48.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=93.5, xmax=96.5, ymin=48.5, ymax=49.5, fill=NA, color="#cccccc", alpha=0.1, size=.02) +
geom_rect(xmin=96.5, xmax=99.5, ymin=49.5, ymax=50.5, fill=NA, color="#cccccc", alpha=0.1, size=.02)
CACHE[[paste0("plot", WHAT, PATH, MODEL_INTERCEPT, METRIC)]] <- plot
assign("CACHE", CACHE, envir = .GlobalEnv)
return(plot)
}
plot <- get_plot()
plot + geom_vline(xintercept=seq(3.5,99.5, 3), color="#cccccc") # adding vertical lines
# drawing subplots
df_long <- CACHE$`df_pratts_majors_long1E:/career_ml/perturb/recall_gain_based_on_best_genetic_crazy_ver_results_202201.xlsxFALSErecall_gain`
df_long <- CACHE$`df_pratts_majors_long1E:/career_ml/per
```
## [reports_mkfold.py](#Overview-of-Code)
```python=
import pandas as pd
from tuning import DefaultNaturalSelection
import sys
from utils import Majors
from strategies import StrategyFactory
from enums import StrategyOptions
ns = DefaultNaturalSelection.deserialize(
serialization_path="best_genetic_crazy_ver_results_202205.xlsx"
)
ns.fit_all(forced=True) # run no matter what
best_recall_gain = ns.gene_pool[0]
best_precision_gain = ns.gene_pool[1]
best_accuracy = ns.gene_pool[2]
baseline = StrategyFactory.get_strategy(StrategyOptions.BASELINE).run()
# result tables
# Create a Pandas Excel writer using XlsxWriter as the engine.
writer = pd.ExcelWriter('best_genetic_crazy_ver_results_202205_reported.xlsx')
# Write each dataframe to a different worksheet.
best_recall_gain_df = best_recall_gain.performance
best_precision_gain_df = best_precision_gain.performance
best_accuracy_df = best_accuracy.performance
best_recall_gain_df.index = Majors.get_major_fullnames(best_recall_gain_df.index )
best_precision_gain_df.index = Majors.get_major_fullnames(best_precision_gain_df.index)
best_accuracy_df.index = Majors.get_major_fullnames(best_accuracy_df.index)
baseline.index = Majors.get_major_fullnames(baseline.index)
best_recall_gain_df.to_excel(writer, sheet_name="best_recall")
best_precision_gain_df.to_excel(writer, sheet_name="best_precision")
best_accuracy_df.to_excel(writer, sheet_name="best_accuracy")
baseline.to_excel(writer, sheet_name="baseline")
# Close the Pandas Excel writer and output the Excel file.
writer.save()
writer.close()
```
###### tags: `dissertation` `replication` `python`