# Worker ## Main * Fungsi main adalah entry point saat program dijalankan * Parameter environment akan menentukan penggunaan config conjur atau config local (**line 3-11**) * Producer atau consumer akan dijalankan berdasarkan parameter jobnya. ```python= def main(job='consumer', schema='avro', run='once', topic='ga_2', to='dwh', environment='local', start_date='yesterday', end_date='today'): if environment == 'conjur': config = load_secrets(load_config()) Log.info(config) Log.info("CONJUR API CLIENT: SECRETS FETCHED SUCCESSFULLY & UPDATED TO CONFIG") Log.info("="*70) else: config = load_config() Log.info("USING LOCAL CONFIG") Log.info("="*70) SCHEMA.init(topic) Log.info("STARTING WORKER FOR TOPIC: ga") config.get('google_analytics').update({'start_date': start_date, 'end_date': end_date}) App.config = config try: with open('/tmp/services.pid', 'w', encoding=str) as _: pass except (FileNotFoundError, TypeError) as _: pass if run == 'once': if job == 'all': producer_job(topic, schema, config) consumer_job(topic, schema, config) elif job == 'producer': producer_job(topic, schema, config) else: consumer_job(topic, schema, config) elif run == 'minute': if job == 'all': schedule.every(5).minutes.do(lambda : producer_job(topic, schema, config)) schedule.every(5).minutes.do(lambda : consumer_job(topic, schema, config)) elif job == 'producer': schedule.every(5).minutes.do(lambda : producer_job(topic, schema, config)) else: schedule.every(5).minutes.do(lambda : consumer_job(topic, schema, config)) while not App.shutdown: schedule.run_pending() sleep(2) App.stop() else: if job == 'all': schedule.every().hour.at("18:00").do(lambda : producer_job(topic, schema, config)) schedule.every().hour.at("18:00").do(lambda : consumer_job(topic, schema, config)) elif job == 'producer': schedule.every().hour.at("18:00").do(lambda : producer_job(topic, schema, config)) else: schedule.every().hour.at("18:00").do(lambda : consumer_job(topic, schema, config)) while not App.shutdown: schedule.run_pending() sleep(2) App.stop() ``` ### Producer Job ### Consumer Job ```python= def consumer_job(topic, schema, config): try: Log.info("CONSUMER JOB - INITIALIZING") data = read_messages(topic, schema) writer_db = DBWriter(config['postgres_db']) writer_db.run_writer(data, topic) except Exception as exception: Log.warn("CONSUMER JOB - ERROR") Log.error(str(exception)) else: Log.info("CONSUMER JOB - OK") Log.info("="*70) ``` ## Schema :::info :bulb: **Summary** Data yang disediakan oleh Google Analytics terdiri dari *Dimensions* dan *Metrics*. Untuk mendukung dimensions dan metrics yang beragam maka disediakan schema supaya tidak terjadi hard-coded apabil dimensions dan metrics berubah. ::: Sebagai gambaran, berikut adalah contoh schema yang ada: ```json= { "daily_stats": { "topic": "daily_stats", "Dimensions": ["date"], "Metrics": ["activeUsers", "newUsers"], "pydantic": { "date": "str ...", "activeUsers": "str ...", "newUsers": "int ..." } } } ``` Pada schema tersebut, fields "dimensions" dan "metrics" menentukan data yang direquest pada Google Analytics. Sedangkan pada fields "pydantic" menentukan tipe data dari tiap dimensions dan metrics untuk menjaga konsistensi data. Apabila merujuk pada schema tersebut, contoh data yang didapat adalah sebagai berikut: ```json= { } ``` ## Config ## GA Worker ::: info :bulb: **Summary** GA Worker berfungsi mengambil data dari Google Analytics sesuai dengan **dimension** & **metric** yang diinginkan. ::: ::: info :pencil: Seluruh panggilan API ke Google Analytics menggunakan HTTP/HTTPS request sederhana. ::: ### Prerequisite * Untuk dapat menggunakan API Google Analytics, dibutuhkan beberapa parameter berikut, terutama **private_key**, **client_email** & **client_id** ```json= { "type": "service_account", "project_id": "project", "private_key_id": "123", "private_key": "private key", "client_email": "example@example.com", "client_id": "12345678910", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-d5lrz%40byon-c930d.iam.gserviceaccount.com" } ``` ### GA API Token * **API Token** digunakan saat memanggil API, masa berlakunya adalah satu jam sehingga akan terus diperbarui setiap jam. * Line 5 mengambil **credentials** dari prerequisite sebelumnya * Saat object **GAWorker** dibuat, cache API Token akan digunakan bila tersedia & diperbarui bila tidak tersedia (**line 9-15**) ```python= class GAWorker: def __init__(self, config): self.aud = "https://www.googleapis.com/oauth2/v4/token" self.scope = "https://www.googleapis.com/auth/analytics.readonly" self.credentials = load_credentials() self.configs = config self.proxies = dict({'https': 'http://someproxy'}) if os.path.exists(API_TOKEN_PATH): Log.info("GA WORKER - USING CACHED API TOKEN") with open(API_TOKEN_PATH, 'r', encoding='utf-8') as f: self.exp = int(f.readline()) self.token = f.read() else: self.renew_token() ``` * Untuk mendapatkan API Token, aplikasi menggunakan **JSON Web Token** (JWT) sebelum melakukan request token ke Google Analytics untuk API Token. ```python= class GAWorker: # =========Truncated========== def __get_token(self, exp_time): self.exp = int(time.time()) + exp_time payload = { "iss": self.credentials.get('client_email'), "sub": self.credentials.get('client_email'), "aud": self.aud, "iat": int(time.time()), "exp": self.exp, "scope": self.scope } headers = { 'kid': self.credentials.get('private_key_id'), "alg": "RS256", "typ": "JWT" } sig = jwt.encode( payload, self.credentials['private_key'], algorithm="RS256", headers=headers) return sig ``` * Setelah mendapat JWT, request POST dikirim ke laman OAuth2 yang disediakan googleapi (**line 3**) * Parameter POST request (**line 7-10**): * grant_type: tipe token yang digunakan untuk merequest Token ke googleapi, dalam hal ini menggunakan JWT * assertion: isi token JWT * POST Request untuk API Token (**line 14**) * Menuliskan hasil API Token ke cache (**line 16-18**) ```python= class GAWorker: def __init__(self, config): self.aud = "https://www.googleapis.com/oauth2/v4/token" # ======Truncated====== def __get_oauth2(self, assertion): data = { "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion": assertion } try: requests.packages.urllib3.disable_warnings() Log.info("GA WORKER - GENERATING API TOKEN") r = requests.post(self.aud, data=data, proxies=self.proxies, verify=False) if r.ok: with open(API_TOKEN_PATH, 'w', encoding='utf-8') as f: f.write(f"{str(self.exp)}\n") f.write(r.json()['access_token']) return r.json()['access_token'] self.exp = 0 return None except Exception as e: Log.warn(f"GA WORKER - FAILED TO GET API TOKEN: {e}") self.exp = 0 return None ``` ### RunReport API * API ini berfungsi mengambil data sesuai dengan dimension & metric berdasarkan daterange yang diinginkan * Penggunaan API dapat dilakukan dengan mengirim POST request ke endpoint yang disediakan google apis (**line 5**) * Property id: identifikasi numerik yang didapatkan dari Google Analytics * API Token yang didapat sebelumnya dipakai untuk memanggil API RunReport (**line 7**) * Parameter POST requests: * dateRanges (line 12) * dimensions & metrics (line 16 & 17) * limit (line 18): jumlah maksimal data, 10K by default & maksimal 100k dari Google Analytics * offset data, berguna untuk data RunReport yg melebihi limit(line 19) * POST request RunReport API (line 23) ```python= class GAWorker: def run_report(self, dim: list, met: list, limit, offset) -> dict: requests.packages.urllib3.disable_warnings() pro_id = self.configs.get('property_id') url = f"https://analyticsdata.googleapis.com/v1beta/properties/{pro_id}:runReport" headers = { "Authorization": "Bearer " + self.token, "Content-Type": "application/json", 'Accept': 'application/json' } data = { "dateRanges": [{ "startDate": self.configs.get('start_date'), "endDate": self.configs.get('end_date') }], "dimensions": dim, "metrics": met, "limit": str(limit), "offset": str(offset) } data = json.dumps(data) try: obj = requests.post(url=url, data=data, proxies=self.proxies, headers=headers, verify=False) return json.loads(obj.content.decode('utf-8').replace('\n', '')) except Exception as e: Log.error(f"GA WORKER - FAILED TO RUN REPORT: {e}") raise ``` ### Run GA Worker * Berfungsi mengecek API Token & menjalankan RunReport untuk pengambilan data yang melebihi limit (default 100K) * Pengecekan API Token (**line 9**) * Pembaharuan API Token (**line 11**) * Fungsi RunReport API (**line 16**) * Untuk pengambilan data melebihi limit, akan dijalankan fungsi RunReport dengan menambahkan offset data (**line 16**) & fungsi akan dijalankan sampai semua data terambil * Pengecekan error API, data kosong & data masih berlanjut (**line 17, 20 & 23**) ```python= def run_worker(self, dim, met, limit, offset): raw_ga = [] iteration_count = 0 retries = 0 try: while True: if retries > 5: raise Exception("GA-WORKER: Failed to renew API Token") if not self.is_token_alive(): Log.info("GA-WORKER: Token Expired, Renewing") self.renew_token() retries += 1 time.sleep(1) continue Log.info(f"GA-WORKER FETCHING ITERATION - {iteration_count+1}") temp = self.run_report(dim, met, limit, (limit*iteration_count)+offset) if temp.get("error"): Log.error(temp.get("error").get("message")) break if not temp.get("rows"): break raw_ga += temp["rows"] if len(temp.get("rows")) < limit: break iteration_count += 1 Log.info(f"GA-WORKER FETCHING COMPLETE: Get {len(raw_ga)} rows of data") return raw_ga except Exception as e: Log.info(f"GA-WORKER ERROR: {e}") raise ``` ### Check API Token & Renew API Token * Karena API Token expired setiap satu jam, pengecekan dapat dilakukan dengan membandingkan **timestamp** saat **API Token** akan expire tercipta sampai saat ini. * Timestamp API Token akan expire: GAWorker.exp (**line 3**) * Nilai fungsi is_token_alive akan False ketika Token mendekati waktu expire (60 detik) * Fungsi renew_token sama seperti ketika pertama kali GAWorker dijalankan (**line 8**) ```python= class GAWorker: def is_token_alive(self): if (self.exp - time.time())<60: return False return True def renew_token(self, exp_time=3600): self.token = self.__get_oauth2(self.__get_token(exp_time)) ``` ## Producer ## Consumer ## Writer