```py
import sys
import time
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
#from pyspark.ml.stat import KolmogorovSmirnovTest
# FROM ALGORITHM ARGUMENT - sample_data_X.csv sample_data_Y.csv ${OUTPUT_PATH} spark/xgboost
df_1_path = sys.argv[1] # sample_data_X.csv
df_2_path = sys.argv[2] # sample_data_Y.csv
output_path = sys.argv[3] # result.csv
def log_results(results_df, output_dir=output_path):
# time_df = pd.DataFrame(list(zip(["TIME"], [time.time()-start])), columns=['index', 'results'])
# results_df = pd.concat([results_df, time_df])
results_spark_df = spark.createDataFrame(results_df)
results_spark_df.select("metric", "results").write.save(output_path, format='csv', header=True, mode='overwrite')
# results_df.to_csv(output_dir, index=False, header=["metric", "results"])
print("FILE SAVED")
if __name__ == "__main__":
start = time.time()
spark = SparkSession\
.builder\
.appName('Helios_AIG_POC')\
.getOrCreate()
### READ AS SPARK DATAFRAME
df_1 = spark.read.csv(df_1_path, header=True, inferSchema=True)
df_2 = spark.read.csv(df_2_path, header=True, inferSchema=True)
### MERGE & GET FEATURE COLUMNS
df_merge = df_1.join(df_2, how='inner', on=['order_id'])
feature_col = df_1.drop("order_id", "date").columns
print(df_merge.count(), len(df_merge.columns))
### VECTORIZE FEATURE COLUMNS AND LABELS FOR FUTURE MODELLING IN SPARK IF NEEDED
assembler = VectorAssembler(outputCol="features").setInputCols(feature_col)
df_merge = assembler.transform(df_merge)
df_merge = df_merge.withColumn("label", df_merge.Y)
print(df_merge.count(), len(df_merge.columns))
### SPLIT INTO MODEL AND OOT DATA
model_data = df_merge.filter(df_merge.date < '2021-10-09')
oot_data = df_merge.filter(df_merge.date >= '2021-10-09')
print("Length MODEL: %d\nLength OOT: %d " % (int(model_data.count()), int(oot_data.count())))
### 80:20 TRAIN TEST SPLIT
train, test = model_data.randomSplit([0.8, 0.2], seed=42342)
### RUN SIMPLE LOGISTIC REGRESSION MODEL
lr = LogisticRegression()
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [10**-i for i in range(1,4)]) \
.addGrid(lr.elasticNetParam, [1**i for i in range(1,2)]) \
.build()
evaluator = BinaryClassificationEvaluator() \
.setLabelCol("label") \
.setRawPredictionCol("prediction") \
.setMetricName("areaUnderROC")
cv = CrossValidator(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5)
cvModel = cv.fit(train)
test_predictions = cvModel.transform(test)
oot_predictions = cvModel.transform(oot_data)
results_dict = {
'TEST DATA AUC': evaluator.evaluate(test_predictions),
'OOT DATA AUC': evaluator.evaluate(oot_predictions)
}
"""
EXPECTED OUTPUT
TEST DATA AUC 0.6625
OOT DATA AUC 0.5119
TIME 1.6691
"""
results_spark_df = spark.createDataFrame(results_dict)
results_spark_df.select("results").write.save(output_path, format='csv', header=True, mode='overwrite')
# results_df = pd.DataFrame.from_dict(results_dict, orient='index', columns=["results"])
# results_df = log_results(results_df, output_path=output_path)
```
```sh
export PYTHONPATH=/opt/rh/rh-python36/root/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYTHONPATH
export PYSPARK_PYTHON=/opt/rh/rh-python36/root/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_PYTHON
export PYSPARK_DRIVER_PYTHON=/opt/rh/rh-python36/root/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_DRIVER_PYTHON
```
```sh
export PYTHONPATH=/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYTHONPATH
export PYSPARK_PYTHON=/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_PYTHON
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3:/root/.local/lib/python3.6/site-packages:$PYSPARK_DRIVER_PYTHON
```
1222445511
9 / 3 =3
6 - 10