找三個月前語法
```py=
或是用 這樣的方式用javascript 指定日期
snapDate: {
$gt: new Date(
new Date().setDate(
new Date().getDate() - 90
)
),
},
```
### X20(S000020) Code
```py=
'''
X20 情境需求/痛點描述:從信用卡交易行為發掘客戶潛在資金需求。
觸發條件:
(1) 信用卡使用行為符合以下條件任一:
a. 最近連續兩期皆未繳全清
b. 近六個月有2期未繳全清
c. 數據科學部回應率的模型屬高回應率客群
(2) 基於上述條件下,且以下條件擇一符合:
1.客戶不為房貸既有戶
2.客戶房貸可增貸額度為0
文案:優質卡友優惠信貸方案,本月立即申貸,可享有首期利率0.01%專屬優惠方案。 (無文案參數)
執行週期:每周二
input: CCSUMDP0、CFRISK_BANK30_LOAN_CLM、PLCCP_LEADS、LONP、BANCS_LOAN_ACCT
output: cust_id: 客戶 ID
'''
import json
import os
import datetime
import pymongo
import logging
import sys
import re
import argparse
import time
from dotenv import load_dotenv
from pymongo import MongoClient
from pathlib import Path
from urllib.parse import quote_plus
from copy import deepcopy
class logger(logging.Logger):
def __init__(self, name: str, level:int, log_filename:str, std_out:bool=True):
super().__init__(name, level=level)
formatter = logging.Formatter(fmt='%(asctime)s | %(levelname)s - %(message)s')
if std_out:
stream_handler = logging.StreamHandler(stream=sys.stdout)
stream_handler.setFormatter(formatter)
self.addHandler(stream_handler)
if log_filename:
log_dir = Path(os.getenv("LOG_DIR") or "logs")
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(
filename=log_dir.joinpath(log_filename),
mode='a',
encoding='utf-8')
file_handler.setFormatter(formatter)
self.addHandler(file_handler)
class Scenario():
def __init__(self, data_date):
load_dotenv()
self.data_date = data_date
self.customer_info = os.getenv("MONGODB_COLLECTION_CUSTOMER_INFO")
self.scenarios = os.getenv("MONGODB_COLLECTION_SCENARIOS")
self.scenario_no = os.getenv("SCENARIO_NO")
self.scenariosText = os.getenv("SCENARIO_TEXT")
self.batch_date = datetime.datetime.today().date().isoformat()
self.batchDateTime = datetime.datetime.today()
logLevel = 10 if os.getenv("DEBUG") else 20
a=str(self.batchDateTime).replace(" ","").replace(":","")
logFilename = f"AI_BANK_SCEANRIO_{self.scenario_no}.{a}.log"
self.logger = logger(name=self.scenario_no,level=logLevel,log_filename=logFilename)
self.logger.info(f'{self.scenario_no} init, data_date: {self.batch_date}')
self.tgt_doc_template = self.get_tgt_doc_template()
self.mdp_tag = os.getenv("MONGODB_COLLECTION_MDP_TAG")
self.mdp_tagging_store = os.getenv("MONGODB_COLLECTION_MDP_TAGGING_STORE")
self.mongo_db_name = os.getenv("MONGODB_DB")
self.insert_batch_size = int(os.getenv("INSERT_BATCH_SIZE") or 1000)
def getMongoClient(self):
try:
password_pre = os.getenv("MONGO_PSWD_PRE")
password_suf = os.getenv("MONGO_PSWD_SUF")
password = quote_plus(f"{password_pre}{password_suf}")
username = quote_plus(os.getenv("MONGO_USERNAME"))
hosts = os.getenv("MONGO_HOSTS")
auth_db = os.getenv("MONGO_AUTH_DB")
connect_str = f"mongodb://{username}:{password}@{hosts}/?authMechanism=DEFAULT&authSource={auth_db}"
client = MongoClient(connect_str)
return client
except pymongo.errors.ConfigurationError:
self.logger.error("An Invalid URI host error was received.")
def getMongoDb(self):
db = self.getMongoClient()[self.mongo_db_name]
return db
def insertMongoDb(self, items, tgt_collection):
if not items:
self.logger.info("No document inserted")
return
try:
result = self.getMongoDb()[tgt_collection].insert_many(items)
except Exception as e:
self.logger.error("Exception occurred during insertMany(): %s", e)
else:
inserted_count = len(result.inserted_ids)
self.logger.info(f'Inserted {inserted_count} documents.')
def deleteMongoDb(self, query, tgt_collection):
try:
result = self.getMongoDb()[tgt_collection].delete_many(query)
except Exception as e:
self.logger.error(f'Exception occurred during deleteMany(): {e}')
else:
delete_count = result.deleted_count
self.logger.info(f'delete {delete_count} documents.')
def get_tgt_doc_template(self):
file_name = os.getenv("TEMPLATE")
file_path = os.path.join(os.path.dirname(__file__), file_name) #組裝寫入MDB格式檔案的路徑
with open(file_path) as f:
template = json.load(f)
return template
def get_mongo_aggregate_result(self): #找出所有CCSUMDP0內的客戶,按照日期由近到遠排序,列出每個客人每期的應繳金額跟繳納金額。
pipeline = [
{'$match': {'sourceDocType': 'CCSUMDP0'}},
{'$sort': {'fields.CYCLE_DATE': -1}},
{'$group': {'_id': '$fields.ACCID',
'firstNCbalb': {'$firstN': {'input': '$fields.CBALB', 'n': 6}},
'firstN+1Sueam2': {'$firstN': {'input': '$fields.SUEAM2', 'n': 7}}}}
]
result = list(self.getMongoDb()["customer_DI_appender"].aggregate(pipeline))
return result
#result like {'_id': 'A121470316', 'firstNCbalb': [8200, 7500, 8000, 100019], 'firstN+1Sueam2': [50934, 29455, 21034, 13396]}
def get_unpaid_balances_last_two_cycles(self, result): #找出最近連續兩期未繳納全額的客人
unpaid_cust = {}
for entry in result:
customer_id = entry['_id']
first_n_cbalb = entry['firstNCbalb']
first_n_add_1_sueam2 = entry['firstN+1Sueam2']
if len(first_n_cbalb) >= 2 and len(first_n_add_1_sueam2) >= 3:
sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i + 1] for i in range(2)]
if sub_values[0] < 0 and sub_values[1] < 0:
sub_entry = {'id': customer_id, 'sub': sub_values}
unpaid_cust[customer_id] = sub_entry
return unpaid_cust
def get_unpaid_balances_two_cycles_six_months(self, result): #找出最近六期內至少有兩期未繳清的客人
unpaid_cust_6 = {}
for entry in result:
customer_id = entry['_id']
first_n_cbalb = entry['firstNCbalb']
first_n_add_1_sueam2 = entry['firstN+1Sueam2']
if 2 <= len(first_n_cbalb) <= 6 and 3 <= len(first_n_add_1_sueam2) <= 7:
sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i+1] for i in range(len(first_n_cbalb)) if i+1 < len(first_n_add_1_sueam2)]
if sum(1 for value in sub_values if value < 0) >= 2:
sub_entry = {'id': customer_id, 'sub': sub_values}
unpaid_cust_6[customer_id] = sub_entry
return unpaid_cust_6
def get_mortgageHolder_with_0_collateral(self): #客戶房貸可增貸額度為0
pipeline = [
{'$match': {'sourceDocType': 'CFRISK_BANK30_LOAN_CLM'}},
{"$group": {"_id": "$fields.CUST_ID", "TOTAL_COLLATERAL_AMT3": {"$sum": "$fields.COLLATERAL_AMT3"}}},
{"$match": {"TOTAL_COLLATERAL_AMT3": {"$lte": 0}}},
{"$project": {"_id": 1}}
]
loan_amt_cust = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline))
return loan_amt_cust
def highResponse(self): #找數科模型高回應客戶 -20240117修改為新定義
pipeline=[
{'$match':
{'$and': [
{'sourceDocType': 'PLCCP_LEADS'},
{'fields.MODEL_SCORE':
{'$in': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]}}]}},
{'$project': {'_id': 0, 'cust_id': 1}}
]
highRes = list(self.getMongoDb()['customer_TI_appender'].aggregate(pipeline))
return [i['cust_id'] for i in highRes]
def joinLONPData(self):
#找出非房貸既有戶:(1)joinLONPData:找出房貸既有戶
#(2)allLoanCust:找出所有貸款客戶ID
#(3)excludeMortgage:(1)-(2)找出所有非房貸既有戶
'''modify'''
mortgage_cust = []
pipeline = [
{'$match': {'sourceDocType': 'LONP'}},
{'$match': {'fields.CAT_TYPE_NAME': {'$regex': '房貸'}}},
{'$project': {
'_id':0,
'combineAccttypeIntcat': {'$concat': ['$fields.ACCT_TYPE', '$fields.INT_CAT']}}}
]
mortgage = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline))
for doc in mortgage:
pipeline2 = [
{'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}},
{'$addFields': {'checkType': {'$concat': ['$fields.ACT_TYPE', '$fields.CAT']}}},
{'$match': {'checkType': {'$eq':doc['combineAccttypeIntcat']}}},
{'$project': {'_id': 0, 'cust_id': 1}}
]
lonpCust = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline2))
for cust in lonpCust:
mortgage_cust.append(cust['cust_id'])
return mortgage_cust
def allLoanCust(self):
'''modify'''
pipeline = [
{'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}},
{'$project': {'_id': 0, 'cust_id': 1}}
]
loanCust = list(list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline)))
return loanCust
def excludeMortgage(self):
'''modify'''
mortage = set(self.joinLONPData())
allcust = self.allLoanCust()
allloancust = {doc['cust_id'] for doc in allcust}
exclude = mortage-allloancust
return exclude
def batch_query_result(self, resultsList): #batch query mongo db result into batchsize
batch_size = self.insert_batch_size
batch_result = []
for doc in resultsList:
batch_result.append(doc)
if len(batch_result) == batch_size:
yield batch_result
batch_result = []
if batch_result:
yield batch_result
def to_tgt_doc(self, doc): #transform mongo db result to tgt doc
tgt_doc = deepcopy(self.tgt_doc_template)
tgt_doc.update({"custId": doc,
"tagName": self.scenario_no,
"systemDtm": self.batch_date,
"taggingDtm": self.data_date
})
return tgt_doc
def getResult(self): #insert data into mongo db
st_time = time.time()
data_date = self.data_date
aggregateData1 = self.get_mongo_aggregate_result()
unpaid_cust = self.get_unpaid_balances_last_two_cycles(aggregateData1)
id1 = set(unpaid_cust.keys())
unpaid_cust_6 = self.get_unpaid_balances_two_cycles_six_months(aggregateData1)
id2 = set(unpaid_cust_6.keys())
highResp = self.highResponse()
id3 = set(highResp)
excust = self.excludeMortgage()
id4 = set(excust)
loan_amt_cust = self.get_mortgageHolder_with_0_collateral()
id5 = {entry['_id'] for entry in loan_amt_cust}
results = (id1 | id2 | id3) & ( id4 | id5)
self.deleteMongoDb({"tagName": self.scenario_no}, self.mdp_tagging_store)
write_count =0
for batch in self.batch_query_result(results):
items = list(map(self.to_tgt_doc, batch))
self.insertMongoDb(items, self.mdp_tagging_store)
write_count+=len(batch)
end_time = time.time()
self.logger.info(f'Total write {write_count} documents.')
self.logger.info(f'Processing Time: {round(end_time - st_time)} s')
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="aibank_scenario_C5",
description="mongo2mongo logic")
yesterday_str = (datetime.datetime.today() + datetime.timedelta(-1)).strftime("%Y%m%d")
parser.add_argument("--data_date",type=str,default=yesterday_str,
help="logic date in YYYYMMDD format, default is today")
args = parser.parse_args()
if re.match(r"^\d{8}$", args.data_date):
data_date = datetime.datetime.strptime(args.data_date, "%Y%m%d")
else:
raise ValueError("data_date format error, should be YYYYMMDD")
x20 = Scenario(data_date=data_date.replace(microsecond=0, second=0, minute=0, hour=0))
x20.getResult()
```