# Streaming desde Twitter con Google Cloud Platform
En este tutorial, se implementará un sistema completo de análisis y procesamiento de datos de *Twitter* usando los servicios de *Google Cloud Platform (GCP)*.

## Pasos
1. [Preparación de Ambiente de Trabajo](#1.-Preparación-de-Ambiente-de-Trabajo)
2. [Crear cuenta de desarrollador de Twitter](#2.-Crear-cuenta-de-desarrollador-de-Twitter)
3. [Proyecto en GCP](#3.-Proyecto-en-GCP)
3.1 [Habilitando Apis de los servicios a utilizar](#3.1.-Habilitando-Apis-de-los-servicios-a-utilizar)
3.2 [Creando Cuenta de Servicios para el proyecto](#3.2.-Creando-Cuenta-de-Servicios-para-el-proyecto)
4. [GCP Pub/sub service](#4.-GCP-Pub/sub-service)
5. [Extraer datos de Twitter](#5.-Extraer-datos-de-Twitter)
6. [Google Cloud Storage](#6.-Google-Cloud-Storage-service)
7. [GCP BigQuery service](#7.-GCP-BigQuery-service)
8. [DataFlow](#8.-Dataflow)
9. [Google Cloud Princing Calculator](#9.-Google-Cloud-Princing-Calculator)
## 1. Preparación de Ambiente de Trabajo
- Partiremos Creando Nuestro Entorno de trabajo en nuestro equipo local , esto lo realizaremos sobre conda que es un sistema de gestión de entornos de código abierto. y nuestro ambiente de trabajo se ejecutara usando la version de python 3.7, Para esto ejecutarmemos el siguiente codigo:
```
conda create --name py37 python=3.7
```

al ejecutar este comando nos preguntara si queremos que instale todas estas librerias para crear el ambiente

le Colocamos que si escribiendo **y** para posteriormente presionar la tecla enter

con esto comenzara la instalación de nuestras librerias para levantar nuestro ambiente de trabajo

para verificar que nuestro entorno de trabajo se creo correctamente ejecutamos el siguiente codigo:
```
conda env list
```

como muestra la imagen en nuestro equipo poseemos dos ambientes uno llamado base y otro py37 como nosotros trabajaremos sobre **py37** activaremos ese ambiente con el siguiente comando:
```
conda activate py37
```

si verificamos la imagen podemos ver que pasamos de **(base)** a **py37** por lo que procederemos a instalar nuestras librerias
la primera libreria que instalaremos sera la libreria **tweepy** la cual nos permitira conectarnos a la api de twitter y descargar los twits que le solicitemos
esto lo realizaremos con el siguiente comando:
```
pip install git+https://github.com/tweepy/tweepy.git
```

comenzara la instalacion y una vez finalizada tendremos algo asi:

procederemos a continuacion a instalar **google-cloud** que nos ayudara a ocupar funciones de gcp
para esto ejecutaremos el siguiente comando:
```
pip install google-cloud==0.28.0
```

finalmente procederemos a instalar la libreria **python-dotenv** que nos permitira leer pares clave-valor de un archivo .env y para configurarlos como variables de entorno
eso lo haremos con el siguiente comando:
```
pip install python-dotenv
```

una vez instaladas nuestras librerias creamos nuesta carpeta donde ejecutaremos finalmente el scripts de python que se conectara a twitter y descarga la información que le indiquemos en mi caso estara alojado en la siguiente ruta

finalmente ejecutaremos el comando
```
code .
```
desde la ruta de nuestro archivo para que nuestra carpeta aparezca en visual studio code y aprovecharemos de crear
nuestro archivo .env donde almacenaremos nuestras variables de entorno

con esto damos por cerrada esta seccion del tutorial donde creamos nuestro ambiente de trabajo
## 2. Crear cuenta de desarrollador de Twitter
- En la página de [desarrolladores de Twitter](https://developer.twitter.com/)

- hacemos click en **Sign up** y siga las instrucciones (basta con la cuenta *Essential*).
Completamos la informacion solicitada

- Revise mail deberia llevar un correo mas o menos asi:

- Confirmamos nuestra cuenta y aparecera el [portal de desarrolladores](https://developer.twitter.com/en/portal/dashboard).
- Creamos el proyecto big-data-app-nisepulvedaa

le damos a get key y esto nos creara nuestras variables

- de todas nuestras key ocuparemos solola que dice **Bearer Token** y guardela en el archivo `.env`.
```
BEARER_TOKEN = "XXXXX"
```
Este token servirá para autenticar la conexión con la API de Twitter desde un cliente.
## 3. Proyecto en GCP
Partiremos Creando nuestro proyecto en GCP donde almacenaremos todos los componentes que ocuparemos para la resolucion de este problema ingresamores a [GCP](https://console.cloud.google.com)
al ingresar al portal levantaremos el mantenedor que nos permitira crear nuestro proyecto

le damos a la opcion PROYECTO NUEVO y completamos los campos solicitados nuestro proyecto tendra el nombre de **examen-big-data-nisepulvedaa**

al presionar en crear podemos ver que nuestro proyecto fue creado con exito:

## 3.1 Habilitando Apis de los servicios a utilizar
como lo vimos inicialmente en nuesto diagrama los servicios que utilizaremos de gcp seran
- [Pub/Sub](https://cloud.google.com/pubsub)
- [Dataflow](https://cloud.google.com/dataflow)
- [BigQuery](https://cloud.google.com/bigquery)
- [BigTable](https://cloud.google.com/bigtable)
para utilizarlos debemos activar las apis que nos daran acceso a sus funciones para eso ingresaremos al siguiente enlace:
[habilitar las APIs de los servicios](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,cloudresourcemanager.googleapis.com,cloudscheduler.googleapis.com&_ga=2.208850066.1081723947.1654792051-1667480672.1652712244&_gac=1.124380536.1654500239.CjwKCAjwy_aUBhACEiwA2IHHQLtCU6BbEpzn1zUwEwjOGejMRqjjmsPe7N-8OuB1jO8eTd3yGGA6mBoCQwYQAvD_BwE).
al ingresar veremos lo siguiente:

le damos a siguiente y nos aparecera el listado de apis que habilitaremos

**importante debemos asegurarnos que estamos en el proyecto que acabamos de crear si no no podremos utilizar los servicios desde nuestro proyecto**
presionamos en habilitar y veremos lo siguiente:

Cuanda se hayan habilitado todas las APIS veremos lo siguiente:

## 3.2 Creando Cuenta de Servicios para el proyecto
Ingresaremos al servicio de *IAM y administración*

luego ingresamos a la opción Cuentas de servicios

crearemos una nueva cuenta de servicio presionamos en la opción crear cuenta de servicios y nos aparecera los siguiente:
nuestra cuenta de servicio se llamara **service-account-twitter-nisepulvedaa**

le damos en crear y continuar

le damos en listo

como podemos ver en la imagen nuestra cuenta fue creada con exito
posterior a esto crearemos nuestras llaves esto lo haremos de la siguiente manera:

se nos cargara la siguiente ventana

seleccionamos lo siguiente:

como podemos ver nuestra llave se creo correctamente.
**importante al ejecutar este paso de forma por defecto se descargara en tu equipo un archivo con las credenciales para conectarese a gcp cloud**

**La cuenta de servicio nos permitirá interactuar con GCP desde fuera de la plataforma.**
finalmente nos volvemos a dirigir a al servicio de *IAM y administración* y a nuestra cuenta de servicios le agregamos los permisos de editor

## 4. GCP Pub/sub service
- Dentro de Nuestro Proyecto, entramos al servicio *Pub/sub*.

- Hacemos click en **Crear tema** e ingrese el ID del tema.
nuestro tema se llamara **examen-bigdata-nisepulvedaa**

Una vez finalizada esta accion nos dirigiremos a nuestro archivo .env y agregaremos las siguientes variables:
```
BEARER_TOKEN = "XXXXX"
PROJECT_ID = "examen-big-data-nisepulvedaa"
TOPIC_ID = "examen-big-data-nisepulvedaa-topic"
```

## 5. Extraer datos de Twitter
La librería ```tweepy``` de Python, facilita las funciones para conectarse y extraer datos de la [API de twitter](https://developer.twitter.com/en/docs). Mientras, el módulo [google.cloud.pubsub](https://cloud.google.com/python/docs/reference/pubsub/latest) de la librería google facilita la conexión y permite publicar datos al tópico del servicio pub/sub.
esta funciona la modificaremos para obtener los twitter de la carrera de formula 1 de este domingo 30-10-2022 correspodiente al Gran Premio de Mexico que utiliza su hashtag #MexicoGP
```python
import json
import os
import tweepy
from time import sleep
from concurrent import futures
from google.cloud import pubsub_v1
from dotenv import load_dotenv
load_dotenv(".env") # dependiendo del sistema operativo se guarda como .env.txt
# Max messages to gcp pubsub
MAX_MESSAGES = 100
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=MAX_MESSAGES, # default 100
max_bytes=1024, # default 1 MB
max_latency=10, # default 10 ms
)
# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
message_id = future.result()
print(message_id)
def extract_data(max_results):
Bearer_token = os.environ["BEARER_TOKEN"]
project_id = os.environ["PROJECT_ID"]
topic_name = os.environ["TOPIC_ID"]
publish_client = pubsub_v1.PublisherClient()
topic_path = F'projects/{project_id}/topics/{topic_name}'
publish_futures = []
client = tweepy.Client(bearer_token=Bearer_token)
query = '#MexicoGP has:hashtags has:media lang:es'
tweets = list(client.search_recent_tweets(query=query, tweet_fields="id,text,created_at,author_id,public_metrics", max_results=max_results))
#print(tweets)
for each_tweet in tweets[0]:
#print(each_tweet.public_metrics['retweet_count'])
tweets_json = {'id':'{}'.format(each_tweet.id),'author_id':'{}'.format(each_tweet.author_id), 'text':'{}'.format(each_tweet.text),'created_at':'{}'.format(each_tweet.created_at),'retweet_count':'{}'.format(each_tweet.public_metrics['retweet_count'])}
# = {'id':'{}'.format(each_tweet.id),'author_id':'{}'.format(each_tweet.author_id), 'text':'{}'.format(each_tweet.text),'created_at':'{}'.format(each_tweet.created_at)}
#print(json.dumps(tweets_json))
#tweets_json["created_at"] = fecha
#tweets_json["public_metrics.retweet_count"] = each_tweet.retweet_count
publish_future = publish_client.publish(topic_path, data=json.dumps(tweets_json).encode("utf-8"))
#print(publish_future)
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
#print(publish_futures)
res = futures.wait(publish_futures , return_when=futures.ALL_COMPLETED)
return res
if __name__ == '__main__':
while True:
tweets_json_batch = extract_data(max_results=MAX_MESSAGES)
sleep(5)
```
una vez configurado nuestro script de python lo ejecutamos de la siguiente manera:
como puden ver en nuestra ruta de trabajo tenemos el archivo con las variables de entorno, el archivo con las conexiones a gcp cloud y el archivo python que nos permitira bajar la info
**antes de ejecutar el script setiamos la variable de entorno
con el siguiente comando:**
este archivo es el que se descargo de forma automatica en el punto **3.2**
```
SET GOOGLE_APPLICATION_CREDENTIALS=examen-big-data-nisepulvedaa-636ecfe5a04f.json
```

teniendo esto listo ejecutamos el script con el siguiente comando

si bien el codigo nos da un error se envia correctamente los datos a nuestro topico creado en gcp

## 6. Google Cloud Storage service
a continuacion nos dirigeremos a *Google Cloud Storage* para crear nuestro **Bucket** eso lo realizaremos de la siguiente manera:

- una vez en *Google Cloud Storage* seleccionamos la opcion crear buckets

- a nuestro bucket le asignaremos el nombre **examen-big-data-nisepulvedaa-bucket**

- eleguimos donde almaceneramos nuestra información

- eleguimos de que forma almacenaremos nuestros datos

y las otras dos opciones restantes quedaran en formato standard por lo que creamos nuestro bucket


una vez creado nuestro bucket crearemos dos carpetas dentro de el que seran `temp/` y `raw_data/`. para eso seleccionamos la opcion **CREAR CARPETA**


- una vez creadas estas dos carptas nuestro bucket se deberia ver de la siguiente forma:

## 7. GCP BigQuery service
a continuacion crearemos nuestras tablas de datos en big query para eso nos digiremos a la siguiente ruta

- dentro de big query partiremos creando nuestro conjunto de datos seleccinando lo siguiente

- completamos la información requerida y le damos a crear conjunto de datos

- una vez creado nuestro conjunto de datos se vera de la siguiente manera

a continuación creamos nuestra tabla:

- nuestra tabla poseera la siguiente estructura:
y la llameremos **tabla_examen_big_data_nisepulvedaa_gp_mexico**

```
[
{
"mode":"NULLABLE",
"name":"id",
"type": "STRING"
},
{
"mode":"NULLABLE",
"name":"author_id",
"type": "STRING"
},
{
"mode":"NULLABLE",
"name":"text",
"type": "STRING"
},
{
"mode":"NULLABLE",
"name":"created_at",
"type": "STRING"
},
{
"mode":"NULLABLE",
"name":"retweet_count",
"type": "STRING"
}
]
```
una vez nuesta tabla creada correctamente veremos lo siguiente:

en este punto lo unico que nos faltaria seria ejecutar nuestro pipeline para nuestro runner ocuparemos *DataFlow*
## 8. Dataflow
para dirigirnos a *DataFlow* lo haremos de la siguiente manera:

aca tenemos dos grandes tareas primero generar un *Pipeline para BigQuery* y otro para *Pipeline para Google Cloud Storage* desde el siguiente menu:

### Pipeline para BigQuery
a nuestro job lo llamaremos **examen_big_data_nisepulvedaa_dataflow_job_to_big_query**
otra cosa importante aca es que la plantilla que ocuparemos sera la que dice **Pub/Sub topic to BigQuery**

a continuacion se nos solicitaran los siguientes parametros:
- El tema del servicio *Pub/sub* se ingresa siguiendo la siguiente estructura:
`projects/<PROJECT_ID/topics/<TOPIC_ID>`
- La tabla de BigQuery se ingresa siguiendo la siguiente estructura:
`<PROJECT_ID>:<nombre-del-conjunto>.<nombre-de-la-tabla>
- Los datos temporales del Pipeline se almacena en Google Cloud Storage por default. Se ingresa de la siguiente estructura:
- `gs://<BUCKET_NAME>/temp/`
**Proyecto:** projects/examen-big-data-nisepulvedaa/topics/examen-big-data-nisepulvedaa-topic
**BigQuery OUTPUT TABLE:** examen-big-data-nisepulvedaa:examen-big-data-nisepulvedaa.conjunto_examen_bita_nisepulvedaa_twitter.tabla_examen_big_data_nisepulvedaa_gp_mexico
**Ubicacion Temporal:** gs://examen-big-data-nisepulvedaa-bucket/temp
completada esta información le damos a la opcion Ejecutar Trabajo

veremos nuestro trabajo creado correctamente

### Pipeline para Google Cloud Storage
para realizar esta accion ingresaremos al siguiente menu:

a nuestro job lo llamaremos **examen_big_data_nisepulvedaa_dataflow_job_to_cloud_storage**
otra cosa importante aca es que la plantilla que ocuparemos sera la que dice ** Pub/Sub to Text Files on Cloud Storage **


- Abra el servicio de *Dataflow* y haga click en **Crear trabajo desde plantilla**.
- En la sección de **plantilla de Dataflow**, seleccione la plantilla **Pub/sub to GCS**.
- El tema del servicio *Pub/sub* se ingresa con la siguiente estructura:
`projects/<PROJECT_ID/topics/<TOPIC_ID>`
- El bucket de destino en GCS se ingresa con la siguiente estructura:
`gs://<BUCKET_NAME>/`
finalmente nuestro Pipelines Creados los podemos ver de la siguiente manera:

**dentro del transcurso de tiempo en que hacia este tutorial pasaba que tenia mis dos Pipelines corriendo pero no lograba ver la informacion ni en *BigQuery* ni en *CloudStorage* asique parti todo desde 0 ejecutando este paso a paso y me di cuenta de esto**

**la tabla que habia creado para almacenar mi informacioon tenia un formato erroneo por lo que al aplicar la correcion todo se soluciono.**
<!-- ## License
[MIT](https://choosealicense.com/licenses/mit/) -->
Finalmente ejecutamos nuestro codigo python ahora obtiendo 100 twits por ejecución:

verificamos si se guardaron estos registros en big query

podemos verificar tambien que la informacion en bruto se ha guardado en *Google Cloud Storage*

si revisamos uno de los archivos podemos ver que tenemos en nuestro contenido toda la informacion

para validar buscaremos el id **1587852337107828737** que aperce en nuestro archivo en **BigQuery** para asegurarnos de que nuestro flujo funciona correctamente

como podemos ver tenemos el registro guardado en BigQuery
**por lo revisado el codigo de python lo que hace es que baja 10 o 100 twets pero cada vez que se ejecuta baja los mismos 100 por lo que movimos algunas cosas para traernos mas info agredandole parametros al script**

- finalmente buscamos los 10 twits mas retuitiados con la siguiente consulta:

## 9. Google Cloud Princing Calculator
finalmente realizamos la estimación de los costos asociados a tener funcionando un sistema de esta magnitud con Google Cloud Pricing Calculator
accedemos al siguiente link:
https://cloud.google.com/google/calculadora al ingresar al enlace veremos lo siguiente:

* identificamos el servicio de Pub/Sub y realizamos la estimación

lo que nos da un resultado de:

* identificamos el servicio de Cloud Storage y realizamos la estimación

lo que nos da un resultado de:

* posterior a esto identificamos el servicio de Big Query y realizamos la estimación

lo que nos da un resultado de:

* identificamos el servicio de DataFlow y realizamos la estimación:

lo que nos da un resultado de:
