# PIPELINE Work
# **Gmail Pipeline Customer**
<hr style="border:2px solid gray">
[Official Documentation](https://cloud.google.com/composer/docs/composer-2/how-to)
[Composer-Airflow Repos](https://github.com/GoogleCloudPlatform/composer-airflow/)
| **Documentation** | **Client** |
| :-------------------------: | :--------: |
| Google Center of Excellence | Customer |
| Author | rayescobar |
## Table of Contents
<hr style="border:2px solid green">
[TOC]
## Requirements
<hr style="border:2px solid red">
Google Cloud API's:
- [x] Gmail API
- [x] Cloud Composer API
- [x] Cloud Firestore API
- [x] Kubernetes Engine API
- [x] Cloud Datastore API
- [x] Cloud Functions API
- [x] Cloud Pub/Sub API
- [x] Cloud Storage API
Google Cloud Functions:
- **Runtime service account: SA account that Service account that the function will assume as its identity,** json key.
- google Cloud DAtastore create/read/write permissions
- OAuth 2.0 Client IDs json keys:
- gmailclient
- Authorized redirect URIs: `https://developers.google.com/oauthplayground`
- gmailwatch
- Authorized JavaScript origins: `https://PROJECT_ID.cloudfunctions.net`
- Authorized redirect URIs: `https://PROJECT_ID.cloudfunctions.net/auth_callback`
- OAuth consent screen:
- gmail
- Authorized domains: `PROJECT_ID.cloudfunctions.net`
- Google Cloud Pubsub:
- add gmail-api-push@system.gserviceaccount.com to pubsub topic name`gmailwatcher` with `publish` permissions

Google Cloud Composer:
- google-api-python-client
- google-auth-httplib2
- google-auth-oauthlib

## Airflow compositon
<hr style="border:2px solid gold">
List of file that composer the whole flow for gmail processing:
- Google Cloud Funtions :
- auth_callback
- env_vars.yaml
- index.js
- package.json
- auth_init
- env_vars.yaml
- index.js
- package.json
- GmailDAGTrigger
- main.py
- composer2_airflow_rest_api.py
- requirements.txt
- watchGmailMessages
- env_vars.yaml
- index.js
- package.json
- Google Composer:
- dag
- gmailDAG.py
- operators
- gmail_operator.py
- parse_ma_json.py
- gmail_callbacks.py
- config.py
- plugins
- plugin_names.py
<details open>
<summary>Code for Google Cloud functions </summary></p>
### auth_init & auth_callback
<hr style="border:2px solid gold">
#### index.js
---
```javascript=!
// express-oauth is a Google-provided, open-source package that helps automate
// the authorization process.
const Auth = require('@google-cloud/express-oauth2-handlers');
// googleapis is the official Google Node.js client library for a number of
// Google APIs, including Gmail.
const {google} = require('googleapis');
const gmail = google.gmail('v1');
// Specify the access scopes required. If authorized, Google will grant your
// registered OAuth client access to your profile, email address, and data in
// Gmail.
const requiredScopes = [
'profile',
'email',
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.readonly'
];
const auth = Auth('datastore', requiredScopes, 'email', true);
const GCP_PROJECT = process.env.GCP_PROJECT;
const PUBSUB_TOPIC = process.env.PUBSUB_TOPIC;
// Call the Gmail API (Users.watch) to set up Gmail push notifications.
// Gmail will send a notification to the specified Cloud Pub/Sun topic
// every time a new mail arrives in inbox. Making filter of label 'UNREAD' actions
const setUpGmailPushNotifications = (email, pubsubTopic) => {
return gmail.users.watch({
userId: email,
requestBody: {
labelIds: ['UNREAD'],
labelFilterAction: "include",
topicName: `projects/${GCP_PROJECT}/topics/${pubsubTopic}`
}
});
};
// If the authorization process completes successfully, set up Gmail push
// notification using the tokens returned
const onSuccess = async (req, res) => {
let email;
try {
// Set up the googleapis library to use the returned tokens.
email = await auth.auth.authedUser.getUserId(req, res);
const OAuth2Client = await auth.auth.authedUser.getClient(req, res, email);
google.options({auth: OAuth2Client});
} catch (err) {
console.log(err);
throw err;
}
try {
await setUpGmailPushNotifications(email, PUBSUB_TOPIC);
} catch (err) {
console.log(err);
if (!err.toString().includes('one user push notification client allowed per developer')) {
throw err;
}
}
res.send(`Successfully set up Gmail push notifications.`);
};
// If the authorization process fails, return an error message.
const onFailure = (err, req, res) => {
console.log(err);
// res.send(`An error has occurred in the authorization process.`);
res.send(JSON.stringify(err))
};
// Export the Cloud Functions for authorization.
exports.auth_init = auth.routes.init;
exports.auth_callback = auth.routes.cb(onSuccess, onFailure);
````
#### env_vars.yaml
<hr style="border:2px solid gold">
```yaml=!
GOOGLE_CLIENT_ID: #################.apps.googleusercontent.com
GOOGLE_CLIENT_SECRET: #################
GOOGLE_CALLBACK_URL: ##################
PUBSUB_TOPIC: gmailwatcher
````
#### package.json
<hr style="border:2px solid gold">
```json=!
{
"name": "gcf-gmail-auth",
"version": "1.0.0",
"description": "",
"main": "index.js",
"repository": {
"type": "git",
"url": ""
},
"author": "",
"license": "Apache-2.0",
"dependencies": {
"@google-cloud/express-oauth2-handlers": "^0.1.2",
"express": "^4.16.4",
"googleapis": "^37.2.0"
}
}
```
### GmailDAGTrigger
<hr style="border:2px solid gold">
#### main.py
```python=!
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""
from typing import Any
import composer2_airflow_rest_api
def trigger_dag_gcf(data, context=None):
"""
Trigger a DAG and pass event data.
Args:
data: A dictionary containing the data for the event. Its format depends
on the event.
context: The context object for the event.
For more information about the arguments, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters
"""
# TODO(developer): replace with your values
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
web_server_url = (
"COMPOSER_URL"
)
# Replace with the ID of the DAG that you want to run.
dag_id = 'Gmail_Triggered'
composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
```
#### composer2_airflow_rest_api.py
```python=!
from typing import Any
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer2_web_server_request(url: str, method: str = "GET", **kwargs: Any) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 2 environment's web server.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
"""
Make a request to trigger a dag using the stable Airflow 2 REST API.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 2 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v1/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data}
response = make_composer2_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
```
#### requirements.txt
<hr style="border:2px solid gold">
```shell!
# Function dependencies, for example:
# package>=version
google-auth==2.6.2
requests==2.27.1
```
---
## watchGmailMessages
<hr style="border:2px solid gold">
#### index.js
---
```javascript=!
const Auth = require('@google-cloud/express-oauth2-handlers');
const {Datastore} = require('@google-cloud/datastore');
const {google} = require('googleapis');
const gmail = google.gmail('v1');
const datastoreClient = new Datastore();
const requiredScopes = [
'profile',
'email',
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.readonly'
];
const auth = Auth('datastore', requiredScopes, 'email', true);
const checkForDuplicateNotifications = async (messageId) => {
//Get non duplicate most recent messageId and store in datstore
const transaction = datastoreClient.transaction();
await transaction.run();
const messageKey = datastoreClient.key(['emailNotifications', messageId]);
const [message] = await transaction.get(messageKey);
if (!message) {
await transaction.save({
key: messageKey,
data: {}
});
}
await transaction.commit();
if (!message) {
return messageId;
}
};
const getMostRecentMessageWithTag = async (email, historyId) => {
// Look up the most recent message.
const listMessagesRes = await gmail.users.messages.list({
userId: email,
maxResults: 1
});
const messageId = await checkForDuplicateNotifications(listMessagesRes.data.messages[0].id);
// Get the message using the message ID.
if (messageId) {
const message = await gmail.users.messages.get({
userId: email,
id: messageId
});
return message;
}
};
exports.watchGmailMessages = async (event) => {
// Decode the incoming Gmail push notification.
const data = Buffer.from(event.data, 'base64').toString();
const newMessageNotification = JSON.parse(data);
const email = newMessageNotification.emailAddress;
const historyId = newMessageNotification.historyId;
try {
await auth.auth.requireAuth(null, null, email);
} catch (err) {
console.log('An error has occurred in the auth process.');
throw err;
}
const authClient = await auth.auth.authedUser.getClient();
google.options({auth: authClient});
// Process the incoming message.
const message = await getMostRecentMessageWithTag(email, historyId);
if (message) {
const messageInfo = extractInfoFromMessage(message);
if (messageInfo.attachmentId && messageInfo.attachmentFilename) {
const attachment = await extractAttachmentFromMessage(email, messageInfo.messageId, messageInfo.attachmentId);
//const topLabels = await analyzeAttachment(attachment.data.data, messageInfo.attachmentFilename);
//await updateReferenceSheet(messageInfo.from, messageInfo.attachmentFilename, topLabels);
}
}
};
```
#### env_vars.yaml
```yaml=!
GOOGLE_CLIENT_ID: CLIENT-ID
GOOGLE_CLIENT_SECRET: CLIENT-SECRET
GOOGLE_CALLBACK_URL: CALLBACK-URL
```
#### package.json
```json=!
{
"name": "gcf-gmail-pubsub",
"version": "1.0.0",
"description": "customer gcf to get email history id.",
"main": "index.js",
},
"author": "Google Cloud",
"license": "Apache-2.0",
"dependencies": {
"@google-cloud/datastore": "^3.1.2",
"@google-cloud/express-oauth2-handlers": "^0.1.2",
"@google-cloud/vision": "^0.25.0",
"express": "^4.16.4",
"googleapis": "^37.2.0"
}
}
```
---
</p></details>
---
<details open>
<summary>Google Composer code </summary></p>
## Google Composer DAG
<hr style="border:2px solid gold">
#### gmailDAG.py
---
```python=!
from datetime import datetime, timedelta
from pprint import pprint
from airflow.decorators import dag,task
from airflow import DAG
from airflow.models.variable import Variable
from airflow import models
from airflow.models.connection import Connection
import os,sys,json
from operators.gmail_operator import ExtractAttachment
from operators.parse_ma_jsonj import Parserj
from airflow.operators.python import PythonOperator
PythonOperator.ui_color = '#a9c815'
from airflow.operators.dummy_operator import DummyOperator
DummyOperator.ui_color = '#001f3f'
from airflow.utils.trigger_rule import TriggerRule
from airflow.exceptions import AirflowException
from operators.gmail_callbacks import success_callback, failure_callback
from functools import partial
START_DATE = datetime(2022, 4, 1)
PROJECT_ID = Variable.get('gcp_project')
def \_\_catch_task(\*\*context):
"""catch the data send from Gmail Wathcher gcf """
data = str(context['dag_run'].conf['data'])
#pprint.pprint(data)
return data
def trigger_success_callback(context, callback_type):
"""definition of start/sucess event on task"""
success_callback(context, callback_type)
def failure_callback(context):
"""definition of failure event on task"""
failure_callback(context)
DEFAULT_DAG_ARGS = {
'owner': 'airflow',
'start_date' : START_DATE,
'depends_on_past': False,
'retries' : 1,
'retry_delay': timedelta(minutes=5),
'project_id' : PROJECT_ID,
'end_date': None,
'provide_context': True,
}
with models.DAG(
dag_id = 'Gmail_Triggered',
description = 'Gmail_Pipeline',
schedule_interval=None,
default_args=DEFAULT_DAG_ARGS,
render_template_as_native_obj=True,
) as dag:
begin = DummyOperator(
task_id='begin',
on_success_callback=partial(trigger_success_callback, callback_type='started'),
)
end = DummyOperator(
task_id = 'end',
trigger_rule=TriggerRule.NONE_FAILED
)
catch_data = PythonOperator(
#get data from trigger gcf as reference
task_id = 'catch_data',
python_callable = __catch_task,
)
decode_and_parse = Parserj(
#decode data to be useful
task_id = 'decode_and_parse',
data = catch_data.output,
)
attachment_move = ExtractAttachment(
#Main task to process email atachment
task_id='attachment_move',
data = "{{ task_instance.xcom_pull('decode_and_parse', key='return_value') }}",
on_success_callback=partial(trigger_success_callback, callback_type='success'),
on_failure_callback=failure_callback,
)
begin >> catch_data >> decode_and_parse >> attachment_move >> end
````
---
## Google Composer Custom Gmail Operator
<hr style="border:2px solid gold">
#### gmail_operator.py
---
```python=!
from email import message
from importlib.resources import path
from airflow.plugins_manager import AirflowPlugin
from email.headerregistry import MessageIDHeader
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
import os, json, sys, base64, re, string
from os import environ
from datetime import timedelta
from googleapiclient.discovery import build
from airflow.models.variable import Variable
import pprint
from httplib2 import Http
from google.cloud import datastore
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from oauth2client.client import GoogleCredentials
from oauth2client import client, GOOGLE_TOKEN_URI
from oauth2client.file import Storage
from google.cloud import storage
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from pathlib import Path, PurePath
from airflow.exceptions import AirflowException,AirflowFailException
from airflow import models
from operators.config import (
CLIENT_ID,
CLIENT_SECRET,
REFRESH_TOKEN,
PROJECT_ID,
BUCKET_RAW,
LOCAL_BUCKET,
USER_AGENT,
SERVICE,
SERVICE_VER,
ENTITY_KIND
)
class ExtractAttachment(BaseOperator):
template_fields=['data']
ui_color = '#a9c815'
"""
Extract attachment from supplier email
:template_fiels = data: Get this from Wathc email gcf
"""
def __init__(self, data, *args, **kwargs):
super(ExtractAttachment, self).__init__(*args, **kwargs)
self.data = data
def __getrMostRecent(self, service, messageid):
"""Get message using message ID
:param service: Use to create teh service call to API from __getrMostRecent
:param messageid: The most recent messageid from query the datastore where gcf wathGmailMessages store from historyid pubsub event
"""
self.service = service
self.message_id=messageid
#it could be the message in the format if we watn to filter them: "Rfc822msgid:" + self.message_id
#search_query= self.message_id+"AND"+"has:attachment"+"AND"+"("+"filetype:xlsx"+"OR"+"filetype:csv"+")"+"filename:asset_template_metadata.xlsx"+"is:unread"
#self.listMessages = self.service.users().messages().list(userId='me',q=search_query, maxResults=1).execute
self.mail = self.service.users().messages().get(userId = 'me', id = self.message_id).execute()
pprint.pprint(self.mail)
return self.mail
def __extract_email_attachment(self, messageid,mail):
"""
Extract info from email, get the attachment and save it
:param messageid: The most recent messageid from query the datastore where gcf wathGmailMessages store from historyid pubsub event
:param mail: most recent email mesage from the history as return from __getrMostRecent
"""
#print(mail['payload']['body']['data'])
#get metadate from email
heads=mail['payload']['headers']
for header in heads:
if header['name'] == 'Subject':
subject = header['value']
if header['name'] == 'From':
sender = header['value']
if header['name']=='Received':
dstamp = header['value']
if header['name'] == 'Message-ID':
MessageID = header['value']
dictlist = self.service.users().labels().list(userId='me').execute()
#pprint.pprint(dictlist)
dictlbls = dictlist.get('labels',[])
if ('PROCESSED' not in [lb['name'] for lb in dictlbls]):
label = {
"labelListVisibility": "labelShow",
"messageListVisibility": "show",
"name": "PROCESSED"
}
self.service.users().labels().create(userId='me',body=label).execute()
lbget=list(filter(lambda dictlbls : dictlbls['name'] == 'PROCESSED', dictlbls))
lbID=lbget[0]['id']
else:
lbget=list(filter(lambda dictlbls : dictlbls['name'] == 'PROCESSED', dictlbls))
lbID=lbget[0]['id']
#print(type(lbID))
pprint.pprint(lbID)
q = "Rfc822msgid:" + MessageID + " " + "has:attachment"
MESSAGE = self.service.users().messages().list(userId='me',labelIds = ['INBOX'] ,q=q).execute()
if MESSAGE['resultSizeEstimate'] != 0 :
if 'parts' in mail['payload']:
for part in mail['payload']['parts']:
if(part['filename'] and part['body'] and part['body']['attachmentId']):
attachment = self.service.users().messages().attachments().get(userId='me', messageId=messageid,id=part['body']['attachmentId']).execute()
bytesFile = base64.urlsafe_b64decode(attachment['data'])
if bytesFile[0:2] != b'PK':
raise AirflowFailException('File attachment is not an xlsx!!')
else:
filedata = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8'))
print(f"attachment name:{(part['filename'])}")
senderm = re.sub(r"[^\\s+\w]",'-', sender)
dir_local = f"/home/airflow/gcs/data/mail/{senderm}/"
dir_local_bucket = f"data/mail/{senderm}/{part['filename']}"
if os.path.exists(dir_local):
print('Directory exists')
#print(part['filename'])
else:
print('Directory has been created')
os.mkdir(dir_local)
path = os.path.join(dir_local, part['filename'])
with open(path, 'wb') as f:
f.write(filedata)
print(f"Written attachment to : {path}")
f.close()
hook = GCSHook('google_cloud_default')
dest_dir = os.path.join('mail',senderm, part['filename'])
hook.upload(BUCKET_RAW, dest_dir, path)
hook.delete(LOCAL_BUCKET, dir_local_bucket)
bodymail = {
'removeLabelIds': ['UNREAD'],
'addLabelIds':[lbID],
}
markmail=self.service.users().messages().modify(userId='me', id=messageid, body=bodymail).execute()
print("Mail attachment PROCESSED!")
pprint.pprint(markmail,indent=2)
else:
raise AirflowFailException('No file attachment on this email')
#useful metadata for logs or tracking
meta = [{'emailtstamp':dstamp,'sender':sender,'emailsubject': subject}]
pprint.pprint(meta)
return meta
def __get_recent_messageId(self):
"""
get the most recent mssageId from datastore
Query the datstore and get from key the str of the messagesid from most recent one
"""
client = datastore.Client(PROJECT_ID)
query = client.query(kind=ENTITY_KIND)
result = list(query.fetch())
entity = result[-1]
entitystr = str(entity)
regex= r"[^']*(?=(?:[']*\b['])*[^']*$)"
mid= re.search(regex, entitystr)
return mid.group()
def __make_creds(self):
"""
creation of credentials for authentication on Gmail API
for detail construct refer to https://developers.google.com/oauthplayground
"""
creds = client.OAuth2Credentials(
access_token = None,
client_id = CLIENT_ID,
client_secret = CLIENT_SECRET,
refresh_token = REFRESH_TOKEN,
token_expiry = None,
token_uri = GOOGLE_TOKEN_URI,
user_agent= USER_AGENT
)
creds.refresh(Http())
#print(creds.to_json())
http=creds.authorize(Http())
self.service = build(SERVICE, SERVICE_VER, http=http, cache_discovery=False)
return self.service
def execute(self, context):
self.service = self.__make_creds()
self.msgId = self.__get_recent_messageId()
self.message= self.__getrMostRecent(self.service, self.msgId)
self.result = self.__extract_email_attachment(self.msgId, self.message)
````
---
## Google Composer Custom Operator for parse and decode
<hr style="border:2px solid gold">
#### parse_ma_json.py
---
```python=!
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import json,base64
class Parserj(BaseOperator):
template_fields = ['data']
#template_ext = ()
ui_color = '#f9c915'
@apply_defaults
def __init__(self, data, *args, **kwargs):
super(Parserj, self).__init__(*args, **kwargs)
self.data = data
def execute(self, context):
#data = json.loads(base64.b64decode(data).decode('utf-8'))
return (json.loads(base64.b64decode(self.data).decode('utf-8')))
```
---
## Google Composer callbacks definition
<hr style="border:2px solid gold">
### gmail_callbacks.py
---
```python=!
#plugin for sucess, fail callbacks on email DAG
from airflow.models import DAG
from airflow.hooks.base_hook import BaseHook
import urllib
from airflow.plugins_manager import AirflowPlugin
import os
import traceback
import pprint
base_url = os.getenv('https://d3c36909e67a494791263efbc511b9f5-dot-us-central1.composer.googleusercontent.com')
def success_callback(context, callback_type):
"""Definition of callback for success on execution Task
param: context, dag_run context from DAG
param: callback_type, type of callback sucess or start
log message from current execution details
"""
if callback_type == 'success':
emoji=':green_check:'
message='Task has succeeded'
dag_id=context.get("task_instance").dag_id
task_id=context.get("task_instance").task_id
execution_date = context.get("execution_date").isoformat()
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
run_id = context.get("task_instance").xcom_pull(key='RUN_ID', task_ids='xcom_run_id')
send_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>.')
pprint.pprint(send_message)
elif callback_type == 'started':
emoji=':airflow:'
message='Task has been triggered'
dag_id=context.get("task_instance").dag_id
task_id=context.get("task_instance").task_id
execution_date = context.get("execution_date").isoformat()
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
run_id = context.get("task_instance").xcom_pull(key='RUN_ID', task_ids='xcom_run_id')
send_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>.')
pprint.pprint(send_message)
def failure_callback(context):
"""Definition of callback for fail on execution Task
param: context, dag_run context from DAG
log message from current execution details
"""
emoji = ':alert:'
message = 'has failed'
dag_id = context.get("task_instance").dag_id
task_id = context.get("task_instance").task_id
execution_date = context.get("execution_date").isoformat()
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
run_id = context.get("task_instance").xcom_pull(key='RUN_ID', task_ids='xcom_run_id')
exception = context.get("exception")
formatted_exception = ''.join(
traceback.format_exception(etype=type(exception),
value=exception, tb=exception.__traceback__)
).strip()
send_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>! <!channel|channel>'
f'\n*Task:* {task_id}'
f'\n*Error:* {formatted_exception}')
pprint.pprint(send_message)
```
---
## Google Composer Operator config file
<hr style="border:2px solid gold">
### config.py
---
```python=!
#Set variables for custom Operator gmail_operator.py
CLIENT_ID = "############.apps.googleusercontent.com"
CLIENT_SECRET = "#############"
REFRESH_TOKEN = "#############"
PROJECT_ID = "############"
BUCKET_RAW= "...-raw-store"
LOCAL_BUCKET ="...-compose-58d452b7-bucket"
USER_AGENT = "google_gmail"
SERVICE = "gmail"
SERVICE_VER= "v1"
ENTITY_KIND = "emailNotifications"
```
---
## Google Composer import helper for Custom Operators
<hr style="border:2px solid gold">
### plugin_name.py
---
```python=!
# plugin_name.py
from airflow.plugins_manager import AirflowPlugin
from operators.gmail_operator import *
from operators.parse_ma_jsonj import *
#from config import *
#from utils.my_utils import *
# etc
class PluginName(AirflowPlugin):
name = 'plugin_name'
#hooks = [GmailHook]
operators = [ExtractAttachment,Parserj]
#macros = [my_util_func]
```
---
</p></details>
## File Directory
<hr style="border:2px solid gold">
```shell!
.
├── dags
│ ├── GCS.py
│ ├── ValidateFile.py
│ ├── getConn.py
│ ├── gmailDAG.py
│ ├── sftp_dag_creation.py
│ └── sftp_supplier_xyz.py
│
├── plugins
│ ├── plugin_name.py
│ └──operators
│ ├── config.py
│ ├── gmail_callbacks.py
│ ├── gmail_operator.py
│ └── parse_ma_json.py
```
## References
<hr style="border:2px solid gold">
[googleapis](https://googleapis.github.io/google-api-python-client/docs/dyn/gmail_v1.users.messages.html#list)
[Search Operator](https://support.google.com/mail/answer/7190)
## Google Composer Gmail solution architecture
<hr style="border:2px solid gold">

Open in new tab to see details
## Flow description
<hr style="border:2px solid gold">
```gherkin!
Feature: email with valid xlsx atachment from supplier
# steps
Scenario: Supplier sends an email with the attachment to be processed
When the email arrives to the customer inbox
Then the cloud Composer flow is trigger and validates it, also gather metadata from email and trigger upstream DAG for validation, a callbak will be trigger to log that the Custom Operator names as Task in DAG process sucessfuly.
# steps
Scenario: email with invalid or not attachment
Given the attachment is not a valid file or it ther not attachment to the email
When the email is processed
Then the Operator named as task in DAG, will reaise and aeception and log by means a callback that this is invalid and will break the flow.
```
## Email attachment flow detail
<hr style="border:2px solid gold">
in case seq. doesn't work:

```sequence
Title:>>Google Cloud Composer Gmail DAG
gcf GmailDAGtrigger-->begin:Triggers GmailDAG \n pass event {data}
Note over gcf GmailDAGtrigger:gcf wathcGmailMessages\ntrack record of last\nhisotryId event\nto corresponds with the\none of the new gmail arrive\nand no other event\npublished by gmail API
begin-->catch_data: get RAW data\nfrom gcf GmailDAGtrigger
catch_data-->decode_and_parse: clean data \n {email:HistoryId}
Note over attachment_move:get last recent HistoryId\nfrom cloudstorage
decode_and_parse-->attachment_move: {HistoryId}
attachment_move-->end:{ OK | no OK}
Note right of end:when attachment \n Trigger Validation DAG \n for the attachment \n extracted and processed
```