# Streaming desde Twitter con Google Cloud Platform Streaming es un concepto que engloba el traspaso de datos de forma continua de un punto a otro. En este tutorial, se implementará un sistema completo análisis y procesamiento de datos de *Twitter* usando los servicios de *Google Cloud Platform (GCP)*. ![Diagrama de arquitectura](https://i.imgur.com/9oxby26.png) ## Pasos 1. [Requerimientos](#1.-Requerimientos) 2. [Crear cuenta de desarrollador de Twitter](#2.-Crear-cuenta-de-desarrollador-de-Twitter) 3. [Proyecto en GCP](#3.-Proyecto-en-GCP) 4. [GCP Pub/sub service](#4.-GCP-Pub/sub-service) 5. [Extraer datos de Twitter](#5.-Extraer-datos-de-Twitter) 6. [Google Cloud Storage](#6.-Google-Cloud-Storage-service) 7. [GCP BigQuery service](#7.-GCP-BigQuery-service) 8. [DataFlow](#8.-Dataflow) ## 1. Requerimientos <!-- - Cuenta [desarrollador en Twitter](https://developer.twitter.com/) --> <!-- - Cuenta activa en [Google Cloud Platform (GCP)](https://cloud.google.com/). --> - Instalar [Python > 3.8](https://www.python.org/downloads/) en su maquina local. - Instalar paquetes de Python con [pip](https://pip.pypa.io/en/stable/): En Windows cmd o macOS terminal: ```bash pip3 install tweepy pip3 install google-cloud pip3 install python-dotenv pip3 install google-cloud-pubsub ``` - Cree un archivo con nombre `.env` donde se guardaran los parámetros de configuración. * Sugiero hacer esto en un ambiente virtual (virtualenv). ## 2. Crear cuenta de desarrollador de Twitter - En la página de [desarrolladores de Twitter](https://developer.twitter.com/), haga click en **Sign up** y siga las instrucciones. En el Developer Portal debe Seguir las indicaciones de **Sign up for Free Account**. - Revise mail y confirme su cuenta, debería aparecer en el [portal de desarrolladores](https://developer.twitter.com/en/portal/dashboard). - Cree un proyecto y vaya a la pestaña de **llaves & tokens**. **UPDATE**: Un proyecto se crea automáticamente con la cuenta gratuita, por lo que acceda al mismo y continúe con el siguiente paso. - Genere una **Bearer Token** y guardela en el archivo `.env`. ``` BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAJdAdAEAAAAAB6hmasdfCdyes5qdfqzYHRkDE4s%3DTNxn1vEd7QU45ra08COtMov1zS6sdfa23Lg8yU3FHg9OKFELWaJi1" ``` Este token servirá para autenticar la conexión con la API de Twitter desde un cliente. ## 3. Proyecto en GCP Primero que todo, se debe crear un proyecto en *GCP*, en este se encapsularán los servicios y se asignaran roles a usuarios. En este caso, debemos incorporar los servicios: [Pub/Sub](https://cloud.google.com/pubsub), [Dataflow](https://cloud.google.com/dataflow), [BigQuery](https://cloud.google.com/bigquery), [BigTable](https://cloud.google.com/bigtable). ### Crear proyecto y cuenta de servicio en GCP - Entre a la consola, podrá encontrar todos los servicios de GCP. - Haga click en **Proyecto Nuevo** e ingrese los datos. - Luego, se necesita [habilitar las APIs de los servicios](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,cloudresourcemanager.googleapis.com,cloudscheduler.googleapis.com&_ga=2.208850066.1081723947.1654792051-1667480672.1652712244&_gac=1.124380536.1654500239.CjwKCAjwy_aUBhACEiwA2IHHQLtCU6BbEpzn1zUwEwjOGejMRqjjmsPe7N-8OuB1jO8eTd3yGGA6mBoCQwYQAvD_BwE). - En el servicio de *IAM y administración*, haga click en la pestaña de **Cuentas de servicios** y otorgue permisos para el proyecto. Luego, haga click en la cuenta de servicio y en las opciones de la derecha, haga click en **Administrar llaves**. Por ultimo cree una nueva llave en formato JSON y se descarga automáticamente. La cuenta de servicio nos permitirá interactuar con GCP desde fuera de la plataforma. - Nuevamente en el servicio de *IAM y administración*, haga click en la pestaña **IAM** haga click en **Agregar** y como principal, agregue su cuenta de servicio con los permisos de: editor en todos los servicios que usaremos. Una vez habilitada la cuenta, se deben fijar las credenciales como variables de entorno para poder conectarse de forma remoto a GCP. Para ello se puede usar el comando: ``` SET GOOGLE_APPLICATION_CREDENTIALS="<path+to+key>.json" ``` O en el administrador de variables de entorno. ## 4. GCP Pub/sub service - En la consola GCP, entre al servicio *Pub/sub*. - Haga click en **Crear tema** e ingrese el ID del tema. - Una vez creada, se puede configurar el tiempo de retención de mensajes. Guarde el nombre del proyecto y del tema en el archivo `.env`, debería quedar asi: ``` BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAJdAdAEAAAAAB6hmasdfCdyes5qdfqzYHRkDE4s%3DTNxn1vEd7QU45ra08COtMov1zS6sdfa23Lg8yU3FHg9OKFELWaJi1" PROJECT_ID = "amiable-catsh-311502" TOPIC_ID = "twitter_topic" ``` ## 5. Extraer datos de Twitter La librería ```tweepy``` de Python, facilita las funciones para conectarse y extraer datos de la [API de twitter](https://developer.twitter.com/en/docs). Mientras, el módulo [google.cloud.pubsub](https://cloud.google.com/python/docs/reference/pubsub/latest) de la librería google facilita la conexión y permite publicar datos al tópico del servicio pub/sub. ```python import json import os import tweepy from time import sleep from concurrent import futures from google.cloud import pubsub_v1 from dotenv import load_dotenv load_dotenv(".env") # dependiendo del sistema operativo se guarda como .env.txt # Max messages to gcp pubsub MAX_MESSAGES = 10 # Configure the batch to publish as soon as there are 10 messages # or 1 KiB of data, or 1 second has passed. batch_settings = pubsub_v1.types.BatchSettings( max_messages=MAX_MESSAGES, # default 100 max_bytes=1024, # default 1 MB max_latency=10, # default 10 ms ) # Resolve the publish future in a separate thread. def callback(future: pubsub_v1.publisher.futures.Future) -> None: message_id = future.result() print(message_id) def extract_data(max_results): Bearer_token = os.environ["BEARER_TOKEN"] project_id = os.environ["PROJECT_ID"] topic_name = os.environ["TOPIC_ID"] publish_client = pubsub_v1.PublisherClient() topic_path = F'projects/{project_id}/topics/{topic_name}' publish_futures = [] client = tweepy.Client(bearer_token=Bearer_token) query = '#birthday -is:retweet' tweets = list(client.search_recent_tweets(query=query, tweet_fields="public_metrics", max_results=max_results)) for each_tweet in tweets[0]: tweets_json = {} tweets_json["id"] = each_tweet.id tweets_json["text"] = each_tweet.text publish_future = publish_client.publish(topic_path, data=json.dumps(tweets_json).encode("utf-8")) # Non-blocking. Allow the publisher client to batch multiple messages. publish_future.add_done_callback(callback) publish_futures.append(publish_future) res = futures.wait(publish_futures , return_when=futures.ALL_COMPLETED) return res if __name__ == '__main__': while True: tweets_json_batch = extract_data(max_results=MAX_MESSAGES) sleep(5) ``` ## 6. Google Cloud Storage service - En el servicio de *Google Cloud Storage* haga click en **Crear Bucket**. El nombre debe ser universalmente único. - Cree dos carpetas dentro del bucket: `temp/` y `raw_data/`. ## 7. GCP BigQuery service - En el panel explorador, seleccione las opciones de su proyecto y haga click en **Crear conjunto de datos** e ingrese un nombre, desde ahora denominado`dataset`. - Luego en el panel explorador, haga click en el conjunto de datos y cree una tabla rellenando los datos, desde ahora denominada `table`. En la sección de esquema, haga click en **Editar como texto** e ingrese en formato JSON los atributos y tipos: ``` [ { "mode": "NULLABLE", "name": "id", "type": "INT" }, { "mode": "NULLABLE", "name": "text", "type": "STRING" } ] ``` ## 8. Dataflow ### Pipeline para BigQuery - Abra el servicio de *Dataflow* y haga click en **Crear trabajo desde plantilla**. - En la sección de **plantilla de Dataflow**, seleccione la plantilla **Pub/sub to BigQuery**. - El tema del servicio *Pub/sub* se ingresa siguiendo la siguiente estructura: `projects/<PROJECT_ID/topics/<TOPIC_ID>` - La tabla de BigQuery se ingresa siguiendo la siguiente estructura: `<PROJECT_ID>:<nombre-del-conjunto>.<nombre-de-la-tabla> - Los datos temporales del Pipeline se almacena en Google Cloud Storage por default. Se ingresa de la siguiente estructura: - `gs://<BUCKET_NAME>/temp/` ### Pipeline para Google Cloud Storage - Abra el servicio de *Dataflow* y haga click en **Crear trabajo desde plantilla**. - En la sección de **plantilla de Dataflow**, seleccione la plantilla **Pub/sub to Text Files on Cloud Storage**. - El tema del servicio *Pub/sub* se ingresa con la siguiente estructura: `projects/<PROJECT_ID/topics/<TOPIC_ID>` - El bucket de destino en GCS se ingresa con la siguiente estructura: `gs://<BUCKET_NAME>/` ### Generación de plantillas propias - A continuación, una breve introducción a los pasos necesarios para generar Beam pipelines que corran en *Dataflow* o cualquier otro Runner (*Spark*, *Flink*, *GoDirect*, etc). - Los pipelines se definen usando un modelo de Beam que se puede ejecutar en distintos tipos de Runners. [Acá](https://cloud.google.com/dataflow/docs/concepts/beam-programming-model) una guía de GCP de Beam y [acá](https://beam.apache.org/documentation/basics/) la documentación original. - Las plantillas usadas en este tutorial están en *java* y son de acceso libre, y se pueden encontrar en [este link](https://github.com/GoogleCloudPlatform/DataflowTemplates). Pero además, es permitido crear plantillas desde: [Python](https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python) o [go](https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-go). - También se puede encontrar [esta guía](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates) para crear y aplicar los pipelines en Dataflow. <!-- ## License [MIT](https://choosealicense.com/licenses/mit/) -->