```py= ''' X20 情境需求/痛點描述:從信用卡交易行為發掘客戶潛在資金需求。 觸發條件: (1) 信用卡使用行為符合以下條件任一: a. 最近連續兩期皆未繳全清 b. 近六個月有2期未繳全清 c. 數據科學部回應率的模型屬高回應率客群 (2) 基於上述條件下,且以下條件擇一符合: 1.客戶不為房貸既有戶 2.客戶房貸可增貸額度為0 文案:優質卡友優惠信貸方案,本月立即申貸,可享有首期利率0.01%專屬優惠方案。 (無文案參數) 執行週期:每周二 input: CCSUMDP0、CFRISK_BANK30_LOAN_CLM、PLCCP_LEADS、LONP、BANCS_LOAN_ACCT output: cust_id: 客戶 ID ''' import json import os import datetime import pymongo import logging import sys import re import argparse import time from dotenv import load_dotenv from pymongo import MongoClient from pathlib import Path from urllib.parse import quote_plus from copy import deepcopy class logger(logging.Logger): def __init__(self, name: str, level:int, log_filename:str, std_out:bool=True): super().__init__(name, level=level) formatter = logging.Formatter(fmt='%(asctime)s | %(levelname)s - %(message)s') if std_out: stream_handler = logging.StreamHandler(stream=sys.stdout) stream_handler.setFormatter(formatter) self.addHandler(stream_handler) if log_filename: log_dir = Path(os.getenv("LOG_DIR") or "logs") log_dir.mkdir(exist_ok=True) file_handler = logging.FileHandler( filename=log_dir.joinpath(log_filename), mode='a', encoding='utf-8') file_handler.setFormatter(formatter) self.addHandler(file_handler) class Scenario(): def __init__(self, data_date): load_dotenv() self.data_date = data_date self.customer_info = os.getenv("MONGODB_COLLECTION_CUSTOMER_INFO") self.scenarios = os.getenv("MONGODB_COLLECTION_SCENARIOS") self.scenario_no = os.getenv("SCENARIO_NO") self.scenariosText = os.getenv("SCENARIO_TEXT") self.batch_date = datetime.datetime.today().date().isoformat() self.batchDateTime = datetime.datetime.today() logLevel = 10 if os.getenv("DEBUG") else 20 a=str(self.batchDateTime).replace(" ","").replace(":","") logFilename = f"AI_BANK_SCEANRIO_{self.scenario_no}.{a}.log" self.logger = logger(name=self.scenario_no,level=logLevel,log_filename=logFilename) self.logger.info(f'{self.scenario_no} init, data_date: {self.batch_date}') self.tgt_doc_template = self.get_tgt_doc_template() self.mdp_tag = os.getenv("MONGODB_COLLECTION_MDP_TAG") self.mdp_tagging_store = os.getenv("MONGODB_COLLECTION_MDP_TAGGING_STORE") self.mongo_db_name = os.getenv("MONGODB_DB") self.insert_batch_size = int(os.getenv("INSERT_BATCH_SIZE") or 1000) def getMongoClient(self): try: password_pre = os.getenv("MONGO_PSWD_PRE") password_suf = os.getenv("MONGO_PSWD_SUF") password = quote_plus(f"{password_pre}{password_suf}") username = quote_plus(os.getenv("MONGO_USERNAME")) hosts = os.getenv("MONGO_HOSTS") auth_db = os.getenv("MONGO_AUTH_DB") connect_str = f"mongodb://{username}:{password}@{hosts}/?authMechanism=DEFAULT&authSource={auth_db}" print(connect_str) client = MongoClient(connect_str) return client except pymongo.errors.ConfigurationError: self.logger.error("An Invalid URI host error was received.") def getMongoDb(self): db = self.getMongoClient()[self.mongo_db_name] return db # def insertMongoDb(self, items, tgt_collection): # if not items: # self.logger.info("No document inserted") # return # try: # result = self.getMongoDb()[tgt_collection].insert_many(items) # except Exception as e: # self.logger.error("Exception occurred during insertMany(): %s", e) # else: # inserted_count = len(result.inserted_ids) # self.logger.info(f'Inserted {inserted_count} documents.') def deleteMongoDb(self, query, tgt_collection): try: result = self.getMongoDb()[tgt_collection].delete_many(query) except Exception as e: self.logger.error(f'Exception occurred during deleteMany(): {e}') else: delete_count = result.deleted_count self.logger.info(f'delete {delete_count} documents.') def get_tgt_doc_template(self): file_name = os.getenv("TEMPLATE") file_path = os.path.join(os.path.dirname(__file__), file_name) #組裝寫入MDB格式檔案的路徑 with open(file_path) as f: template = json.load(f) return template def get_mongo_aggregate_result(self): #找出所有CCSUMDP0內的客戶,按照日期由近到遠排序,列出每個客人每期的應繳金額跟繳納金額。 pipeline = [ {'$match': {'sourceDocType': 'CCSUMDP0'}}, {'$sort': {'fields.CYCLE_DATE': -1}}, {'$group': {'_id': '$fields.ACCID', 'firstNCbalb': {'$firstN': {'input': '$fields.CBALB', 'n': 6}}, 'firstN+1Sueam2': {'$firstN': {'input': '$fields.SUEAM2', 'n': 7}}}} ] result = list(self.getMongoDb()["customer_DI_appender"].aggregate(pipeline)) return result #result like {'_id': 'A121470316', 'firstNCbalb': [8200, 7500, 8000, 100019], 'firstN+1Sueam2': [50934, 29455, 21034, 13396]} def get_unpaid_balances_last_two_cycles(self, result): #找出最近連續兩期未繳納全額的客人 unpaid_cust = {} for entry in result: customer_id = entry['_id'] first_n_cbalb = entry['firstNCbalb'] first_n_add_1_sueam2 = entry['firstN+1Sueam2'] if len(first_n_cbalb) >= 2 and len(first_n_add_1_sueam2) >= 3: sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i + 1] for i in range(2)] if sub_values[0] < 0 and sub_values[1] < 0: sub_entry = {'id': customer_id, 'sub': sub_values} unpaid_cust[customer_id] = sub_entry return unpaid_cust def get_unpaid_balances_two_cycles_six_months(self, result): #找出最近六期內至少有兩期未繳清的客人 unpaid_cust_6 = {} for entry in result: customer_id = entry['_id'] first_n_cbalb = entry['firstNCbalb'] first_n_add_1_sueam2 = entry['firstN+1Sueam2'] if 2 <= len(first_n_cbalb) <= 6 and 3 <= len(first_n_add_1_sueam2) <= 7: sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i+1] for i in range(len(first_n_cbalb)) if i+1 < len(first_n_add_1_sueam2)] if sum(1 for value in sub_values if value < 0) >= 2: sub_entry = {'id': customer_id, 'sub': sub_values} unpaid_cust_6[customer_id] = sub_entry return unpaid_cust_6 def get_mortgageHolder_with_0_collateral(self): #客戶房貸可增貸額度為0 pipeline = [ {'$match': {'sourceDocType': 'CFRISK_BANK30_LOAN_CLM'}}, {"$group": {"_id": "$fields.CUST_ID", "TOTAL_COLLATERAL_AMT3": {"$sum": "$fields.COLLATERAL_AMT3"}}}, {"$match": {"TOTAL_COLLATERAL_AMT3": {"$lte": 0}}}, {"$project": {"_id": 1}} ] loan_amt_cust = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline)) return loan_amt_cust def highResponse(self): #找數科模型高回應客戶 pipeline=[ {'$match': {'$and': [ {'sourceDocType': 'PLCCP_LEADS'}, {'fields.MODEL_RISK_LEVEL_OUT': {'$in': ['Level 1', 'Level 2', 'Level 3', 'Level 4', 'Level 5', 'Level 6', 'Level 7']}}]}}, {'$project': {'_id': 0, 'cust_id': 1}} ] highRes = list(self.getMongoDb()['customer_TI_appender'].aggregate(pipeline)) return [i['cust_id'] for i in highRes] #result like [{'cust_id': 'Q123154872'}, {'cust_id': 'Q123154881'}}] def joinLONPData(self): #找非房貸既有戶 cust = [] pipeline = [ {'$match': {'sourceDocType': 'LONP'}}, {'$match': {'fields.CAT_TYPE_NAME': {'$regex': '房貸'}}}, {'$project': { '_id':0, 'combineAccttypeIntcat': {'$concat': ['$fields.ACCT_TYPE', '$fields.INT_CAT']}}} ] mortgage = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline)) for doc in mortgage: pipepline2 = [ {'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}}, {'$addFields': {'checkType': {'$concat': ['$fields.ACT_TYPE', '$fields.CAT']}}}, {'$match': {'checkType': {'$eq':doc['combineAccttypeIntcat']}}}, {'$project': {'_id': 0, 'cust_id': 1}}] isCurrent = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipepline2)) pipepline3 = [ {'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}}, {'$project': {'_id': 0, 'cust_id': 1}}] allBancsLoanACCT = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipepline3)) isNotCurrent = set([i['cust_id'] for i in allBancsLoanACCT]) - set([i['cust_id'] for i in isCurrent]) for doc in isNotCurrent: cust.append(doc) return cust def batch_query_result(self, resultsList): #batch query mongo db result into batchsize batch_size = self.insert_batch_size batch_result = [] for doc in resultsList: batch_result.append(doc) if len(batch_result) == batch_size: yield batch_result batch_result = [] if batch_result: yield batch_result def to_tgt_doc(self, doc): #transform mongo db result to tgt doc tgt_doc = deepcopy(self.tgt_doc_template) # cust_id = doc["CUST_ID"] tgt_doc.update({"custId": doc, "tagName": self.scenario_no, "taggingDtm": self.data_date, "systemDtm": self.batch_date }) return tgt_doc def getResult(self): #insert data into mongo db st_time = time.time() data_date = self.data_date aggregateData1 = self.get_mongo_aggregate_result() unpaid_cust = self.get_unpaid_balances_last_two_cycles(aggregateData1) id1 = set(unpaid_cust.keys()) unpaid_cust_6 = self.get_unpaid_balances_two_cycles_six_months(aggregateData1) id2 = set(unpaid_cust_6.keys()) loan_amt_cust = self.get_mortgageHolder_with_0_collateral() id4 = {entry['_id'] for entry in loan_amt_cust} lonp_data = self.joinLONPData() id3 = set(lonp_data) highResp = self.highResponse() id5 = set(highResp) results = (id1 | id2 | id5) & ( id4 | id3) self.deleteMongoDb({"tagName": self.scenario_no}, self.mdp_tagging_store) write_count =0 for batch in self.batch_query_result(results): items = map(self.to_tgt_doc, batch) tmpFileName = 'customer'+str(write_count)+'.txt' with open(tmpFileName, 'a', encoding='utf-8') as f: json.dump(items, f, ensure_ascii=False, indent=4) # self.insertMongoDb(items, self.mdp_tagging_store) write_count+=len(batch) end_time = time.time() self.logger.info(f'Total write {write_count} documents.') self.logger.info(f'Processing Time: {round(end_time - st_time)} s') return results if __name__ == "__main__": parser = argparse.ArgumentParser( prog="aibank_scenario_C5", description="mongo2mongo logic") yesterday_str = (datetime.datetime.today() + datetime.timedelta(-1)).strftime("%Y%m%d") parser.add_argument("--data_date",type=str,default=yesterday_str, help="logic date in YYYYMMDD format, default is today") args = parser.parse_args() if re.match(r"^\d{8}$", args.data_date): data_date = datetime.datetime.strptime(args.data_date, "%Y%m%d") else: raise ValueError("data_date format error, should be YYYYMMDD") x20 = Scenario(data_date=data_date.replace(microsecond=0, second=0, minute=0, hour=0)) x20.getResult() # t1 = datetime.datetime.now() # print(f'程式開始執行時間: {t1}') # x20 = Scenario() # output = x20.createDoc() # # print("X20 Customers:", output) # t2 = datetime.datetime.now() # print(f'程式結束執行時間: {t2}') ``` Traceback (most recent call last): File "c:\Users\samantha.tc.yu\Documents\06-情境\X20\X20.py", line 299, in <module> x20.getResult() File "c:\Users\samantha.tc.yu\Documents\06-情境\X20\X20.py", line 274, in getResult json.dump(items, f, ensure_ascii=False, indent=4) File "C:\Users\samantha.tc.yu\AppData\Local\Programs\Python\Python310\lib\json\__init__.py", line 179, in dump for chunk in iterable: File "C:\Users\samantha.tc.yu\AppData\Local\Programs\Python\Python310\lib\json\encoder.py", line 438, in _iterencode o = _default(o) File "C:\Users\samantha.tc.yu\AppData\Local\Programs\Python\Python310\lib\json\encoder.py", line 179, in default raise TypeError(f'Object of type {o.__class__.__name__} ' TypeError: Object of type map is not JSON serializable ```py= # Mongo #MONGODB_URI="mongodb://mdpsys:FubonTsid2@tpebnkdpmgdb01t:7017,tpebnkdpmgdb02t:7017,tpebnkdpmgdb03t:7017/?authMechanism=DEFAULT&authSource=admin" MONGODB_DB="MDP_TAG" MONGO_PSWD_PRE="Fubon" MONGO_PSWD_SUF="Tsid2" MONGO_USERNAME="mdpsys" MONGO_HOSTS="tpebnkdpmgdb01t:7017,tpebnkdpmgdb02t:7017,tpebnkdpmgdb03t:7017" MONGO_AUTH_DB="admin" MONGODB_COLLECTION_CUSTOMER_INFO="customer_Info" MONGODB_COLLECTION_MDP_TAGGING_STORE="MDP_TAGGING_STORE" MONGODB_COLLECTION_MDP_TAG="MDP_TAG" #business logic SCENARIO_NO="X20" #S000020 SCENARIO_TEXT="優質卡友優惠信貸方案,本月立即申貸,可享有首期利率0.01%專屬優惠方案。" #system #DEBUG="" LOG_DIR="C:\Users\samantha.tc.yu\Documents\06-情境\X20\Log" TEMPLATE = "tgt_doc_template" ```