# Memoria
Documentación para mi memoria (❁´◡`❁)
## Glosario por agregar:
* tweets(?, logs, pipeline, NTP, stateful operations, stateless ", mapeo, records, joins, aggregate, request, memoria heap, JVM, JMX, watermarks, buffer, GC (garbage collector), template
## TODO:
* **cuando esté listo el doc, enviarle a federico meza la informacion**
- [x] Agregar Organización del documento en Cap1
- [x] Dice Figura X, Figura Y en algunas partes... revisar con detalle que no hayan typos de ese estilo.
- [x] En las figuras en el eje donde dice Tiempo, debes poner la unidad, Fig 17, 18, 19 , 23,
- [ ] Agregar discusión sobre los costos, no solo importa el rendimiento sino también cuanto vale $$$ lograrlo. Puedes agregarlo en el cap 4 dado que allí veo discusiones.
- [ ] arreglar indice
----
## Anotaciones importantes
* latencia: todos probados con 1000 bytes por segundo, excepto beam que fue con 500k bytes por segundo
* throughput máximo de cada framework y pipeline
* flink
* iot - 19_000 bytes/s
* twitter - 18_000 bytes/s
* log - 200_000 bytes/s
* spark
* 2_500 bytes/s
* twitter - 4000 bytes/s
* log - 5_000 bytes/s
* kafka streams
* iot - 72_000 bytes/s
* twitter - 5_000 bytes/s
* log - 2_000_000 bytes/s
* beam
* iot - 50m bytes/s
* twitter - 3.9m bytes/s
* log - 19.9m bytes/s
* Chequear que no haya más estado del arte actual, 2021, 2022
* trabajos futuros: ver la semantica (el evento at least once, exactly once, at most once; hacer algo al respecto)
* en general el pipeline de logs funciona mejor porque hay menos coincidencias: los ids que hacen match en el join se dan de forma menos frecuente, dejando tiempo de procesamiento libre, en cambio en los otros casos existen datos siendo procesados en todo momento, causando overflow?
* Para hacernos una idea, en Vandongen, para la **latencia** ocupó un flujo de 135 MB en 30 minutos, diciendo que envia 400 mensajes por segundo. 135MB en 30 minutos equivale a aproximadamente 80.000 bytes por segundo y 720.000 mensajes en total. Cada mensaje debe tener un tamaño de aproximadamente 200 bytes para lograr los 80k bytes por segundo.
* **al final spark demora más de 10s sin join y map**, por lo que se debe dejar así. Pareciera ser intrinseco del framework demorarse harto, esto hay que justificarlo
# Estudio general
* Due to Flink’s asynchronous checkpointing mecha-
nism, the processing pipeline is not blocked while checkpoint-
ing. Flink’s mechanism of keeping state reduces the load on
the garbage collector, as opposed to the mechanism used by
Spark which still heavily relies on JVM memory management,
as described in Section 4.4
* Sobre el calculo de latencia: hay tener un metodo fiable para calcular la latencia. Esto servirá para el calculo del throughput, **ya que si la latencia no se ve incrementada de forma constante y no supera los 10 segundos, entonces es un throughput adecuado**.
* https://siraj-deen.medium.com/garbage-collection-in-spark-why-it-matters-and-how-to-optimize-it-for-optimal-performance-d9a31ab5ff78
### Apache Flink
**¿Qué es?** Framework de procesamiento de datos en tiempo real open source tanto en streams y en batches. Ofrece tolerancia a fallas, escalabilidad e integraciones variadas. Con un gran conjunto de APIs, puede manejar muchas tareas de procesamiento de datos y se utiliza para analiticas en tiempo real, aplicaciones basadas en eventos y tareas en batches.
**¿Cómo funciona?** funcionade una forma paralelaizada y distribuida. Flink procesa los datos mediante un grafo dirigido de transformaciones (mapping, filtering, aggregation,etc). Este framework también divide los datos en particiones para aprovechar el paralelismo, donde cada partición puede ser procesada de manera independiente en diferentes nodos, lo que le permite escalar horizontalmente. Por otro lado, también permite el stateful processing, lo que significa que mantiene el estado de la información entre las etapas, además de controlar los eventos out-of-order mediante watermarks. Utiliza checkpoints, lo que le permite capturar el estado de los datos en intervalos regulares, siendo tolerante a fallas
**Desventajas:** Puede ser complejo sobre todo para usuarios nuevos en el procesamiento de datos de forma distribuida. Desarrollar y debugear requiere un nivel de expertiz. Flink puede ser muy demandante en cuanto a recursos (sobre todo memoria), requiriendo de una sólida infraestructura y mucho monitoreo. Por otro lado, tiene un soporte limitado de querys SQL y quizá tenga menos soporte e integración en comparación con otros frameworks más maduros
### Apache Spark Structured Streaming
**¿Qué es?** Es un componente del framework de apache spark diseñado para el procesamiento de datos en tiempo real. Se caracteriza por tratar los datos como tablas estructuradas y permite operaciones de tipo SQL en estas tablas. Estas caracteristicas hacen que sea fácil desarrollar y mantener aplicaciones con rasgos como tolerancia a fallas y gran consistencia. Se usa en muchos casos, desde transformaciones de datos simples hasta complejas aplicaciones basada en eventos
**¿Cómo funciona?** Funciona tratando los streams de datos como tablas estructuradas y aplicando operaciones en micro-batches a estos streams. Con micro batches se refiere a que convierte el stream continuo en pequeños y consistentens batches. A las tablas se les denomina dataframes o streaming datasets. Soporta querys de tipo SQL tales como aggregation, joins, filtering, etc y procesamiento stateful, lo que quiere decir que mantiene el estado a través del pipeline. Es escalable horizontalmente agregando nodos y soporta event-time processing permitiendo trabajar con datos fuera de orden mediante los timestamps.
**Desventajas:** Al trabajar con micro batches en vez de un flujo continuo de datos, existe una latencia inherente, por lo que, si bien permite trabajar en muchos casos, quizá no es la mejor opción para aplicaciones sensibles a la latencia. Otra desventaja es su intenso uso de memoria y recursos comparado con otros frameworks de procesamiento de streams, lo que puede afectar la tasa de transferencia y latencia. Si bien es escalable, agregar más nodos no siempre es la solución ya que no todas las cargas de trabajo escalan de forma lineal.
### Dataflow
**¿Qué es?** Es un servicio serverless de GCP diseñado para procesar y analizar grandes volumenes de datos de una manera distribuida y paralela. Está construido sobre Apache Beam. Dataflow te permite procesar tanto como batches y como streams y el código es el mismo, lo cual lo hace de fácil uso. Al ser serverless, no hay que preocuparse de la infraestructura ni escalar recursos. Escala de manera automática dependiendo del volumen de datos. Por último, tiene gran cantidad de conectores e integraciones con otros servicios de GCP
**¿Cómo funciona?** Se define y desarrolla el pipeline utilizando Apache Beam. Apache Beam soporta Go, Java y python con una API muy consistente. Se definen las transformaciones (aggregations, joinins, etc) que luego son pasadas a un grafo dirigido. Dataflow distribuye la carga en nodos de workers, los nodos son manejados por GCP de forma dinámica, además de intentar paralelizar para maximizar la eficiencia. GCP ofrece monitoreo y logging de manera integrada simplificando y facilitando el debugeo y la detección de problemas.
**Desventajas:** Una de las desventajas es entender el modelo de programación de apache beam, ya que la curva de aprendizaje puede requerir bastante tiempo. Al estar hosteado en GCP, al igual que cualquier otro proveedor de servicios en la nube, el pipeline puede ser dificil de migrar, amarrandote solo a GCP. Otra desventaja tiene que ver con el costo monetario de todas las funcionalidades del servicio tal como el escalado automático. Es importante conocer tus patrones de consumo y entender el modelo de cobros de GCP.
### Kafka Streams
**¿Qué es?** Diseñado para funcionar con apache kafka, este framework permite que se construyan aplicaciones de procesamiento de streams en tiempo real escalables y tolerante a fallas. Permite operaciones stateful como aggregation, filtering, etc. Puede ser utilizado para construir microservicios.
**¿Cómo funciona?** Una vez los datos son ingestados, kafka streams provee una API para definir la lógica de procesamiento, de esta forma los desarrolladores definen las operaciones de transformación. Soporta operaciones stateful, por lo que mantiene y actualiza los datos a medida que se procesan. Escala horizontalmente para manejar grandes volumenes de datos al ejecutar varias instancias de la aplicación, soportando particionamiento a través de las mismas asegurando paralelización y balanzamiento de cargas. Mediante checkpoints se puede recuperar de fallas recuperando datos perdidos
**Desventajas:** Es principalmente desarrollado en Java, por lo que puede ser una limitación utilizar otros lenguajes. Si bien soporta operaciones windowing, pueden no ser tan completas como otros frameworks. Por otro lado, depende de tópicos de kafka para almacenar datos, pero manejar la retención y el almacenamiento de los tópicos puede ser un desafío, al igual que el testeo, el debugeo y la escalabilidad. Kafka streams también es demandante a nivel de hardware, por lo que es importante utilizar buenos recursos de cpu y memoria.
# Google Cloud
* **Crear una VM:**
```
export ZONE=southamerica-west1-a
```
```
gcloud compute instances create NOMBRE-VM --zone=$ZONE --machine-type=e2-small
```
*revisar tipos de vm's disponibles en la zona*
Especificar la imagen del SO:
```
gcloud compute instances create NOMBRE-VM --image-family=rhel-8 --image-project=rhel-cloud --zone=$ZONE --machine-type=e2-small
```
Listar todas las imágenes públicas de SO disponibles:
```
gcloud compute images list
```
Imagen de VM ubuntu:
```
NAME: ubuntu-1804-bionic-v20220901
PROJECT: ubuntu-os-cloud
FAMILY: ubuntu-1804-lts
DEPRECATED:
STATUS: READY
```
* **Conectarse a una VM mediante SSH:**
```
gcloud compute ssh NOMBRE-VM --zone=$ZONE
```
* **Copiar desde un bucket a la máquina:**
```
sudo gsutil cp -r gs://dataflow_beam_test-1/beam-app /home/javieraibz_j/dataflow
```
* **Comandos de Linux**:
* Descargar un archivo
```
wget LINK
```
* Descomprimir un tgz
```
tar -xzf file.tgz
```
* Borrar un archivo
```
sudo rm -rf ARCHIVO/CARPETA
```
* Crear una carpeta
```
mkdir nombreCarpeta
```
* Ejecutar un FAT JAR
```
jar cmvf META-INF/MANIFEST.MF <new-jar-filename>.jar <files to include>
```
* Mover un archivo
```
mv NOMBREARCHIVO UBICACION
```
* **Shell script para correr kafka y crear sus tópicos automaticamente**
```
nano kafka.sh
```
```
#!/bin/sh
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
sudo bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic iotA --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic iotB --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic logA --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic logB --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic twitterA --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic twitterB --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic twitterOut --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic iotOut --create --partitions 20 --replication-factor 1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic logOut --create --partitions 20 --replication-factor 1
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--add-config 'producer_byte_rate=1000' \
--entity-type clients --entity-name datasetsProducer
```
```
bash kafka.sh
```
## Instalar Oh My Zsh en la VM
* Instalar Zsh
`sudo apt install zsh`
* Hacer Zsh tu shell por defecto:
`sudo chsh -s $(which zsh) $(whoami)`
* Cerrar la terminal y volver a abrirla
* Seleccionar opción q para salir del menú de configuración
* Instalar Oh my Zsh
`sh -c "$(wget https://raw.github.com/ohmyzsh/ohmyzsh/master/tools/install.sh -O -)"`
## CREAR MÁQUINAS EN GCP: Imágen de la máquina para testear cada framework
**Cambiar service account y project id *Chequear service account en cuentas de servicio***
```
gcloud compute instances create frameworks-test \
--project=elegant-verbena-400000 \
--zone=southamerica-west1-a \
--machine-type=n2-custom-8-32768 \
--network-interface=network-tier=PREMIUM,stack-type=IPV4_ONLY,subnet=default \
--maintenance-policy=MIGRATE \
--provisioning-model=STANDARD \
--service-account=293941898296-compute@developer.gserviceaccount.com \
--scopes=https://www.googleapis.com/auth/devstorage.read_only,https://www.googleapis.com/auth/logging.write,https://www.googleapis.com/auth/monitoring.write,https://www.googleapis.com/auth/servicecontrol,https://www.googleapis.com/auth/service.management.readonly,https://www.googleapis.com/auth/trace.append \
--tags=http-server,https-server \
--create-disk=auto-delete=yes,boot=yes,device-name=instance-1,image=projects/ubuntu-os-cloud/global/images/ubuntu-2004-focal-v20230724,mode=rw,size=30,type=projects/elegant-verbena-400000/zones/southamerica-west1-a/diskTypes/pd-ssd \
--no-shielded-secure-boot \
--shielded-vtpm \
--shielded-integrity-monitoring \
--labels=goog-ec-src=vm_add-gcloud \
--reservation-affinity=any
```
* Imágen para kafka
```
gcloud compute instances create kafka-broker \
--project=elegant-verbena-400000 \
--zone=southamerica-west1-a \
--machine-type=e2-custom-4-16384 \
--network-interface=network-tier=PREMIUM,stack-type=IPV4_ONLY,subnet=default \
--maintenance-policy=MIGRATE \
--provisioning-model=STANDARD \
--service-account=293941898296-compute@developer.gserviceaccount.com \
--scopes=https://www.googleapis.com/auth/devstorage.read_only,https://www.googleapis.com/auth/logging.write,https://www.googleapis.com/auth/monitoring.write,https://www.googleapis.com/auth/servicecontrol,https://www.googleapis.com/auth/service.management.readonly,https://www.googleapis.com/auth/trace.append \
--tags=http-server,https-server \
--create-disk=auto-delete=yes,boot=yes,device-name=kafka-broker,image=projects/ubuntu-os-cloud/global/images/ubuntu-2004-focal-v20230724,mode=rw,size=30,type=projects/elegant-verbena-400000/zones/southamerica-west1-a/diskTypes/pd-ssd \
--no-shielded-secure-boot \
--shielded-vtpm \
--shielded-integrity-monitoring \
--labels=goog-ec-src=vm_add-gcloud \
--reservation-affinity=any
```
* Crear un .jar en IntelliJ para ejecutarlo en la VM
[https://www.jetbrains.com/help/idea/compiling-applications.html#package_into_jar](https://)
---
# Kafka
*NOTE: Your local environment must have Java 8+ installed.*
* **Descargar la (*)última (31-08-2022) versión:**
```
https://archive.apache.org/dist/kafka/3.2.1/kafka_2.12-3.2.1.tgz
```
* **Entrar a la carpeta de instalación de Kafka:**
```
cd /usr/local/kafka
```
* **Para iniciar Kafka como demonio (siempre corriendo):**
```
sudo bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
```
```
sudo bin/kafka-server-start.sh -daemon config/server.properties
```
--------
* **Listar todos los tópicos:**
```
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
```
* **Crear tópico:**
```
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic NOMBREDELTOPICO --create --partitions 1 --replication-factor 1
```
* **Eliminar un tópico:**
```
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic NOMBRETOPICO
```
* **Consumidor de eventos:**
```
bin/kafka-console-consumer.sh --topic NOMBREDELTOPICO --from-beginning --bootstrap-server localhost:9092
```
* **Productor de eventos:**
```
bin/kafka-console-producer.sh --topic NOMBRE-TOPICO --bootstrap-server localhost:9092
```
* **Productor con llave:**
```
bin/kafka-console-producer.sh --topic NOMBRE-TOPICO --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
```
---
### Configuración de server.properties
* **Para configurar la máquina y existan conexiones externas:**
```
nano config/server.properties
```
```
advertised.listeners=PLAINTEXT://IP_EXTERNA:PUERTO
```
* **Para que el Kafka Broker asigne sus propios Timestamps en modo LogAppendTime:**
```
nano config/server.properties
```
```
log.message.timestamp.type=LogAppendTime
```
* **Para definir una quota de bytes por segundo en un cliente:**
```
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
--add-config 'producer_byte_rate=2097152' \
--entity-type clients --entity-name datasetsProducer
```
* **Para agregar o cambiar configuraciones del broker:**
```
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config TIPO-DE-CONFIGURACION
```
* **Sobre los timestamps:**
It depends on the timestamp type used, there are two types:
**CreateTime** - timestamp is assigned when producer record is created, so before sending. There can be retries, so there is no guarantee that ordering is preserved.
**LogAppendTime** - timestamp is assigned when record is appended to the log on the broker. In that case ordering per partition is preserved. Multiple messages might get the same timestamp assigned.
By default, CreateTime is used. To change this, set log.message.timestamp.type for broker or message.timestamp.type for particular topic.
---
# Flink (v1.15.2)
Un clúster típico de Flink consiste en un ***Flink master*** y uno o varios ***workers***. Se pueden correr aplicaciones (algoritmos en python, por ejemplo) dentro de un clúster de Flink.
* **Descargar la (*)última (31-08-2022) versión 1.15.2**
```
https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
```
* **Iniciar el cluster:**
```
./bin/start-cluster.sh
```
* **Ejecutar una aplicación en Java:**
```
./bin/flink run \--detached \./examples/APP.jar
```
```
./bin/flink run -c org.example.App /home/sandravillagra98/apache-flink/flink_app.jar //probar sin -c
```
* **Listar los jobs ejecutándose:**
```
./bin/flink list
```
* **terminar un job:**
```
./bin/flink stop \
--savepointPath /tmp/flink-savepoints \
$JOB_ID
```
* **Ver el output del job ejecutándose:**
```
tail log/flink-*-taskexecutor-*.out
```
### Editar archivo config/flink-conf.yaml
```
# se pueden cambiar del codigo
jobmanager.memory.process.size: 8g
taskmanager.memory.process.size: 20g
taskmanager.numberOfTaskSlots: 20 # ES CON 20 CTM
parallelism.default: 20
execution.checkpointing.interval: 10s
#se agregan
execution.buffer-timeout: 100
pipeline.object-reuse: true
```
---
# Kafka Streams (v3.2.1)
En Kafka Streams, cada *record* es un par clave-valor.
* **Para ejecutar Kafka-Streams:**
```
You can run Java applications that use the Kafka Streams library without any additional configuration or requirements.
# Start the application in class `com.example.MyStreamsApp`
# from the fat JAR named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
el real
java -cp kafka-streams_trhoughput.jar org.example.App
```
---
# Spark (v3.3.0)
The Maven-based build is the build of reference for Apache Spark. Building Spark using Maven requires Maven 3.8.4 and Java 8. Spark requires Scala 2.12/2.13; support for Scala 2.11 was removed in Spark 3.0.0.
* **Aspectos importantes a tener en cuenta:**
* Borrar carpeta de checkpoints antes de correr en limpio
* Puede no lanzar output por tener overflow - cargas demasiada pesadas al inicio
* **Descargar la (*)última (31-08-2022) versión 3.3.0:**
```
https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
```
* **Ejecutar una aplicación en Spark:**
```
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
```
donde
--class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
--master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
--deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) †
--conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). Multiple configurations should be passed as separate arguments. (e.g. --conf <key>=<value> --conf <key2>=<value2>)
application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
application-arguments: Arguments passed to the main method of your main class, if any
por ejemplo:
```
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
```
* ~~**Para que el jar funcione:** borrar los .RSA, .SF, .DSA~~
* **CMD:**
```
sudo ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 \
--class org.example.App \
--master local[8] \
/home/ignacio_andres432/frameworks-jars/spark-app\(same\).jar \
10
// USANDO ZSH ES "local[8]"
# oneliner
sudo ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class org.example.App --master local[8] /home/ignacio_andres432/frameworks-jars/spark-app\(same\).jar 10
```
* **Spark conf/spark-defaults.conf**
```
spark.sql.shuffle.partitions 2
spark.sql.streaming.minBatchesToRetain 2
spark.driver.cores 2
spark.driver.memory 6g
spark.executor.instances 5
spark.executor.memory 17g
spark.executor.cores 4
spark.locality.wait 100ms
spark.default.parallelism 20
spark.driver.extraJavaOptions -XX:+UseG1GC
spark.executor.extraJavaOptions -XX:+UseG1GC
spark.executor.extraJavaOptions -XX:ConcGCThreads=2
spark.executor.extraJavaOptions -XX:ParallelGCThreads=4
spark.executor.extraJavaOptions -XX:InitiatingHeapOccupancyPercent=35
spark.sql.autoBroadcastJoinThreshold -1
```
---
# Dataflow
**~~IMPORTANTE: Google Cloud tiene un límite de 8 CPUs para las VM de tipo C2 para southamerica-west1 en su prueba gratuita, por lo que no se puede tener un worker de Dataflow y una VM a la vez.~~ Ahora no se puede tener CPU tipo C2, hay que probar lo que crea un job de dataflow**
Para efectos prácticos, por mientras se cambiará la región, pero en las pruebas finales, se deberá eliminar la VM de pruebas, por lo que es mejor dejar Dataflow para el final
* **Configuración Dataflow**
Instalar gcloud cli en la máquina de testeo para poder tener acceso al bucket y descargar el código: https://cloud.google.com/sdk/docs/install?hl=es-419#deb
Instalar maven en la máquina:
```
sudo apt install maven
```
Finalmente, ejecutar el comando para ejecutar el código **con SUDO** (*mvn -Pdataflow-runner*...)
* **Agregar dependencia a *pom.xml* de Apache Beam:**
```
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>TU VERSION DE APACHE BEAM</version>
<scope>runtime</scope>
</dependency>
```
* **Pasos para configurar el proyecto en Google Cloud y poder ejecutar Dataflow:**
* En el código de Apache Beam, crear el objeto PipelineOptions con los parámetros a utilizar, y pasárselo al Pipeline.
* Una vez listo el código, ir a Google Cloud > Dataflow y seguir la guía inicial para configurar permisos en la Cuenta de Servicio de Compute Engine de GCP (Agregar roles Administrador de almacenamiento, Propietario y Programador de Dataflow si es necesario) y crear el Bucket donde se escribirá la información del pipeline de Dataflow.
* Instalar GCP en la máquina de testeo de frameworks y tener una copia del repositorio.
* Finalmente, entrar a la máquina de testeo de frameworks haciendo click en su nombre > Editar y marcar la opción "Permitir el acceso total a todas las API de Cloud" como aparece en la siguiente imágen:

* Copiar el repositorio desde el bucket a la máquina donde se ejecutará el comando para iniciar el worker de Dataflow usando el siguiente comando:
`sudo gsutil cp -r gs://dataflow_beam_test-1/beam-app /home/javieraibz_j/dataflow`
* **Información adicional muy importante:**
```
https://beam.apache.org/documentation/runners/dataflow/
```
* **Comando para ejecutar en la consola de la nube:**
```
sudo mvn -Pdataflow-runner compile \
exec:java \
-Dexec.mainClass=org.example.App \
-Pdataflow-runner
```
* Comando para deployear job de dataflow
```
sudo mvn -Pdataflow-runner compile \
exec:java \
-Dexec.mainClass=org.example.App \
-Dexec.args="--project=elegant-verbena-400000 \
--gcpTempLocation=gs://beam_dataflow_bucket/temp/ \
--output=gs://beam_dataflow_bucket/results/output \
--runner=DataflowRunner \
--region=southamerica-west1" \
-Pdataflow-runner
```
## Pasos para configurar el entorno de testeo
- [ ] Crear máquinas virtuales
- [ ] En la máquina de Kafka:
- [ ] Instalar Kafka
- [ ] Instalar Java (https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-on-ubuntu-22-04)
- [ ] Crear reglas de firewall para admitir tráfico en el puerto 9092
- [ ] Configurar *server.properties*
- [ ] Crear tópicos con 20 particiones (ejecutar script)
- [ ] En la máquina de testeo de frameworks:
- [ ] Instalar Java version 11 (el default JRE con Java 11 funciona para TODOS los frameworks))
## Ejemplos de inputs:
* **Para IoT:**
```
- 2707176363363894:2021-02-07 00:03:19,1612656199,63.3,17.4,0
- 2753085432003927:2021-02-07 00:03:38,1612656218,69.8,21,1
- 2707176363363894:2021-02-07 00:02:25,1612656145,63.3,25.4,2
```
* **Para Twitter:**
```
- 1:RT saludos!!432&%$(())#0
- 1:@saludos saludos!!432&%$(())#1
- 1:saludos!!!!432&%$(())#2
```
* **Para Logs:**
```
- 1:[22/Jan/2019:03:56:16 +0330] "GET /image/60844/productModel/200x200 HTTP/1.1" 400 5667 "https://www.zanbil.ir/m/filter/b113" "Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build/HuaweiALE-L21) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.158 Mobile Safari/537.36" "-"!!432&%$(())#0
- 1:[22/Jan/2019:03:56:16 +0330] "GET /image/60844/productModel/200x200 HTTP/1.1" 401 5667 "https://www.zanbil.ir/m/filter/b113" "Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build/HuaweiALE-L21) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.158 Mobile Safari/537.36" "-"!!432&%$(())#1
- 1:[22/Jan/2019:03:56:16 +0330] "GET /image/60844/productModel/200x200 HTTP/1.1" 200 5667 "https://www.zanbil.ir/m/filter/b113" "Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build/HuaweiALE-L21) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.158 Mobile Safari/537.36" "-"!!432&%$(())#2
```
## Diferencias entre at least once, exactly once, etc:
*
* **at least once (al menos uno):** se envia un ack y se espera la respuesta en un periodo de tiempo. Si no llega respuesta en ese periodo, se reenvia. Puede provocar duplicados
*
## Parámetros estándar:
* **Tópicos de Kafka:**
* iotA - iotB - iotOut
* logA - logB - logOut
* twitterA - twitterB - twitterOut
* **Tiempo de las ventanas:**
* 15 segundos
* **Path por defecto para el archivo de la ip**:
* /home/ubuntu/ip_folder
* **Path para Spark checkPointLocation**:
* /home/ubuntu/{*topicshortname*}_spark