# Code Repo: Exploring Explainable Multilabel Neural Network as a Psychometric Tool for Short-Form Test Scoring and Diagnostics: A Study on College Major Preference Assessment (Cont'd) :::info :bulb: This is a continuation from [Code Repo: Exploring Explainable Multilabel Neural Network as a Psychometric Tool for Short-Form Test Scoring and Diagnostics: A Study on College Major Preference Assessment](/Sjx7jw_8TlyyJ3_IeK8TyQ). The previous document reached the limit of the max number of characters in hack.md, so the remaining code is stored here. ::: # Overview of Code The code was [keras](https://keras.io/) for machine learning wrapped in self-defined classes. The following shows files and classes and what they do. Files that are written in `Python` have the extension `.py`, and `R`, `.R`. Files with the :arrow_backward: icon are in [the previous document](/Sjx7jw_8TlyyJ3_IeK8TyQ). |File|Description|Classes Defined There| |---|---|---| |:arrow_backward: [analyses.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Backend calculations for simple-sum and multilabel MNN with k-fold corss validation.|`MultilabelKFold`, `PerturbedMultilabelKFold` |:arrow_backward: [data.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Defined classes to easily read CMPA data and preprocess it with.| `DataGenerator`, `FullData` |:arrow_backward: [enums.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|For enums, which are close-category options convenient for programming. Not important conceptually.| `StrategyOptions`, `MajorNumberOptions`, `MetricsOptions`, `AdvancedMetricsOptions`, `StanadardizationModeOptions`, `ItemTypeOptions`, `EvaluatorTypeOptions` |:arrow_backward: [evaluatators.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Calculating performance metrics.| `EvaluatorFactory`, `Evaluator`, `KerasDefaultEvaluator`, `BinaryEvaluator`, `BinaryEvaluatorWithoutPunishment` |:arrow_backward: [gene_definition.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Generating gene definitions for hyperparameter tuning.|`GeneDefinitionGenerator`| |:arrow_backward: [models.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Wrapping keras classes under a more convenient model class.|`MultilabelModel` |:arrow_backward: [perturbation.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|To run perturbation. The real backend analysis is defined in [analyses.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|None |:arrow_backward: [perturbation_pratts.R](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Drawing the graphs in Chapter 4.|None |:arrow_backward: [reports_mkfold.py](/Sjx7jw_8TlyyJ3_IeK8TyQ)|Generating reports of the results in human readable tables.|None |[settings.py](#settings.py)|All the settings are here.|`Settings`, `TuningSettings` |[strategies.py](#strategies.py)|Wrapping everything in here for a basic run with an MNN or baseline.| `RunStrategy`, `StrategyFactory`, `TestStrategy`, `MkFoldBaseRun`, `PerturbationRun`, `RawAggregate` |[tuning.py](#tuning.py)|For genetic hyperparameter tuning.|`Genes`, `Species`, `DefaultNaturalSelection` |[utils.py](#utils.py)|Auxiliary functions and decorators, such as functions that deal with getting the names of the 50 majors in CMPA.|`Decorators`, `ItemCheck`, `Majors`, `ItemSelector` # Source Code (Cont'd) ## [settings.py](#Overview-of-Code) ```python= from enums import * import tensorflow as tf class Settings: DATAPATH="data/major_all.csv" CLEANTHRESHOLD=.0 BINARY_THRESHOLD=.5 SCORED_ITEMTYPE=ItemTypeOptions.FORCED_CHOICE TOP_N=3 NUMBER_OF_MAJORS=MajorNumberOptions.FIFTY CLASSWEIGHT=None STANDARDIZATION_MODE=StanadardizationModeOptions.BY_COL N_NODES=[72, 64] SUBSET_SIZE=50 PERTURBATION_ITER=2000 #change to 5000 PERTURBATION_SERIALIZATION_PATH="perturb/perturbation_crazy_ver_model.xlsx" PERTURBATION_SERIALIZATION_FREQUENCY=10 INPUT_SHAPE=(99, ) METRICS=MetricsOptions.ALL K_FOLD=3 #!! change this! # used when K_FOLD == 1 DEFAULT_TRAIN_SPLIT_END=7000 DEFAULT_VAL_SPLIT_END=8000 EVALUATOR = EvaluatorTypeOptions.BINARY EPOCHS=10000 EARLY_STOPPING=tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=1e-4, patience=3, restore_best_weights=True) ADVANCDE_METRICS=[ AdvancedMetricsOptions.TEST_ACCURACY_OBSERVED, AdvancedMetricsOptions.TEST_ACCURACY_COIN, AdvancedMetricsOptions.TEST_ACCURACY_GAIN, AdvancedMetricsOptions.TEST_PRECISION_OBSERVED, AdvancedMetricsOptions.TEST_PRECISION_COIN, AdvancedMetricsOptions.TEST_PRECISION_GAIN, AdvancedMetricsOptions.TEST_RECALL_OBSERVED, AdvancedMetricsOptions.TEST_RECALL_COIN, AdvancedMetricsOptions.TEST_RECALL_GAIN, AdvancedMetricsOptions.TEST_F1_OBSERVED, AdvancedMetricsOptions.TEST_F1_COIN, AdvancedMetricsOptions.TEST_F1_GAIN ] class TuningSettings(Settings): MOMEMTUM_START = 1.6 MOMEMTUM_DECAY = .95 MOMEMTUM_MIN = .15 ORGANISM_NAMES = [ "koda", "amery", "andy", "ryan", "sirui", "yue", "hsin", "jerry", "jake", "shunfu", "yoda", "oily", "psycho", "metrician", "magician", "mermer", "doraemon", "sailor", "moon", "venus", "pikachu", "bulbasaur", "squirtle", "lucifer", "minjeong", "allpass", "alpaca", "santa", "amarda", "statistician", "o", "canada", "covid", "karma", "university", "building" "koda", "avocado", "poseidon", "zeus", "crank", "thunderbird", "crazy", "rich", "asian", "cherry", "fortune", "luck", "girl", "scarfe", "main", "mall", "ball", "sour", "twinkle", "pronoun", "circle", "diversity", "make", "america", "great", "again", "incel", "spongebob", "patrick", "star", "rain", "parade", "daily", "testicle", "timely", "matcha", "mochi","mahou", "zombie", "muthen", "R", "constipation", "diarrhea", "irt", "sem", "cfa", "growth", "tea", "tapioca", "lavaan", "bayesian", "frequentist", "chi", "papi","bubble", "odysseus", "dear", "square" "electron", "epse", "karen", "yan", "shapka", "wu", "doctor", "committee", "liu", "ji", "hu", "zou", "stone", "ubc", "save", "foods", "cucumber", "carrots", "ma", "park", "hsu", "chen", "choi", "ng", "connie", "lo", "owen", "pogo", "koda", "maomao", "gueiguei", "baibai", "mimi", "yellowhead", "niuniu", "panpan", "dinghao", "laifu", "dongdong", "nannan", "hsihsi", "beibei" ] STRATEGY= StrategyOptions.BASIC_KFOLD_MNN #StrategyOptions.BASIC_KFOLD_MNN # StrategyOptions.TEST EVALUATOR_TYPE=EvaluatorTypeOptions.BINARY INIT_ORGANISM_FITNESS = -9999. CLASSWEIGHTGENE_MUTATION_RATIO = .15 GENE_POOL_SIZE = 30 # GENE_DEFINITION= { # 'a': (-10,10,0,5), # 'b': (-5,5,0,1.3), # 'c': (-10,10,0,5), # 'd': (-5,5,0,1.3), # 'e': (-10,10,0,5), # 'f': (-5,5,0,1.3), # 'g': (-15,15,0,8), # 'h': (32,250,80,30), # layers # 'i': (-250,250,80,55), # 'j': (-250,250,0,100), # 'k': (.2,.97,.5,.2) # } # will be overridden by the gene definitions generated in gene_definition.py GENE_DEFINITION = { 'accng':(0.0,9999,1.986076,0.993038), 'afres':(0.0,9999,3.2595,1.62975), 'antgy':(0.0,9999,3.0020046,1.5010023), 'arcgy':(0.0,9999,3.5289693,1.7644846), 'arcre':(0.0,9999,2.4149277,1.2074639), 'asies':(0.0,9999,6.2625,3.13125), 'biogy':(0.0,9999,4.4615097,2.2307549), 'busss':(0.0,9999,1.9664408,0.9832204), 'cheng':(0.0,9999,9.803774,4.901887), 'chery':(0.0,9999,3.3771653,1.6885827), 'civng':(0.0,9999,4.0148516,2.0074258), 'clacs':(0.0,9999,4.516092,2.258046), 'comce':(0.0,9999,3.040136,1.520068), 'comns':(0.0,9999,1.9987195,0.9993597), 'crigy':(0.0,9999,1.3379582,0.6689791), 'dance':(0.0,9999,2.708046,1.354023), 'digia':(0.0,9999,2.04678,1.02339), 'ecocs':(0.0,9999,2.776342,1.388171), 'eduon':(0.0,9999,1.6133786,0.8066893), 'eleng':(0.0,9999,10.063107,5.0315533), 'engsh':(0.0,9999,3.4101064,1.7050532), 'envce':(0.0,9999,2.5645502,1.2822751), 'frech':(0.0,9999,4.2349825,2.1174912), 'genes':(0.0,9999,2.7469668,1.3734834), 'geogy':(0.0,9999,5.2897673,2.6448836), 'geohy':(0.0,9999,4.369853,2.1849265), 'geran':(0.0,9999,5.764949,2.8824744), 'heace':(0.0,9999,1.6605157,0.83025783), 'hisry':(0.0,9999,2.9562092,1.4781046), 'intes':(0.0,9999,4.5440154,2.2720077), 'itaan':(0.0,9999,4.143299,2.0716496), 'kingy':(0.0,9999,1.7887006,0.8943503), 'lincs':(0.0,9999,3.0020046,1.5010023), 'marng':(0.0,9999,1.9823394,0.9911697), 'matce':(0.0,9999,8.2734375,4.1367188), 'matcs':(0.0,9999,3.2893672,1.6446836), 'mecng':(0.0,9999,3.7005935,1.8502967), 'music':(0.0,9999,2.432143,1.2160715), 'neuce':(0.0,9999,1.3913066,0.6956533), 'nurng':(0.0,9999,1.6472684,0.8236342), 'perrt':(0.0,9999,2.2658467,1.1329234), 'phihy':(0.0,9999,1.4063305,0.70316523), 'phycs':(0.0,9999,5.1513515,2.5756757), 'polcs':(0.0,9999,3.5661016,1.7830508), 'psygy':(0.0,9999,1.1917465,0.59587324), 'relon':(0.0,9999,5.2293577,2.6146789), 'socgy':(0.0,9999,2.2798245,1.1399122), 'spash':(0.0,9999,3.2019513,1.6009756), 'stacs':(0.0,9999,3.934727,1.9673635), 'visrt':(0.0,9999,2.3746877,1.1873438), 'layer1': (32,250,80,30), # layers 'layer2': (-250,250,80,55), 'layer3': (-250,250,-50,100), 'decision_threshold': (.5,.5,.5,0) } # dict{ # location_name: (min, max, mean, variance) # } GENE_POOL_DF_PATH = "genetic_tuning_202201_accuracy_crazy_ver.xlsx" TUNING_SETTINGS_PATH = "genetic_tuning_202201_accuracy_crazy_ver_settings.json" CULL_RATIO = .33333 # keeping how much of the orgnaisms BREED_RATIO = .50 # the percentage of replenishing behaviour that is based on breeding NUM_GENERATIONS = 65 FITNESS_INDEX = MetricsOptions.ACCURACY GLOBAL_RAW_DATA = {} GLOBAL_CLEANED_DATA = {} GLOBAL_LIKERT_DATA = {} GLOBAL_MAJOR_SCORES_DF = {} GLOBAL_MAJOR_IS_TOP_N_DF = {} ``` ## [strategies.py](#Overview-of-Code) ```python= # these are packed implementations for convenient running! import random from data import * from enums import EvaluatorTypeOptions, MetricsOptions, StrategyOptions from evaluators import Evaluator, EvaluatorFactory from models import MultilabelModel from analyses import MultilabelKFold, PerturbedMultilabelKFold from abc import ABC, abstractmethod from settings import Settings from utils import Decorators # these are convient classes for running the classes # we'll have both a factory pattern and a strategy pattern class RunStrategy(ABC): @abstractmethod def run() -> pd.DataFrame: pass # maybe we'll need a strategy context factory? class StrategyFactory: @classmethod def get_strategy(cls, strategy, *args, **kwargs) -> RunStrategy: if strategy == StrategyOptions.BASIC_KFOLD_MNN: return MkFoldBaseRun(*args, **kwargs) elif strategy == StrategyOptions.BASELINE: return RawAggregate(*args, **kwargs) elif strategy == StrategyOptions.GENETIC: raise Exception("Genetic tuning hasn't been rewritten yet.") elif strategy == StrategyOptions.PERTURBED_KFOLD_MNN: return PerturbationRun(*args, **kwargs) elif strategy == StrategyOptions.TEST: return TestStrategy(*args, **kwargs) else: raise Exception("Strategy must be one of the specified optsion in StrategyOptions") class StrategyContext(ABC): @property def strategy(self): return self._strategy @strategy.setter def strategy(self, strategy: RunStrategy): assert isinstance(strategy, RunStrategy) self._strategy = strategy @property def settings(self) -> dict: return self._settings @settings.setter def settings(self, settings: dict): self._settings = settings return self @abstractmethod def run(self): # some implementations of how to # apply settings to context using a particular setting file pass class TestStrategy(RunStrategy): def __init__(self, *arg, **kwargs) -> None: pass # @Decorators.time_it def run(self, *args, **kwargs) -> pd.DataFrame: metrics = Settings.METRICS majors = Majors.get_corresponding_majors(ItemTypeOptions.FORCED_CHOICE) shape = (len(majors), len(metrics)) return pd.DataFrame( np.random.rand(*shape), index=majors, columns=metrics ) class MkFoldBaseRun(RunStrategy): # the default of my dissertation Chapter 3 def __init__( self, path=Settings.DATAPATH, threshold=Settings.CLEANTHRESHOLD, scored_item_type=Settings.SCORED_ITEMTYPE, top_n=Settings.TOP_N, num_of_majors=Settings.NUMBER_OF_MAJORS, class_weight_function=Settings.CLASSWEIGHT, n_nodes=Settings.N_NODES, input_shape=Settings.INPUT_SHAPE, metrics=Settings.METRICS, k_fold=Settings.K_FOLD, epochs=Settings.EPOCHS, early_stopping=Settings.EARLY_STOPPING, evaluator_type=Settings.EVALUATOR, evaluator_threshold=Settings.BINARY_THRESHOLD ): # data_generator = DataGenerator(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function) full_data = FullData(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function) # full_data = FullData(X=data_generator.get_likert_only(),y=data_generator.get_weighted_major_is_top_n_df()) model = MultilabelModel(num_of_majors, n_nodes, input_shape, metrics, epochs, early_stopping, evaluator_type, evaluator_threshold) mkfold = MultilabelKFold(full_data, model) # generated self.__dict__.update({k:v for k, v in locals().items() if k not in ["self"]}) # is equivalent to self.top_n = top_n; self.input_shape = input_shape and so on... @Decorators.time_it("MKFoldBaseRun strategy's run()") def run(self) -> pd.DataFrame: results = self.mkfold.run(self.k_fold) return results class PerturbationRun(MkFoldBaseRun): def __init__(self, path=Settings.DATAPATH, threshold=Settings.CLEANTHRESHOLD, scored_item_type=Settings.SCORED_ITEMTYPE, top_n=Settings.TOP_N, num_of_majors=Settings.NUMBER_OF_MAJORS, class_weight_function=Settings.CLASSWEIGHT, n_nodes=Settings.N_NODES, input_shape=Settings.INPUT_SHAPE, metrics=Settings.METRICS, k_fold=Settings.K_FOLD, epochs=Settings.EPOCHS, early_stopping=Settings.EARLY_STOPPING, evaluator_type=Settings.EVALUATOR, evaluator_threshold=Settings.BINARY_THRESHOLD, subset=Settings.SUBSET_SIZE, iter=Settings.PERTURBATION_ITER): super().__init__(path, threshold, scored_item_type, top_n, num_of_majors, class_weight_function, n_nodes, input_shape, metrics, k_fold, epochs, early_stopping, evaluator_type, evaluator_threshold) self.subset = subset self.iter = iter self.perturbed_mkfold = PerturbedMultilabelKFold.from_mkfold(self.mkfold, subset) print("self.model.n_nodes", self.model.n_nodes) @Decorators.time_it() def run(self): return self.perturbed_mkfold.run(self.k_fold, self.iter) class RawAggregate(RunStrategy): def __init__(self, path=Settings.DATAPATH, threshold=Settings.CLEANTHRESHOLD, top_n=Settings.TOP_N, num_of_majors=Settings.NUMBER_OF_MAJORS, metrics=Settings.METRICS, *args, **kwargs ): self.path = path self.threshold = threshold self.top_n = top_n self.num_of_majors=num_of_majors self.metrics = metrics def get_evaluator(self) -> Evaluator: predicted_generator = DataGenerator( self.path, self.threshold, ItemTypeOptions.LIKRET, self.top_n, self.num_of_majors, None ) answers_generator = DataGenerator( self.path, self.threshold, ItemTypeOptions.FORCED_CHOICE, self.top_n, self.num_of_majors, None ) predicted = predicted_generator.get_major_is_top_n_df() answers = answers_generator.get_major_is_top_n_df() evaulator = EvaluatorFactory.get_evaluator( EvaluatorTypeOptions.BINARY, predicted=predicted, answers=answers, metrics=self.metrics ) return evaulator @Decorators.time_it("Raw aggregates run()") def run(self) -> pd.DataFrame: evaluator = self.get_evaluator() results = evaluator.evaluate() return pd.DataFrame(results) if __name__ == "__main__": perturb = StrategyFactory.get_strategy(StrategyOptions.PERTURBED_KFOLD_MNN) perturb.run() ``` # [tuning.py](#Overview-of-Code) ```python= from abc import ABC, abstractproperty from typing import List, Tuple import json from numpy import numarray, spacing from numpy.core.numeric import full from data import DataGenerator from enums import EvaluatorTypeOptions, MajorNumberOptions, StrategyOptions from evaluators import Evaluator from settings import Settings, TuningSettings from strategies import RunStrategy, StrategyFactory from utils import Decorators, Majors import random import pandas as pd import sys class Genes: def __init__(self, gene_definition=TuningSettings.GENE_DEFINITION) -> None: # dict{ # location_name: (min, max, mean, variance) # } self.gene_definition = gene_definition # dynamically generate values first self.generate_brand_new_genes() @classmethod def from_dict(cls, gene_dict, gene_definition=TuningSettings.GENE_DEFINITION) -> "Genes": # a dict that has the same keys as gene_definition # the dict may contain more, "undeed", information than the gene def gene_values = {k: gene_dict.get(k) for k in gene_definition.keys()} genes = cls(gene_definition) gene_dict.values = gene_values return genes @property def values(self) -> dict: return self._values @values.setter def values(self, new_values:dict): # dict { # location_name: new_value # } for k, v in new_values.items(): m, M, _, _ = self.gene_definition.get(k) if v < m: v = m if v > M: v = M new_values[k] = v if hasattr(self, "_values"): self._values.update(new_values) else: self._values = new_values def generate_brand_new_genes(self): new_values = {} for k, (_, _, mean, variance) in self.gene_definition.items(): new_values[k] = random.gauss(mu=mean, sigma=variance) self.values = new_values def mutate_genes(self, genes_to_mutate:List[str], mutation_ratio=TuningSettings.CLASSWEIGHTGENE_MUTATION_RATIO): current_values = self.values new_values = {} for k in genes_to_mutate: _, _, mean, variance = self.gene_definition.get(k) current_value = current_values.get(k) new_value = current_value + random.gauss(mu=mean, sigma=variance*mutation_ratio) new_values[k] = new_value self.values = new_values def mutate_all_genes(self, mutation_ratio=TuningSettings.CLASSWEIGHTGENE_MUTATION_RATIO): all_genes = list(self.gene_definition.keys()) self.mutate_genes(all_genes, mutation_ratio) # species are like strategy_contexts class Species: def __init__(self, genes:Genes, str_op:str, fitness_index:str, organism_names:List[str]=TuningSettings.ORGANISM_NAMES, init_organism_fitness=TuningSettings.INIT_ORGANISM_FITNESS) -> None: self.genes = genes self.str_op = str_op # strategy_options self.fitness_index=fitness_index self.organism_names=organism_names self.init_organism_fitness = init_organism_fitness def reincarnate(self): # basically forget who you are self.sur_name = "" self.given_name = "" self.performance = pd.DataFrame() self.fitness = self.init_organism_fitness - random.random()/10 self.genes.generate_brand_new_genes() @property def performance(self) -> pd.DataFrame: if not hasattr(self, "_performance"): self._performance = pd.DataFrame() return self._performance @performance.setter def performance(self, new_performance:pd.DataFrame): self._performance = new_performance @property def fitness(self): if not hasattr(self, "_fitness"): self._fitness = self.init_organism_fitness - random.random()/10 return self._fitness @fitness.setter def fitness(self, new_fitness): self._fitness = new_fitness def __eq__(self, other:"Species") -> bool: return self.fitness == other.fitness def __gt__(self, other:"Species") -> bool: return self.fitness > other.fitness @property def genes(self) -> Genes: return self._genes @genes.setter def genes(self, genes): self._genes = genes @property def strategy(self) -> RunStrategy: return self._strategy @strategy.setter def strategy(self, strategy:RunStrategy): assert isinstance(strategy, RunStrategy) self._strategy = strategy @Decorators.time_it("tuning.py Species.run()") def run(self, forced=False) -> "Species": # probably fitted if not forced: if self.fitness > self.init_organism_fitness: print("fitted -- ", self) return self print("fitting organism ", self.name) self.strategy = StrategyFactory.get_strategy(self.str_op, **self.settings) results = self.strategy.run() self.performance = results #.mean() #.to_dict() # a df reduced to a dict fitted_index = self.performance[self.fitness_index] # if it has a mean method if hasattr(fitted_index, "mean") and callable(getattr(fitted_index, "mean")): self.fitness = fitted_index.mean() else: self.fitness = fitted_index print("fitted -- ", self) print(fitted_index) return self @property def name(self) -> str: if not hasattr(self, "_name"): self._name = self.given_name + " " + self.sur_name return self._name @property def given_name(self) -> str: if not hasattr(self, "_given_name"): self._given_name = random.choice(self.organism_names) return self._given_name @property def sur_name(self) -> str: if not hasattr(self, "_sur_name"): self._sur_name = random.choice(self.organism_names) return self._sur_name @given_name.setter def given_name(self, given_name:str): self._given_name = given_name @sur_name.setter def sur_name(self, sur_name:str): self._sur_name = sur_name def __repr__(self) -> str: return f"<{self.name} {self.fitness_index}: {str(round(self.fitness,3))}, {self.genes.values.items()}>" @property def settings(self) -> dict: specific_majors = Majors.get_corresponding_majors(MajorNumberOptions.FIFTY) major_weights = {k:self.genes.values.get(k) for k in specific_majors} layer1 = self.genes.values.get("layer1") layer2 = self.genes.values.get("layer2") layer3 = self.genes.values.get("layer3") decision_threshold = self.genes.values.get("decision_threshold") def class_weight_function(df:pd.DataFrame): weights = pd.Series(major_weights) return weights * df n_nodes = [int(layer1), int(layer2), int(layer3)] n_nodes.sort(reverse=True) n_nodes = [n for n in n_nodes if n > 0] binary_threshold = decision_threshold return { "class_weight_function": class_weight_function, "n_nodes":n_nodes, "evaluator_type": TuningSettings.EVALUATOR_TYPE, "evaluator_threshold": binary_threshold } def read_gene_pool(gene_df:pd.DataFrame, settings:dict) -> List[Species]: for col in range(gene_df.shape[1]): data_dict = gene_df.iloc[:, col].to_dict() data_dict["name_to_parse"] = gene_df.columns[col] species = recreate_one_species(data_dict, settings) yield species def recreate_one_species( data_dict:dict, settings:dict ): fitness_index = settings.get("fitness_index", TuningSettings.FITNESS_INDEX) genes = read_genes(data_dict, settings) performance = read_performance(data_dict, settings) names = read_name(data_dict, settings) species = Species( genes, settings.get("str_op", TuningSettings.STRATEGY), fitness_index, settings.get("organism_names", TuningSettings.ORGANISM_NAMES), settings.get("init_organism_fitness", TuningSettings.INIT_ORGANISM_FITNESS) ) species.sur_name = names.get("sur_name") species.given_name = names.get("given_name") species.performance = performance species.fitness = species.performance.get(fitness_index) return species def read_genes(gene_dict:dict, settings:dict) -> Genes: gene_definition = settings.get("gene_definition", TuningSettings.GENE_DEFINITION) genes = Genes(gene_definition) genes.values = {k:gene_dict.get(k) for k in gene_definition.keys()} return genes def read_performance(performance_dict:dict, settings:dict) -> dict: metrics = settings.get("metrics", Settings.METRICS) performance_keys = [k for k in metrics] metrics = {k:performance_dict.get(k) for k in performance_keys} return metrics def read_name(data_dict:dict, settings:dict) -> dict: name_to_parse = data_dict.get("name_to_parse") organism_names = settings.get("organism_names", TuningSettings.ORGANISM_NAMES) try: given_name, sur_name = name_to_parse.split() except: given_name = random.choice(organism_names) sur_name = random.choice(organism_names) return {'given_name':given_name, 'sur_name':sur_name} class DefaultNaturalSelection: def __init__( self, str_op=TuningSettings.STRATEGY, organism_names=TuningSettings.ORGANISM_NAMES, init_organism_fitness=TuningSettings.INIT_ORGANISM_FITNESS, gene_definition=TuningSettings.GENE_DEFINITION, fitness_index=TuningSettings.FITNESS_INDEX, gene_pool=[], pool_size=TuningSettings.GENE_POOL_SIZE, num_generations=TuningSettings.NUM_GENERATIONS, mutation_ratio=TuningSettings.CLASSWEIGHTGENE_MUTATION_RATIO, breed_ratio=TuningSettings.BREED_RATIO, keep_ratio=TuningSettings.CULL_RATIO, metrics=Settings.METRICS, momemtum_start=TuningSettings.MOMEMTUM_START, momemtum_decay=TuningSettings.MOMEMTUM_DECAY, momemtum_min=TuningSettings.MOMEMTUM_MIN, serialization_path=TuningSettings.GENE_POOL_DF_PATH, tuning_settings_path=TuningSettings.TUNING_SETTINGS_PATH, *args, **kwargs ) -> None: self.str_op = str_op self.organaism_names = organism_names self.init_organism_fitness = init_organism_fitness self.gene_definition = gene_definition self.fitness_index = fitness_index self.gene_pool = gene_pool self.metrics = metrics self.pool_size=pool_size self.num_generations=num_generations self.mutation_ratio = mutation_ratio self.breed_ratio = breed_ratio self.keep_ratio = keep_ratio self._momemtum = momemtum_start self.momemtum_start = momemtum_start self.momemtum_decay = momemtum_decay self.momemtum_min = momemtum_min self.serialization_path = serialization_path self.tuning_settings_path = tuning_settings_path @property def momemtum(self): # decayed everytime used self._momemtum = self._momemtum * self.momemtum_decay if self._momemtum < self.momemtum_min: self._momemtum = self.momemtum_start return self._momemtum @momemtum.setter def momemtum(self, new_momemtum): self._momemtum = max(new_momemtum, self.momemtum_min) def generate_one(self) -> Species: genes = Genes(self.gene_definition) species = Species(genes, self.str_op, self.fitness_index, self.organaism_names, self.init_organism_fitness) return species def breed(self, parent1:Species, parent2:Species, child:Species): new_values = {} for k in self.gene_definition.keys(): die = random.choice([0, 1]) if die == 0: basis = parent1.genes.values.get(k) else: basis = parent2.genes.values.get(k) new_values[k] = basis child_genes = Genes(self.gene_definition) child_genes.values = new_values child_genes.mutate_all_genes(self.mutation_ratio*self.momemtum) # child = Species(child_genes, self.str_op, self.fitness_index) child_sur_name_options = [ parent1.sur_name, parent2.sur_name, parent1.sur_name + "-" + parent2.sur_name ] child.sur_name = random.choice(child_sur_name_options) # return child def batch_reincarnate(self, to_replace:List[Species]): #parent_candiates = [] #parent_candiates += self.gene_pool for species in to_replace: species.reincarnate() #new_species = self.generate_one() #parent_candiates.append(new_species) #self.gene_pool.append(new_species) def batch_breed(self, to_replace:List[Species], to_keep:List[Species], n:int): parent_candidates = to_keep # + to_replace[n:] for old_species in to_replace[:n]: parent1 = random.choice(parent_candidates) parent2 = random.choice(parent_candidates) self.breed(parent1, parent2, child=old_species) # self.gene_pool.append(child) def initialize_pool(self): print("initializing pool") # _ = self.batch_generate(self.pool_size) for _ in range(self.pool_size): new_species = self.generate_one() self.gene_pool.append(new_species) random.shuffle(self.gene_pool) def replenish_pool(self, to_replace:List[Species], to_keep:List[Species]): room_left = len(to_replace) if room_left <= 0: return num_to_breed = int(room_left * self.breed_ratio) # num_to_generate = room_left - num_to_breed # self.batch_reincarnate(to_replace, num_to_generate) self.batch_breed(to_replace, to_keep, num_to_breed) # as a safe check if pool size not full, generate til full while len(self.gene_pool) < self.pool_size: self.gene_pool.append(self.generate_one()) # remember to shuffle it random.shuffle(self.gene_pool) print("replenish pool() to keep", len(to_keep)) print("replenish pool() to replace", len(to_replace)) def sort_pool(self): self.gene_pool.sort(reverse=True) def fit_all(self, forced=False): for i, species in enumerate(self.gene_pool): print(f"species {i+1} of {len(self.gene_pool)}") species.run(forced=forced) self.sort_pool() def cull(self) -> Tuple[List[Species], List[Species]]: self.sort_pool() keep_n = max(2, int(len(self.gene_pool) * self.keep_ratio)) to_keep = self.gene_pool[:keep_n] or [] to_replace = self.gene_pool[keep_n:] or [] # self.batch_reincarnate(to_replace) print("cull(): to_keep length", len(to_keep)) print("cull(): to_replace length", len(to_replace)) return to_replace, to_keep def generate_report(self): self.sort_pool() report = pd.DataFrame() for species in self.gene_pool: gene_settings = pd.Series(species.genes.values) performance_results = species.performance if hasattr(performance_results, "mean"): performance_results = performance_results.mean() else: performance_results = pd.Series(performance_results) results = gene_settings.append(performance_results) report[species.name] = results return report @Decorators.time_it() def run(self): if not self.gene_pool: self.initialize_pool() to_keep = [] to_replace = self.gene_pool else: to_replace, to_keep = self.cull() for i in range(self.num_generations): print("generation: " + str(i+1) + " of "+ str(self.num_generations)) self.replenish_pool(to_replace, to_keep) self.fit_all() results = self.generate_report() self.serialize(results) print(f"generation {i+1} saved to {self.serialization_path}" ) to_replace, to_keep = self.cull() self.batch_reincarnate(to_replace) return results def serialize(self, results:pd.DataFrame): # for now; let's not serialize settings try: results.to_excel(self.serialization_path) with open(self.tuning_settings_path, "w") as f: # getting defaults from TuningSettings # settings_dict = {k:v for k, v in vars(TuningSettings).items() if k not in ["__doc__", "__module__"]} # overriding some with new settings_dict = {k:v for k, v in self.__dict__.items() if k not in ["from_dict", "gene_pool"]} json.dump(settings_dict, f) except Exception as e: print(e) @classmethod def deserialize( cls, serialization_path=TuningSettings.GENE_POOL_DF_PATH, tuning_settings_path=TuningSettings.TUNING_SETTINGS_PATH, *args, **kwargs ) -> "DefaultNaturalSelection": # get settings with open(tuning_settings_path) as f: settings = json.load(f) df = pd.read_excel(serialization_path, index_col=0) gene_pool = list(read_gene_pool(df, settings)) settings['gene_pool'] = gene_pool return cls(**settings) if __name__ == "__main__": if len(sys.argv) > 2: # check if user inputs serialization path / tuning settings input_serialization_path = sys.argv[1] input_tuning_settings_path = sys.argv[2] else: input_serialization_path = None input_tuning_settings_path = None try: print("deserializing natural selection.") ns = DefaultNaturalSelection.deserialize( serialization_path=input_serialization_path or TuningSettings.GENE_POOL_DF_PATH, tuning_settings_path=input_tuning_settings_path or TuningSettings.TUNING_SETTINGS_PATH ) except: print("no need to deserialize") ns = DefaultNaturalSelection( serialization_path=input_serialization_path or TuningSettings.GENE_POOL_DF_PATH, tuning_settings_path=input_tuning_settings_path or TuningSettings.TUNING_SETTINGS_PATH ) ns.run() print("all done!") ``` ## [utils.py](#Overview-of-Code) ```python= from datetime import datetime from os import stat import re from enums import ItemTypeOptions, MajorNumberOptions from settings import Settings import pandas as pd from typing import List from functools import wraps class Decorators: @staticmethod def time_it(print_name:str=""): def time_it_second_layer(func): @wraps(func) def wrapped(*args, **kwargs): t_start = datetime.now() results = func(*args, **kwargs) duration = datetime.now() - t_start print(f" {print_name or func.__name__}, Execution time -- {duration.total_seconds()}") return results return wrapped return time_it_second_layer class ItemCheck: @staticmethod def get_scored_item_type_check(what, per_major=False): def likert_selector(text): pattern = re.compile(r'l[1-3][^\r\n]+') # result = [re.match(pattern, text) is not None for text in seq] # return result return re.match(pattern, text) is not None def ipsative_selector(text): pattern = re.compile(r'f[4-7][^\r\n]+') #result = [re.match(pattern, text) is not None for text in seq] return re.match(pattern, text) is not None def is_likert_major(major, col): m = re.match("l[1-3]"+major+"\d", col) return m is not None def is_ipsative_major(major, col): m = re.match("f[4-7]"+major+"\d", col) return m is not None if what == ItemTypeOptions.LIKRET: if per_major: return is_likert_major else: return likert_selector elif what == ItemTypeOptions.FORCED_CHOICE: if per_major: return is_ipsative_major else: return ipsative_selector else: raise Exception( "ItemType needs to be either ItemCheck.LIKERT or ItemTypeOptions.FORCED_CHOICE" ) class Majors: GENERICS = { "g_art":"General Art", "g_bns":"General Business", "g_eng":"General Engieering", "g_hst":"General History", "g_hth":"General Health", "g_lng":"General Language", "afres":"African American Studies", "arcre":"Architecture", "asies":"Asian Studies", "biogy":"Biology", "chery":"Chemistry", "comce":"Computer Science", "comns":"Communications", "crigy":"Criminology", "dance":"Dance", "eduon":"Education", "engsh":"English", "envce":"Environmental Science", "genes":"Gender Studies", "geogy":"Geology", "geohy":"Geography", "intes":"International Studies", "lincs":"Linguistics", "matcs":"Mathematics", "music":"Music", "neuce":"Neuroscience", "perrt":"Performance Art", "phihy":"Philosophy", "phycs":"Physics", "polcs":"Politics", "psygy":"Psychology", "socgy":"Sociology", "stacs":"Statistics" } SPECIFICS = { "afres":"African American Studies", "arcre":"Architecture", "asies":"Asian Studies", "biogy":"Biology", "chery":"Chemistry", "comce":"Computer Science", "comns":"Communications", "crigy":"Criminology", "dance":"Dance", "eduon":"Education", "engsh":"English", "envce":"Environmental Science", "genes":"Gender Studies", "geogy":"Geology", "geohy":"Geography", "intes":"International Studies", "lincs":"Linguistics", "matcs":"Mathematics", "music":"Music", "neuce":"Neuroscience", "perrt":"Performance Art", "phihy":"Philosophy", "phycs":"Physics", "polcs":"Politics", "psygy":"Psychology", "socgy":"Sociology", "stacs":"Statistics", "digia":"Digital Art", "visrt":"Visual Art", 'accng':"Accounting", 'busss':"Business", 'ecocs':"Economics", 'marng':"Marketing", 'cheng':"Chemical Engineering", 'civng':"Civic Engineering", 'eleng':"Electronic Engineering", #'engng':"Engieering", 'matce':"Materials Science", 'mecng':"Mechanical Engineering", 'hisry':"History", 'arcgy':"Archeology", 'antgy':"Anthropology", 'clacs':"Classics", 'relon':"Religion", 'heace':"Health Science", 'kingy':"Kinesiology", 'nurng':"Nursing", 'frech':"French", 'geran':"German", 'itaan':"Italian", 'spash':"Spanish" } MAPPING = { 'g_art': ['digia', 'visrt'], 'g_bns': ['accng', 'busss', 'ecocs', 'marng'], 'g_eng': ['cheng', 'civng', 'eleng', 'matce', 'mecng'], 'g_hst': ['hisry', 'arcgy', 'antgy', 'clacs', 'relon'], 'g_hth': ['heace', 'kingy', 'nurng'], 'g_lng': ['frech', 'geran', 'itaan', 'spash'] } @staticmethod def get_corresponding_majors(what) -> List[str]: if what in [ItemTypeOptions.LIKRET, MajorNumberOptions.THIRTY_THREE_USE_MEAN, MajorNumberOptions.THIRTY_THREE_USE_MAX]: return list(Majors.GENERICS.keys()) elif what in [ItemTypeOptions.FORCED_CHOICE, MajorNumberOptions.FIFTY]: return list(Majors.SPECIFICS.keys()) else: raise Exception("Itemtype doesn't exist. Must be one of LIKERT or FORCED_CHOICE or one of the MajorNumberOptions") @staticmethod def get_major_fullnames(short_names:List[str]): results = [Majors.SPECIFICS.get(sn, sn) for sn in short_names] return [Majors.GENERICS.get(sn, sn) for sn in results] class ItemSelector: # in the future, should support subsetting def __init__(self, data:pd.DataFrame): self.data = data def get_selected_cols_by_item_type(self, scored_item_type=Settings.SCORED_ITEMTYPE): check_function = ItemCheck.get_scored_item_type_check(scored_item_type, per_major=False) return [col for col in self.data.columns if check_function(col)] def get_selected_df_by_item_type(self, scored_item_type=Settings.SCORED_ITEMTYPE) -> pd.DataFrame: return self.data.loc[:, self.get_selected_cols_by_item_type(scored_item_type)] ``` ###### tags: `dissertation` `python` `replication`