Cloud Dataproc

Dataproc en GCP nos permitirá tener el mismo ecosistema opensource de Hadoop y Spark. Esto permite correr los mismos códigos para Big Data que tengan hechos directamente en GCP, sin cambios, y con la ventaja de tener un cluster en cosa de minutos.

En este ejemplo, veremos lo sencillo que es crear un cluster en Dataproc, con un nodo maestro y dos workers.

Para ejemplificar el uso del cluster usaremos Spark SQL usando Jupyter Notebooks que ejecutará el trabajo sobre este cluster de Dataproc.

Los datos a analizar los pondremos primero en HDFS y luego en Cloud Storage, para desacoplar la computación del almacenamiento.

Finalmente, pasaremos el trabajo del notebook a un archivo python para que sea lanzado como un trabajo (job) al cluster.

Cluster de Cloud Dataproc

Buscaremos Dataproc en el menú de navegación, y le pondremos un pin para encontrarlo rápidamente la siguiente vez. Habilite la API en caso que no esté habilitada con anterioridad.

Haz click en CREATE CLUSTER.

Luego da la opción entre usar Compute Engine o GKE, elegiremos Compute Engine.

Le pondremos un nombre a nuestro cluster, por ejemplo "midataproccluster", seleccionamos región, el tipo de cluster dejaremos el Standard.

Luego pon CHANGE en la sección Versioning y selecciona 2.0 (Debian 10, Hadoop 3.2, Spark 3.1). Aquí es donde ajustas las versiones de los software que necesitas y si no están puedes proveer una imagen hecha por ti. Pon SELECT.

En la sección Components, haz click en Enable component gateway. Y en Optional Components selecciona Jupyter Notebooks.

Explora las configuraciones opcionales, pero deja los valores por defecto. Haz click en Equivalent Command Line para ver el gcloud command que crea el cluster que configuraste sin necesidad de tantos clicks. Puedes copiarlo, es útil el caso de que tengas que cerrar el trabajo antes de terminar el lab y luego quieras partir sin perder minutos en configuración.

Luego presiona CREATE para crear el cluster. Deberías ver luego de unos minutos el cluster en estado Running, como en la siguiente figura.

UPDATE Si no te deja crear el cluster con la configuración que tienes, intenta cambiando el tipo de máquinas del cluster. Esto se hace en "Configure nodes. Allí puedes elegir nodos de la Serie N1, tanto para el Manager como para los dos nodos Workers.

Usando software instalado en Dataproc (Jupyter)

Abre el cloud Shell y sube el notebook que está en el siguiente link que ocuparemos como ejemplo. Archivo spark-hdfs2.ipynb.

Este notebook tenemos que almacenarlo en el bucket que usa Cloud Dataproc, lo puedes encontrar entre los datos del cluster:

Una vez que tengas el archivo en tu Cloud Shell lo subes Cloud Storage con el siguiente commando, reemplaza $DP_STORAGE por el nombre del bucket de Dataproc:

gsutil -m cp spark-hfds.ipynb gs://$DP_STORAGE/notebooks/jupyter

Sabemos que el cluster tiene instalado Hadoop, Spark, entre otros, de hecho tenemos acceso a su interfaz gráfica al entrar a los detalles del cluster, en Web Interfaces > Component gateway. Haz click sobre el que dice Jupyter.

Te aparecerá algo como, dado que se el contenido de la carpeta notebooks/jupyter que está en Cloud Storage y tiene el notebook que acabas de subir:

Haz click en el notebook dentro de GCS para abrirlo.

Luego haz click en Cells > Run All. Esto ejecutará todas las celdas del notebook.
Explora como se van ejecutando cada etapa.

El dataset analizado en este ejemplo es de la competencia KDD de 1999, que era sobre detección de intrusos y contiene datos de conexiones TCP que están tageadas como normales o ataques, que pueden ser DoS, acceso no autorizado, escaneo de puertos, entre otros. Más información del dataset en el siguiente Link

En el video asociado a este tutorial explicaremos el contenido del notebook, el uso de HFDS y Spark SQL.

Descoplar Computación y Almacenamiento

Ahora modificaremos el notebook para que en vez de leer de HDFS (hdfs://) lo haga de Cloud Storage (gs://). Vamos primero a crear un bucket y almacenaremos ahí los datos, para eso usaremos Cloud Shell.

export PROJECT_ID=$(gcloud info --format='value(config.project)')
gsutil mb gs://$PROJECT_ID
wget https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data_10_percent.txt
gsutil cp kddcup.data_10_percent.txt gs://$PROJECT_ID/

De vuelta en Jupyter Notebook, haz una copia del archivo y nombralo como spark-cs (File > Make a Copy).

El notebook que estabas ocupando antes spark-hdfs cierralo (File > Close and Halt). Regresa al otro notebook.

Para borrar celdas en el notebook, selecciona la celda y haz click en cut que tiene el símbolo de las tijeras en el toolbar.

Los comentarios iniciales y celdas [1] [2] y [3]. Para que el notebook comience en Reading Data. Luego reemplaza el contenido de la celda [4] con este código donde debes poner el nombre del bucket donde están los datos.

from pyspark.sql import SparkSession, SQLContext, Row gcs_bucket='Nombre-De-Tu-Bucket' spark = SparkSession.builder.appName("kdd").getOrCreate() sc = spark.sparkContext data_file = "gs://"+gcs_bucket+"//kddcup.data_10_percent.txt" raw_rdd = sc.textFile(data_file).cache() raw_rdd.take(5)

Luego click en Cell > Run All, el resultado es el mismo anterior pero está desacoplado la computación del almacenamiento.

Usando Dataproc con Jobs en Python

Al cluster de Dataproc podemos enviar distintos trabajo que se ejecutan sobre el cluster creado. Transformaremos el código en el Jupyter notebook a Python y lo lanzaremos como un trabajo.

Copia el notebook llamado spark-cs a otro que se llame spark-python (File > Make a Copy).

Luego cierra el notebook spark-cs (File > Close and Halt).

En el notebook spark-python haz click en la primera celda y agrega otra celda arriba (Insert > Insert Cell Above)

Pega el siguiente código en esa celda para hacer los import necesarios y manejo de parámetros:

%%writefile spark_analysis.py import matplotlib matplotlib.use('agg') import argparse parser = argparse.ArgumentParser() parser.add_argument("--bucket", help="bucket for input and output") args = parser.parse_args() BUCKET = args.bucket

Ahora, al comienzo de cada una de las otras celdas del notebook agrega:

%%writefile -a spark_analysis.py

En la última celda, borra %matplotlib inline, ya que no graficaremos así.

Agrega una celda bajo la última celda del notebook (Insert > Insert Cell Below), y poen el siguiente código:

%%writefile -a spark_analysis.py ax[0].get_figure().savefig('report.png');

Agrega una celda más con el siguiente código, ojo que está dos veces el nombre del cluster, que se debe ajustar si es que no usaron el mismo:

%%writefile -a spark_analysis.py import google.cloud.storage as gcs bucket = gcs.Client().get_bucket(BUCKET) for blob in bucket.list_blobs(prefix='midataproccluster/'): blob.delete() bucket.blob('midataproccluster/report.png').upload_from_filename('report.png')

En una nueva celda al final del notebook pon el siguiente código, nuevamente ojo con el nombre del cluster en la última línea:

%%writefile -a spark_analysis.py connections_by_protocol.write.format("csv").mode("overwrite").save( "gs://{}/midataproccluster/connections_by_protocol".format(BUCKET))

En una nueva celda al final del documento agrega lo siguiente:

BUCKET_list = !gcloud info --format='value(config.project)' BUCKET=BUCKET_list[0] print('Writing to {}'.format(BUCKET)) !/opt/conda/miniconda3/bin/python spark_analysis.py --bucket=$BUCKET

En una nueva celda agrega

!gsutil ls gs://$BUCKET/midataproccluster/**

En una nueva celda agrega

!gsutil cp spark_analysis.py gs://$BUCKET/midataproccluster/spark_analysis.py

Haz click en Cell y Run All para correr todas las celdas del notebook.

Ahora vamos a correr esto como un trabajo en Cloud Dataproc, para eso vamos al Cloud Shell y copiamos el script the pyhton de Cloud Storage:

gsutil cp gs://$PROJECT_ID/midataproccluster/spark_analysis.py spark_analysis.py

Lanza el trabajo con el siguiente comando, este es el correcto, en el Video falta un doble guión:

gcloud dataproc jobs submit pyspark \
       --cluster midataproccluster \
       --region us-central1 \
       spark_analysis.py \
       -- --bucket=$PROJECT_ID

Puedes ver el trabajo ejecutándose en la consola en Jobs, haz click en el nombre y mira el progreso del trabajo:

Cuando haya terminado, puedes ver el resultado en el bucket de Cloud Storage, abriendo el archivo midataproccluster/report.png.

RECUERDA BORAR EL CLUSTER DE DATAPROC CREADO y el bucket con los datos en cloud Storage

gcloud dataproc clusters delete [DATAPROC_CLUSTER_NAME]