# basura ``` def query_for_message_ids(service, search_query): """ searching for an e-mail (Supports the same query format as the Gmail search box. For example, "from:someuser@example.com rfc822msgid:<somemsgid@example.com> is:unread") """ result = service.messages().list(userId='me', q=search_query).execute() results = result.get('messages') if results: msg_ids = [r['id'] for r in results] else: msg_ids = [] #print(msg_ids) return msg_ids def _get_attachment_data(service, messageId, attachmentId): att = service.messages().attachments().get(userId='me', id=attachmentId, messageId=messageId).execute() #print (att['data']) return att['data'] def _get_attachment_from_part(service, messageId, part): body = part.get('body') data = body.get('data') attachmentId = body.get('attachmentId') if data: #print(data) return data if attachmentId: return _get_attachment_data(service, messageId, attachmentId) def transformFile(**kwargs) : import pandas as pd source_bucket = kwargs['dag_run'].conf['bucket'] source_object = kwargs['dag_run'].conf['name'] completion_ds = kwargs['ds_nodash'] tabs = pd.ExcelFile("gcs://{source_bucket}/{source_object}").sheet_names pi_type = tabs[1] sheet = tabs[0] file = pd.read_excel("gcs://{source_bucket}/{source_object}", sheet, usecols="A", header=2) prov_name = file.columns.values[0] target_object = os.path.join(prov_name, pi_type, completion_ds, source_object,) return target_object ``` ## Hook ``` from hooks.GmailDiscoveryServiceHook import GoogleCloudBaseHook #from airflow.providers.google.common.hooks.base_google import GoogleBaseHook import json import os from typing import Optional from googleapiclient.discovery import build from google.oauth2 import Credentials #from googleapiclient.errors import Error,HttpError class GmailDiscoveryServiceHook(GoogleCloudBaseHook): client = None serviceName = "gmail" version = "v1" def __init__(self, conn_id = None, delegate_to = None): super().__init__(conn_id,delegate_to) self.conn_id = self.get_connection("google_gmail") self.extras = self.conn_id.extra_dejson self.KEY_FILE_LOC = self.extras.get("extra__gmail_platform__key_path") self.SCOPES = self.extras.get("extra__gmail__scope") #use this as fdlat text in scropes on Airflow should be a tuple ('X', 'Y',) #'https://www.googleapis.com/auth/gmail.readonly', #'https://www.googleapis.com/auth/gmail.modify', def get_conn(self): if not self.client: #from google.oauth2 import Credentials credentials = Credentials.from_authorized_user_file(filename = self.KEY_FILE_LOC, scopes = self.SCOPES) #from googleapiclient.errors import Error self.client = build(self.serviceName, self.version, http=None, credentials = credentials,) return self.client #check if not need to retiurn slf.client.users() instead ``` Operator ``` from email.headerregistry import MessageIDHeader from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from hooks.GmailDiscoveryServiceHook import GmailDiscoveryServiceHook from airflow.providers.google.common.hooks.base_google import GoogleBaseHook import os from os import environ from datetime import timedelta import sys import string import pandas as pd from google_api import gmail from googleapiclient.discovery import build CSV__TYPE = 'text/csv' XLSX__TYPE = 'application/vnd.openxmlformats-officedocument.spreadsheetml' def mime_type_to_dtype(s): if s == CSV__TYPE: return 'csv' if s == XLSX__TYPE: return 'xlsx' raise AssertionError("mime type not accepted") class ExtractAttachment(BaseOperator): """ Extract and compare provider ID and pitype data from Gmail """ @apply_defaults def __init__(self, inbox_name, *args, **kwargs): super(ExtractAttachment, self).__init__(*args, **kwargs) self.inbox_name = inbox_name def __get_service_gmail(self): """token is hardcoded in hook""" hook = GmailDiscoveryServiceHook('google_gmail') service = hook.get_conn() return service.users() def __get_email_messages_ids(self,service, e_query='INBOX'): """Get email messages from INBOX""" result = service.messages().list(userId='me', q=e_query).execute() results = result.get('messages') if results: msg_ids = [r['id'] for r in results] else: msg_ids = [] return msg_ids def __get_data_attachment(self, service, messageId, attachtmentId): """Get email attachment""" att = service.messages().attachments().get(userId='me', id=attachtmentId, messageId=messageId).execute() return att['data'] def __get_attachment_from_part(self, service, messageId, part): """Get part of attachment""" body = part.get('body') data = body.get('data') attachmentId = body.get('attachmentId') if data: return data if attachmentId: return __get_data_attachment(service, messageId, attachmentId) def _convert_attachment_data_to_dataframe(self,data, data_type): str_decoded = base64.urlsafe_b64decode(data.encode('UTF-8')) if data_type == 'csv': df = pd.read_csv(StringIO(str_decoded)) elif data_type == 'xlsx': df = pd.read_excel(StringIO(str_decoded)) return df def _flatten_nested_email_parts(self, parts): all_parts = [] for p in parts: if p.get('parts'): all_parts.extend(p.get('parts')) else: all_parts.append(p) return all_parts def __get_csv_or_xl_attachments_from_msg_id(self, service, messageId): """returns a dict of all CSV attachments as pd.DataFrames in the email associated with `messageId`. The keys for the dictionary are the csv filenames """ msg = service.messages().get(userId='me', id=messageId).execute() msg_parts = msg.get('payload').get('parts') headers = msg.get('payload').get('headers') subject = [h['value'] for h in headers if h['name']=='Subject'][0] if not msg_parts: return [] msg_parts = self._flatten_nested_email_parts(msg_parts) att_parts = [p for p in msg_parts if p['mimeType'] in [ CSV__TYPE, XLSX__TYPE]] types = [mime_type_to_dtype(p['mimeType']) for p in att_parts] filenames = [p['filename'] for p in att_parts] datas = [self.__get_attachment_from_part(service, messageId, p) for p in att_parts] dfs = [self._convert_attachment_data_to_dataframe(d, t) for d, t in zip(datas, types)] return [{'emailsubject': subject, 'filename': f, 'data': d} for f, d in zip(filenames, dfs)] def execute(self): service = self.__get_service_gmail() messageId = self.__get_email_messages_ids(service, e_query='INBOX') self.__get_csv_or_xl_attachments_from_msg_id(service, messageId) ``` https://towardsdatascience.com/data-engineering-how-to-build-a-gmail-data-pipeline-on-apache-airflow-ce2cfd1f9282