# Code Repo: Exploring Explainable Multilabel Neural Network as a Psychometric Tool for Short-Form Test Scoring and Diagnostics: A Study on College Major Preference Assessment (Cont'd)
:::info
:bulb: This is a continuation from [Code Repo: Exploring Explainable Multilabel Neural Network as a Psychometric Tool for Short-Form Test Scoring and Diagnostics: A Study on College Major Preference Assessment](/Sjx7jw_8TlyyJ3_IeK8TyQ). The previous document reached the limit of the max number of characters in hack.md, so the remaining code is stored here.
:::
# Overview of Code
The code was [keras](https://keras.io/) for machine learning wrapped in self-defined classes. The following shows files and classes and what they do. Files that are written in `Python` have the extension `.py`, and `R`, `.R`. Files with the :arrow_backward: icon are in [the previous document](/Sjx7jw_8TlyyJ3_IeK8TyQ).
|File|Description|Classes Defined There|
|---|---|---|
|:arrow_backward: [analyses.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Backend calculations for simple-sum and multilabel MNN with k-fold corss validation.|`MultilabelKFold`, `PerturbedMultilabelKFold`
|:arrow_backward: [data.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Defined classes to easily read CMPA data and preprocess it with.| `DataGenerator`, `FullData`
|:arrow_backward: [enums.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|For enums, which are close-category options convenient for programming. Not important conceptually.| `StrategyOptions`, `MajorNumberOptions`, `MetricsOptions`, `AdvancedMetricsOptions`, `StanadardizationModeOptions`, `ItemTypeOptions`, `EvaluatorTypeOptions`
|:arrow_backward: [evaluatators.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Calculating performance metrics.| `EvaluatorFactory`, `Evaluator`, `KerasDefaultEvaluator`, `BinaryEvaluator`, `BinaryEvaluatorWithoutPunishment`
|:arrow_backward: [gene_definition.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Generating gene definitions for hyperparameter tuning.|`GeneDefinitionGenerator`|
|:arrow_backward: [models.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Wrapping keras classes under a more convenient model class.|`MultilabelModel`
|:arrow_backward: [perturbation.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|To run perturbation. The real backend analysis is defined in [analyses.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|None
|:arrow_backward: [perturbation_pratts.R](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Drawing the graphs in Chapter 4.|None
|:arrow_backward: [reports_mkfold.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Generating reports of the results in human readable tables.|None
|[settings.py](#settings.py)|All the settings are here.|`Settings`, `TuningSettings`
|[strategies.py](#strategies.py)|Wrapping everything in here for a basic run with an MNN or baseline.| `RunStrategy`, `StrategyFactory`, `TestStrategy`, `MkFoldBaseRun`, `PerturbationRun`, `RawAggregate`
|[tuning.py](#tuning.py)|For genetic hyperparameter tuning.|`Genes`, `Species`, `DefaultNaturalSelection`
|[utils.py](#utils.py)|Auxiliary functions and decorators, such as functions that deal with getting the names of the 50 majors in CMPA.|`Decorators`, `ItemCheck`, `Majors`, `ItemSelector`
# Source Code (Cont'd)
## [settings.py](#Overview-of-Code)
```python=
from enums import *
import tensorflow as tf
class Settings:
DATAPATH="data/major_all.csv"
CLEANTHRESHOLD=.0
BINARY_THRESHOLD=.5
SCORED_ITEMTYPE=ItemTypeOptions.FORCED_CHOICE
TOP_N=3
NUMBER_OF_MAJORS=MajorNumberOptions.FIFTY
CLASSWEIGHT=None
STANDARDIZATION_MODE=StanadardizationModeOptions.BY_COL
N_NODES=[72, 64]
SUBSET_SIZE=50
PERTURBATION_ITER=2000 #change to 5000
PERTURBATION_SERIALIZATION_PATH="perturb/perturbation_crazy_ver_model.xlsx"
PERTURBATION_SERIALIZATION_FREQUENCY=10
INPUT_SHAPE=(99, )
METRICS=MetricsOptions.ALL
K_FOLD=3 #!! change this!
# used when K_FOLD == 1
DEFAULT_TRAIN_SPLIT_END=7000
DEFAULT_VAL_SPLIT_END=8000
EVALUATOR = EvaluatorTypeOptions.BINARY
EPOCHS=10000
EARLY_STOPPING=tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=1e-4, patience=3, restore_best_weights=True)
ADVANCDE_METRICS=[
AdvancedMetricsOptions.TEST_ACCURACY_OBSERVED,
AdvancedMetricsOptions.TEST_ACCURACY_COIN,
AdvancedMetricsOptions.TEST_ACCURACY_GAIN,
AdvancedMetricsOptions.TEST_PRECISION_OBSERVED,
AdvancedMetricsOptions.TEST_PRECISION_COIN,
AdvancedMetricsOptions.TEST_PRECISION_GAIN,
AdvancedMetricsOptions.TEST_RECALL_OBSERVED,
AdvancedMetricsOptions.TEST_RECALL_COIN,
AdvancedMetricsOptions.TEST_RECALL_GAIN,
AdvancedMetricsOptions.TEST_F1_OBSERVED,
AdvancedMetricsOptions.TEST_F1_COIN,
AdvancedMetricsOptions.TEST_F1_GAIN
]
class TuningSettings(Settings):
MOMEMTUM_START = 1.6
MOMEMTUM_DECAY = .95
MOMEMTUM_MIN = .15
ORGANISM_NAMES = [
"koda", "amery", "andy", "ryan", "sirui", "yue", "hsin", "jerry", "jake", "shunfu",
"yoda", "oily", "psycho", "metrician", "magician", "mermer", "doraemon", "sailor",
"moon", "venus", "pikachu", "bulbasaur", "squirtle", "lucifer", "minjeong", "allpass",
"alpaca", "santa", "amarda", "statistician", "o", "canada", "covid", "karma",
"university", "building" "koda", "avocado", "poseidon", "zeus",
"crank", "thunderbird", "crazy", "rich", "asian", "cherry", "fortune", "luck",
"girl", "scarfe", "main", "mall", "ball", "sour", "twinkle", "pronoun", "circle", "diversity",
"make", "america", "great", "again", "incel", "spongebob", "patrick",
"star", "rain", "parade", "daily", "testicle", "timely", "matcha", "mochi","mahou", "zombie", "muthen",
"R", "constipation", "diarrhea", "irt", "sem", "cfa", "growth", "tea", "tapioca",
"lavaan", "bayesian", "frequentist", "chi", "papi","bubble", "odysseus", "dear", "square"
"electron", "epse", "karen", "yan", "shapka", "wu", "doctor", "committee", "liu",
"ji", "hu", "zou", "stone", "ubc", "save", "foods", "cucumber", "carrots", "ma", "park", "hsu", "chen",
"choi", "ng", "connie", "lo", "owen", "pogo", "koda", "maomao", "gueiguei",
"baibai", "mimi", "yellowhead", "niuniu", "panpan", "dinghao", "laifu",
"dongdong", "nannan", "hsihsi", "beibei"
]
STRATEGY= StrategyOptions.BASIC_KFOLD_MNN
#StrategyOptions.BASIC_KFOLD_MNN
# StrategyOptions.TEST
EVALUATOR_TYPE=EvaluatorTypeOptions.BINARY
INIT_ORGANISM_FITNESS = -9999.
CLASSWEIGHTGENE_MUTATION_RATIO = .15
GENE_POOL_SIZE = 30
# GENE_DEFINITION= {
# 'a': (-10,10,0,5),
# 'b': (-5,5,0,1.3),
# 'c': (-10,10,0,5),
# 'd': (-5,5,0,1.3),
# 'e': (-10,10,0,5),
# 'f': (-5,5,0,1.3),
# 'g': (-15,15,0,8),
# 'h': (32,250,80,30), # layers
# 'i': (-250,250,80,55),
# 'j': (-250,250,0,100),
# 'k': (.2,.97,.5,.2)
# }
# will be overridden by the gene definitions generated in gene_definition.py
GENE_DEFINITION = {
'accng':(0.0,9999,1.986076,0.993038),
'afres':(0.0,9999,3.2595,1.62975),
'antgy':(0.0,9999,3.0020046,1.5010023),
'arcgy':(0.0,9999,3.5289693,1.7644846),
'arcre':(0.0,9999,2.4149277,1.2074639),
'asies':(0.0,9999,6.2625,3.13125),
'biogy':(0.0,9999,4.4615097,2.2307549),
'busss':(0.0,9999,1.9664408,0.9832204),
'cheng':(0.0,9999,9.803774,4.901887),
'chery':(0.0,9999,3.3771653,1.6885827),
'civng':(0.0,9999,4.0148516,2.0074258),
'clacs':(0.0,9999,4.516092,2.258046),
'comce':(0.0,9999,3.040136,1.520068),
'comns':(0.0,9999,1.9987195,0.9993597),
'crigy':(0.0,9999,1.3379582,0.6689791),
'dance':(0.0,9999,2.708046,1.354023),
'digia':(0.0,9999,2.04678,1.02339),
'ecocs':(0.0,9999,2.776342,1.388171),
'eduon':(0.0,9999,1.6133786,0.8066893),
'eleng':(0.0,9999,10.063107,5.0315533),
'engsh':(0.0,9999,3.4101064,1.7050532),
'envce':(0.0,9999,2.5645502,1.2822751),
'frech':(0.0,9999,4.2349825,2.1174912),
'genes':(0.0,9999,2.7469668,1.3734834),
'geogy':(0.0,9999,5.2897673,2.6448836),
'geohy':(0.0,9999,4.369853,2.1849265),
'geran':(0.0,9999,5.764949,2.8824744),
'heace':(0.0,9999,1.6605157,0.83025783),
'hisry':(0.0,9999,2.9562092,1.4781046),
'intes':(0.0,9999,4.5440154,2.2720077),
'itaan':(0.0,9999,4.143299,2.0716496),
'kingy':(0.0,9999,1.7887006,0.8943503),
'lincs':(0.0,9999,3.0020046,1.5010023),
'marng':(0.0,9999,1.9823394,0.9911697),
'matce':(0.0,9999,8.2734375,4.1367188),
'matcs':(0.0,9999,3.2893672,1.6446836),
'mecng':(0.0,9999,3.7005935,1.8502967),
'music':(0.0,9999,2.432143,1.2160715),
'neuce':(0.0,9999,1.3913066,0.6956533),
'nurng':(0.0,9999,1.6472684,0.8236342),
'perrt':(0.0,9999,2.2658467,1.1329234),
'phihy':(0.0,9999,1.4063305,0.70316523),
'phycs':(0.0,9999,5.1513515,2.5756757),
'polcs':(0.0,9999,3.5661016,1.7830508),
'psygy':(0.0,9999,1.1917465,0.59587324),
'relon':(0.0,9999,5.2293577,2.6146789),
'socgy':(0.0,9999,2.2798245,1.1399122),
'spash':(0.0,9999,3.2019513,1.6009756),
'stacs':(0.0,9999,3.934727,1.9673635),
'visrt':(0.0,9999,2.3746877,1.1873438),
'layer1': (32,250,80,30), # layers
'layer2': (-250,250,80,55),
'layer3': (-250,250,-50,100),
'decision_threshold': (.5,.5,.5,0)
}
# dict{
# location_name: (min, max, mean, variance)
# }
GENE_POOL_DF_PATH = "genetic_tuning_202201_accuracy_crazy_ver.xlsx"
TUNING_SETTINGS_PATH = "genetic_tuning_202201_accuracy_crazy_ver_settings.json"
CULL_RATIO = .33333 # keeping how much of the orgnaisms
BREED_RATIO = .50 # the percentage of replenishing behaviour that is based on breeding
NUM_GENERATIONS = 65
FITNESS_INDEX = MetricsOptions.ACCURACY
GLOBAL_RAW_DATA = {}
GLOBAL_CLEANED_DATA = {}
GLOBAL_LIKERT_DATA = {}
GLOBAL_MAJOR_SCORES_DF = {}
GLOBAL_MAJOR_IS_TOP_N_DF = {}
```
## [strategies.py](#Overview-of-Code)
```python=
# these are packed implementations for convenient running!
import random
from data import *
from enums import EvaluatorTypeOptions, MetricsOptions, StrategyOptions
from evaluators import Evaluator, EvaluatorFactory
from models import MultilabelModel
from analyses import MultilabelKFold, PerturbedMultilabelKFold
from abc import ABC, abstractmethod
from settings import Settings
from utils import Decorators
# these are convient classes for running the classes
# we'll have both a factory pattern and a strategy pattern
class RunStrategy(ABC):
@abstractmethod
def run() -> pd.DataFrame:
pass
# maybe we'll need a strategy context factory?
class StrategyFactory:
@classmethod
def get_strategy(cls, strategy, *args, **kwargs) -> RunStrategy:
if strategy == StrategyOptions.BASIC_KFOLD_MNN:
return MkFoldBaseRun(*args, **kwargs)
elif strategy == StrategyOptions.BASELINE:
return RawAggregate(*args, **kwargs)
elif strategy == StrategyOptions.GENETIC:
raise Exception("Genetic tuning hasn't been rewritten yet.")
elif strategy == StrategyOptions.PERTURBED_KFOLD_MNN:
return PerturbationRun(*args, **kwargs)
elif strategy == StrategyOptions.TEST:
return TestStrategy(*args, **kwargs)
else:
raise Exception("Strategy must be one of the specified optsion in StrategyOptions")
class StrategyContext(ABC):
@property
def strategy(self):
return self._strategy
@strategy.setter
def strategy(self, strategy: RunStrategy):
assert isinstance(strategy, RunStrategy)
self._strategy = strategy
@property
def settings(self) -> dict:
return self._settings
@settings.setter
def settings(self, settings: dict):
self._settings = settings
return self
@abstractmethod
def run(self):
# some implementations of how to
# apply settings to context using a particular setting file
pass
class TestStrategy(RunStrategy):
def __init__(self, *arg, **kwargs) -> None:
pass
# @Decorators.time_it
def run(self, *args, **kwargs) -> pd.DataFrame:
metrics = Settings.METRICS
majors = Majors.get_corresponding_majors(ItemTypeOptions.FORCED_CHOICE)
shape = (len(majors), len(metrics))
return pd.DataFrame(
np.random.rand(*shape),
index=majors,
columns=metrics
)
class MkFoldBaseRun(RunStrategy):
# the default of my dissertation Chapter 3
def __init__(
self,
path=Settings.DATAPATH,
threshold=Settings.CLEANTHRESHOLD,
scored_item_type=Settings.SCORED_ITEMTYPE,
top_n=Settings.TOP_N,
num_of_majors=Settings.NUMBER_OF_MAJORS,
class_weight_function=Settings.CLASSWEIGHT,
n_nodes=Settings.N_NODES,
input_shape=Settings.INPUT_SHAPE,
metrics=Settings.METRICS,
k_fold=Settings.K_FOLD,
epochs=Settings.EPOCHS,
early_stopping=Settings.EARLY_STOPPING,
evaluator_type=Settings.EVALUATOR,
evaluator_threshold=Settings.BINARY_THRESHOLD
):
# data_generator = DataGenerator(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function)
full_data = FullData(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function)
# full_data = FullData(X=data_generator.get_likert_only(),y=data_generator.get_weighted_major_is_top_n_df())
model = MultilabelModel(num_of_majors, n_nodes, input_shape, metrics, epochs, early_stopping, evaluator_type, evaluator_threshold)
mkfold = MultilabelKFold(full_data, model)
# generated
self.__dict__.update({k:v for k, v in locals().items() if k not in ["self"]})
# is equivalent to self.top_n = top_n; self.input_shape = input_shape and so on...
@Decorators.time_it("MKFoldBaseRun strategy's run()")
def run(self) -> pd.DataFrame:
results = self.mkfold.run(self.k_fold)
return results
class PerturbationRun(MkFoldBaseRun):
def __init__(self, path=Settings.DATAPATH, threshold=Settings.CLEANTHRESHOLD, scored_item_type=Settings.SCORED_ITEMTYPE, top_n=Settings.TOP_N, num_of_majors=Settings.NUMBER_OF_MAJORS, class_weight_function=Settings.CLASSWEIGHT, n_nodes=Settings.N_NODES, input_shape=Settings.INPUT_SHAPE, metrics=Settings.METRICS, k_fold=Settings.K_FOLD, epochs=Settings.EPOCHS, early_stopping=Settings.EARLY_STOPPING, evaluator_type=Settings.EVALUATOR, evaluator_threshold=Settings.BINARY_THRESHOLD, subset=Settings.SUBSET_SIZE, iter=Settings.PERTURBATION_ITER):
super().__init__(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function, n_nodes, input_shape, metrics, k_fold, epochs, early_stopping, evaluator_type, evaluator_threshold)
self.subset = subset
self.iter = iter
self.perturbed_mkfold = PerturbedMultilabelKFold.from_mkfold(self.mkfold, subset)
print("self.model.n_nodes", self.model.n_nodes)
@Decorators.time_it()
def run(self):
return self.perturbed_mkfold.run(self.k_fold, self.iter)
class RawAggregate(RunStrategy):
def __init__(self,
path=Settings.DATAPATH,
threshold=Settings.CLEANTHRESHOLD,
top_n=Settings.TOP_N,
num_of_majors=Settings.NUMBER_OF_MAJORS,
metrics=Settings.METRICS,
*args, **kwargs
):
self.path = path
self.threshold = threshold
self.top_n = top_n
self.num_of_majors=num_of_majors
self.metrics = metrics
def get_evaluator(self) -> Evaluator:
predicted_generator = DataGenerator(
self.path,
self.threshold,
ItemTypeOptions.LIKRET,
self.top_n,
self.num_of_majors,
None
)
answers_generator = DataGenerator(
self.path,
self.threshold,
ItemTypeOptions.FORCED_CHOICE,
self.top_n,
self.num_of_majors,
None
)
predicted = predicted_generator.get_major_is_top_n_df()
answers = answers_generator.get_major_is_top_n_df()
evaulator = EvaluatorFactory.get_evaluator(
EvaluatorTypeOptions.BINARY,
predicted=predicted,
answers=answers,
metrics=self.metrics
)
return evaulator
@Decorators.time_it("Raw aggregates run()")
def run(self) -> pd.DataFrame:
evaluator = self.get_evaluator()
results = evaluator.evaluate()
return pd.DataFrame(results)
if __name__ == "__main__":
perturb = StrategyFactory.get_strategy(StrategyOptions.PERTURBED_KFOLD_MNN)
perturb.run()
```
# [tuning.py](#Overview-of-Code)
```python=
from abc import ABC, abstractproperty
from typing import List, Tuple
import json
from numpy import numarray, spacing
from numpy.core.numeric import full
from data import DataGenerator
from enums import EvaluatorTypeOptions, MajorNumberOptions, StrategyOptions
from evaluators import Evaluator
from settings import Settings, TuningSettings
from strategies import RunStrategy, StrategyFactory
from utils import Decorators, Majors
import random
import pandas as pd
import sys
class Genes:
def __init__(self, gene_definition=TuningSettings.GENE_DEFINITION) -> None:
# dict{
# location_name: (min, max, mean, variance)
# }
self.gene_definition = gene_definition
# dynamically generate values first
self.generate_brand_new_genes()
@classmethod
def from_dict(cls, gene_dict, gene_definition=TuningSettings.GENE_DEFINITION) -> "Genes":
# a dict that has the same keys as gene_definition
# the dict may contain more, "undeed", information than the gene def
gene_values = {k: gene_dict.get(k) for k in gene_definition.keys()}
genes = cls(gene_definition)
gene_dict.values = gene_values
return genes
@property
def values(self) -> dict:
return self._values
@values.setter
def values(self, new_values:dict):
# dict {
# location_name: new_value
# }
for k, v in new_values.items():
m, M, _, _ = self.gene_definition.get(k)
if v < m:
v = m
if v > M:
v = M
new_values[k] = v
if hasattr(self, "_values"):
self._values.update(new_values)
else:
self._values = new_values
def generate_brand_new_genes(self):
new_values = {}
for k, (_, _, mean, variance) in self.gene_definition.items():
new_values[k] = random.gauss(mu=mean, sigma=variance)
self.values = new_values
def mutate_genes(self, genes_to_mutate:List[str], mutation_ratio=TuningSettings.CLASSWEIGHTGENE_MUTATION_RATIO):
current_values = self.values
new_values = {}
for k in genes_to_mutate:
_, _, mean, variance = self.gene_definition.get(k)
current_value = current_values.get(k)
new_value = current_value + random.gauss(mu=mean, sigma=variance*mutation_ratio)
new_values[k] = new_value
self.values = new_values
def mutate_all_genes(self, mutation_ratio=TuningSettings.CLASSWEIGHTGENE_MUTATION_RATIO):
all_genes = list(self.gene_definition.keys())
self.mutate_genes(all_genes, mutation_ratio)
# species are like strategy_contexts
class Species:
def __init__(self, genes:Genes, str_op:str, fitness_index:str, organism_names:List[str]=TuningSettings.ORGANISM_NAMES, init_organism_fitness=TuningSettings.INIT_ORGANISM_FITNESS) -> None:
self.genes = genes
self.str_op = str_op # strategy_options
self.fitness_index=fitness_index
self.organism_names=organism_names
self.init_organism_fitness = init_organism_fitness
def reincarnate(self):
# basically forget who you are
self.sur_name = ""
self.given_name = ""
self.performance = pd.DataFrame()
self.fitness = self.init_organism_fitness - random.random()/10
self.genes.generate_brand_new_genes()
@property
def performance(self) -> pd.DataFrame:
if not hasattr(self, "_performance"):
self._performance = pd.DataFrame()
return self._performance
@performance.setter
def performance(self, new_performance:pd.DataFrame):
self._performance = new_performance
@property
def fitness(self):
if not hasattr(self, "_fitness"):
self._fitness = self.init_organism_fitness - random.random()/10
return self._fitness
@fitness.setter
def fitness(self, new_fitness):
self._fitness = new_fitness
def __eq__(self, other:"Species") -> bool:
return self.fitness == other.fitness
def __gt__(self, other:"Species") -> bool:
return self.fitness > other.fitness
@property
def genes(self) -> Genes:
return self._genes
@genes.setter
def genes(self, genes):
self._genes = genes
@property
def strategy(self) -> RunStrategy:
return self._strategy
@strategy.setter
def strategy(self, strategy:RunStrategy):
assert isinstance(strategy, RunStrategy)
self._strategy = strategy
@Decorators.time_it("tuning.py Species.run()")
def run(self, forced=False) -> "Species":
# probably fitted
if not forced:
if self.fitness > self.init_organism_fitness:
print("fitted -- ", self)
return self
print("fitting organism ", self.name)
self.strategy = StrategyFactory.get_strategy(self.str_op, **self.settings)
results = self.strategy.run()
self.performance = results #.mean() #.to_dict() # a df reduced to a dict
fitted_index = self.performance[self.fitness_index]
# if it has a mean method
if hasattr(fitted_index, "mean") and callable(getattr(fitted_index, "mean")):
self.fitness = fitted_index.mean()
else:
self.fitness = fitted_index
print("fitted -- ", self)
print(fitted_index)
return self
@property
def name(self) -> str:
if not hasattr(self, "_name"):
self._name = self.given_name + " " + self.sur_name
return self._name
@property
def given_name(self) -> str:
if not hasattr(self, "_given_name"):
self._given_name = random.choice(self.organism_names)
return self._given_name
@property
def sur_name(self) -> str:
if not hasattr(self, "_sur_name"):
self._sur_name = random.choice(self.organism_names)
return self._sur_name
@given_name.setter
def given_name(self, given_name:str):
self._given_name = given_name
@sur_name.setter
def sur_name(self, sur_name:str):
self._sur_name = sur_name
def __repr__(self) -> str:
return f"<{self.name} {self.fitness_index}: {str(round(self.fitness,3))}, {self.genes.values.items()}>"
@property
def settings(self) -> dict:
specific_majors = Majors.get_corresponding_majors(MajorNumberOptions.FIFTY)
major_weights = {k:self.genes.values.get(k) for k in specific_majors}
layer1 = self.genes.values.get("layer1")
layer2 = self.genes.values.get("layer2")
layer3 = self.genes.values.get("layer3")
decision_threshold = self.genes.values.get("decision_threshold")
def class_weight_function(df:pd.DataFrame):
weights = pd.Series(major_weights)
return weights * df
n_nodes = [int(layer1), int(layer2), int(layer3)]
n_nodes.sort(reverse=True)
n_nodes = [n for n in n_nodes if n > 0]
binary_threshold = decision_threshold
return {
"class_weight_function": class_weight_function,
"n_nodes":n_nodes,
"evaluator_type": TuningSettings.EVALUATOR_TYPE,
"evaluator_threshold": binary_threshold
}
def read_gene_pool(gene_df:pd.DataFrame, settings:dict) -> List[Species]:
for col in range(gene_df.shape[1]):
data_dict = gene_df.iloc[:, col].to_dict()
data_dict["name_to_parse"] = gene_df.columns[col]
species = recreate_one_species(data_dict, settings)
yield species
def recreate_one_species(
data_dict:dict,
settings:dict
):
fitness_index = settings.get("fitness_index", TuningSettings.FITNESS_INDEX)
genes = read_genes(data_dict, settings)
performance = read_performance(data_dict, settings)
names = read_name(data_dict, settings)
species = Species(
genes,
settings.get("str_op", TuningSettings.STRATEGY),
fitness_index,
settings.get("organism_names", TuningSettings.ORGANISM_NAMES),
settings.get("init_organism_fitness", TuningSettings.INIT_ORGANISM_FITNESS)
)
species.sur_name = names.get("sur_name")
species.given_name = names.get("given_name")
species.performance = performance
species.fitness = species.performance.get(fitness_index)
return species
def read_genes(gene_dict:dict, settings:dict) -> Genes:
gene_definition = settings.get("gene_definition", TuningSettings.GENE_DEFINITION)
genes = Genes(gene_definition)
genes.values = {k:gene_dict.get(k) for k in gene_definition.keys()}
return genes
def read_performance(performance_dict:dict, settings:dict) -> dict:
metrics = settings.get("metrics", Settings.METRICS)
performance_keys = [k for k in metrics]
metrics = {k:performance_dict.get(k) for k in performance_keys}
return metrics
def read_name(data_dict:dict, settings:dict) -> dict:
name_to_parse = data_dict.get("name_to_parse")
organism_names = settings.get("organism_names", TuningSettings.ORGANISM_NAMES)
try:
given_name, sur_name = name_to_parse.split()
except:
given_name = random.choice(organism_names)
sur_name = random.choice(organism_names)
return {'given_name':given_name, 'sur_name':sur_name}
class DefaultNaturalSelection:
def __init__(
self,
str_op=TuningSettings.STRATEGY,
organism_names=TuningSettings.ORGANISM_NAMES,
init_organism_fitness=TuningSettings.INIT_ORGANISM_FITNESS,
gene_definition=TuningSettings.GENE_DEFINITION,
fitness_index=TuningSettings.FITNESS_INDEX,
gene_pool=[],
pool_size=TuningSettings.GENE_POOL_SIZE,
num_generations=TuningSettings.NUM_GENERATIONS,
mutation_ratio=TuningSettings.CLASSWEIGHTGENE_MUTATION_RATIO,
breed_ratio=TuningSettings.BREED_RATIO,
keep_ratio=TuningSettings.CULL_RATIO,
metrics=Settings.METRICS,
momemtum_start=TuningSettings.MOMEMTUM_START,
momemtum_decay=TuningSettings.MOMEMTUM_DECAY,
momemtum_min=TuningSettings.MOMEMTUM_MIN,
serialization_path=TuningSettings.GENE_POOL_DF_PATH,
tuning_settings_path=TuningSettings.TUNING_SETTINGS_PATH,
*args, **kwargs
) -> None:
self.str_op = str_op
self.organaism_names = organism_names
self.init_organism_fitness = init_organism_fitness
self.gene_definition = gene_definition
self.fitness_index = fitness_index
self.gene_pool = gene_pool
self.metrics = metrics
self.pool_size=pool_size
self.num_generations=num_generations
self.mutation_ratio = mutation_ratio
self.breed_ratio = breed_ratio
self.keep_ratio = keep_ratio
self._momemtum = momemtum_start
self.momemtum_start = momemtum_start
self.momemtum_decay = momemtum_decay
self.momemtum_min = momemtum_min
self.serialization_path = serialization_path
self.tuning_settings_path = tuning_settings_path
@property
def momemtum(self):
# decayed everytime used
self._momemtum = self._momemtum * self.momemtum_decay
if self._momemtum < self.momemtum_min:
self._momemtum = self.momemtum_start
return self._momemtum
@momemtum.setter
def momemtum(self, new_momemtum):
self._momemtum = max(new_momemtum, self.momemtum_min)
def generate_one(self) -> Species:
genes = Genes(self.gene_definition)
species = Species(genes, self.str_op, self.fitness_index, self.organaism_names, self.init_organism_fitness)
return species
def breed(self, parent1:Species, parent2:Species, child:Species):
new_values = {}
for k in self.gene_definition.keys():
die = random.choice([0, 1])
if die == 0:
basis = parent1.genes.values.get(k)
else:
basis = parent2.genes.values.get(k)
new_values[k] = basis
child_genes = Genes(self.gene_definition)
child_genes.values = new_values
child_genes.mutate_all_genes(self.mutation_ratio*self.momemtum)
# child = Species(child_genes, self.str_op, self.fitness_index)
child_sur_name_options = [
parent1.sur_name,
parent2.sur_name,
parent1.sur_name + "-" + parent2.sur_name
]
child.sur_name = random.choice(child_sur_name_options)
# return child
def batch_reincarnate(self, to_replace:List[Species]):
#parent_candiates = []
#parent_candiates += self.gene_pool
for species in to_replace:
species.reincarnate()
#new_species = self.generate_one()
#parent_candiates.append(new_species)
#self.gene_pool.append(new_species)
def batch_breed(self, to_replace:List[Species], to_keep:List[Species], n:int):
parent_candidates = to_keep # + to_replace[n:]
for old_species in to_replace[:n]:
parent1 = random.choice(parent_candidates)
parent2 = random.choice(parent_candidates)
self.breed(parent1, parent2, child=old_species)
# self.gene_pool.append(child)
def initialize_pool(self):
print("initializing pool")
# _ = self.batch_generate(self.pool_size)
for _ in range(self.pool_size):
new_species = self.generate_one()
self.gene_pool.append(new_species)
random.shuffle(self.gene_pool)
def replenish_pool(self, to_replace:List[Species], to_keep:List[Species]):
room_left = len(to_replace)
if room_left <= 0:
return
num_to_breed = int(room_left * self.breed_ratio)
# num_to_generate = room_left - num_to_breed
# self.batch_reincarnate(to_replace, num_to_generate)
self.batch_breed(to_replace, to_keep, num_to_breed)
# as a safe check if pool size not full, generate til full
while len(self.gene_pool) < self.pool_size:
self.gene_pool.append(self.generate_one())
# remember to shuffle it
random.shuffle(self.gene_pool)
print("replenish pool() to keep", len(to_keep))
print("replenish pool() to replace", len(to_replace))
def sort_pool(self):
self.gene_pool.sort(reverse=True)
def fit_all(self, forced=False):
for i, species in enumerate(self.gene_pool):
print(f"species {i+1} of {len(self.gene_pool)}")
species.run(forced=forced)
self.sort_pool()
def cull(self) -> Tuple[List[Species], List[Species]]:
self.sort_pool()
keep_n = max(2, int(len(self.gene_pool) * self.keep_ratio))
to_keep = self.gene_pool[:keep_n] or []
to_replace = self.gene_pool[keep_n:] or []
# self.batch_reincarnate(to_replace)
print("cull(): to_keep length", len(to_keep))
print("cull(): to_replace length", len(to_replace))
return to_replace, to_keep
def generate_report(self):
self.sort_pool()
report = pd.DataFrame()
for species in self.gene_pool:
gene_settings = pd.Series(species.genes.values)
performance_results = species.performance
if hasattr(performance_results, "mean"):
performance_results = performance_results.mean()
else:
performance_results = pd.Series(performance_results)
results = gene_settings.append(performance_results)
report[species.name] = results
return report
@Decorators.time_it()
def run(self):
if not self.gene_pool:
self.initialize_pool()
to_keep = []
to_replace = self.gene_pool
else:
to_replace, to_keep = self.cull()
for i in range(self.num_generations):
print("generation: " + str(i+1) + " of "+ str(self.num_generations))
self.replenish_pool(to_replace, to_keep)
self.fit_all()
results = self.generate_report()
self.serialize(results)
print(f"generation {i+1} saved to {self.serialization_path}" )
to_replace, to_keep = self.cull()
self.batch_reincarnate(to_replace)
return results
def serialize(self, results:pd.DataFrame):
# for now; let's not serialize settings
try:
results.to_excel(self.serialization_path)
with open(self.tuning_settings_path, "w") as f:
# getting defaults from TuningSettings
# settings_dict = {k:v for k, v in vars(TuningSettings).items() if k not in ["__doc__", "__module__"]}
# overriding some with new
settings_dict = {k:v for k, v in self.__dict__.items() if k not in ["from_dict", "gene_pool"]}
json.dump(settings_dict, f)
except Exception as e:
print(e)
@classmethod
def deserialize(
cls,
serialization_path=TuningSettings.GENE_POOL_DF_PATH,
tuning_settings_path=TuningSettings.TUNING_SETTINGS_PATH,
*args, **kwargs
) -> "DefaultNaturalSelection":
# get settings
with open(tuning_settings_path) as f:
settings = json.load(f)
df = pd.read_excel(serialization_path, index_col=0)
gene_pool = list(read_gene_pool(df, settings))
settings['gene_pool'] = gene_pool
return cls(**settings)
if __name__ == "__main__":
if len(sys.argv) > 2: # check if user inputs serialization path / tuning settings
input_serialization_path = sys.argv[1]
input_tuning_settings_path = sys.argv[2]
else:
input_serialization_path = None
input_tuning_settings_path = None
try:
print("deserializing natural selection.")
ns = DefaultNaturalSelection.deserialize(
serialization_path=input_serialization_path or TuningSettings.GENE_POOL_DF_PATH,
tuning_settings_path=input_tuning_settings_path or TuningSettings.TUNING_SETTINGS_PATH
)
except:
print("no need to deserialize")
ns = DefaultNaturalSelection(
serialization_path=input_serialization_path or TuningSettings.GENE_POOL_DF_PATH,
tuning_settings_path=input_tuning_settings_path or TuningSettings.TUNING_SETTINGS_PATH
)
ns.run()
print("all done!")
```
## [utils.py](#Overview-of-Code)
```python=
from datetime import datetime
from os import stat
import re
from enums import ItemTypeOptions, MajorNumberOptions
from settings import Settings
import pandas as pd
from typing import List
from functools import wraps
class Decorators:
@staticmethod
def time_it(print_name:str=""):
def time_it_second_layer(func):
@wraps(func)
def wrapped(*args, **kwargs):
t_start = datetime.now()
results = func(*args, **kwargs)
duration = datetime.now() - t_start
print(f" {print_name or func.__name__}, Execution time -- {duration.total_seconds()}")
return results
return wrapped
return time_it_second_layer
class ItemCheck:
@staticmethod
def get_scored_item_type_check(what, per_major=False):
def likert_selector(text):
pattern = re.compile(r'l[1-3][^\r\n]+')
# result = [re.match(pattern, text) is not None for text in seq]
# return result
return re.match(pattern, text) is not None
def ipsative_selector(text):
pattern = re.compile(r'f[4-7][^\r\n]+')
#result = [re.match(pattern, text) is not None for text in seq]
return re.match(pattern, text) is not None
def is_likert_major(major, col):
m = re.match("l[1-3]"+major+"\d", col)
return m is not None
def is_ipsative_major(major, col):
m = re.match("f[4-7]"+major+"\d", col)
return m is not None
if what == ItemTypeOptions.LIKRET:
if per_major:
return is_likert_major
else:
return likert_selector
elif what == ItemTypeOptions.FORCED_CHOICE:
if per_major:
return is_ipsative_major
else:
return ipsative_selector
else:
raise Exception(
"ItemType needs to be either ItemCheck.LIKERT or ItemTypeOptions.FORCED_CHOICE"
)
class Majors:
GENERICS = {
"g_art":"General Art",
"g_bns":"General Business",
"g_eng":"General Engieering",
"g_hst":"General History",
"g_hth":"General Health",
"g_lng":"General Language",
"afres":"African American Studies",
"arcre":"Architecture",
"asies":"Asian Studies",
"biogy":"Biology",
"chery":"Chemistry",
"comce":"Computer Science",
"comns":"Communications",
"crigy":"Criminology",
"dance":"Dance",
"eduon":"Education",
"engsh":"English",
"envce":"Environmental Science",
"genes":"Gender Studies",
"geogy":"Geology",
"geohy":"Geography",
"intes":"International Studies",
"lincs":"Linguistics",
"matcs":"Mathematics",
"music":"Music",
"neuce":"Neuroscience",
"perrt":"Performance Art",
"phihy":"Philosophy",
"phycs":"Physics",
"polcs":"Politics",
"psygy":"Psychology",
"socgy":"Sociology",
"stacs":"Statistics"
}
SPECIFICS = {
"afres":"African American Studies",
"arcre":"Architecture",
"asies":"Asian Studies",
"biogy":"Biology",
"chery":"Chemistry",
"comce":"Computer Science",
"comns":"Communications",
"crigy":"Criminology",
"dance":"Dance",
"eduon":"Education",
"engsh":"English",
"envce":"Environmental Science",
"genes":"Gender Studies",
"geogy":"Geology",
"geohy":"Geography",
"intes":"International Studies",
"lincs":"Linguistics",
"matcs":"Mathematics",
"music":"Music",
"neuce":"Neuroscience",
"perrt":"Performance Art",
"phihy":"Philosophy",
"phycs":"Physics",
"polcs":"Politics",
"psygy":"Psychology",
"socgy":"Sociology",
"stacs":"Statistics",
"digia":"Digital Art",
"visrt":"Visual Art",
'accng':"Accounting",
'busss':"Business",
'ecocs':"Economics",
'marng':"Marketing",
'cheng':"Chemical Engineering",
'civng':"Civic Engineering",
'eleng':"Electronic Engineering",
#'engng':"Engieering",
'matce':"Materials Science",
'mecng':"Mechanical Engineering",
'hisry':"History",
'arcgy':"Archeology",
'antgy':"Anthropology",
'clacs':"Classics",
'relon':"Religion",
'heace':"Health Science",
'kingy':"Kinesiology",
'nurng':"Nursing",
'frech':"French",
'geran':"German",
'itaan':"Italian",
'spash':"Spanish"
}
MAPPING = {
'g_art': ['digia', 'visrt'],
'g_bns': ['accng', 'busss', 'ecocs', 'marng'],
'g_eng': ['cheng', 'civng', 'eleng', 'matce', 'mecng'],
'g_hst': ['hisry', 'arcgy', 'antgy', 'clacs', 'relon'],
'g_hth': ['heace', 'kingy', 'nurng'],
'g_lng': ['frech', 'geran', 'itaan', 'spash']
}
@staticmethod
def get_corresponding_majors(what) -> List[str]:
if what in [ItemTypeOptions.LIKRET, MajorNumberOptions.THIRTY_THREE_USE_MEAN, MajorNumberOptions.THIRTY_THREE_USE_MAX]:
return list(Majors.GENERICS.keys())
elif what in [ItemTypeOptions.FORCED_CHOICE, MajorNumberOptions.FIFTY]:
return list(Majors.SPECIFICS.keys())
else:
raise Exception("Itemtype doesn't exist. Must be one of LIKERT or FORCED_CHOICE or one of the MajorNumberOptions")
@staticmethod
def get_major_fullnames(short_names:List[str]):
results = [Majors.SPECIFICS.get(sn, sn) for sn in short_names]
return [Majors.GENERICS.get(sn, sn) for sn in results]
class ItemSelector:
# in the future, should support subsetting
def __init__(self, data:pd.DataFrame):
self.data = data
def get_selected_cols_by_item_type(self, scored_item_type=Settings.SCORED_ITEMTYPE):
check_function = ItemCheck.get_scored_item_type_check(scored_item_type, per_major=False)
return [col for col in self.data.columns if check_function(col)]
def get_selected_df_by_item_type(self, scored_item_type=Settings.SCORED_ITEMTYPE) -> pd.DataFrame:
return self.data.loc[:, self.get_selected_cols_by_item_type(scored_item_type)]
```
###### tags: `dissertation` `python` `replication`