找三個月前語法 ```py= 或是用 這樣的方式用javascript 指定日期 snapDate: { $gt: new Date( new Date().setDate( new Date().getDate() - 90 ) ), }, ``` ### X20(S000020) Code ```py= ''' X20 情境需求/痛點描述:從信用卡交易行為發掘客戶潛在資金需求。 觸發條件: (1) 信用卡使用行為符合以下條件任一: a. 最近連續兩期皆未繳全清 b. 近六個月有2期未繳全清 c. 數據科學部回應率的模型屬高回應率客群 (2) 基於上述條件下,且以下條件擇一符合: 1.客戶不為房貸既有戶 2.客戶房貸可增貸額度為0 文案:優質卡友優惠信貸方案,本月立即申貸,可享有首期利率0.01%專屬優惠方案。 (無文案參數) 執行週期:每周二 input: CCSUMDP0、CFRISK_BANK30_LOAN_CLM、PLCCP_LEADS、LONP、BANCS_LOAN_ACCT output: cust_id: 客戶 ID ''' import json import os import datetime import pymongo import logging import sys import re import argparse import time from dotenv import load_dotenv from pymongo import MongoClient from pathlib import Path from urllib.parse import quote_plus from copy import deepcopy class logger(logging.Logger): def __init__(self, name: str, level:int, log_filename:str, std_out:bool=True): super().__init__(name, level=level) formatter = logging.Formatter(fmt='%(asctime)s | %(levelname)s - %(message)s') if std_out: stream_handler = logging.StreamHandler(stream=sys.stdout) stream_handler.setFormatter(formatter) self.addHandler(stream_handler) if log_filename: log_dir = Path(os.getenv("LOG_DIR") or "logs") log_dir.mkdir(exist_ok=True) file_handler = logging.FileHandler( filename=log_dir.joinpath(log_filename), mode='a', encoding='utf-8') file_handler.setFormatter(formatter) self.addHandler(file_handler) class Scenario(): def __init__(self, data_date): load_dotenv() self.data_date = data_date self.customer_info = os.getenv("MONGODB_COLLECTION_CUSTOMER_INFO") self.scenarios = os.getenv("MONGODB_COLLECTION_SCENARIOS") self.scenario_no = os.getenv("SCENARIO_NO") self.scenariosText = os.getenv("SCENARIO_TEXT") self.batch_date = datetime.datetime.today().date().isoformat() self.batchDateTime = datetime.datetime.today() logLevel = 10 if os.getenv("DEBUG") else 20 a=str(self.batchDateTime).replace(" ","").replace(":","") logFilename = f"AI_BANK_SCEANRIO_{self.scenario_no}.{a}.log" self.logger = logger(name=self.scenario_no,level=logLevel,log_filename=logFilename) self.logger.info(f'{self.scenario_no} init, data_date: {self.batch_date}') self.tgt_doc_template = self.get_tgt_doc_template() self.mdp_tag = os.getenv("MONGODB_COLLECTION_MDP_TAG") self.mdp_tagging_store = os.getenv("MONGODB_COLLECTION_MDP_TAGGING_STORE") self.mongo_db_name = os.getenv("MONGODB_DB") self.insert_batch_size = int(os.getenv("INSERT_BATCH_SIZE") or 1000) def getMongoClient(self): try: password_pre = os.getenv("MONGO_PSWD_PRE") password_suf = os.getenv("MONGO_PSWD_SUF") password = quote_plus(f"{password_pre}{password_suf}") username = quote_plus(os.getenv("MONGO_USERNAME")) hosts = os.getenv("MONGO_HOSTS") auth_db = os.getenv("MONGO_AUTH_DB") connect_str = f"mongodb://{username}:{password}@{hosts}/?authMechanism=DEFAULT&authSource={auth_db}" client = MongoClient(connect_str) return client except pymongo.errors.ConfigurationError: self.logger.error("An Invalid URI host error was received.") def getMongoDb(self): db = self.getMongoClient()[self.mongo_db_name] return db def insertMongoDb(self, items, tgt_collection): if not items: self.logger.info("No document inserted") return try: result = self.getMongoDb()[tgt_collection].insert_many(items) except Exception as e: self.logger.error("Exception occurred during insertMany(): %s", e) else: inserted_count = len(result.inserted_ids) self.logger.info(f'Inserted {inserted_count} documents.') def deleteMongoDb(self, query, tgt_collection): try: result = self.getMongoDb()[tgt_collection].delete_many(query) except Exception as e: self.logger.error(f'Exception occurred during deleteMany(): {e}') else: delete_count = result.deleted_count self.logger.info(f'delete {delete_count} documents.') def get_tgt_doc_template(self): file_name = os.getenv("TEMPLATE") file_path = os.path.join(os.path.dirname(__file__), file_name) #組裝寫入MDB格式檔案的路徑 with open(file_path) as f: template = json.load(f) return template def get_mongo_aggregate_result(self): #找出所有CCSUMDP0內的客戶,按照日期由近到遠排序,列出每個客人每期的應繳金額跟繳納金額。 pipeline = [ {'$match': {'sourceDocType': 'CCSUMDP0'}}, {'$sort': {'fields.CYCLE_DATE': -1}}, {'$group': {'_id': '$fields.ACCID', 'firstNCbalb': {'$firstN': {'input': '$fields.CBALB', 'n': 6}}, 'firstN+1Sueam2': {'$firstN': {'input': '$fields.SUEAM2', 'n': 7}}}} ] result = list(self.getMongoDb()["customer_DI_appender"].aggregate(pipeline)) return result #result like {'_id': 'A121470316', 'firstNCbalb': [8200, 7500, 8000, 100019], 'firstN+1Sueam2': [50934, 29455, 21034, 13396]} def get_unpaid_balances_last_two_cycles(self, result): #找出最近連續兩期未繳納全額的客人 unpaid_cust = {} for entry in result: customer_id = entry['_id'] first_n_cbalb = entry['firstNCbalb'] first_n_add_1_sueam2 = entry['firstN+1Sueam2'] if len(first_n_cbalb) >= 2 and len(first_n_add_1_sueam2) >= 3: sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i + 1] for i in range(2)] if sub_values[0] < 0 and sub_values[1] < 0: sub_entry = {'id': customer_id, 'sub': sub_values} unpaid_cust[customer_id] = sub_entry return unpaid_cust def get_unpaid_balances_two_cycles_six_months(self, result): #找出最近六期內至少有兩期未繳清的客人 unpaid_cust_6 = {} for entry in result: customer_id = entry['_id'] first_n_cbalb = entry['firstNCbalb'] first_n_add_1_sueam2 = entry['firstN+1Sueam2'] if 2 <= len(first_n_cbalb) <= 6 and 3 <= len(first_n_add_1_sueam2) <= 7: sub_values = [first_n_cbalb[i] - first_n_add_1_sueam2[i+1] for i in range(len(first_n_cbalb)) if i+1 < len(first_n_add_1_sueam2)] if sum(1 for value in sub_values if value < 0) >= 2: sub_entry = {'id': customer_id, 'sub': sub_values} unpaid_cust_6[customer_id] = sub_entry return unpaid_cust_6 def get_mortgageHolder_with_0_collateral(self): #客戶房貸可增貸額度為0 pipeline = [ {'$match': {'sourceDocType': 'CFRISK_BANK30_LOAN_CLM'}}, {"$group": {"_id": "$fields.CUST_ID", "TOTAL_COLLATERAL_AMT3": {"$sum": "$fields.COLLATERAL_AMT3"}}}, {"$match": {"TOTAL_COLLATERAL_AMT3": {"$lte": 0}}}, {"$project": {"_id": 1}} ] loan_amt_cust = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline)) return loan_amt_cust def highResponse(self): #找數科模型高回應客戶 -20240117修改為新定義 pipeline=[ {'$match': {'$and': [ {'sourceDocType': 'PLCCP_LEADS'}, {'fields.MODEL_SCORE': {'$in': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]}}]}}, {'$project': {'_id': 0, 'cust_id': 1}} ] highRes = list(self.getMongoDb()['customer_TI_appender'].aggregate(pipeline)) return [i['cust_id'] for i in highRes] def joinLONPData(self): #找出非房貸既有戶:(1)joinLONPData:找出房貸既有戶 #(2)allLoanCust:找出所有貸款客戶ID #(3)excludeMortgage:(1)-(2)找出所有非房貸既有戶 '''modify''' mortgage_cust = [] pipeline = [ {'$match': {'sourceDocType': 'LONP'}}, {'$match': {'fields.CAT_TYPE_NAME': {'$regex': '房貸'}}}, {'$project': { '_id':0, 'combineAccttypeIntcat': {'$concat': ['$fields.ACCT_TYPE', '$fields.INT_CAT']}}} ] mortgage = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline)) for doc in mortgage: pipeline2 = [ {'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}}, {'$addFields': {'checkType': {'$concat': ['$fields.ACT_TYPE', '$fields.CAT']}}}, {'$match': {'checkType': {'$eq':doc['combineAccttypeIntcat']}}}, {'$project': {'_id': 0, 'cust_id': 1}} ] lonpCust = list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline2)) for cust in lonpCust: mortgage_cust.append(cust['cust_id']) return mortgage_cust def allLoanCust(self): '''modify''' pipeline = [ {'$match': {'sourceDocType': 'BANCS_LOAN_ACCT'}}, {'$project': {'_id': 0, 'cust_id': 1}} ] loanCust = list(list(self.getMongoDb()["customer_TI_appender"].aggregate(pipeline))) return loanCust def excludeMortgage(self): '''modify''' mortage = set(self.joinLONPData()) allcust = self.allLoanCust() allloancust = {doc['cust_id'] for doc in allcust} exclude = mortage-allloancust return exclude def batch_query_result(self, resultsList): #batch query mongo db result into batchsize batch_size = self.insert_batch_size batch_result = [] for doc in resultsList: batch_result.append(doc) if len(batch_result) == batch_size: yield batch_result batch_result = [] if batch_result: yield batch_result def to_tgt_doc(self, doc): #transform mongo db result to tgt doc tgt_doc = deepcopy(self.tgt_doc_template) tgt_doc.update({"custId": doc, "tagName": self.scenario_no, "systemDtm": self.batch_date, "taggingDtm": self.data_date }) return tgt_doc def getResult(self): #insert data into mongo db st_time = time.time() data_date = self.data_date aggregateData1 = self.get_mongo_aggregate_result() unpaid_cust = self.get_unpaid_balances_last_two_cycles(aggregateData1) id1 = set(unpaid_cust.keys()) unpaid_cust_6 = self.get_unpaid_balances_two_cycles_six_months(aggregateData1) id2 = set(unpaid_cust_6.keys()) highResp = self.highResponse() id3 = set(highResp) excust = self.excludeMortgage() id4 = set(excust) loan_amt_cust = self.get_mortgageHolder_with_0_collateral() id5 = {entry['_id'] for entry in loan_amt_cust} results = (id1 | id2 | id3) & ( id4 | id5) self.deleteMongoDb({"tagName": self.scenario_no}, self.mdp_tagging_store) write_count =0 for batch in self.batch_query_result(results): items = list(map(self.to_tgt_doc, batch)) self.insertMongoDb(items, self.mdp_tagging_store) write_count+=len(batch) end_time = time.time() self.logger.info(f'Total write {write_count} documents.') self.logger.info(f'Processing Time: {round(end_time - st_time)} s') return results if __name__ == "__main__": parser = argparse.ArgumentParser( prog="aibank_scenario_C5", description="mongo2mongo logic") yesterday_str = (datetime.datetime.today() + datetime.timedelta(-1)).strftime("%Y%m%d") parser.add_argument("--data_date",type=str,default=yesterday_str, help="logic date in YYYYMMDD format, default is today") args = parser.parse_args() if re.match(r"^\d{8}$", args.data_date): data_date = datetime.datetime.strptime(args.data_date, "%Y%m%d") else: raise ValueError("data_date format error, should be YYYYMMDD") x20 = Scenario(data_date=data_date.replace(microsecond=0, second=0, minute=0, hour=0)) x20.getResult() ```