網路編成3 === > 複習: > - 服務器: 轉發, 合併header, body數據 > - 框架: ```python # 最基本要有這個可以調 def application(env, start_response): # 調用響應頭要傳狀態碼跟有的沒的資訊回去 start_response('200 OK', [('DGG', 'SGG')]) # 最後的結果要回傳回Body return "GodJJJJJ" ``` ### 第一版-面向對象服務器 ```python #coding=utf-8 import re from socket import * from multiprocessing import Process class WSGIServer(object): def __init__(self): self.t_socket = socket(AF_INET, SOCK_STREAM) self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.t_socket.bind(("", 5566)) self.t_socket.listen(128) def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" try: f = open("./html" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) new_socket.close() def run_forever(self): while True: new_socket, client_addr = self.t_socket.accept() p = Process(target=self.server_client, args=(new_socket,)) p.start() new_socket.close() self.t_socket.close() def main(): wsgi_server = WSGIServer() wsgi_server.run_forever() if __name__ == "__main__": main() ``` ### 第二版-動態請求 ```python import re import time from socket import * from multiprocessing import Process class WSGIServer(object): def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" # 改這, 不是py結尾就開html if not file_name.endswith(".py"): try: f = open("./html" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) # 是py結尾, 調用時間模塊 else: header = "HTTP/1.1 200 OK\r\n" header += "\r\n" body = "haha, %s" % time.ctime() response = header + body new_socket.send(response.encode("utf-8")) new_socket.close() ``` ### 第三版-服務器與動態處理解藕 ```python import re #import time import mini_frame from socket import * from multiprocessing import Process class WSGIServer(object): def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" if not file_name.endswith(".py"): try: f = open("./html" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) # 把動態模塊出來 else: header = "HTTP/1.1 200 OK\r\n" header += "\r\n" # 版本一: 直接在服務器實現 # body = "haha,%s" %time.ctime() # 版本二: 解耦第一版 # 問題: mini_frame每多一個功能, 這裡就要多寫一次判斷 # if file_name == "/test.py": # body = mini_frame.test() # elif file_name == "/test2.py": # body = mini_frame.test2() # elif ... # 把那些判斷的拿去application判斷, 調用application就好 body = mini_frame.application(file_name) response = header + body new_socket.send(response.encode("utf-8")) new_socket.close() ``` `vi mini_frame.py` ```python import time def test(): return "AlimamaDO, %s" % time.ctime() def test2(): return "test2, %s" % time.ctime() def test3(): return "test3, %s" % time.ctime() def application(file_name): if file_name == "/test.py": return test() elif file_name == "/test2.py": return test2() elif file_name == "/test3.py": return test3() else: return "Not Found" ``` ### 第四版-WSGI > - 實際上服務器不會用自己的, 如此的話自己寫框架就不能被導入 > - 框架必須遵循WSGI, 才能被服務器正常調用 > - 比較有名的服務器如 nginx, apache ```python import re import mini_frame from socket import * from multiprocessing import Process class WSGIServer(object): def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" if not file_name.endswith(".py"): try: f = open("./html" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) else: env = {} body = mini_frame.application(env, self.start_response) header = "HTTP/1.1 %s\r\n" % self.status for temp in self.headers: header += "%s:%s\r\n" %(temp[0], temp[1]) header += "\r\n" response = header + body new_socket.send(response.encode("utf-8")) new_socket.close() def start_response(self, status, headers): self.status = status # 服務器可能會寫自己是幾版 self.headers = [("Server", "mini_frame v87")] self.headers = headers ``` `vi mini_frame` ```python # 沒有設定編碼的話, 中文會亂碼 # def application(env, start_response): # start_response('200 OK', [('xxoo', 'ooxx')]) # return "GodJJJJJ 東踏曲覓" def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) return "GodJJJJJ 東踏曲覓" ``` ```python # 傳env參數 def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" if not file_name.endswith(".py"): try: f = open("./html" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) else: # 這裏 env = {} env["PATH_INFO"] = file_name # {"PATH_INFO" : file_name} body = mini_frame.application(env, self.start_response) header = "HTTP/1.1 %s\r\n" % self.status for temp in self.headers: header += "%s:%s\r\n" %(temp[0], temp[1]) header += "\r\n" response = header + body new_socket.send(response.encode("utf-8")) new_socket.close() ``` `$ vi mini_frame.py` ```python def test1(): return "test1" def test2(): return "test2" def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] if file_name == "/test1.py": return test1() elif file_name == "/test2.py": return test2() else: return "GodJJJJJ 東踏曲覓" # 問題: 框架可能有好幾個模塊, 而不只有一個mini_frame.py # 一旦模塊多了可能會很亂 > 裝成一個資料夾 ``` ### 模板文件 ``` $ tree . ├── html │   └── index.html └── mini_frame.py └── webServer.py # 一般會有三個資料夾: dynamic, static, templates # templates 裝模板 # dynamic 裝動態模塊(py, php) # static 裝靜態模塊(html, js, jpg..) $ tree . ├── dynamic │   ├── __init__.py │   └── mini_frame.py ├── static │   ├── css │   └── js ├── templates │   └── index.html └── webServer.py ``` `vi webServer.py` ```python # 改導入 import dynamic.mini_frame class WSGIServer(object): def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" if not file_name.endswith(".py"): try: # 去靜態資源取 f = open("./static" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) else: env = {} env["PATH_INFO"] = file_name # 改導入 body = dynamic.mini_frame.application(env, self.start_response) header = "HTTP/1.1 %s\r\n" % self.status for temp in self.headers: header += "%s:%s\r\n" %(temp[0], temp[1]) header += "\r\n" response = header + body new_socket.send(response.encode("utf-8")) ``` `vi dynamic/mini_frame.py` ```python # 前後端解藕 # def index(): # return """ # <html> # <head> # <title>toyz</title> # </head> # <body> # <h1>GodJJ</h1> # </body> # </html> # """ def index(): # with open("../templates/index.html") as f: # 註一 with open("./templates/index.html") as f: content = f.read() return content def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] if file_name == "/index.py": return index() else: return "GodJJJJJ 東踏曲覓" ``` ```python # 註一之一 # open的路徑是以啟用端來算的 $ tree . ├── dynamic │ ├── __init__.py │ └── mini_frame.py ├── static │ ├── css │ └── js ├── templates │ └── index.html └── webServer.py # 啟用端是webServer , 故是當前路徑下的templates(./templates) # 註一之二 # with # # 傳統開檔案會有兩個問題 # 1. 開啟的時候有問題 # 2. 忘記關 file = open("1.txt") data = file.read() file.close() # 用try就能解決, 但冗長 f = open("1.txt", "rb") try: data = file.read() finally: f.close() # with 開始時會調用__enter__, 結束時會調用__end__釋放資源 with open('file.txt', 'rb') as f: data = f.read() --- class Open(object): def __init__(self, file_name): self.file_name = file_name def __enter__(self): self.open_file = open(self.file_name, "rb") return self.open_file def __exit__(self, type, value, traceback): self.open_file.close() ``` > - 小結: > - 目前完成瀏覽器訪問服務器, 服務器把static丟給框架, 框架把headbody回傳, 服務器組裝後傳給瀏覽器, 呈現畫面 > - 問題: > 1. 還是沒解決import指定框架問題 > 2. 是否能指定端口, 而非服務器固定的端口 ``` # 目的: 給程序傳參數 # $ python3 webserver.py 5566 mini_frame:applocation ``` `$ vi test.py` ``` import sys print(sys.argv) ``` ``` $ python3 test.py haha ['test.py', 'haha'] $ python3 test.py haha 你 好 ['test.py', 'haha', '你', '好'] $ python3 test.py 9898 mini_frame:application ['test.py', '9898', 'mini_frame:application'] # sys.argv 會返回[], 每個參數都變成str返回 ``` ```python # 搞定端口 import re import dynamic.mini_frame import sys from socket import * from multiprocessing import Process class WSGIServer(object): # 接收參數 def __init__(self, port): self.t_socket = socket(AF_INET, SOCK_STREAM) self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 使用參數 self.t_socket.bind(("", port)) self.t_socket.listen(128) def main(): # 確認有輸入包含py有兩個參數 if len(sys.argv) == 2: # 避免用戶不是輸入數字, try try: port = int(sys.argv[1]) except Exception as ret: print("端口輸入錯誤") return else: print("請照以下格式") print("python3 webserver.py port") return # 把參數傳進init wsgi_server = WSGIServer(port) wsgi_server.run_forever() ``` ```python #coding=utf-8 import re # 目的: 指定框架, 而非固定框架 # import dynamic.mini_frame import sys from socket import * from multiprocessing import Process class WSGIServer(object): # 導入app def __init__(self, port, app): self.t_socket = socket(AF_INET, SOCK_STREAM) self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.t_socket.bind(("", port)) self.t_socket.listen(128) self.application = app def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" if not file_name.endswith(".py"): try: f = open("./static" + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) else: env = {} env["PATH_INFO"] = file_name # {"PATH_INFO" : file_name} # 最後調用函數 #body = dynamic.mini_frame.application(env, self.start_response) body = self.application(env, self.start_response) header = "HTTP/1.1 %s\r\n" % self.status for temp in self.headers: header += "%s:%s\r\n" %(temp[0], temp[1]) header += "\r\n" response = header + body new_socket.send(response.encode("utf-8")) new_socket.close() def start_response(self, status, headers): self.status = status self.headers = headers def run_forever(self): while True: new_socket, client_addr = self.t_socket.accept() p = Process(target=self.server_client, args=(new_socket,)) p.start() new_socket.close() self.t_socket.close() def main(): if len(sys.argv) == 3: try: port = int(sys.argv[1]) frame_app_name = sys.argv[2] # mini_frame:application except Exception as ret: print("端口輸入錯誤") return else: print("請照以下格式") print("python3 webserver.py port, 框架:函數") return # 取出來 ret = re.match(r"([^:]+):(.+)", frame_app_name) if ret: frame_name = ret.group(1) # mini_frame app_name = ret.group(2) # application else: print("請照以下格式") print("python3 webserver.py port, 框架:函數") return # 註一 # 由於mini_frame在dynamic資料夾中 # 使用sys.path 搜索路徑, 並插入指定路徑, # 否則從當前路徑往上找會找不到 sys.path.append("./dynamic") # 導入 # import frame_name # import 會把frame_name直接當成模塊名(frame_name)去找, # 而不是他指向得值(mini_frame) frame = __import__(frame_name) # 如此才得以找mini_frame # 註二 app = getattr(frame, app_name) # 此時app指向 # ./dynamic/mini_frame # 的application函數 wsgi_server = WSGIServer(port, app) wsgi_server.run_forever() if __name__ == "__main__": main() # 問題: f = open("./static" + file_name, "rb") 會找不到 # 目標: 再不用動服務器的情況下, 自己設定PATH ``` ```python # 註一: 搜索路徑 In [4]: import sys In [5]: sys.path Out[5]: ['/Library/Frameworks/Python.framework/Versions/3.7/bin', ... 'packages/IPython/extensions', '/Users/haha/.ipython'] # 註二 getattr(object, name[, default]) # 返回一個對象屬性 In [1]: class Test(object): ...: a = 100 ...: def haha(self): ...: print("hahhaa") ...: In [2]: test = Test() In [4]: godjj = getattr(test, "a") In [5]: godjj Out[5]: 100 In [6]: toyz = getattr(test, "haha") In [7]: toyz Out[7]: <bound method Test.haha of <__main__.Test object at 0x1111fe208>> In [8]: toyz() hahhaa # 第三個參數是預設, 找無時指向 In [9]: zero = getattr(test, "c") AttributeError: 'Test' object has no attribute 'c' In [10]: zero = getattr(test, "c", "bonjo") In [11]: zero Out[11]: 'bonjo' ``` ### 配置 > - 服務器不可能配合本地資料配置而改服務器的code > - 故服務器可以開放配置, 讓本地自行設置 #### 本地設定一個路徑配置 `$ werServer.conf` ``` { "static_path" : "./static" "dynamic_path" : "./dynamic" } ``` > - 基本上cnf或conf結尾的都是配置文件 > - 裡面的東西全部都是字符串 > - 當然可以不要寫成{}的形式而告訴服務器第一行是什麼,第二行是什麼, 但這樣很麻煩也容易忘記第一行寫什麼, 直接用字典的形式來調用比較方便 ```python #coding=utf-8 import re import sys from socket import * from multiprocessing import Process class WSGIServer(object): # 導入路徑 def __init__(self, port, app, static_path): self.t_socket = socket(AF_INET, SOCK_STREAM) self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.t_socket.bind(("", port)) self.t_socket.listen(128) self.application = app self.static_path = static_path def server_client(self, new_socket): request = new_socket.recv(1024).decode("utf-8") request_lines = request.splitlines() print(request_lines) file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) print("-"*10) print(ret) if ret: file_name = ret.group(1) if file_name == "/": file_name = "/index.html" if not file_name.endswith(".py"): try: # 使用 f = open(self.static_path + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) else: env = {} env["PATH_INFO"] = file_name body = self.application(env, self.start_response) header = "HTTP/1.1 %s\r\n" % self.status for temp in self.headers: header += "%s:%s\r\n" %(temp[0], temp[1]) header += "\r\n" response = header + body new_socket.send(response.encode("utf-8")) new_socket.close() def start_response(self, status, headers): self.status = status self.headers = headers def run_forever(self): while True: new_socket, client_addr = self.t_socket.accept() p = Process(target=self.server_client, args=(new_socket,)) p.start() new_socket.close() self.t_socket.close() def main(): if len(sys.argv) == 3: try: port = int(sys.argv[1]) frame_app_name = sys.argv[2] except Exception as ret: print("端口輸入錯誤") return else: print("請照以下格式") print("python3 webserver.py port, 框架:函數") return ret = re.match(r"([^:]+):(.+)", frame_app_name) if ret: frame_name = ret.group(1) app_name = ret.group(2) else: print("請照以下格式") print("python3 webserver.py port, 框架:函數") return # 打開配置 with open("./webServer.conf") as f: # 因為cnf裡面都是str, # 使用eval()來執行裡面的str # 註 conf_info = eval(f.read()) # 這裡改用字典調用值 sys.path.append(conf_info["dynamic_path"]) frame = __import__(frame_name) app = getattr(frame, app_name) # 傳參 wsgi_server = WSGIServer(port, app, conf_info["static_path"]) wsgi_server.run_forever() if __name__ == "__main__": main() ``` ```python In [1]: a = """ ...: { ...: "haha" : 123, ...: "heyhey" : 456 ...: } ...: """ # a 是一個str In [2]: a Out[2]: '\n{\n "haha" : 123,\n "heyhey" : 456\n}\n' # eval可以執行字符串表達式 # 白話就是脫掉"" # # 將字符串轉成相應值 In [3]: b = eval(a) In [4]: b Out[4]: {'haha': 123, 'heyhey': 456} # 計算 In [5]: eval( '3 * 7' ) Out[5]: 21 ``` ### Shell腳本 > 執行服務器時, 都需要輸入一長串 > `$ python3 xx.py port 框架:函數` > - Shell腳本用法, 將Linux命令放到一個文件中, 執行該文件, 裡面的命令會由上往下通通執行, 且可以if, for等 `$ vi run.sh` ``` python3 webServer.py 5566 mini_frame:application ``` ``` # 此時還無法執行, 因為沒有執行權限 $ ls -lh -rw-r--r-- 1 ooxx staff 49B Sep 15 17:57 run.sh # 給權限 $ chmod +x run.sh $ ls -lh -rwxr-xr-x 1 ooxx staff 49B Sep 15 17:57 run.sh ``` > 最後寫使用說明書txt `$ vi readme.txt` ``` 使用方法: ./run.sh 或 python3 xx.py port 框架:函數 ``` ``` $ tree . ├── dynamic │   ├── __init__.py │   └── mini_frame.py ├── readme.txt ├── run.sh ├── static │   ├── css │   └── js ├── templates │   └── index.html ├── webServer.conf └── webServer.py ``` ## 框架路由 > 根據請求不同而調用不同 ```python def index(): with open("./templates/index.html") as f: return f.read() def center(): with open("./templates/center.html") as f: return f.read() # 創建一個字典 # 解釋器由上到下, # 故要先讓解釋器知道index跟center的存在 # 否則找不到會報錯鬧脾氣 URL_INFO_DICT = { "/index.py" : index, "/center.py": center } def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] # 如果有一百個, 那就要判斷一百次? """ if file_name == "/index.py": return index() else: return "GodJJJJJ 東踏曲覓" """ # 取得value # index = U_I_D["/index.html"] func = URL_INFO_DICT[file_name] # 執行value, 並將結果返回 return func() # 問題: 能否自動新增字典內容? ``` ```python # 利用裝飾器在調用之前就執行的特點 URL_INFO_DICT = dict() def route(url): def set_func(func): # url > 傳進來的參數("/index.py") # func > 被裝飾的變量(index) URL_INFO_DICT[url] = func def call_func(*args, **kwargs): return func(*args, **kwargs) return call_func return set_func @route("/index.py") def index(): with open("./templates/index.html") as f: return f.read() @route("/center.py") def center(): with open("./templates/center.html") as f: return f.read() """ URL_INFO_DICT = { "/index.py" : index, "/center.py": center } """ def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] try: #func = URL_INFO_DICT[file_name] #return func() return URL_INFO_DICT[file_name]() except Exception as ret: return "異常: %s" %str(ret) ``` ## 偽靜態 #### 靜態 - `網域/xxx.html` - 靜態URL開啟快速(直接使用), 對[SEO](https://zh.wikipedia.org/wiki/%E6%90%9C%E5%B0%8B%E5%BC%95%E6%93%8E%E6%9C%80%E4%BD%B3%E5%8C%96)有正面影響 #### 動態 - `域名/haha.php?id=1` `域名/haha.asp?id=1`... - 修改頁面方便, 但開啟速度比較慢(須經過運算) #### 偽靜態 `$ vi webserver.py` ```python # 把html導入框架 # if not file_name.endswith(".py"): if not file_name.endswith(".html"): try: f = open(self.static_path + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "file not found" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close response = "HTTP/1.1 200 OK\r\n" response += "\r\n" new_socket.send(response.encode("utf-8")) new_socket.send(html_content) else: env = {} env["PATH_INFO"] = file_name body = self.application(env, self.start_response) header = "HTTP/1.1 %s\r\n" % self.status for temp in self.headers: header += "%s:%s\r\n" %(temp[0], temp[1]) header += "\r\n" response = header + body new_socket.send(response.encode("utf-8")) ``` ```python # url接收參數改成html # @route("/index.py") @route("/index.html") def index(): with open("./templates/index.html") as f: return f.read() # @route("/center.py") @route("/center.html") def center(): with open("./templates/center.html") as f: return f.read() ``` ## 導入SQL數據 ```python import re from pymysql import connect URL_INFO_DICT = dict() def route(url): def set_func(func): URL_INFO_DICT[url] = func def call_func(*args, **kwargs): return func(*args, **kwargs) return call_func return set_func @route("/index.html") def index(): with open("./templates/index.html") as f: content = f.read() # SQL操作 conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select * from info;") stock_info = cs.fetchall() cs.close() conn.close() # 在html內容裡寫{%content%}, 然後在這裡把他換掉 # sub(搜,改,搜索內容) content = re.sub(r"\{%content%\}", str(stock_info), content) return content @route("/center.html") def center(): with open("./templates/center.html") as f: return f.read() def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] try: return URL_INFO_DICT[file_name]() except Exception as ret: return "異常: %s" %str(ret) # 模板html基本上是固定的 # 但是SQL數據是會變動的, # 故兩者應該分開 ``` #### 替換模板 ```python @route("/index.html") def index(): # 打開模板 with open("./templates/index.html") as f: content = f.read() # 查詢數據 conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select * from info;") stock_info = cs.fetchall() cs.close() conn.close() # 組織數據 # html的表格是以 # <table>表格 # <tr>列 # <td>行 # 組成 # 數據總共有八種, 故tr八列 tr=""" <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> """ html = "" html += "<table>" # 依序套入數據 for lines in stock_info: html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7]) html += "</table>" # 返回數據 content = re.sub(r"\{%content%\}", html, content) return content ``` #### 支持正則 > - url往往會帶參數, 如/add/001.html > - 如果沒有正則, 就須每個參數都手動寫入對應的函數進字典 > - 支援正則之後, /add/001跟/add/002 都以/add/\d+判斷即可 ```python import re from pymysql import connect URL_INFO_DICT = dict() def route(url): def set_func(func): URL_INFO_DICT[url] = func def call_func(*args, **kwargs): return func(*args, **kwargs) return call_func return set_func @route(r"/index.html") def index(): with open("./templates/index.html") as f: content = f.read() conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select * from info;") stock_info = cs.fetchall() cs.close() conn.close() tr=""" <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> """ html = "" html += "<table>" for lines in stock_info: #print(lines) html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7]) html += "</table>" content = re.sub(r"\{%content%\}", html, content) return content @route(r"/center.html") def center(): with open("./templates/center.html") as f: content = f.read() conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select i.code, i.short, i.chg, i.turnover, i.price, i.highs, f.note_info from info as i inner join focus as f on i.id = f.info_id;") stock_info = cs.fetchall() cs.close() conn.close() tr=""" <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> """ html = "" html += "<table>" for lines in stock_info: #print(lines) html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6]) html += "</table>" content = re.sub(r"\{%content%\}", html, content) return content # 將所有url參數都改成正則來當成match第一個參數 @route(r"/add/\d+\.html") def add_focus(): return "haha" def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] try: # return URL_INFO_DICT[file_name]() # 註一 for url, func in URL_INFO_DICT.items(): # { # r"/index.html":index, # r"/center.html":center, # r"/add/\d+\.html":add_focus # } # 註二 ret = re.match(url, file_name) if ret: return func() # 這個else放for循環裡面的話, 只會循環第一圈就return出來了.. # 故應該放外面 else: return "沒有對應的函數(%s)"%file_name except Exception as ret: return "異常: %s" %str(ret) ``` ```python # 註一: dict.items() # 以列表返回可遍歷的元祖 In [1]: a = {"godjj":1, "toyz":2} In [2]: a Out[2]: {'godjj': 1, 'toyz': 2} In [3]: a.items() Out[3]: dict_items([('godjj', 1), ('toyz', 2)]) # In [5]: for b,c in a.items(): ...: print(b) ...: print("----") ...: print(c) ...: godjj ---- 1 toyz ---- 2 # 註二: match第一參數是規則, 第二參數才是配對客體 In [46]: a = "123" In [47]: b = r"\d" In [48]: re.match(a,b) In [49]: re.match(b,a) Out[49]: <re.Match object; span=(0, 1), match='1'> ``` ### 增刪數據 ```python import re from pymysql import connect URL_INFO_DICT = dict() def route(url): def set_func(func): URL_INFO_DICT[url] = func def call_func(*args, **kwargs): return func(*args, **kwargs) return call_func return set_func # 記得開行參 @route(r"/index.html") def index(ret): with open("./templates/index.html") as f: content = f.read() conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select * from info;") stock_info = cs.fetchall() cs.close() conn.close() tr=""" <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> """ html = "" html += "<table>" for lines in stock_info: #print(lines) html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7]) html += "</table>" content = re.sub(r"\{%content%\}", html, content) return content # 記得開行參 @route(r"/center.html") def center(ret): with open("./templates/center.html") as f: content = f.read() conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select i.code, i.short, i.chg, i.turnover, i.price, i.highs, f.note_info from info as i inner join focus as f on i.id = f.info_id;") stock_info = cs.fetchall() cs.close() conn.close() tr=""" <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> """ html = "" html += "<table>" for lines in stock_info: #print(lines) html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6]) html += "</table>" content = re.sub(r"\{%content%\}", html, content) return content # 參數的地方(), 要拿來使用 @route(r"/add/(\d+)\.html") def add_focus(ret): # 1.取得參數 stock_code = ret.group(1) # 2.判斷是否有這檔股票 conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() # 避免SQL注入 # sql = """select * from info where code = '%s';"""%stock_code sql = """select * from info where code = %s;""" cs.execute(sql, (stock_code,)) # 取一次就沒東西時, 表示根本沒有這檔股票 # 如果沒有就關起來返回沒有 if not cs.fetchone(): cs.close() conn.close() return "沒有這檔股票(%s)"%stock_code # 3.判斷是否已經關注過 sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;""" cs.execute(sql, (stock_code,)) if cs.fetchone(): cs.close() conn.close() return "這檔股票(%s)已關注過"%stock_code # 4. 新增關注 # 複習: # 一般新增 # insert into 表 value (值),(值),..; # 部分插入 # insert into 表(列,..) value (值,..),(值,..),..; # 子查詢插入(沒有value!!!) # insert into 表(列,..) select ...; sql = """insert into focus(info_id) select id from info where code = %s;""" cs.execute(sql, [stock_code]) # 修改數據記得提交 conn.commit() cs.close() conn.close() return "關注成功(%s)"% stock_code @route(r"/del/(\d+)\.html") def del_focus(ret): # 1.取得參數 stock_code = ret.group(1) # 2.判斷是否有這檔股票 conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() # 避免SQL注入 # sql = """select * from info where code = '%s';"""%stock_code sql = """select * from info where code = %s;""" cs.execute(sql, (stock_code,)) if not cs.fetchone(): cs.close() conn.close() return "沒有這檔股票(%s)"%stock_code # 3.判斷是否有關注過 sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;""" cs.execute(sql, (stock_code,)) # 如果沒關注過就沒辦法刪, 故返回 if not cs.fetchone(): cs.close() conn.close() return "這檔股票(%s)尚未關注過"%stock_code # 4. 新增關注 # 複習: # 物理刪除 # delete from 表名 where 條件; #sql = """insert into focus(info_id) select id from info where code = %s;""" sql = """delete from focus where info_id = (select id from info where code = %s);""" cs.execute(sql, [stock_code]) # 修改數據記得提交 conn.commit() cs.close() conn.close() return "取消關注成功(%s)"% stock_code @route(r"/update/(\d+)\.html") def show_update_info(ret): with open("./templates/update.html") as f: content = f.read() stock_code = ret.group(1) conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() # 1. 確認股票有無(略) # 2. 確認有無關注(略) sql = """select note_info from focus where info_id = (select id from info where code = %s);""" cs.execute(sql, (stock_code,)) # 3. 確認note_info有無 if cs.fetchone(): # 因為if判斷把cs存放的內容取掉了, 故再執行一次sql語句 cs.execute(sql, (stock_code,)) stock_info = cs.fetchone() note_info = stock_info[0] # 沒有備註的話, 讓備註顯示空白 else: note_info = " " cs.close() conn.close() content = re.sub(r"\{%code%\}", stock_code, content) content = re.sub(r"\{%note_indfo%\}", note_info, content) return content @route(r"/update/(\d+)/(.*)\.html") def update_info(ret): stock_code = ret.group(1) comment = ret.group(2) conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() # 確認股票有無(略) # 確認有無關注(略) # 修改 # 複習: # 修改數據 # update 表名 set 列1=值1,列2=值2,... where 條件; sql = """ update focus set note_info=%s where info_id = (select id from info where code =%s);""" cs.execute(sql, (comment, stock_code,)) conn.commit() cs.close() conn.close() return "修改成功" def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] try: for url, func in URL_INFO_DICT.items(): ret = re.match(url, file_name) if ret: # 把url傳進去, 才能取得參數來用 return func(ret) else: return "沒有對應的函數(%s)"%file_name except Exception as ret: return "異常: %s" %str(ret) ``` ### URL編碼解碼 > - URL傳進來的參數遇到非英文字母會編碼 > - 故傳到框架時應該將其解碼後再傳入SQL ```python In [1]: import urllib.parse # URL編碼 In [2]: urllib.parse.quote("Toyz喊在") Out[2]: 'Toyz%E5%96%8A%E5%9C%A8' # URL解碼 In [4]: urllib.parse.unquote("Toyz%E5%96%8A%E5%9C%A8") Out[4]: 'Toyz喊在' ``` ```python import re # 導入模塊 import urllib.parse from pymysql import connect @route(r"/update/(\d+)/(.*)\.html") def update_info(ret): stock_code = ret.group(1) comment = ret.group(2) # 解碼 comment = urllib.parse.unquote(comment) conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() sql = """ update focus set note_info=%s where info_id = (select id from info where code =%s);""" cs.execute(sql, (comment, stock_code,)) conn.commit() cs.close() conn.close() return "修改成功" ``` > - 問題: SQL不論幾個空格, 瀏覽器都只顯示一個? > - 瀏覽器遇到多個空格皆案一個空格來解析 > - 解法: 前端解決 > &nbps; ## Log日誌 > - 日誌級別: > - DEBUG: 診斷問題紀錄 > - INFO: 一般資訊 > - WARNING: 低錯誤, 不影響運行 > - ERROR: 中錯誤, 影響部分運行 > - CRITICAL: 高錯誤, 無法運行 ### 顯示log在Terminal上 ```python import logging # level: 開啟級別, 級別以下的不會紀錄 # format: 格式 logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') # 開啟功能 logging.debug('loggging debug message') logging.info('loggging info message') logging.warning('loggging a warning message') logging.error('loggging error message') logging.critical('loggging critical message') ``` ### 記錄在文件中 ```python import logging # 在參數設定檔名跟寫入方式 logging.basicConfig(level=logging.WARNING, filename='./log.txt', filemode='a', format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') logging.debug('loggging debug message') logging.info('loggging info message') logging.warning('loggging a warning message') logging.error('loggging error message') logging.critical('loggging critical message') ``` ### 我全都要 ```python import logging # 創建logger logger = logging.getLogger() logger.setLevel(logging.INFO) # 總等級 # FileHandler用於寫入文件 logfile = './log.txt' fh = logging.FileHandler(logfile, mode='a') fh.setLevel(logging.DEBUG) # StreamHandler用於輸出控制台 ch = logging.StreamHandler() ch.setLevel(logging.WARNING) # Formatter用於設定格式 formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") fh.setFormatter(formatter) ch.setFormatter(formatter) # addHandler添加 logger.addHandler(fh) logger.addHandler(ch) logging.debug('loggging debug message') logging.info('loggging info message') logging.warning('loggging a warning message') logging.error('loggging error message') logging.critical('loggging critical message') # 如果總等級開的比其他handler來得高 # 那就不會顯示低於總等級的等級 # 例如logger開到warning而handler開到debug # 那麼handler的debug不會顯示, 只會顯示warning以上 ``` > - 格式參數 > - %(levelno)s: 日誌級別數值 > - %(levelname)s: 日誌級別名稱 > - %(pathname)s: 當前執行路徑,亦即sys.argv[0] > - %(filename)s: 程序名 > - %(funcName)s: 函數名 > - %(lineno)d: 行號 > - %(asctime)s: 時間 > - %(thread)d: 線程ID > - %(threadName)s: 線程名 > - %(process)d: 進程ID > - %(message)s: 日誌信息 ### 實作 ```python import re import urllib.parse # 導入 import logging from pymysql import connect URL_INFO_DICT = dict() def route(url): def set_func(func): URL_INFO_DICT[url] = func def call_func(*args, **kwargs): return func(*args, **kwargs) return call_func return set_func @route(r"/index.html") def index(ret): with open("./templates/index.html") as f: content = f.read() conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select * from info;") stock_info = cs.fetchall() cs.close() conn.close() tr=""" <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> """ html = "" html += "<table>" for lines in stock_info: html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7]) html += "</table>" content = re.sub(r"\{%content%\}", html, content) return content @route(r"/center.html") def center(ret): with open("./templates/center.html") as f: content = f.read() conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() cs.execute("select i.code, i.short, i.chg, i.turnover, i.price, i.highs, f.note_info from info as i inner join focus as f on i.id = f.info_id;") stock_info = cs.fetchall() cs.close() conn.close() tr=""" <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> """ html = "" html += "<table>" for lines in stock_info: html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6]) html += "</table>" content = re.sub(r"\{%content%\}", html, content) return content @route(r"/add/(\d+)\.html") def add_focus(ret): stock_code = ret.group(1) conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() sql = """select * from info where code = %s;""" cs.execute(sql, (stock_code,)) if not cs.fetchone(): cs.close() conn.close() return "沒有這檔股票(%s)"%stock_code sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;""" cs.execute(sql, (stock_code,)) if cs.fetchone(): cs.close() conn.close() return "這檔股票(%s)已關注過"%stock_code sql = """insert into focus(info_id) select id from info where code = %s;""" cs.execute(sql, [stock_code]) conn.commit() cs.close() conn.close() return "關注成功(%s)"% stock_code @route(r"/del/(\d+)\.html") def del_focus(ret): stock_code = ret.group(1) conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() sql = """select * from info where code = %s;""" cs.execute(sql, (stock_code,)) if not cs.fetchone(): cs.close() conn.close() return "沒有這檔股票(%s)"%stock_code sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;""" cs.execute(sql, (stock_code,)) if not cs.fetchone(): cs.close() conn.close() return "這檔股票(%s)尚未關注過"%stock_code sql = """delete from focus where info_id = (select id from info where code = %s);""" cs.execute(sql, [stock_code]) conn.commit() cs.close() conn.close() return "取消關注成功(%s)"% stock_code @route(r"/update/(\d+)\.html") def show_update_info(ret): with open("./templates/update.html") as f: content = f.read() stock_code = ret.group(1) conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() sql = """select note_info from focus where info_id = (select id from info where code = %s);""" cs.execute(sql, (stock_code,)) if cs.fetchone(): cs.execute(sql, (stock_code,)) stock_info = cs.fetchone() note_info = stock_info[0] else: note_info = " " cs.close() conn.close() content = re.sub(r"\{%code%\}", stock_code, content) content = re.sub(r"\{%note_indfo%\}", note_info, content) return content @route(r"/update/(\d+)/(.*)\.html") def update_info(ret): stock_code = ret.group(1) comment = ret.group(2) comment = urllib.parse.unquote(comment) conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8") cs = conn.cursor() sql = """ update focus set note_info=%s where info_id = (select id from info where code =%s);""" cs.execute(sql, (comment, stock_code,)) conn.commit() cs.close() conn.close() return "修改成功" def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env["PATH_INFO"] # 格式 logging.basicConfig(level=logging.INFO, filename='./log.txt', filemode='a', format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') # 寫 logging.info('訪問:%s'%file_name) try: for url, func in URL_INFO_DICT.items(): ret = re.match(url, file_name) if ret: return func(ret) else: # 寫 logging.warning('沒有對應的函數(%s)'%file_name) return "沒有對應的函數(%s)"%file_name except Exception as ret: return "異常: %s" %str(ret) ``` ### 主從 > - 修改處訪問主服務器 > - 查詢相關訪問從服務器 ## Djongo 改Type邏輯 > Djongo為了省略常常打SQL語句, > 故利用元類改了輸出的類 > 並將所有參數乾坤大挪移成SQL語句, > 最終達成只需輸入參數及調用函數, 即可完成SQL功能 ```python class XX(type): # name = "User" # bases = () # attrs = {"uid":(..), "password":(..)} def __new__(cls, name, bases, attrs): mappings = dict() # print(attrs) # { # '__module__': '__main__', # '__qualname__': 'User', # 'uid': ('uid', 'int unsigned'), # 'password': ('password', 'varchar(30)') # } # dirt.items(): [(k:v), (k:v)] for k,v in attrs.items(): # isintance 判斷對象是否為同類型 # isintance(object, classinfo) if isinstance(v, tuple): mappings[k] = v # dirt.keys()返回 ([k1,k2]) for k in mappings.keys(): attrs.pop(k) attrs["__mappings__"] = mappings attrs["__tables__"] = name # print(attrs) # { # '__module__': '__main__', # '__qualname__': 'User', # '__mappings__': { # 'uid': ('uid, 'int unsigned'), # 'password': ('password', 'varchar(30)') # }, # '__tables__': 'User' # } return type.__new__(cls, name, bases, attrs) class User(metaclass=XX): uid = ("uid", "int unsigned") password = ("password", "varchar(30)") def __init__(self, **kwargs): # print(kwargs) # {'uid': 12345, 'password': 'kkbox'} for name, value in kwargs.items(): # self.name = value # 這只會保存self.name = "kkbox" # 而不是保存想保存的屬性 uid=12345a, password = "kkbox" # setattr用來設置數性質, 且屬性不一定要存在 # setattr(object, name, value) # 註 setattr(self, name, value) # self.uid = 12345 # self.password = "kkbox" def save(self): field = [] arg = [] for k,v in self.__mappings__.items(): # k = "uid", "password" # v = ("int", "int unsigned"), ("password", "varchar(30)") field.append(v[0]) # "int", "password" arg.append(getattr(self, k, None)) # 12345, "kkbox" #print(field) #print(arg) # insert into 表名(列,...) values (值,...); sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join([str(i) for i in arg])) print(sql) u = User(uid=12345, password="kkbox") u.save() # insert into User(int,password) values(12345,kkbox) # 此時有個bug就是kkbox沒有引號 ``` ```python class XX(type): def __new__(cls, name, bases, attrs): mappings = dict() for k,v in attrs.items(): if isinstance(v, tuple): mappings[k] = v for k in mappings.keys(): attrs.pop(k) attrs["__mappings__"] = mappings attrs["__tables__"] = name return type.__new__(cls, name, bases, attrs) class User(metaclass=XX): uid = ("int", "int unsigned") password = ("password", "varchar(30)") def __init__(self, **kwargs): for name, value in kwargs.items(): setattr(self, name, value) def save(self): field = [] arg = [] for k,v in self.__mappings__.items(): field.append(v[0]) arg.append(getattr(self, k, None)) # insert into 表名(列,...) values (值,...); # sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join([str(i) for i in arg])) # print(sql) arg_temp = [] for i in arg: if isinstance(i ,int): arg_temp.append(str(i)) elif isinstance(i, str): arg_temp.append("""'%s'"""%i) sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join(arg_temp)) print(sql) u = User(uid=12345, password="kkbox") u.save() ``` ```python # setattr # 已存在的屬性會被重設 In [5]: class A: ...: a = 100 ...: In [6]: b = A() In [8]: getattr(b,"a") Out[8]: 100 In [9]: setattr(b, "a", 200) In [10]: b.a Out[10]: 200 In [11]: getattr(b, "a") Out[11]: 200 # 不存在的屬性會被創建 In [12]: class C(): ...: pass ...: In [13]: c = C() In [14]: setattr(c, "d", "godjj") In [15]: c.d Out[15]: 'godjj' # join # 用指定的元素將序列的str串起來 In [1]: a = ["int", "password"] In [2]: ",".join(a) Out[2]: 'int,password' In [3]: "大中天".join(a) Out[3]: 'int大中天password' ``` ```python # 最後整理 class XX(type): def __new__(cls, name, bases, attrs): mappings = dict() for k,v in attrs.items(): if isinstance(v, tuple): mappings[k] = v for k in mappings.keys(): attrs.pop(k) attrs["__mappings__"] = mappings attrs["__tables__"] = name return type.__new__(cls, name, bases, attrs) class Model(metaclass=XX): def __init__(self, **kwargs): for name, value in kwargs.items(): setattr(self, name, value) def save(self): field = [] arg = [] for k,v in self.__mappings__.items(): field.append(v[0]) arg.append(getattr(self, k, None)) arg_temp = [] for i in arg: if isinstance(i ,int): arg_temp.append(str(i)) elif isinstance(i, str): arg_temp.append("""'%s'"""%i) sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join(arg_temp)) # 12345,'kkbox' print(sql) ############################################## # 經過乾坤大挪移後,只要創一個函數就能創表,傳參就能寫SQL # 創建表 class User(Model): uid = ("int", "int unsigned") password = ("password", "varchar(30)") class Haha(Model): uid = ("Uid", "int unsigned") email = ("Email", "varchar(30)") phone = ("Phone", "int unsigned") # 傳參數設值 u = User(uid=12345, password="kkbox") # 調用功能 u.save() h = Haha(uid=22222, phone=9999999, email="haha@gmail.com") h.save() ```