# BULK API ZOHO :::warning ## DOCUMENTATION https://www.zoho.com/creator/help/api/v2/bulk-api/overview.html ## HOW TO USE IT To use the bulk api you must create first a bulk read job to export records. #### Create Bulk Read job (Bulk Export) ##### Request URL ``` [POST](https://<base_url>/api/bulk/v2/<account_owner_name>/<app_link_name>/report/<report_link_name>/read`) ````` ##### Request method ``` POST ``` ##### AUTH TOKEN ``` scope=ZohoCreator.bulk.CREATE ``` ##### SPECIFICATIONS ##### Header | Header | Value | Description | | -------- | -------- | -------- | | Authorization | Zoho-oauthtoken 1000.8cb99dxxxxxxxxxxxxx9be93.9b8xxxxxxxxxxxxxxxf | An authentication token (authtoken) allows users to access apps and APIs without having to enter their login credentials each time. | ##### Response ```python { "code": 3000, "details": { "created_time": "03-Sep-2021 14:55:46", "id": "2000000018003", "operation": "read", "created_by": "jason18", "status": "In-progress" } } ````` - operation (string)- Specifies the type of action the API completed. Sample - "operation" : "read”. - created_by (string)- Specifies the name of the user who initiated the bulk read job. Sample - "created_by": "jason18", - created_time (string)- Specifies the created time of the bulk read job. - status (string)- Specifies the current status of the bulk read job. Example: "state": "IN PROGRESS", "COMPLETED", or "FAILED". - id (string)- Specifies the unique identifier of the bulk read job. Use this ID to check the job status and download the result. Sample - "id": "2000000018003". #### Get the Status of the Bulk Read Job ##### Request URL ``` [GET](https://<base_url>/api/bulk/v2/<account_owner_name>/<app_link_name>/report/<report_link_name>/read/<job_id>) ````` ##### Request method ``` GET ``` ##### AUTH TOKEN ``` scope=ZohoCreator.bulk.READ ``` ##### SPECIFICATIONS ##### Header | Header | Value | Description | | -------- | -------- | -------- | | Authorization | Zoho-oauthtoken 1000.8cb99dxxxxxxxxxxxxx9be93.9b8xxxxxxxxxxxxxxxf | An authentication token (authtoken) allows users to access apps and APIs without having to enter their login credentials each time. | ##### Response ```python { "code": 3000, "details": { "created_time": "06-Sep-2021 16:47:18", "id": "2000000018003", "operation": "read", "created_by": "jason18", "status": "In-progress" } } ````` - operation (string)- Specifies the type of action the API completed. Sample - "operation" : "read”. - created_by (string)- Specifies the name of the user who initiated the bulk read job. Sample - "created_by": "jason18", - created_time (string)- Specifies the created time of the bulk read job. - status (string)- Specifies the current status of the bulk read job. Example: "state": "IN PROGRESS", "COMPLETED", or "FAILED". - id (string)- Specifies the unique identifier of the bulk read job. Use this ID to check the job status and download the result. Sample - "id": "2000000018003". ### Download the result #### First Method Get The job status ```python { "code": 3000, "details": { "result": { "count": 16, "download_url": "/api/bulk/v2/jason_18/one/report/b_Report/read/2000000010003/result" }, "created_time": "06-Sep-2021 16:47:18", "id": "2000000010003", "operation": "read", "created_by": "jason_18", "status": "Completed" } } ````` and acess to the "download_url" value. #### Second Method ```python https://<base_url>/api/bulk/v2/<account_owner_name>/<app_link_name>/report/<report_link_name>/read/<job_id>/result ````` ::: :::success ## WHAT IS ALREADY DONE - Both read and create Tokens I already create one read token for the `GET` Method `(scope=ZohoCreator.bulk.READ)`. I already create one create token for the `POST` Method `(scope=ZohoCreator.bulk.CREATE)`. ```python #Bulk Api PARAMS = { 'client_id': "1000.4OGKBPK6EX2LI9W90JCWEOWET5J9VH", 'client_secret': "eec06585422c9b948a2f749b229a9045dcdef81e5d", } BULK_API_TOKEN = { 'access_token': '1000.0555c6e14759d86cf125a822e659fe4e.949cb3a62547aa39fda893ccc76f071f', 'refresh_token': '1000.56f97b786b5e47b27074928b4766ccd4.5d0bf02bb7ba952db65dfc9b17d3dfe9', 'api_domain': 'https://www.zohoapis.eu', 'token_type': 'Bearer', 'expires_in': 3600 } BULK_READ_TOKEN = { 'access_token': '1000.3c04ad204c580fc176fa3dbfdb298c5f.3168442bd77ee7fa16c9e1ab578224f7', 'refresh_token': '1000.902b29d1a30d611fe63f95eefddb0583.3553736da5dd344f61753b74b0771d86', 'api_domain': 'https://www.zohoapis.eu', 'token_type': 'Bearer', 'expires_in': 3600 } ``` - Create Bulk read job for a report. - Get Bulk read Job Status. - Get Bulk read Job Download URL. - Send GET request on the download URL to extract data. ```python class BulkAPI: _app_owner: str _app_code: str _access_token_gen: AccessTokenGenerator _read_token_gen: AccessTokenGenerator _api_url: str def __init__( self, app_owner: str, app_code: str, access_token_gen: AccessTokenGenerator, read_token_gen: AccessTokenGenerator, api_url: str, ) -> None: self._app_owner = app_owner self._app_code = app_code self._access_token_gen = access_token_gen self._read_token_gen = read_token_gen self._api_url = api_url self._download_urls = {} self._job = {} @property def _token(self) -> str: return self._access_token_gen.get() @property def _read_token(self) -> str: return self._read_token_gen.get() def refresh_token(self) -> None: logger.info("Refreshing create token…") self._access_token_gen.refresh() def refresh_read_token(self) -> None: logger.info("Refreshing read token…") self._read_token_gen.refresh() @property def _auth_headers(self) -> Dict[str, str]: return { 'Authorization': f'Zoho-oauthtoken {self._token}', } @property def _read_auth_headers(self) -> Dict[str, str]: return { 'Authorization': f'Zoho-oauthtoken {self._read_token}', } def _get_report_url(self, report_name: str) -> str: return '/'.join([ self._api_url, self._app_owner, self._app_code, 'report', report_name, 'read', ]) def _get_job_url(self, report_name: str, id: str) -> str: return '/'.join([ self._api_url, self._app_owner, self._app_code, 'report', report_name, 'read', id, ]) def _create_bulk_read_job( self, model_link, remaining_retry: int = 1 ) -> dict: logger.info(f"Creating bulk read job for report {model_link}…") url = self._get_report_url(model_link) r = requests.post(url, headers=self._auth_headers) data = r.json() if r.status_code == 401: logger.info(f"Creation Failed for Bulk read job for report {model_link}…") self.refresh_token() if remaining_retry >= 1: logger.info(f"Retrying post read job for {model_link} record…") return self._create_bulk_read_job( model_link, remaining_retry=remaining_retry - 1) return data def get_record( self, model_link: str, criteria: str, ) -> List[Dict[str, Any]]: logger.info(f'Posting api request for {model_link} record...') key = 'details' job = self._create_bulk_read_job(model_link) job_details = [] if key in job: job_details = job[key] self._job[model_link] = job_details else: logger.error(f"Could not post read job for {model_link} record") time.sleep(120) while self.get_export_status(model_link) == None: time.sleep(60) return self.fetch_bulk_datas(model_link) def _get_job_status( self, model_link: str, remaining_retry: int = 1, ): logger.info(f"Try get read job status for {model_link} record") if model_link in self._job: job_id = self._job[model_link].get('id') url = self._get_job_url(model_link, job_id) r = requests.get(url, headers=self._read_auth_headers) data = r.json() if r.status_code == 401: self.refresh_read_token() if remaining_retry >= 1: logger.info("Retrying get_status…") return self._get_job_status( model_link, remaining_retry=remaining_retry - 1) return data else: logger.error(f"No read job for {model_link} record") def get_export_status(self, model_link) -> str: logger.info(f"Fetching export status for {model_link} record") if model_link in self._job: try: status = self._get_job_status(model_link) if status.get('details').get('status') != 'Completed': return None else: self._download_urls[model_link] = "https://creator.zoho.eu" + status.get('details').get('result').get('download_url') return "found" except Exception as e: logger.error(f"Could not get export status for {model_link}.") else: logger.error(f"No read job for {model_link} record") def fetch_bulk_datas(self, model_link: str) -> List[Dict[str, Any]]: logger.info(f"Downloading response for {model_link} record") if (model_link in self._job) and (model_link in self._download_urls): print(f"found {model_link}") url = self._download_urls[model_link] r = requests.get(url, headers=self._read_auth_headers) data = r.json() print(data) return data else: logger.error(f"No read job for {model_link} record") ``` ::: :::danger ## THE PROBLEM #### THE FETCH_BULK_DATAS function don't return anything so i can't get datats ```python def fetch_bulk_datas(self, model_link: str) -> List[Dict[str, Any]]: logger.info(f"Downloading response for {model_link} record") if (model_link in self._job) and (model_link in self._download_urls): print(f"found {model_link}") url = self._download_urls[model_link] r = requests.get(url, headers=self._read_auth_headers) data = r.json() print(data) return data else: logger.error(f"No read job for {model_link} record") ``` ::: :::info ## HOW TO TEST IT #### SETTINGS FILE ```python from environs import Env env = Env() env.read_env() #Bulk Api PARAMS = { 'client_id': "1000.4OGKBPK6EX2LI9W90JCWEOWET5J9VH", 'client_secret': "eec06585422c9b948a2f749b229a9045dcdef81e5d", } BULK_API_TOKEN = { 'access_token': '1000.0555c6e14759d86cf125a822e659fe4e.949cb3a62547aa39fda893ccc76f071f', 'refresh_token': '1000.56f97b786b5e47b27074928b4766ccd4.5d0bf02bb7ba952db65dfc9b17d3dfe9', 'api_domain': 'https://www.zohoapis.eu', 'token_type': 'Bearer', 'expires_in': 3600 } BULK_READ_TOKEN = { 'access_token': '1000.3c04ad204c580fc176fa3dbfdb298c5f.3168442bd77ee7fa16c9e1ab578224f7', 'refresh_token': '1000.902b29d1a30d611fe63f95eefddb0583.3553736da5dd344f61753b74b0771d86', 'api_domain': 'https://www.zohoapis.eu', 'token_type': 'Bearer', 'expires_in': 3600 } BENIN_ZOHO_APP_CODE = env.str('BENIN_ZOHO_APP_CODE', 'crm') BURKINA_ZOHO_APP_CODE = env.str('BURKINA_ZOHO_APP_CODE', 'qotto-burkina') ZOHO_CLIENT_ID = env.str('ZOHO_CLIENT_ID', PARAMS['client_id']) ZOHO_CLIENT_SECRET = env.str('ZOHO_CLIENT_SECRET', PARAMS['client_secret']) ZOHO_REFRESH_TOKEN = env.str('ZOHO_REFRESH_TOKEN', BULK_API_TOKEN['refresh_token']) ZOHO_READ_TOKEN = env.str('ZOHO_READ_TOKEN', BULK_READ_TOKEN['refresh_token']) ZOHO_APP_OWNER = env.str('ZOHO_APP_OWNER', 'fabricedegaudemar') ZOHO_AUTH_URL = env.str('ZOHO_AUTH_URL', 'https://accounts.zoho.eu/oauth/v2') ZOHO_API_URL = env.str('ZOHO_API_URL', 'https://creator.zoho.eu/api/bulk/v2') UPDATE_BENIN = env.bool('UPDATE_BENIN', True) UPDATE_BURKINA = env.bool('UPDATE_BURKINA', False) ``` #### BULK API CLASS ```python # Copyright (c) Qotto, 2022. All rights reserved. import logging import requests import time import csv from typing import Any, Dict, List from gw_crm.gw.access_token_generator import AccessTokenGenerator __all__ = [ 'BulkAPI', ] logger = logging.getLogger(__name__) class BulkAPI: _app_owner: str _app_code: str _access_token_gen: AccessTokenGenerator _read_token_gen: AccessTokenGenerator _api_url: str def __init__( self, app_owner: str, app_code: str, access_token_gen: AccessTokenGenerator, read_token_gen: AccessTokenGenerator, api_url: str, ) -> None: self._app_owner = app_owner self._app_code = app_code self._access_token_gen = access_token_gen self._read_token_gen = read_token_gen self._api_url = api_url self._download_urls = {} self._job = {} @property def _token(self) -> str: return self._access_token_gen.get() @property def _read_token(self) -> str: return self._read_token_gen.get() def refresh_token(self) -> None: logger.info("Refreshing create token…") self._access_token_gen.refresh() def refresh_read_token(self) -> None: logger.info("Refreshing read token…") self._read_token_gen.refresh() @property def _auth_headers(self) -> Dict[str, str]: return { 'Authorization': f'Zoho-oauthtoken {self._token}', } @property def _read_auth_headers(self) -> Dict[str, str]: return { 'Authorization': f'Zoho-oauthtoken {self._read_token}', } def _get_report_url(self, report_name: str) -> str: return '/'.join([ self._api_url, self._app_owner, self._app_code, 'report', report_name, 'read', ]) def _get_job_url(self, report_name: str, id: str) -> str: return '/'.join([ self._api_url, self._app_owner, self._app_code, 'report', report_name, 'read', id, ]) def _create_bulk_read_job( self, model_link, remaining_retry: int = 1 ) -> dict: logger.info(f"Creating bulk read job for report {model_link}…") url = self._get_report_url(model_link) r = requests.post(url, headers=self._auth_headers) data = r.json() if r.status_code == 401: logger.info(f"Creation Failed for Bulk read job for report {model_link}…") self.refresh_token() if remaining_retry >= 1: logger.info(f"Retrying post read job for {model_link} record…") return self._create_bulk_read_job( model_link, remaining_retry=remaining_retry - 1) return data def get_record( self, model_link: str, criteria: str, ) -> List[Dict[str, Any]]: logger.info(f'Posting api request for {model_link} record...') key = 'details' job = self._create_bulk_read_job(model_link) job_details = [] if key in job: job_details = job[key] self._job[model_link] = job_details else: logger.error(f"Could not post read job for {model_link} record") time.sleep(120) while self.get_export_status(model_link) == None: time.sleep(60) return self.fetch_bulk_datas(model_link) def _get_job_status( self, model_link: str, remaining_retry: int = 1, ): logger.info(f"Try get read job status for {model_link} record") if model_link in self._job: job_id = self._job[model_link].get('id') url = self._get_job_url(model_link, job_id) r = requests.get(url, headers=self._read_auth_headers) data = r.json() if r.status_code == 401: self.refresh_read_token() if remaining_retry >= 1: logger.info("Retrying get_status…") return self._get_job_status( model_link, remaining_retry=remaining_retry - 1) return data else: logger.error(f"No read job for {model_link} record") def get_export_status(self, model_link) -> str: logger.info(f"Fetching export status for {model_link} record") if model_link in self._job: try: status = self._get_job_status(model_link) if status.get('details').get('status') != 'Completed': return None else: self._download_urls[model_link] = "https://creator.zoho.eu" + status.get('details').get('result').get('download_url') return "found" except Exception as e: logger.error(f"Could not get export status for {model_link}.") else: logger.error(f"No read job for {model_link} record") def fetch_bulk_datas(self, model_link: str) -> List[Dict[str, Any]]: logger.info(f"Downloading response for {model_link} record") if (model_link in self._job) and (model_link in self._download_urls): print(f"found {model_link}") url = self._download_urls[model_link] r = requests.get(url, headers=self._read_auth_headers) data = r.json() print(data) return data else: logger.error(f"No read job for {model_link} record") ``` #### INIT FILE ```python bulk_api_access_token_gen = AccessTokenGenerator( auth_url=cf.ZOHO_AUTH_URL, client_id=cf.ZOHO_CLIENT_ID, client_secret=cf.ZOHO_CLIENT_SECRET, refresh_token=cf.ZOHO_REFRESH_TOKEN, ) bulk_api_read_access_token_gen = AccessTokenGenerator( auth_url=cf.ZOHO_AUTH_URL, client_id=cf.ZOHO_CLIENT_ID, client_secret=cf.ZOHO_CLIENT_SECRET, refresh_token=cf.ZOHO_READ_TOKEN, ) bulk_api_benin = BulkAPI( app_owner=cf.ZOHO_APP_OWNER, app_code=cf.BENIN_ZOHO_APP_CODE, access_token_gen=bulk_api_access_token_gen, read_token_gen=bulk_api_read_access_token_gen, api_url=cf.ZOHO_API_URL, ) bulk_api_burkina = BulkAPI( app_owner=cf.ZOHO_APP_OWNER, app_code=cf.BURKINA_ZOHO_APP_CODE, access_token_gen=bulk_api_access_token_gen, read_token_gen=bulk_api_read_access_token_gen, api_url=cf.ZOHO_API_URL, ) ``` #### TEST ```python def run_update( api: BulkAPI, ): try: api.get_record('Tous_les_Offres', '') # api.get_job_status('Tous_les_Offres') # api.get_export_status() except Exception as e: logger.error(f"Could not update sheet error is {e}.") if cf.UPDATE_BENIN: with local_trace(correlation_id=gen_trace_id('benin')): logger.info(f"Will start update for Benin.") run_update( api=bulk_api_benin, ) if cf.UPDATE_BURKINA: with local_trace(correlation_id=gen_trace_id('burkina')): logger.info(f"Will start update for Burkina Faso.") run_update( api=bulk_api_burkina, ) ``` :::