```py import sys import time from pyspark.sql import SparkSession from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import VectorAssembler from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import VectorAssembler #from pyspark.ml.stat import KolmogorovSmirnovTest # FROM ALGORITHM ARGUMENT - sample_data_X.csv sample_data_Y.csv ${OUTPUT_PATH} spark/xgboost df_1_path = sys.argv[1] # sample_data_X.csv df_2_path = sys.argv[2] # sample_data_Y.csv output_path = sys.argv[3] # result.csv def log_results(results_df, output_dir=output_path): # time_df = pd.DataFrame(list(zip(["TIME"], [time.time()-start])), columns=['index', 'results']) # results_df = pd.concat([results_df, time_df]) results_spark_df = spark.createDataFrame(results_df) results_spark_df.select("metric", "results").write.save(output_path, format='csv', header=True, mode='overwrite') # results_df.to_csv(output_dir, index=False, header=["metric", "results"]) print("FILE SAVED") if __name__ == "__main__": start = time.time() spark = SparkSession\ .builder\ .appName('Helios_AIG_POC')\ .getOrCreate() ### READ AS SPARK DATAFRAME df_1 = spark.read.csv(df_1_path, header=True, inferSchema=True) df_2 = spark.read.csv(df_2_path, header=True, inferSchema=True) ### MERGE & GET FEATURE COLUMNS df_merge = df_1.join(df_2, how='inner', on=['order_id']) feature_col = df_1.drop("order_id", "date").columns print(df_merge.count(), len(df_merge.columns)) ### VECTORIZE FEATURE COLUMNS AND LABELS FOR FUTURE MODELLING IN SPARK IF NEEDED assembler = VectorAssembler(outputCol="features").setInputCols(feature_col) df_merge = assembler.transform(df_merge) df_merge = df_merge.withColumn("label", df_merge.Y) print(df_merge.count(), len(df_merge.columns)) ### SPLIT INTO MODEL AND OOT DATA model_data = df_merge.filter(df_merge.date < '2021-10-09') oot_data = df_merge.filter(df_merge.date >= '2021-10-09') print("Length MODEL: %d\nLength OOT: %d " % (int(model_data.count()), int(oot_data.count()))) ### 80:20 TRAIN TEST SPLIT train, test = model_data.randomSplit([0.8, 0.2], seed=42342) ### RUN SIMPLE LOGISTIC REGRESSION MODEL lr = LogisticRegression() paramGrid = ParamGridBuilder() \ .addGrid(lr.regParam, [10**-i for i in range(1,4)]) \ .addGrid(lr.elasticNetParam, [1**i for i in range(1,2)]) \ .build() evaluator = BinaryClassificationEvaluator() \ .setLabelCol("label") \ .setRawPredictionCol("prediction") \ .setMetricName("areaUnderROC") cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) cvModel = cv.fit(train) test_predictions = cvModel.transform(test) oot_predictions = cvModel.transform(oot_data) results_dict = { 'TEST DATA AUC': evaluator.evaluate(test_predictions), 'OOT DATA AUC': evaluator.evaluate(oot_predictions) } """ EXPECTED OUTPUT TEST DATA AUC 0.6625 OOT DATA AUC 0.5119 TIME 1.6691 """ results_spark_df = spark.createDataFrame(results_dict) results_spark_df.select("results").write.save(output_path, format='csv', header=True, mode='overwrite') # results_df = pd.DataFrame.from_dict(results_dict, orient='index', columns=["results"]) # results_df = log_results(results_df, output_path=output_path) ``` ```sh export PYTHONPATH=/opt/rh/rh-python36/root/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYTHONPATH export PYSPARK_PYTHON=/opt/rh/rh-python36/root/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_PYTHON export PYSPARK_DRIVER_PYTHON=/opt/rh/rh-python36/root/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_DRIVER_PYTHON ``` ```sh export PYTHONPATH=/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYTHONPATH export PYSPARK_PYTHON=/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_PYTHON export PYSPARK_DRIVER_PYTHON=/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_DRIVER_PYTHON ``` 1222445511 9 / 3 =3 6 - 10