```py=
'''
X20 情境需求/痛點描述:從信用卡交易行為發掘客戶潛在資金需求。
觸發條件:
(1) 信用卡使用行為符合以下條件任一:
a. 最近連續兩期皆未繳全清
b. 近六個月有2期未繳全清
c. 數據科學部回應率的模型屬高回應率客群
(2) 基於上述條件下,且以下條件擇一符合:
1.客戶不為房貸既有戶
2.客戶房貸可增貸額度為0
文案:優質卡友優惠信貸方案,本月立即申貸,可享有首期利率0.01%專屬優惠方案。 (無文案參數)
執行週期:每周二
input: CCSUMDP0、CFRISK_BANK30_LOAN_CLM、PLCCP_LEADS、LONP、BANCS_LOAN_ACCT
output: cust_id: 客戶 ID
'''
import json
import os
import datetime
import pymongo
import logging
import sys
import re
import argparse
import time
from dotenv import load_dotenv
from pymongo import MongoClient
from pathlib import Path
from urllib.parse import quote_plus
from copy import deepcopy
class logger(logging.Logger):
def __init__(self, name: str, level:int, log_filename:str, std_out:bool=True):
super().__init__(name, level=level)
formatter = logging.Formatter(fmt='%(asctime)s | %(levelname)s - %(message)s')
if std_out:
stream_handler = logging.StreamHandler(stream=sys.stdout)
stream_handler.setFormatter(formatter)
self.addHandler(stream_handler)
if log_filename:
log_dir = Path(os.getenv("LOG_DIR") or "logs")
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
filename=log_dir.joinpath(log_filename),
mode='a',
encoding='utf-8')
file_handler.setFormatter(formatter)
self.addHandler(file_handler)
class Scenario():
def __init__(self, data_date):
load_dotenv()
self.data_date = data_date
self.customer_info = os.getenv("MONGODB_COLLECTION_CUSTOMER_INFO")
self.scenarios = os.getenv("MONGODB_COLLECTION_SCENARIOS")
self.scenario_no = os.getenv("SCENARIO_NO")
self.scenariosText = os.getenv("SCENARIO_TEXT")
self.batch_date = datetime.datetime.today().date().isoformat()
self.batchDateTime = datetime.datetime.today()
logLevel = 10 if os.getenv("DEBUG") else 20
a=str(self.batchDateTime).replace(" ","").replace(":","")
logFilename = f"AI_BANK_SCEANRIO_{self.scenario_no}.{a}.log"
self.logger = logger(name=self.scenario_no,level=logLevel,log_filename=logFilename)
self.logger.info(f'{self.scenario_no} init, data_date: {self.batch_date}')
self.tgt_doc_template = self.get_tgt_doc_template()
self.mdp_tag = os.getenv("MONGODB_COLLECTION_MDP_TAG")
self.mdp_tagging_store = os.getenv("MONGODB_COLLECTION_MDP_TAGGING_STORE")
self.mongo_db_name = os.getenv("MONGODB_DB")
self.insert_batch_size = int(os.getenv("INSERT_BATCH_SIZE") or 1000)
def getMongoClient(self):
try:
password_pre = os.getenv("MONGO_PSWD_PRE")
password_suf = os.getenv("MONGO_PSWD_SUF")
password = quote_plus(f"{password_pre}{password_suf}")
username = quote_plus(os.getenv("MONGO_USERNAME"))
hosts = os.getenv("MONGO_HOSTS")
auth_db = os.getenv("MONGO_AUTH_DB")
connect_str = f"mongodb://{username}:{password}@{hosts}/?authMechanism=DEFAULT&authSource={auth_db}"
print(connect_str)
client = MongoClient(connect_str)
return client
except pymongo.errors.ConfigurationError:
self.logger.error("An Invalid URI host error was received.")
def getMongoDb(self):
db = self.getMongoClient()[self.mongo_db_name]
return db
# def insertMongoDb(self, items, tgt_collection):
# if not items:
# self.logger.info("No document inserted")
# return
# try:
# result = self.getMongoDb()[tgt_collection].insert_many(items)
# except Exception as e:
# self.logger.error("Exception occurred during insertMany(): %s", e)
# else:
# inserted_count = len(result.inserted_ids)
# self.logger.info(f'Inserted {inserted_count} documents.')
def deleteMongoDb(self, query, tgt_collection):
try:
result = self.getMongoDb()[tgt_collection].delete_many(query)
except Exception as e:
self.logger.error(f'Exception occurred during deleteMany(): {e}')
else:
delete_count = result.deleted_count
self.logger.info(f'delete {delete_count} documents.')
def get_tgt_doc_template(self):
file_name = os.getenv("TEMPLATE")
file_path = os.path.join(os.path.dirname(__file__), file_name) #組裝寫入MDB格式檔案的路徑
with open(file_path) as f:
template = json.load(f)
return template
def get_mongo_aggregate_result(self): #找出所有CCSUMDP0內的客戶,按照日期由近到遠排序,列出每個客人每期的應繳金額跟繳納金額。
pipeline = [
{'$match': {'sourceDocType': 'CCSUMDP0'}},
{'$sort': {'fields.CYCLE_DATE': -1}},
{'$group': {'_id': '$fields.ACCID',
'firstNCbalb': {'$firstN': {'input': '$fields.CBALB', 'n': 6}},
'firstN+1Sueam2': {'$firstN': {'input': '$fields.SUEAM2', 'n': 7}}}}
]
result = list(self.getMongoDb()["customer_DI_appender"].aggregate(pipeline))
return result
#result like {'_id': 'A121470316', 'firstNCbalb': [8200, 7500, 8000, 100019], 'firstN+1Sueam2': [50934, 29455, 21034, 13396]}
def get_unpaid_balances_last_two_cycles(self, result): #找出最近連續兩期未繳納全額的客人
unpaid_cust = {}
for entry in result:
customer_id = entry['_id']
first_n_cbalb = entry['firstNCbalb']
first_n_add_1_sueam2 = entry['firstN+1Sueam2']
if len(first_n_cbalb) >= 2 and len(first_n_add_1_sueam2) >= 3:
sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i + 1] for i in range(2)]
if sub_values[0] < 0 and sub_values[1] < 0:
sub_entry = {'id': customer_id, 'sub': sub_values}
unpaid_cust[customer_id] = sub_entry
return unpaid_cust
def get_unpaid_balances_two_cycles_six_months(self, result): #找出最近六期內至少有兩期未繳清的客人
unpaid_cust_6 = {}
for entry in result:
customer_id = entry['_id']
first_n_cbalb = entry['firstNCbalb']
first_n_add_1_sueam2 = entry['firstN+1Sueam2']
if 2 <= len(first_n_cbalb) <= 6 and 3 <= len(first_n_add_1_sueam2) <= 7:
sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i+1] for i in range(len(first_n_cbalb)) if i+1 < len(first_n_add_1_sueam2)]
if sum(1 for value in sub_values if value < 0) >= 2:
sub_entry = {'id': customer_id, 'sub': sub_values}
unpaid_cust_6[customer_id] = sub_entry
return unpaid_cust_6
def get_mortgageHolder_with_0_collateral(self): #客戶房貸可增貸額度為0
pipeline = [
{'$match': {'sourceDocType': 'CFRISK_BANK30_LOAN_CLM'}},
{"$group": {"_id": "$fields.CUST_ID", "TOTAL_COLLATERAL_AMT3": {"$sum": "$fields.COLLATERAL_AMT3"}}},
{"$match": {"TOTAL_COLLATERAL_AMT3": {"$lte": 0}}},
{"$project": {"_id": 1}}
]
loan_amt_cust = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline))
return loan_amt_cust
def highResponse(self): #找數科模型高回應客戶
pipeline=[
{'$match':
{'$and': [
{'sourceDocType': 'PLCCP_LEADS'},
{'fields.MODEL_RISK_LEVEL_OUT':
{'$in': ['Level 1', 'Level 2', 'Level 3', 'Level 4', 'Level 5', 'Level 6', 'Level 7']}}]}},
{'$project': {'_id': 0, 'cust_id': 1}}
]
highRes = list(self.getMongoDb()['customer_TI_appender'].aggregate(pipeline))
return [i['cust_id'] for i in highRes]
#result like [{'cust_id': 'Q123154872'}, {'cust_id': 'Q123154881'}}]
def joinLONPData(self): #找非房貸既有戶
cust = []
pipeline = [
{'$match': {'sourceDocType': 'LONP'}},
{'$match': {'fields.CAT_TYPE_NAME': {'$regex': '房貸'}}},
{'$project': {
'_id':0,
'combineAccttypeIntcat': {'$concat': ['$fields.ACCT_TYPE', '$fields.INT_CAT']}}}
]
mortgage = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline))
for doc in mortgage:
pipepline2 = [
{'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}},
{'$addFields': {'checkType': {'$concat': ['$fields.ACT_TYPE', '$fields.CAT']}}},
{'$match': {'checkType': {'$eq':doc['combineAccttypeIntcat']}}},
{'$project': {'_id': 0, 'cust_id': 1}}]
isCurrent = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipepline2))
pipepline3 = [
{'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}},
{'$project': {'_id': 0, 'cust_id': 1}}]
allBancsLoanACCT = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipepline3))
isNotCurrent = set([i['cust_id'] for i in allBancsLoanACCT]) - set([i['cust_id'] for i in isCurrent])
for doc in isNotCurrent:
cust.append(doc)
return cust
def batch_query_result(self, resultsList): #batch query mongo db result into batchsize
batch_size = self.insert_batch_size
batch_result = []
for doc in resultsList:
batch_result.append(doc)
if len(batch_result) == batch_size:
yield batch_result
batch_result = []
if batch_result:
yield batch_result
def to_tgt_doc(self, doc): #transform mongo db result to tgt doc
tgt_doc = deepcopy(self.tgt_doc_template)
# cust_id = doc["CUST_ID"]
tgt_doc.update({"custId": doc,
"tagName": self.scenario_no,
"taggingDtm": self.data_date,
"systemDtm": self.batch_date
})
return tgt_doc
def getResult(self): #insert data into mongo db
st_time = time.time()
data_date = self.data_date
aggregateData1 = self.get_mongo_aggregate_result()
unpaid_cust = self.get_unpaid_balances_last_two_cycles(aggregateData1)
id1 = set(unpaid_cust.keys())
unpaid_cust_6 = self.get_unpaid_balances_two_cycles_six_months(aggregateData1)
id2 = set(unpaid_cust_6.keys())
loan_amt_cust = self.get_mortgageHolder_with_0_collateral()
id4 = {entry['_id'] for entry in loan_amt_cust}
lonp_data = self.joinLONPData()
id3 = set(lonp_data)
highResp = self.highResponse()
id5 = set(highResp)
results = (id1 | id2 | id5) & ( id4 | id3)
self.deleteMongoDb({"tagName": self.scenario_no}, self.mdp_tagging_store)
write_count =0
for batch in self.batch_query_result(results):
items = map(self.to_tgt_doc, batch)
tmpFileName = 'customer'+str(write_count)+'.txt'
with open(tmpFileName, 'a', encoding='utf-8') as f:
json.dump(items, f, ensure_ascii=False, indent=4)
# self.insertMongoDb(items, self.mdp_tagging_store)
write_count+=len(batch)
end_time = time.time()
self.logger.info(f'Total write {write_count} documents.')
self.logger.info(f'Processing Time: {round(end_time - st_time)} s')
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="aibank_scenario_C5",
description="mongo2mongo logic")
yesterday_str = (datetime.datetime.today() + datetime.timedelta(-1)).strftime("%Y%m%d")
parser.add_argument("--data_date",type=str,default=yesterday_str,
help="logic date in YYYYMMDD format, default is today")
args = parser.parse_args()
if re.match(r"^\d{8}$", args.data_date):
data_date = datetime.datetime.strptime(args.data_date, "%Y%m%d")
else:
raise ValueError("data_date format error, should be YYYYMMDD")
x20 = Scenario(data_date=data_date.replace(microsecond=0, second=0, minute=0, hour=0))
x20.getResult()
# t1 = datetime.datetime.now()
# print(f'程式開始執行時間: {t1}')
# x20 = Scenario()
# output = x20.createDoc()
# # print("X20 Customers:", output)
# t2 = datetime.datetime.now()
# print(f'程式結束執行時間: {t2}')
```
Traceback (most recent call last):
File "c:\Users\samantha.tc.yu\Documents\06-情境\X20\X20.py", line 299, in <module>
x20.getResult()
File "c:\Users\samantha.tc.yu\Documents\06-情境\X20\X20.py", line 274, in getResult
json.dump(items, f, ensure_ascii=False, indent=4)
File "C:\Users\samantha.tc.yu\AppData\Local\Programs\Python\Python310\lib\json\__init__.py", line 179, in dump
for chunk in iterable:
File "C:\Users\samantha.tc.yu\AppData\Local\Programs\Python\Python310\lib\json\encoder.py", line 438, in _iterencode
o = _default(o)
File "C:\Users\samantha.tc.yu\AppData\Local\Programs\Python\Python310\lib\json\encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type map is not JSON serializable
```py=
# Mongo
#MONGODB_URI="mongodb://mdpsys:FubonTsid2@tpebnkdpmgdb01t:7017,tpebnkdpmgdb02t:7017,tpebnkdpmgdb03t:7017/?authMechanism=DEFAULT&authSource=admin"
MONGODB_DB="MDP_TAG"
MONGO_PSWD_PRE="Fubon"
MONGO_PSWD_SUF="Tsid2"
MONGO_USERNAME="mdpsys"
MONGO_HOSTS="tpebnkdpmgdb01t:7017,tpebnkdpmgdb02t:7017,tpebnkdpmgdb03t:7017"
MONGO_AUTH_DB="admin"
MONGODB_COLLECTION_CUSTOMER_INFO="customer_Info"
MONGODB_COLLECTION_MDP_TAGGING_STORE="MDP_TAGGING_STORE"
MONGODB_COLLECTION_MDP_TAG="MDP_TAG"
#business logic
SCENARIO_NO="X20" #S000020
SCENARIO_TEXT="優質卡友優惠信貸方案,本月立即申貸,可享有首期利率0.01%專屬優惠方案。"
#system
#DEBUG=""
LOG_DIR="C:\Users\samantha.tc.yu\Documents\06-情境\X20\Log"
TEMPLATE = "tgt_doc_template"
```