# Cloud Dataflow ![](https://i.imgur.com/wtEXP4f.png) Cloud Dataflow es un servicio para ejecutar flujos de trabajos de big data en GCP. Ocupa el modelo de programación de Apache Beam. Abre Cloud Shell para trabajar. Habilitar APIs que se van a usar. ``` gcloud services enable dataflow.googleapis.com ``` Crea una carpeta en Cloud Storage con el nombre de tu proyecto e inicializa variables para ocupar más adelante. ``` export PROJECT_ID="ID de tu proyecto" gsutil mb gs://$PROJECT_ID export BUCKET=$PROJECT_ID export REGION="us-central1" ``` Vamos a usar como ejemplo un programa de Python que cuenta palabras palabras tomando por defecto como input un archivo en Cloud Storage del trabajo de Shakespeare. Este ejemplo lo encontramos en el siguiente [Link](https://beam.apache.org/get-started/wordcount-example/). Recordemos que Dataflow usa el modelo de Apache Beam, donde hay **PCollections**, que son los datos, **PTransformations** que son las acciones sobre los datos, se define un **Pipeline** que es el que se ve en la figura siguiente y el **Pipeline runner** será Dataflow. ![](https://i.imgur.com/08Z00gE.png) Encuentra en el código: donde se entrega el texto que es el input? donde quedará el output? cuál es el pipeline que se esta creando? ```python= # pipeline_options: --output output.txt import argparse import logging import re import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions # Funcion para parsear cada linea del texto en palabras. # Retorna iterador sobre las palabras class WordExtractingDoFn(beam.DoFn): def process(self, element): return re.findall(r'[\w\']+', element, re.UNICODE) # Funcion que define y ejecuta el pipeline def run(argv=None, save_main_session=True): parser = argparse.ArgumentParser() parser.add_argument( '--input', dest='input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Input file to process.') parser.add_argument( '--output', dest='output', required=True, help='Output file to write results to.') known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session with beam.Pipeline(options=pipeline_options) as p: # Pone el archivo en un PCollection. lines = p | 'Read' >> ReadFromText(known_args.input) #Definición de PTransformations counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum)) # Formatea la counts en una PCollection de strings. def format_result(word, count): return '%s: %d' % (word, count) output = counts | 'Format' >> beam.MapTuple(format_result) # Escribe la salida output | 'Write' >> WriteToText(known_args.output) #Funcion main que ejecuta RUN y setea el nivel de logs if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() ``` Crea un archivo en tu Cloud Shell llamado wordcount.py donde pongas el código anterior. En el video asociado a este tuorial explicamos las partes del código. Primero ejecutaremos el código de manera local, lo que te permite probar y depurar el código. Para eso haremos un Virtual Environment de python e instalaremos el requerimiento que se necesita: ``` python3 -m pip install --user virtualenv python3 -m venv env source env/bin/activate pip install 'apache-beam[gcp]' ``` Antes de ejecutar debes ir a IAM > IAM y dar los permisos a Compute Engine default service account: Visualizador de objetos de Storage y Creator de objetos de Storage, Administrador de Dataflow y Trabajador de Dataflow. ![Screenshot 2024-09-25 at 17.06.12](https://hackmd.io/_uploads/BkyITyz0A.png) Ejecutamos: ``` mkdir temp python3 wordcount.py --output temp/output.txt ```` Vemos una parte del resultado con el comando cat. El resultado es una lista de "palabra: conteo" Si no vemos ningún problema lo podemos ejecutar sobre Dataflow lo ejecutamos con: ``` python wordcount.py --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://$BUCKET/counts \ --runner DataflowRunner \ --project $PROJECT_ID \ --region $REGION \ --temp_location gs://$BUCKET/tmp/ ``` Podemos ir a ver a la consola como se ejecuta el trabajo y el pipeline que se ha creado en el Job. En el video asociado a este tutorial explicamos las partes de la consola de Dataflow. Puedes revisar la salida del trabajo en Cloud Storage. **Pregunta:** Piense en los datos que usted maneja ¿Cuál de los análisis que usted ha realizado sobre datos se podrían expresar como un pipeline? (Cualquier ETL puede expresarse como un pipeline) **Desafío:** Haz algún cambio en las PTransformations o en el texto de entrada para que no sea el de Shakespeare y lanza el trabajo nuevamente. Como ayuda, en este [Link](https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.io.html) puedes encontrar diferentes conectores a diferentes fuentes. [Acá](https://beam.apache.org/documentation/transforms/python/overview/) puedes encontrar el catálogo de distintas transformaciones. Ejemplos * Realizar el conteo, pero filtrar las palabras como "the", "a". * Ocupar un archivo en español (Por ejemplo [alguno de estos](https://www.blindworlds.com/publicacion/82460)) y filtrar las palabras "el" "la" "y" "de" en el conteo. Si no tiene tiempo para programarlo miren las sugerencias y evaluen si saben donde exactamente tienen que modificar el código para lograr el objetivo. **Elimine los buckets creados antes de finalizar el tutorial**