# Streaming desde Twitter con Google Cloud Platform En este tutorial, se implementará un sistema completo de análisis y procesamiento de datos de *Twitter* usando los servicios de *Google Cloud Platform (GCP)*. ![Diagrama de arquitectura](https://i.imgur.com/9oxby26.png) ## Pasos 1. [Preparación de Ambiente de Trabajo](#1.-Preparación-de-Ambiente-de-Trabajo) 2. [Crear cuenta de desarrollador de Twitter](#2.-Crear-cuenta-de-desarrollador-de-Twitter) 3. [Proyecto en GCP](#3.-Proyecto-en-GCP) 3.1 [Habilitando Apis de los servicios a utilizar](#3.1.-Habilitando-Apis-de-los-servicios-a-utilizar) 3.2 [Creando Cuenta de Servicios para el proyecto](#3.2.-Creando-Cuenta-de-Servicios-para-el-proyecto) 4. [GCP Pub/sub service](#4.-GCP-Pub/sub-service) 5. [Extraer datos de Twitter](#5.-Extraer-datos-de-Twitter) 6. [Google Cloud Storage](#6.-Google-Cloud-Storage-service) 7. [GCP BigQuery service](#7.-GCP-BigQuery-service) 8. [DataFlow](#8.-Dataflow) 9. [Google Cloud Princing Calculator](#9.-Google-Cloud-Princing-Calculator) ## 1. Preparación de Ambiente de Trabajo - Partiremos Creando Nuestro Entorno de trabajo en nuestro equipo local , esto lo realizaremos sobre conda que es un sistema de gestión de entornos de código abierto. y nuestro ambiente de trabajo se ejecutara usando la version de python 3.7, Para esto ejecutarmemos el siguiente codigo: ``` conda create --name py37 python=3.7 ``` ![Comando Creacion Entorno de trabajo](https://i.imgur.com/oHq4spj.png) al ejecutar este comando nos preguntara si queremos que instale todas estas librerias para crear el ambiente ![Librerias a instalar](https://i.imgur.com/DTdYZb2.png) le Colocamos que si escribiendo **y** para posteriormente presionar la tecla enter ![Procedemos con librerias de nuestro entorno de trabajo](https://i.imgur.com/0RRMofc.png) con esto comenzara la instalación de nuestras librerias para levantar nuestro ambiente de trabajo ![Intalacion de paquetes](https://i.imgur.com/qarvtPe.png) para verificar que nuestro entorno de trabajo se creo correctamente ejecutamos el siguiente codigo: ``` conda env list ``` ![ambientes creados](https://i.imgur.com/cqLHtrR.png) como muestra la imagen en nuestro equipo poseemos dos ambientes uno llamado base y otro py37 como nosotros trabajaremos sobre **py37** activaremos ese ambiente con el siguiente comando: ``` conda activate py37 ``` ![Cambio de Ambiente](https://i.imgur.com/Q98Brk8.png) si verificamos la imagen podemos ver que pasamos de **(base)** a **py37** por lo que procederemos a instalar nuestras librerias la primera libreria que instalaremos sera la libreria **tweepy** la cual nos permitira conectarnos a la api de twitter y descargar los twits que le solicitemos esto lo realizaremos con el siguiente comando: ``` pip install git+https://github.com/tweepy/tweepy.git ``` ![install tweepy](https://i.imgur.com/vmf621U.png) comenzara la instalacion y una vez finalizada tendremos algo asi: ![install tweepy complete](https://i.imgur.com/7YmZC09.png) procederemos a continuacion a instalar **google-cloud** que nos ayudara a ocupar funciones de gcp para esto ejecutaremos el siguiente comando: ``` pip install google-cloud==0.28.0 ``` ![google-cloud](https://i.imgur.com/cLDxvcm.png) finalmente procederemos a instalar la libreria **python-dotenv** que nos permitira leer pares clave-valor de un archivo .env y para configurarlos como variables de entorno eso lo haremos con el siguiente comando: ``` pip install python-dotenv ``` ![install dobnet](https://i.imgur.com/2NCIWMa.png) una vez instaladas nuestras librerias creamos nuesta carpeta donde ejecutaremos finalmente el scripts de python que se conectara a twitter y descarga la información que le indiquemos en mi caso estara alojado en la siguiente ruta ![entorno trabajo](https://i.imgur.com/9737Ckz.png) finalmente ejecutaremos el comando ``` code . ``` desde la ruta de nuestro archivo para que nuestra carpeta aparezca en visual studio code y aprovecharemos de crear nuestro archivo .env donde almacenaremos nuestras variables de entorno ![Variables de entorno](https://i.imgur.com/hDRbSgM.png) con esto damos por cerrada esta seccion del tutorial donde creamos nuestro ambiente de trabajo ## 2. Crear cuenta de desarrollador de Twitter - En la página de [desarrolladores de Twitter](https://developer.twitter.com/) ![](https://i.imgur.com/a1lSK6x.jpg) - hacemos click en **Sign up** y siga las instrucciones (basta con la cuenta *Essential*). Completamos la informacion solicitada ![](https://i.imgur.com/954NIWY.png) - Revise mail deberia llevar un correo mas o menos asi: ![](https://i.imgur.com/1eGR1y0.png) - Confirmamos nuestra cuenta y aparecera el [portal de desarrolladores](https://developer.twitter.com/en/portal/dashboard). - Creamos el proyecto big-data-app-nisepulvedaa ![big-data-app-nisepulvedaa](https://i.imgur.com/l1KnyaR.png) le damos a get key y esto nos creara nuestras variables ![twitter-app](https://i.imgur.com/QH7SO5Z.png) - de todas nuestras key ocuparemos solola que dice **Bearer Token** y guardela en el archivo `.env`. ``` BEARER_TOKEN = "XXXXX" ``` Este token servirá para autenticar la conexión con la API de Twitter desde un cliente. ## 3. Proyecto en GCP Partiremos Creando nuestro proyecto en GCP donde almacenaremos todos los componentes que ocuparemos para la resolucion de este problema ingresamores a [GCP](https://console.cloud.google.com) al ingresar al portal levantaremos el mantenedor que nos permitira crear nuestro proyecto ![Creando Proyecto GCP](https://i.imgur.com/5Ky3Hg1.png) le damos a la opcion PROYECTO NUEVO y completamos los campos solicitados nuestro proyecto tendra el nombre de **examen-big-data-nisepulvedaa** ![Nuevo proyecto](https://i.imgur.com/7L8YcS1.png) al presionar en crear podemos ver que nuestro proyecto fue creado con exito: ![Proyecto creado](https://i.imgur.com/dOHs1jl.png) ## 3.1 Habilitando Apis de los servicios a utilizar como lo vimos inicialmente en nuesto diagrama los servicios que utilizaremos de gcp seran - [Pub/Sub](https://cloud.google.com/pubsub) - [Dataflow](https://cloud.google.com/dataflow) - [BigQuery](https://cloud.google.com/bigquery) - [BigTable](https://cloud.google.com/bigtable) para utilizarlos debemos activar las apis que nos daran acceso a sus funciones para eso ingresaremos al siguiente enlace: [habilitar las APIs de los servicios](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,cloudresourcemanager.googleapis.com,cloudscheduler.googleapis.com&_ga=2.208850066.1081723947.1654792051-1667480672.1652712244&_gac=1.124380536.1654500239.CjwKCAjwy_aUBhACEiwA2IHHQLtCU6BbEpzn1zUwEwjOGejMRqjjmsPe7N-8OuB1jO8eTd3yGGA6mBoCQwYQAvD_BwE). al ingresar veremos lo siguiente: ![Apis google cloud](https://i.imgur.com/WA5XBCO.png) le damos a siguiente y nos aparecera el listado de apis que habilitaremos ![listado de apis a habilitar](https://i.imgur.com/SsL3WUa.png) **importante debemos asegurarnos que estamos en el proyecto que acabamos de crear si no no podremos utilizar los servicios desde nuestro proyecto** presionamos en habilitar y veremos lo siguiente: ![Habilitando Apis](https://i.imgur.com/1Wuk0Y7.png) Cuanda se hayan habilitado todas las APIS veremos lo siguiente: ![Finalizado](https://i.imgur.com/ebvyQqf.png) ## 3.2 Creando Cuenta de Servicios para el proyecto Ingresaremos al servicio de *IAM y administración* ![administracion](https://i.imgur.com/ktwRBSF.png) luego ingresamos a la opción Cuentas de servicios ![Cuenta de servicio](https://i.imgur.com/GOzV2aC.png) crearemos una nueva cuenta de servicio presionamos en la opción crear cuenta de servicios y nos aparecera los siguiente: nuestra cuenta de servicio se llamara **service-account-twitter-nisepulvedaa** ![](https://i.imgur.com/MD40hl6.png) le damos en crear y continuar ![](https://i.imgur.com/MNXU7yn.png) le damos en listo ![](https://i.imgur.com/bz5jfO4.png) como podemos ver en la imagen nuestra cuenta fue creada con exito posterior a esto crearemos nuestras llaves esto lo haremos de la siguiente manera: ![](https://i.imgur.com/RJqiCXo.png) se nos cargara la siguiente ventana ![](https://i.imgur.com/31b1jwJ.png) seleccionamos lo siguiente: ![](https://i.imgur.com/NCt7PTW.png) como podemos ver nuestra llave se creo correctamente. **importante al ejecutar este paso de forma por defecto se descargara en tu equipo un archivo con las credenciales para conectarese a gcp cloud** ![](https://i.imgur.com/LeUCBjH.png) **La cuenta de servicio nos permitirá interactuar con GCP desde fuera de la plataforma.** finalmente nos volvemos a dirigir a al servicio de *IAM y administración* y a nuestra cuenta de servicios le agregamos los permisos de editor ![](https://i.imgur.com/tlOwcLa.png) ## 4. GCP Pub/sub service - Dentro de Nuestro Proyecto, entramos al servicio *Pub/sub*. ![](https://i.imgur.com/OI98Hi3.png) - Hacemos click en **Crear tema** e ingrese el ID del tema. nuestro tema se llamara **examen-bigdata-nisepulvedaa** ![](https://i.imgur.com/L2ZmZzW.png) Una vez finalizada esta accion nos dirigiremos a nuestro archivo .env y agregaremos las siguientes variables: ``` BEARER_TOKEN = "XXXXX" PROJECT_ID = "examen-big-data-nisepulvedaa" TOPIC_ID = "examen-big-data-nisepulvedaa-topic" ``` ![](https://i.imgur.com/TbqEnW1.png) ## 5. Extraer datos de Twitter La librería ```tweepy``` de Python, facilita las funciones para conectarse y extraer datos de la [API de twitter](https://developer.twitter.com/en/docs). Mientras, el módulo [google.cloud.pubsub](https://cloud.google.com/python/docs/reference/pubsub/latest) de la librería google facilita la conexión y permite publicar datos al tópico del servicio pub/sub. esta funciona la modificaremos para obtener los twitter de la carrera de formula 1 de este domingo 30-10-2022 correspodiente al Gran Premio de Mexico que utiliza su hashtag #MexicoGP ```python import json import os import tweepy from time import sleep from concurrent import futures from google.cloud import pubsub_v1 from dotenv import load_dotenv load_dotenv(".env") # dependiendo del sistema operativo se guarda como .env.txt # Max messages to gcp pubsub MAX_MESSAGES = 100 # Configure the batch to publish as soon as there are 10 messages # or 1 KiB of data, or 1 second has passed. batch_settings = pubsub_v1.types.BatchSettings( max_messages=MAX_MESSAGES, # default 100 max_bytes=1024, # default 1 MB max_latency=10, # default 10 ms ) # Resolve the publish future in a separate thread. def callback(future: pubsub_v1.publisher.futures.Future) -> None: message_id = future.result() print(message_id) def extract_data(max_results): Bearer_token = os.environ["BEARER_TOKEN"] project_id = os.environ["PROJECT_ID"] topic_name = os.environ["TOPIC_ID"] publish_client = pubsub_v1.PublisherClient() topic_path = F'projects/{project_id}/topics/{topic_name}' publish_futures = [] client = tweepy.Client(bearer_token=Bearer_token) query = '#MexicoGP has:hashtags has:media lang:es' tweets = list(client.search_recent_tweets(query=query, tweet_fields="id,text,created_at,author_id,public_metrics", max_results=max_results)) #print(tweets) for each_tweet in tweets[0]: #print(each_tweet.public_metrics['retweet_count']) tweets_json = {'id':'{}'.format(each_tweet.id),'author_id':'{}'.format(each_tweet.author_id), 'text':'{}'.format(each_tweet.text),'created_at':'{}'.format(each_tweet.created_at),'retweet_count':'{}'.format(each_tweet.public_metrics['retweet_count'])} # = {'id':'{}'.format(each_tweet.id),'author_id':'{}'.format(each_tweet.author_id), 'text':'{}'.format(each_tweet.text),'created_at':'{}'.format(each_tweet.created_at)} #print(json.dumps(tweets_json)) #tweets_json["created_at"] = fecha #tweets_json["public_metrics.retweet_count"] = each_tweet.retweet_count publish_future = publish_client.publish(topic_path, data=json.dumps(tweets_json).encode("utf-8")) #print(publish_future) # Non-blocking. Allow the publisher client to batch multiple messages. publish_future.add_done_callback(callback) publish_futures.append(publish_future) #print(publish_futures) res = futures.wait(publish_futures , return_when=futures.ALL_COMPLETED) return res if __name__ == '__main__': while True: tweets_json_batch = extract_data(max_results=MAX_MESSAGES) sleep(5) ``` una vez configurado nuestro script de python lo ejecutamos de la siguiente manera: como puden ver en nuestra ruta de trabajo tenemos el archivo con las variables de entorno, el archivo con las conexiones a gcp cloud y el archivo python que nos permitira bajar la info **antes de ejecutar el script setiamos la variable de entorno con el siguiente comando:** este archivo es el que se descargo de forma automatica en el punto **3.2** ``` SET GOOGLE_APPLICATION_CREDENTIALS=examen-big-data-nisepulvedaa-636ecfe5a04f.json ``` ![](https://i.imgur.com/2HJB5up.png) teniendo esto listo ejecutamos el script con el siguiente comando ![](https://i.imgur.com/S7Ulxnh.png) si bien el codigo nos da un error se envia correctamente los datos a nuestro topico creado en gcp ![](https://i.imgur.com/AJLMHjE.png) ## 6. Google Cloud Storage service a continuacion nos dirigeremos a *Google Cloud Storage* para crear nuestro **Bucket** eso lo realizaremos de la siguiente manera: ![](https://i.imgur.com/ZVQkg5b.png) - una vez en *Google Cloud Storage* seleccionamos la opcion crear buckets ![](https://i.imgur.com/bSngPN2.png) - a nuestro bucket le asignaremos el nombre **examen-big-data-nisepulvedaa-bucket** ![](https://i.imgur.com/7jqYeGu.png) - eleguimos donde almaceneramos nuestra información ![](https://i.imgur.com/W0Dc8K8.png) - eleguimos de que forma almacenaremos nuestros datos ![](https://i.imgur.com/IaU1a0e.png) y las otras dos opciones restantes quedaran en formato standard por lo que creamos nuestro bucket ![](https://i.imgur.com/nfMb6WR.png) ![](https://i.imgur.com/ktm978S.png) una vez creado nuestro bucket crearemos dos carpetas dentro de el que seran `temp/` y `raw_data/`. para eso seleccionamos la opcion **CREAR CARPETA** ![](https://i.imgur.com/aw8jaDu.png) ![](https://i.imgur.com/HVliqTj.png) - una vez creadas estas dos carptas nuestro bucket se deberia ver de la siguiente forma: ![](https://i.imgur.com/X96qf40.png) ## 7. GCP BigQuery service a continuacion crearemos nuestras tablas de datos en big query para eso nos digiremos a la siguiente ruta ![](https://i.imgur.com/NWoAiA0.png) - dentro de big query partiremos creando nuestro conjunto de datos seleccinando lo siguiente ![](https://i.imgur.com/aAtdjBP.png) - completamos la información requerida y le damos a crear conjunto de datos ![](https://i.imgur.com/vg49ezL.png) - una vez creado nuestro conjunto de datos se vera de la siguiente manera ![](https://i.imgur.com/raIOcUj.png) a continuación creamos nuestra tabla: ![](https://i.imgur.com/dEFvBZi.png) - nuestra tabla poseera la siguiente estructura: y la llameremos **tabla_examen_big_data_nisepulvedaa_gp_mexico** ![](https://i.imgur.com/VquAyfv.png) ``` [ { "mode":"NULLABLE", "name":"id", "type": "STRING" }, { "mode":"NULLABLE", "name":"author_id", "type": "STRING" }, { "mode":"NULLABLE", "name":"text", "type": "STRING" }, { "mode":"NULLABLE", "name":"created_at", "type": "STRING" }, { "mode":"NULLABLE", "name":"retweet_count", "type": "STRING" } ] ``` una vez nuesta tabla creada correctamente veremos lo siguiente: ![](https://i.imgur.com/CPJgIwl.png) en este punto lo unico que nos faltaria seria ejecutar nuestro pipeline para nuestro runner ocuparemos *DataFlow* ## 8. Dataflow para dirigirnos a *DataFlow* lo haremos de la siguiente manera: ![](https://i.imgur.com/7NWcIwh.png) aca tenemos dos grandes tareas primero generar un *Pipeline para BigQuery* y otro para *Pipeline para Google Cloud Storage* desde el siguiente menu: ![](https://i.imgur.com/VLmP1dX.png) ### Pipeline para BigQuery a nuestro job lo llamaremos **examen_big_data_nisepulvedaa_dataflow_job_to_big_query** otra cosa importante aca es que la plantilla que ocuparemos sera la que dice **Pub/Sub topic to BigQuery** ![](https://i.imgur.com/mLm6hBU.png) a continuacion se nos solicitaran los siguientes parametros: - El tema del servicio *Pub/sub* se ingresa siguiendo la siguiente estructura: `projects/<PROJECT_ID/topics/<TOPIC_ID>` - La tabla de BigQuery se ingresa siguiendo la siguiente estructura: `<PROJECT_ID>:<nombre-del-conjunto>.<nombre-de-la-tabla> - Los datos temporales del Pipeline se almacena en Google Cloud Storage por default. Se ingresa de la siguiente estructura: - `gs://<BUCKET_NAME>/temp/` **Proyecto:** projects/examen-big-data-nisepulvedaa/topics/examen-big-data-nisepulvedaa-topic **BigQuery OUTPUT TABLE:** examen-big-data-nisepulvedaa:examen-big-data-nisepulvedaa.conjunto_examen_bita_nisepulvedaa_twitter.tabla_examen_big_data_nisepulvedaa_gp_mexico **Ubicacion Temporal:** gs://examen-big-data-nisepulvedaa-bucket/temp completada esta información le damos a la opcion Ejecutar Trabajo ![](https://i.imgur.com/YGXEOIA.png) veremos nuestro trabajo creado correctamente ![](https://i.imgur.com/zsqX5M5.png) ### Pipeline para Google Cloud Storage para realizar esta accion ingresaremos al siguiente menu: ![](https://i.imgur.com/3KuHZlT.png) a nuestro job lo llamaremos **examen_big_data_nisepulvedaa_dataflow_job_to_cloud_storage** otra cosa importante aca es que la plantilla que ocuparemos sera la que dice ** Pub/Sub to Text Files on Cloud Storage ** ![](https://i.imgur.com/Bs4DuBf.png) ![](https://i.imgur.com/amNdmcE.png) - Abra el servicio de *Dataflow* y haga click en **Crear trabajo desde plantilla**. - En la sección de **plantilla de Dataflow**, seleccione la plantilla **Pub/sub to GCS**. - El tema del servicio *Pub/sub* se ingresa con la siguiente estructura: `projects/<PROJECT_ID/topics/<TOPIC_ID>` - El bucket de destino en GCS se ingresa con la siguiente estructura: `gs://<BUCKET_NAME>/` finalmente nuestro Pipelines Creados los podemos ver de la siguiente manera: ![](https://i.imgur.com/wapZ5Qy.png) **dentro del transcurso de tiempo en que hacia este tutorial pasaba que tenia mis dos Pipelines corriendo pero no lograba ver la informacion ni en *BigQuery* ni en *CloudStorage* asique parti todo desde 0 ejecutando este paso a paso y me di cuenta de esto** ![](https://i.imgur.com/k2RjPUL.png) **la tabla que habia creado para almacenar mi informacioon tenia un formato erroneo por lo que al aplicar la correcion todo se soluciono.** <!-- ## License [MIT](https://choosealicense.com/licenses/mit/) --> Finalmente ejecutamos nuestro codigo python ahora obtiendo 100 twits por ejecución: ![](https://i.imgur.com/TpIL0J1.png) verificamos si se guardaron estos registros en big query ![](https://i.imgur.com/wXIXpmh.png) podemos verificar tambien que la informacion en bruto se ha guardado en *Google Cloud Storage* ![](https://i.imgur.com/iNmbz37.png) si revisamos uno de los archivos podemos ver que tenemos en nuestro contenido toda la informacion ![](https://i.imgur.com/xZkRaPY.png) para validar buscaremos el id **1587852337107828737** que aperce en nuestro archivo en **BigQuery** para asegurarnos de que nuestro flujo funciona correctamente ![](https://i.imgur.com/PLKeyXY.png) como podemos ver tenemos el registro guardado en BigQuery **por lo revisado el codigo de python lo que hace es que baja 10 o 100 twets pero cada vez que se ejecuta baja los mismos 100 por lo que movimos algunas cosas para traernos mas info agredandole parametros al script** ![](https://i.imgur.com/Zv55Boz.png) - finalmente buscamos los 10 twits mas retuitiados con la siguiente consulta: ![](https://i.imgur.com/HgZWadX.png) ## 9. Google Cloud Princing Calculator finalmente realizamos la estimación de los costos asociados a tener funcionando un sistema de esta magnitud con Google Cloud Pricing Calculator accedemos al siguiente link: https://cloud.google.com/google/calculadora al ingresar al enlace veremos lo siguiente: ![](https://i.imgur.com/1WW7FD6.png) * identificamos el servicio de Pub/Sub y realizamos la estimación ![](https://i.imgur.com/nwCHmzS.png) lo que nos da un resultado de: ![](https://i.imgur.com/L5VQKE2.png) * identificamos el servicio de Cloud Storage y realizamos la estimación ![](https://i.imgur.com/v8Xp4Tf.png) lo que nos da un resultado de: ![](https://i.imgur.com/gQTtKGF.png) * posterior a esto identificamos el servicio de Big Query y realizamos la estimación ![](https://i.imgur.com/NmOshbB.png) lo que nos da un resultado de: ![](https://i.imgur.com/kmkFGEo.png) * identificamos el servicio de DataFlow y realizamos la estimación: ![](https://i.imgur.com/K0GG74S.png) lo que nos da un resultado de: ![](https://i.imgur.com/PxS0coI.png)