# Cloud Dataflow

Cloud Dataflow es un servicio para ejecutar flujos de trabajos de big data en GCP. Ocupa el modelo de programación de Apache Beam.
Abre Cloud Shell para trabajar.
Habilitar APIs que se van a usar.
```
gcloud services enable dataflow.googleapis.com
```
Crea una carpeta en Cloud Storage con el nombre de tu proyecto e inicializa variables para ocupar más adelante.
```
export PROJECT_ID="ID de tu proyecto"
gsutil mb gs://$PROJECT_ID
export BUCKET=$PROJECT_ID
export REGION="us-central1"
```
Vamos a usar como ejemplo un programa de Python que cuenta palabras palabras tomando por defecto como input un archivo en Cloud Storage del trabajo de Shakespeare.
Este ejemplo lo encontramos en el siguiente [Link](https://beam.apache.org/get-started/wordcount-example/).
Recordemos que Dataflow usa el modelo de Apache Beam, donde hay **PCollections**, que son los datos, **PTransformations** que son las acciones sobre los datos, se define un **Pipeline** que es el que se ve en la figura siguiente y el **Pipeline runner** será Dataflow.

Encuentra en el código: donde se entrega el texto que es el input? donde quedará el output? cuál es el pipeline que se esta creando?
```python=
# pipeline_options: --output output.txt
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
# Funcion para parsear cada linea del texto en palabras.
# Retorna iterador sobre las palabras
class WordExtractingDoFn(beam.DoFn):
def process(self, element):
return re.findall(r'[\w\']+', element, re.UNICODE)
# Funcion que define y ejecuta el pipeline
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
# Pone el archivo en un PCollection.
lines = p | 'Read' >> ReadFromText(known_args.input)
#Definición de PTransformations
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Formatea la counts en una PCollection de strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Escribe la salida
output | 'Write' >> WriteToText(known_args.output)
#Funcion main que ejecuta RUN y setea el nivel de logs
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
```
Crea un archivo en tu Cloud Shell llamado wordcount.py donde pongas el código anterior. En el video asociado a este tuorial explicamos las partes del código.
Primero ejecutaremos el código de manera local, lo que te permite probar y depurar el código. Para eso haremos un Virtual Environment de python e instalaremos el requerimiento que se necesita:
```
python3 -m pip install --user virtualenv
python3 -m venv env
source env/bin/activate
pip install 'apache-beam[gcp]'
```
Antes de ejecutar debes ir a IAM > IAM y dar los permisos a Compute Engine default service account: Visualizador de objetos de Storage y Creator de objetos de Storage, Administrador de Dataflow y Trabajador de Dataflow.

Ejecutamos:
```
mkdir temp
python3 wordcount.py --output temp/output.txt
````
Vemos una parte del resultado con el comando cat. El resultado es una lista de "palabra: conteo"
Si no vemos ningún problema lo podemos ejecutar sobre Dataflow lo ejecutamos con:
```
python wordcount.py --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://$BUCKET/counts \
--runner DataflowRunner \
--project $PROJECT_ID \
--region $REGION \
--temp_location gs://$BUCKET/tmp/
```
Podemos ir a ver a la consola como se ejecuta el trabajo y el pipeline que se ha creado en el Job. En el video asociado a este tutorial explicamos las partes de la consola de Dataflow.
Puedes revisar la salida del trabajo en Cloud Storage.
**Pregunta:**
Piense en los datos que usted maneja ¿Cuál de los análisis que usted ha realizado sobre datos se podrían expresar como un pipeline? (Cualquier ETL puede expresarse como un pipeline)
**Desafío:**
Haz algún cambio en las PTransformations o en el texto de entrada para que no sea el de Shakespeare y lanza el trabajo nuevamente.
Como ayuda, en este [Link](https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.io.html) puedes encontrar diferentes conectores a diferentes fuentes. [Acá](https://beam.apache.org/documentation/transforms/python/overview/) puedes encontrar el catálogo de distintas transformaciones.
Ejemplos
* Realizar el conteo, pero filtrar las palabras como "the", "a".
* Ocupar un archivo en español (Por ejemplo [alguno de estos](https://www.blindworlds.com/publicacion/82460)) y filtrar las palabras "el" "la" "y" "de" en el conteo.
Si no tiene tiempo para programarlo miren las sugerencias y evaluen si saben donde exactamente tienen que modificar el código para lograr el objetivo.
**Elimine los buckets creados antes de finalizar el tutorial**