# Streaming desde Twitter con Google Cloud Platform
Streaming es un concepto que engloba el traspaso de datos de forma continua de un punto a otro. En este tutorial, se implementará un sistema completo análisis y procesamiento de datos de *Twitter* usando los servicios de *Google Cloud Platform (GCP)*.

## Pasos
1. [Requerimientos](#1.-Requerimientos)
2. [Crear cuenta de desarrollador de Twitter](#2.-Crear-cuenta-de-desarrollador-de-Twitter)
3. [Proyecto en GCP](#3.-Proyecto-en-GCP)
4. [GCP Pub/sub service](#4.-GCP-Pub/sub-service)
5. [Extraer datos de Twitter](#5.-Extraer-datos-de-Twitter)
6. [Google Cloud Storage](#6.-Google-Cloud-Storage-service)
7. [GCP BigQuery service](#7.-GCP-BigQuery-service)
8. [DataFlow](#8.-Dataflow)
## 1. Requerimientos
<!-- - Cuenta [desarrollador en Twitter](https://developer.twitter.com/) -->
<!-- - Cuenta activa en [Google Cloud Platform (GCP)](https://cloud.google.com/). -->
- Instalar [Python > 3.8](https://www.python.org/downloads/) en su maquina local.
- Instalar paquetes de Python con [pip](https://pip.pypa.io/en/stable/):
En Windows cmd o macOS terminal:
```bash
pip3 install tweepy
pip3 install google-cloud
pip3 install python-dotenv
pip3 install google-cloud-pubsub
```
- Cree un archivo con nombre `.env` donde se guardaran los parámetros de configuración.
* Sugiero hacer esto en un ambiente virtual (virtualenv).
## 2. Crear cuenta de desarrollador de Twitter
- En la página de [desarrolladores de Twitter](https://developer.twitter.com/), haga click en **Sign up** y siga las instrucciones. En el Developer Portal debe Seguir las indicaciones de **Sign up for Free Account**.
- Revise mail y confirme su cuenta, debería aparecer en el [portal de desarrolladores](https://developer.twitter.com/en/portal/dashboard).
- Cree un proyecto y vaya a la pestaña de **llaves & tokens**. **UPDATE**: Un proyecto se crea automáticamente con la cuenta gratuita, por lo que acceda al mismo y continúe con el siguiente paso.
- Genere una **Bearer Token** y guardela en el archivo `.env`.
```
BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAJdAdAEAAAAAB6hmasdfCdyes5qdfqzYHRkDE4s%3DTNxn1vEd7QU45ra08COtMov1zS6sdfa23Lg8yU3FHg9OKFELWaJi1"
```
Este token servirá para autenticar la conexión con la API de Twitter desde un cliente.
## 3. Proyecto en GCP
Primero que todo, se debe crear un proyecto en *GCP*, en este se encapsularán los servicios y se asignaran roles a usuarios. En este caso, debemos incorporar los servicios: [Pub/Sub](https://cloud.google.com/pubsub), [Dataflow](https://cloud.google.com/dataflow), [BigQuery](https://cloud.google.com/bigquery), [BigTable](https://cloud.google.com/bigtable).
### Crear proyecto y cuenta de servicio en GCP
- Entre a la consola, podrá encontrar todos los servicios de GCP.
- Haga click en **Proyecto Nuevo** e ingrese los datos.
- Luego, se necesita [habilitar las APIs de los servicios](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,cloudresourcemanager.googleapis.com,cloudscheduler.googleapis.com&_ga=2.208850066.1081723947.1654792051-1667480672.1652712244&_gac=1.124380536.1654500239.CjwKCAjwy_aUBhACEiwA2IHHQLtCU6BbEpzn1zUwEwjOGejMRqjjmsPe7N-8OuB1jO8eTd3yGGA6mBoCQwYQAvD_BwE).
- En el servicio de *IAM y administración*, haga click en la pestaña de **Cuentas de servicios** y otorgue permisos para el proyecto. Luego, haga click en la cuenta de servicio y en las opciones de la derecha, haga click en **Administrar llaves**. Por ultimo cree una nueva llave en formato JSON y se descarga automáticamente. La cuenta de servicio nos permitirá interactuar con GCP desde fuera de la plataforma.
- Nuevamente en el servicio de *IAM y administración*, haga click en la pestaña **IAM** haga click en **Agregar** y como principal, agregue su cuenta de servicio con los permisos de: editor en todos los servicios que usaremos.
Una vez habilitada la cuenta, se deben fijar las credenciales como variables de entorno para poder conectarse de forma remoto a GCP. Para ello se puede usar el comando:
```
SET GOOGLE_APPLICATION_CREDENTIALS="<path+to+key>.json"
```
O en el administrador de variables de entorno.
## 4. GCP Pub/sub service
- En la consola GCP, entre al servicio *Pub/sub*.
- Haga click en **Crear tema** e ingrese el ID del tema.
- Una vez creada, se puede configurar el tiempo de retención de mensajes.
Guarde el nombre del proyecto y del tema en el archivo `.env`, debería quedar asi:
```
BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAJdAdAEAAAAAB6hmasdfCdyes5qdfqzYHRkDE4s%3DTNxn1vEd7QU45ra08COtMov1zS6sdfa23Lg8yU3FHg9OKFELWaJi1"
PROJECT_ID = "amiable-catsh-311502"
TOPIC_ID = "twitter_topic"
```
## 5. Extraer datos de Twitter
La librería ```tweepy``` de Python, facilita las funciones para conectarse y extraer datos de la [API de twitter](https://developer.twitter.com/en/docs). Mientras, el módulo [google.cloud.pubsub](https://cloud.google.com/python/docs/reference/pubsub/latest) de la librería google facilita la conexión y permite publicar datos al tópico del servicio pub/sub.
```python
import json
import os
import tweepy
from time import sleep
from concurrent import futures
from google.cloud import pubsub_v1
from dotenv import load_dotenv
load_dotenv(".env") # dependiendo del sistema operativo se guarda como .env.txt
# Max messages to gcp pubsub
MAX_MESSAGES = 10
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=MAX_MESSAGES, # default 100
max_bytes=1024, # default 1 MB
max_latency=10, # default 10 ms
)
# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
message_id = future.result()
print(message_id)
def extract_data(max_results):
Bearer_token = os.environ["BEARER_TOKEN"]
project_id = os.environ["PROJECT_ID"]
topic_name = os.environ["TOPIC_ID"]
publish_client = pubsub_v1.PublisherClient()
topic_path = F'projects/{project_id}/topics/{topic_name}'
publish_futures = []
client = tweepy.Client(bearer_token=Bearer_token)
query = '#birthday -is:retweet'
tweets = list(client.search_recent_tweets(query=query, tweet_fields="public_metrics", max_results=max_results))
for each_tweet in tweets[0]:
tweets_json = {}
tweets_json["id"] = each_tweet.id
tweets_json["text"] = each_tweet.text
publish_future = publish_client.publish(topic_path, data=json.dumps(tweets_json).encode("utf-8"))
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
res = futures.wait(publish_futures , return_when=futures.ALL_COMPLETED)
return res
if __name__ == '__main__':
while True:
tweets_json_batch = extract_data(max_results=MAX_MESSAGES)
sleep(5)
```
## 6. Google Cloud Storage service
- En el servicio de *Google Cloud Storage* haga click en **Crear Bucket**. El nombre debe ser universalmente único.
- Cree dos carpetas dentro del bucket: `temp/` y `raw_data/`.
## 7. GCP BigQuery service
- En el panel explorador, seleccione las opciones de su proyecto y haga click en **Crear conjunto de datos** e ingrese un nombre, desde ahora denominado`dataset`.
- Luego en el panel explorador, haga click en el conjunto de datos y cree una tabla rellenando los datos, desde ahora denominada `table`. En la sección de esquema, haga click en **Editar como texto** e ingrese en formato JSON los atributos y tipos:
```
[
{
"mode": "NULLABLE",
"name": "id",
"type": "INT"
},
{
"mode": "NULLABLE",
"name": "text",
"type": "STRING"
}
]
```
## 8. Dataflow
### Pipeline para BigQuery
- Abra el servicio de *Dataflow* y haga click en **Crear trabajo desde plantilla**.
- En la sección de **plantilla de Dataflow**, seleccione la plantilla **Pub/sub to BigQuery**.
- El tema del servicio *Pub/sub* se ingresa siguiendo la siguiente estructura:
`projects/<PROJECT_ID/topics/<TOPIC_ID>`
- La tabla de BigQuery se ingresa siguiendo la siguiente estructura:
`<PROJECT_ID>:<nombre-del-conjunto>.<nombre-de-la-tabla>
- Los datos temporales del Pipeline se almacena en Google Cloud Storage por default. Se ingresa de la siguiente estructura:
- `gs://<BUCKET_NAME>/temp/`
### Pipeline para Google Cloud Storage
- Abra el servicio de *Dataflow* y haga click en **Crear trabajo desde plantilla**.
- En la sección de **plantilla de Dataflow**, seleccione la plantilla **Pub/sub to Text Files on Cloud Storage**.
- El tema del servicio *Pub/sub* se ingresa con la siguiente estructura:
`projects/<PROJECT_ID/topics/<TOPIC_ID>`
- El bucket de destino en GCS se ingresa con la siguiente estructura:
`gs://<BUCKET_NAME>/`
### Generación de plantillas propias
- A continuación, una breve introducción a los pasos necesarios para generar Beam pipelines que corran en *Dataflow* o cualquier otro Runner (*Spark*, *Flink*, *GoDirect*, etc).
- Los pipelines se definen usando un modelo de Beam que se puede ejecutar en distintos tipos de Runners. [Acá](https://cloud.google.com/dataflow/docs/concepts/beam-programming-model) una guía de GCP de Beam y [acá](https://beam.apache.org/documentation/basics/) la documentación original.
- Las plantillas usadas en este tutorial están en *java* y son de acceso libre, y se pueden encontrar en [este link](https://github.com/GoogleCloudPlatform/DataflowTemplates). Pero además, es permitido crear plantillas desde: [Python](https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python) o [go](https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-go).
- También se puede encontrar [esta guía](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates) para crear y aplicar los pipelines en Dataflow.
<!-- ## License
[MIT](https://choosealicense.com/licenses/mit/) -->