# basura
```
def query_for_message_ids(service, search_query):
"""
searching for an e-mail (Supports the same query format as the Gmail search box.
For example, "from:someuser@example.com rfc822msgid:<somemsgid@example.com>
is:unread")
"""
result = service.messages().list(userId='me', q=search_query).execute()
results = result.get('messages')
if results:
msg_ids = [r['id'] for r in results]
else:
msg_ids = []
#print(msg_ids)
return msg_ids
def _get_attachment_data(service, messageId, attachmentId):
att = service.messages().attachments().get(userId='me', id=attachmentId, messageId=messageId).execute()
#print (att['data'])
return att['data']
def _get_attachment_from_part(service, messageId, part):
body = part.get('body')
data = body.get('data')
attachmentId = body.get('attachmentId')
if data:
#print(data)
return data
if attachmentId:
return _get_attachment_data(service, messageId, attachmentId)
def transformFile(**kwargs) :
import pandas as pd
source_bucket = kwargs['dag_run'].conf['bucket']
source_object = kwargs['dag_run'].conf['name']
completion_ds = kwargs['ds_nodash']
tabs = pd.ExcelFile("gcs://{source_bucket}/{source_object}").sheet_names
pi_type = tabs[1]
sheet = tabs[0]
file = pd.read_excel("gcs://{source_bucket}/{source_object}", sheet, usecols="A", header=2)
prov_name = file.columns.values[0]
target_object = os.path.join(prov_name, pi_type, completion_ds, source_object,)
return target_object
```
## Hook
```
from hooks.GmailDiscoveryServiceHook import GoogleCloudBaseHook
#from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
import json
import os
from typing import Optional
from googleapiclient.discovery import build
from google.oauth2 import Credentials
#from googleapiclient.errors import Error,HttpError
class GmailDiscoveryServiceHook(GoogleCloudBaseHook):
client = None
serviceName = "gmail"
version = "v1"
def __init__(self, conn_id = None, delegate_to = None):
super().__init__(conn_id,delegate_to)
self.conn_id = self.get_connection("google_gmail")
self.extras = self.conn_id.extra_dejson
self.KEY_FILE_LOC = self.extras.get("extra__gmail_platform__key_path")
self.SCOPES = self.extras.get("extra__gmail__scope")
#use this as fdlat text in scropes on Airflow should be a tuple ('X', 'Y',)
#'https://www.googleapis.com/auth/gmail.readonly',
#'https://www.googleapis.com/auth/gmail.modify',
def get_conn(self):
if not self.client:
#from google.oauth2 import Credentials
credentials = Credentials.from_authorized_user_file(filename = self.KEY_FILE_LOC, scopes = self.SCOPES)
#from googleapiclient.errors import Error
self.client = build(self.serviceName, self.version, http=None, credentials = credentials,)
return self.client #check if not need to retiurn slf.client.users() instead
```
Operator
```
from email.headerregistry import MessageIDHeader
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.GmailDiscoveryServiceHook import GmailDiscoveryServiceHook
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
import os
from os import environ
from datetime import timedelta
import sys
import string
import pandas as pd
from google_api import gmail
from googleapiclient.discovery import build
CSV__TYPE = 'text/csv'
XLSX__TYPE = 'application/vnd.openxmlformats-officedocument.spreadsheetml'
def mime_type_to_dtype(s):
if s == CSV__TYPE:
return 'csv'
if s == XLSX__TYPE:
return 'xlsx'
raise AssertionError("mime type not accepted")
class ExtractAttachment(BaseOperator):
"""
Extract and compare provider ID and pitype data from Gmail
"""
@apply_defaults
def __init__(self, inbox_name, *args, **kwargs):
super(ExtractAttachment, self).__init__(*args, **kwargs)
self.inbox_name = inbox_name
def __get_service_gmail(self):
"""token is hardcoded in hook"""
hook = GmailDiscoveryServiceHook('google_gmail')
service = hook.get_conn()
return service.users()
def __get_email_messages_ids(self,service, e_query='INBOX'):
"""Get email messages from INBOX"""
result = service.messages().list(userId='me', q=e_query).execute()
results = result.get('messages')
if results:
msg_ids = [r['id'] for r in results]
else:
msg_ids = []
return msg_ids
def __get_data_attachment(self, service, messageId, attachtmentId):
"""Get email attachment"""
att = service.messages().attachments().get(userId='me', id=attachtmentId, messageId=messageId).execute()
return att['data']
def __get_attachment_from_part(self, service, messageId, part):
"""Get part of attachment"""
body = part.get('body')
data = body.get('data')
attachmentId = body.get('attachmentId')
if data:
return data
if attachmentId:
return __get_data_attachment(service, messageId, attachmentId)
def _convert_attachment_data_to_dataframe(self,data, data_type):
str_decoded = base64.urlsafe_b64decode(data.encode('UTF-8'))
if data_type == 'csv':
df = pd.read_csv(StringIO(str_decoded))
elif data_type == 'xlsx':
df = pd.read_excel(StringIO(str_decoded))
return df
def _flatten_nested_email_parts(self, parts):
all_parts = []
for p in parts:
if p.get('parts'):
all_parts.extend(p.get('parts'))
else:
all_parts.append(p)
return all_parts
def __get_csv_or_xl_attachments_from_msg_id(self, service, messageId):
"""returns a dict of all CSV attachments as pd.DataFrames
in the email associated with `messageId`. The keys for the
dictionary are the csv filenames
"""
msg = service.messages().get(userId='me', id=messageId).execute()
msg_parts = msg.get('payload').get('parts')
headers = msg.get('payload').get('headers')
subject = [h['value'] for h in headers if h['name']=='Subject'][0]
if not msg_parts:
return []
msg_parts = self._flatten_nested_email_parts(msg_parts)
att_parts = [p for p in msg_parts if p['mimeType'] in [
CSV__TYPE, XLSX__TYPE]]
types = [mime_type_to_dtype(p['mimeType']) for p in att_parts]
filenames = [p['filename'] for p in att_parts]
datas = [self.__get_attachment_from_part(service, messageId, p) for p in att_parts]
dfs = [self._convert_attachment_data_to_dataframe(d, t)
for d, t in zip(datas, types)]
return [{'emailsubject': subject, 'filename': f, 'data': d}
for f, d in zip(filenames, dfs)]
def execute(self):
service = self.__get_service_gmail()
messageId = self.__get_email_messages_ids(service, e_query='INBOX')
self.__get_csv_or_xl_attachments_from_msg_id(service, messageId)
```
https://towardsdatascience.com/data-engineering-how-to-build-a-gmail-data-pipeline-on-apache-airflow-ce2cfd1f9282