網路編成3
===
> 複習:
> - 服務器: 轉發, 合併header, body數據
> - 框架:
```python
# 最基本要有這個可以調
def application(env, start_response):
# 調用響應頭要傳狀態碼跟有的沒的資訊回去
start_response('200 OK', [('DGG', 'SGG')])
# 最後的結果要回傳回Body
return "GodJJJJJ"
```
### 第一版-面向對象服務器
```python
#coding=utf-8
import re
from socket import *
from multiprocessing import Process
class WSGIServer(object):
def __init__(self):
self.t_socket = socket(AF_INET, SOCK_STREAM)
self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.t_socket.bind(("", 5566))
self.t_socket.listen(128)
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
try:
f = open("./html" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
new_socket.close()
def run_forever(self):
while True:
new_socket, client_addr = self.t_socket.accept()
p = Process(target=self.server_client, args=(new_socket,))
p.start()
new_socket.close()
self.t_socket.close()
def main():
wsgi_server = WSGIServer()
wsgi_server.run_forever()
if __name__ == "__main__":
main()
```
### 第二版-動態請求
```python
import re
import time
from socket import *
from multiprocessing import Process
class WSGIServer(object):
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
# 改這, 不是py結尾就開html
if not file_name.endswith(".py"):
try:
f = open("./html" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
# 是py結尾, 調用時間模塊
else:
header = "HTTP/1.1 200 OK\r\n"
header += "\r\n"
body = "haha, %s" % time.ctime()
response = header + body
new_socket.send(response.encode("utf-8"))
new_socket.close()
```
### 第三版-服務器與動態處理解藕
```python
import re
#import time
import mini_frame
from socket import *
from multiprocessing import Process
class WSGIServer(object):
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
if not file_name.endswith(".py"):
try:
f = open("./html" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
# 把動態模塊出來
else:
header = "HTTP/1.1 200 OK\r\n"
header += "\r\n"
# 版本一: 直接在服務器實現
# body = "haha,%s" %time.ctime()
# 版本二: 解耦第一版
# 問題: mini_frame每多一個功能, 這裡就要多寫一次判斷
# if file_name == "/test.py":
# body = mini_frame.test()
# elif file_name == "/test2.py":
# body = mini_frame.test2()
# elif ...
# 把那些判斷的拿去application判斷, 調用application就好
body = mini_frame.application(file_name)
response = header + body
new_socket.send(response.encode("utf-8"))
new_socket.close()
```
`vi mini_frame.py`
```python
import time
def test():
return "AlimamaDO, %s" % time.ctime()
def test2():
return "test2, %s" % time.ctime()
def test3():
return "test3, %s" % time.ctime()
def application(file_name):
if file_name == "/test.py":
return test()
elif file_name == "/test2.py":
return test2()
elif file_name == "/test3.py":
return test3()
else:
return "Not Found"
```
### 第四版-WSGI
> - 實際上服務器不會用自己的, 如此的話自己寫框架就不能被導入
> - 框架必須遵循WSGI, 才能被服務器正常調用
> - 比較有名的服務器如 nginx, apache
```python
import re
import mini_frame
from socket import *
from multiprocessing import Process
class WSGIServer(object):
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
if not file_name.endswith(".py"):
try:
f = open("./html" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
else:
env = {}
body = mini_frame.application(env, self.start_response)
header = "HTTP/1.1 %s\r\n" % self.status
for temp in self.headers:
header += "%s:%s\r\n" %(temp[0], temp[1])
header += "\r\n"
response = header + body
new_socket.send(response.encode("utf-8"))
new_socket.close()
def start_response(self, status, headers):
self.status = status
# 服務器可能會寫自己是幾版
self.headers = [("Server", "mini_frame v87")]
self.headers = headers
```
`vi mini_frame`
```python
# 沒有設定編碼的話, 中文會亂碼
# def application(env, start_response):
# start_response('200 OK', [('xxoo', 'ooxx')])
# return "GodJJJJJ 東踏曲覓"
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
return "GodJJJJJ 東踏曲覓"
```
```python
# 傳env參數
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
if not file_name.endswith(".py"):
try:
f = open("./html" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
else:
# 這裏
env = {}
env["PATH_INFO"] = file_name
# {"PATH_INFO" : file_name}
body = mini_frame.application(env, self.start_response)
header = "HTTP/1.1 %s\r\n" % self.status
for temp in self.headers:
header += "%s:%s\r\n" %(temp[0], temp[1])
header += "\r\n"
response = header + body
new_socket.send(response.encode("utf-8"))
new_socket.close()
```
`$ vi mini_frame.py`
```python
def test1():
return "test1"
def test2():
return "test2"
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
if file_name == "/test1.py":
return test1()
elif file_name == "/test2.py":
return test2()
else:
return "GodJJJJJ 東踏曲覓"
# 問題: 框架可能有好幾個模塊, 而不只有一個mini_frame.py
# 一旦模塊多了可能會很亂 > 裝成一個資料夾
```
### 模板文件
```
$ tree
.
├── html
│ └── index.html
└── mini_frame.py
└── webServer.py
# 一般會有三個資料夾: dynamic, static, templates
# templates 裝模板
# dynamic 裝動態模塊(py, php)
# static 裝靜態模塊(html, js, jpg..)
$ tree
.
├── dynamic
│ ├── __init__.py
│ └── mini_frame.py
├── static
│ ├── css
│ └── js
├── templates
│ └── index.html
└── webServer.py
```
`vi webServer.py`
```python
# 改導入
import dynamic.mini_frame
class WSGIServer(object):
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
if not file_name.endswith(".py"):
try:
# 去靜態資源取
f = open("./static" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
else:
env = {}
env["PATH_INFO"] = file_name
# 改導入
body = dynamic.mini_frame.application(env, self.start_response)
header = "HTTP/1.1 %s\r\n" % self.status
for temp in self.headers:
header += "%s:%s\r\n" %(temp[0], temp[1])
header += "\r\n"
response = header + body
new_socket.send(response.encode("utf-8"))
```
`vi dynamic/mini_frame.py`
```python
# 前後端解藕
# def index():
# return """
# <html>
# <head>
# <title>toyz</title>
# </head>
# <body>
# <h1>GodJJ</h1>
# </body>
# </html>
# """
def index():
# with open("../templates/index.html") as f:
# 註一
with open("./templates/index.html") as f:
content = f.read()
return content
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
if file_name == "/index.py":
return index()
else:
return "GodJJJJJ 東踏曲覓"
```
```python
# 註一之一
# open的路徑是以啟用端來算的
$ tree
.
├── dynamic
│ ├── __init__.py
│ └── mini_frame.py
├── static
│ ├── css
│ └── js
├── templates
│ └── index.html
└── webServer.py
# 啟用端是webServer , 故是當前路徑下的templates(./templates)
# 註一之二
# with
#
# 傳統開檔案會有兩個問題
# 1. 開啟的時候有問題
# 2. 忘記關
file = open("1.txt")
data = file.read()
file.close()
# 用try就能解決, 但冗長
f = open("1.txt", "rb")
try:
data = file.read()
finally:
f.close()
# with 開始時會調用__enter__, 結束時會調用__end__釋放資源
with open('file.txt', 'rb') as f:
data = f.read()
---
class Open(object):
def __init__(self, file_name):
self.file_name = file_name
def __enter__(self):
self.open_file = open(self.file_name, "rb")
return self.open_file
def __exit__(self, type, value, traceback):
self.open_file.close()
```
> - 小結:
> - 目前完成瀏覽器訪問服務器, 服務器把static丟給框架, 框架把headbody回傳, 服務器組裝後傳給瀏覽器, 呈現畫面
> - 問題:
> 1. 還是沒解決import指定框架問題
> 2. 是否能指定端口, 而非服務器固定的端口
```
# 目的: 給程序傳參數
# $ python3 webserver.py 5566 mini_frame:applocation
```
`$ vi test.py`
```
import sys
print(sys.argv)
```
```
$ python3 test.py haha
['test.py', 'haha']
$ python3 test.py haha 你 好
['test.py', 'haha', '你', '好']
$ python3 test.py 9898 mini_frame:application
['test.py', '9898', 'mini_frame:application']
# sys.argv 會返回[], 每個參數都變成str返回
```
```python
# 搞定端口
import re
import dynamic.mini_frame
import sys
from socket import *
from multiprocessing import Process
class WSGIServer(object):
# 接收參數
def __init__(self, port):
self.t_socket = socket(AF_INET, SOCK_STREAM)
self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# 使用參數
self.t_socket.bind(("", port))
self.t_socket.listen(128)
def main():
# 確認有輸入包含py有兩個參數
if len(sys.argv) == 2:
# 避免用戶不是輸入數字, try
try:
port = int(sys.argv[1])
except Exception as ret:
print("端口輸入錯誤")
return
else:
print("請照以下格式")
print("python3 webserver.py port")
return
# 把參數傳進init
wsgi_server = WSGIServer(port)
wsgi_server.run_forever()
```
```python
#coding=utf-8
import re
# 目的: 指定框架, 而非固定框架
# import dynamic.mini_frame
import sys
from socket import *
from multiprocessing import Process
class WSGIServer(object):
# 導入app
def __init__(self, port, app):
self.t_socket = socket(AF_INET, SOCK_STREAM)
self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.t_socket.bind(("", port))
self.t_socket.listen(128)
self.application = app
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
if not file_name.endswith(".py"):
try:
f = open("./static" + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
else:
env = {}
env["PATH_INFO"] = file_name
# {"PATH_INFO" : file_name}
# 最後調用函數
#body = dynamic.mini_frame.application(env, self.start_response)
body = self.application(env, self.start_response)
header = "HTTP/1.1 %s\r\n" % self.status
for temp in self.headers:
header += "%s:%s\r\n" %(temp[0], temp[1])
header += "\r\n"
response = header + body
new_socket.send(response.encode("utf-8"))
new_socket.close()
def start_response(self, status, headers):
self.status = status
self.headers = headers
def run_forever(self):
while True:
new_socket, client_addr = self.t_socket.accept()
p = Process(target=self.server_client, args=(new_socket,))
p.start()
new_socket.close()
self.t_socket.close()
def main():
if len(sys.argv) == 3:
try:
port = int(sys.argv[1])
frame_app_name = sys.argv[2] # mini_frame:application
except Exception as ret:
print("端口輸入錯誤")
return
else:
print("請照以下格式")
print("python3 webserver.py port, 框架:函數")
return
# 取出來
ret = re.match(r"([^:]+):(.+)", frame_app_name)
if ret:
frame_name = ret.group(1) # mini_frame
app_name = ret.group(2) # application
else:
print("請照以下格式")
print("python3 webserver.py port, 框架:函數")
return
# 註一
# 由於mini_frame在dynamic資料夾中
# 使用sys.path 搜索路徑, 並插入指定路徑,
# 否則從當前路徑往上找會找不到
sys.path.append("./dynamic")
# 導入
# import frame_name
# import 會把frame_name直接當成模塊名(frame_name)去找,
# 而不是他指向得值(mini_frame)
frame = __import__(frame_name) # 如此才得以找mini_frame
# 註二
app = getattr(frame, app_name) # 此時app指向
# ./dynamic/mini_frame
# 的application函數
wsgi_server = WSGIServer(port, app)
wsgi_server.run_forever()
if __name__ == "__main__":
main()
# 問題: f = open("./static" + file_name, "rb") 會找不到
# 目標: 再不用動服務器的情況下, 自己設定PATH
```
```python
# 註一: 搜索路徑
In [4]: import sys
In [5]: sys.path
Out[5]:
['/Library/Frameworks/Python.framework/Versions/3.7/bin',
...
'packages/IPython/extensions',
'/Users/haha/.ipython']
# 註二 getattr(object, name[, default])
# 返回一個對象屬性
In [1]: class Test(object):
...: a = 100
...: def haha(self):
...: print("hahhaa")
...:
In [2]: test = Test()
In [4]: godjj = getattr(test, "a")
In [5]: godjj
Out[5]: 100
In [6]: toyz = getattr(test, "haha")
In [7]: toyz
Out[7]: <bound method Test.haha of <__main__.Test object at 0x1111fe208>>
In [8]: toyz()
hahhaa
# 第三個參數是預設, 找無時指向
In [9]: zero = getattr(test, "c")
AttributeError: 'Test' object has no attribute 'c'
In [10]: zero = getattr(test, "c", "bonjo")
In [11]: zero
Out[11]: 'bonjo'
```
### 配置
> - 服務器不可能配合本地資料配置而改服務器的code
> - 故服務器可以開放配置, 讓本地自行設置
#### 本地設定一個路徑配置
`$ werServer.conf`
```
{
"static_path" : "./static"
"dynamic_path" : "./dynamic"
}
```
> - 基本上cnf或conf結尾的都是配置文件
> - 裡面的東西全部都是字符串
> - 當然可以不要寫成{}的形式而告訴服務器第一行是什麼,第二行是什麼, 但這樣很麻煩也容易忘記第一行寫什麼, 直接用字典的形式來調用比較方便
```python
#coding=utf-8
import re
import sys
from socket import *
from multiprocessing import Process
class WSGIServer(object):
# 導入路徑
def __init__(self, port, app, static_path):
self.t_socket = socket(AF_INET, SOCK_STREAM)
self.t_socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.t_socket.bind(("", port))
self.t_socket.listen(128)
self.application = app
self.static_path = static_path
def server_client(self, new_socket):
request = new_socket.recv(1024).decode("utf-8")
request_lines = request.splitlines()
print(request_lines)
file_name = ""
ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0])
print("-"*10)
print(ret)
if ret:
file_name = ret.group(1)
if file_name == "/":
file_name = "/index.html"
if not file_name.endswith(".py"):
try:
# 使用
f = open(self.static_path + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
else:
env = {}
env["PATH_INFO"] = file_name
body = self.application(env, self.start_response)
header = "HTTP/1.1 %s\r\n" % self.status
for temp in self.headers:
header += "%s:%s\r\n" %(temp[0], temp[1])
header += "\r\n"
response = header + body
new_socket.send(response.encode("utf-8"))
new_socket.close()
def start_response(self, status, headers):
self.status = status
self.headers = headers
def run_forever(self):
while True:
new_socket, client_addr = self.t_socket.accept()
p = Process(target=self.server_client, args=(new_socket,))
p.start()
new_socket.close()
self.t_socket.close()
def main():
if len(sys.argv) == 3:
try:
port = int(sys.argv[1])
frame_app_name = sys.argv[2]
except Exception as ret:
print("端口輸入錯誤")
return
else:
print("請照以下格式")
print("python3 webserver.py port, 框架:函數")
return
ret = re.match(r"([^:]+):(.+)", frame_app_name)
if ret:
frame_name = ret.group(1)
app_name = ret.group(2)
else:
print("請照以下格式")
print("python3 webserver.py port, 框架:函數")
return
# 打開配置
with open("./webServer.conf") as f:
# 因為cnf裡面都是str,
# 使用eval()來執行裡面的str
# 註
conf_info = eval(f.read())
# 這裡改用字典調用值
sys.path.append(conf_info["dynamic_path"])
frame = __import__(frame_name)
app = getattr(frame, app_name)
# 傳參
wsgi_server = WSGIServer(port, app, conf_info["static_path"])
wsgi_server.run_forever()
if __name__ == "__main__":
main()
```
```python
In [1]: a = """
...: {
...: "haha" : 123,
...: "heyhey" : 456
...: }
...: """
# a 是一個str
In [2]: a
Out[2]: '\n{\n "haha" : 123,\n "heyhey" : 456\n}\n'
# eval可以執行字符串表達式
# 白話就是脫掉""
#
# 將字符串轉成相應值
In [3]: b = eval(a)
In [4]: b
Out[4]: {'haha': 123, 'heyhey': 456}
# 計算
In [5]: eval( '3 * 7' )
Out[5]: 21
```
### Shell腳本
> 執行服務器時, 都需要輸入一長串
> `$ python3 xx.py port 框架:函數`
> - Shell腳本用法, 將Linux命令放到一個文件中, 執行該文件, 裡面的命令會由上往下通通執行, 且可以if, for等
`$ vi run.sh`
```
python3 webServer.py 5566 mini_frame:application
```
```
# 此時還無法執行, 因為沒有執行權限
$ ls -lh
-rw-r--r-- 1 ooxx staff 49B Sep 15 17:57 run.sh
# 給權限
$ chmod +x run.sh
$ ls -lh
-rwxr-xr-x 1 ooxx staff 49B Sep 15 17:57 run.sh
```
> 最後寫使用說明書txt
`$ vi readme.txt`
```
使用方法: ./run.sh 或 python3 xx.py port 框架:函數
```
```
$ tree
.
├── dynamic
│ ├── __init__.py
│ └── mini_frame.py
├── readme.txt
├── run.sh
├── static
│ ├── css
│ └── js
├── templates
│ └── index.html
├── webServer.conf
└── webServer.py
```
## 框架路由
> 根據請求不同而調用不同
```python
def index():
with open("./templates/index.html") as f:
return f.read()
def center():
with open("./templates/center.html") as f:
return f.read()
# 創建一個字典
# 解釋器由上到下,
# 故要先讓解釋器知道index跟center的存在
# 否則找不到會報錯鬧脾氣
URL_INFO_DICT = {
"/index.py" : index,
"/center.py": center
}
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
# 如果有一百個, 那就要判斷一百次?
"""
if file_name == "/index.py":
return index()
else:
return "GodJJJJJ 東踏曲覓"
"""
# 取得value
# index = U_I_D["/index.html"]
func = URL_INFO_DICT[file_name]
# 執行value, 並將結果返回
return func()
# 問題: 能否自動新增字典內容?
```
```python
# 利用裝飾器在調用之前就執行的特點
URL_INFO_DICT = dict()
def route(url):
def set_func(func):
# url > 傳進來的參數("/index.py")
# func > 被裝飾的變量(index)
URL_INFO_DICT[url] = func
def call_func(*args, **kwargs):
return func(*args, **kwargs)
return call_func
return set_func
@route("/index.py")
def index():
with open("./templates/index.html") as f:
return f.read()
@route("/center.py")
def center():
with open("./templates/center.html") as f:
return f.read()
"""
URL_INFO_DICT = {
"/index.py" : index,
"/center.py": center
}
"""
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
try:
#func = URL_INFO_DICT[file_name]
#return func()
return URL_INFO_DICT[file_name]()
except Exception as ret:
return "異常: %s" %str(ret)
```
## 偽靜態
#### 靜態
- `網域/xxx.html`
- 靜態URL開啟快速(直接使用), 對[SEO](https://zh.wikipedia.org/wiki/%E6%90%9C%E5%B0%8B%E5%BC%95%E6%93%8E%E6%9C%80%E4%BD%B3%E5%8C%96)有正面影響
#### 動態
- `域名/haha.php?id=1` `域名/haha.asp?id=1`...
- 修改頁面方便, 但開啟速度比較慢(須經過運算)
#### 偽靜態
`$ vi webserver.py`
```python
# 把html導入框架
# if not file_name.endswith(".py"):
if not file_name.endswith(".html"):
try:
f = open(self.static_path + file_name, "rb")
except:
response = "HTTP/1.1 404 NOT FOUND\r\n"
response += "\r\n"
response += "file not found"
new_socket.send(response.encode("utf-8"))
else:
html_content = f.read()
f.close
response = "HTTP/1.1 200 OK\r\n"
response += "\r\n"
new_socket.send(response.encode("utf-8"))
new_socket.send(html_content)
else:
env = {}
env["PATH_INFO"] = file_name
body = self.application(env, self.start_response)
header = "HTTP/1.1 %s\r\n" % self.status
for temp in self.headers:
header += "%s:%s\r\n" %(temp[0], temp[1])
header += "\r\n"
response = header + body
new_socket.send(response.encode("utf-8"))
```
```python
# url接收參數改成html
# @route("/index.py")
@route("/index.html")
def index():
with open("./templates/index.html") as f:
return f.read()
# @route("/center.py")
@route("/center.html")
def center():
with open("./templates/center.html") as f:
return f.read()
```
## 導入SQL數據
```python
import re
from pymysql import connect
URL_INFO_DICT = dict()
def route(url):
def set_func(func):
URL_INFO_DICT[url] = func
def call_func(*args, **kwargs):
return func(*args, **kwargs)
return call_func
return set_func
@route("/index.html")
def index():
with open("./templates/index.html") as f:
content = f.read()
# SQL操作
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select * from info;")
stock_info = cs.fetchall()
cs.close()
conn.close()
# 在html內容裡寫{%content%}, 然後在這裡把他換掉
# sub(搜,改,搜索內容)
content = re.sub(r"\{%content%\}", str(stock_info), content)
return content
@route("/center.html")
def center():
with open("./templates/center.html") as f:
return f.read()
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
try:
return URL_INFO_DICT[file_name]()
except Exception as ret:
return "異常: %s" %str(ret)
# 模板html基本上是固定的
# 但是SQL數據是會變動的,
# 故兩者應該分開
```
#### 替換模板
```python
@route("/index.html")
def index():
# 打開模板
with open("./templates/index.html") as f:
content = f.read()
# 查詢數據
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select * from info;")
stock_info = cs.fetchall()
cs.close()
conn.close()
# 組織數據
# html的表格是以
# <table>表格
# <tr>列
# <td>行
# 組成
# 數據總共有八種, 故tr八列
tr="""
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
"""
html = ""
html += "<table>"
# 依序套入數據
for lines in stock_info:
html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7])
html += "</table>"
# 返回數據
content = re.sub(r"\{%content%\}", html, content)
return content
```
#### 支持正則
> - url往往會帶參數, 如/add/001.html
> - 如果沒有正則, 就須每個參數都手動寫入對應的函數進字典
> - 支援正則之後, /add/001跟/add/002 都以/add/\d+判斷即可
```python
import re
from pymysql import connect
URL_INFO_DICT = dict()
def route(url):
def set_func(func):
URL_INFO_DICT[url] = func
def call_func(*args, **kwargs):
return func(*args, **kwargs)
return call_func
return set_func
@route(r"/index.html")
def index():
with open("./templates/index.html") as f:
content = f.read()
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select * from info;")
stock_info = cs.fetchall()
cs.close()
conn.close()
tr="""
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
"""
html = ""
html += "<table>"
for lines in stock_info:
#print(lines)
html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7])
html += "</table>"
content = re.sub(r"\{%content%\}", html, content)
return content
@route(r"/center.html")
def center():
with open("./templates/center.html") as f:
content = f.read()
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select i.code, i.short, i.chg, i.turnover, i.price, i.highs, f.note_info from info as i inner join focus as f on i.id = f.info_id;")
stock_info = cs.fetchall()
cs.close()
conn.close()
tr="""
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
"""
html = ""
html += "<table>"
for lines in stock_info:
#print(lines)
html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6])
html += "</table>"
content = re.sub(r"\{%content%\}", html, content)
return content
# 將所有url參數都改成正則來當成match第一個參數
@route(r"/add/\d+\.html")
def add_focus():
return "haha"
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
try:
# return URL_INFO_DICT[file_name]()
# 註一
for url, func in URL_INFO_DICT.items():
# {
# r"/index.html":index,
# r"/center.html":center,
# r"/add/\d+\.html":add_focus
# }
# 註二
ret = re.match(url, file_name)
if ret:
return func()
# 這個else放for循環裡面的話, 只會循環第一圈就return出來了..
# 故應該放外面
else:
return "沒有對應的函數(%s)"%file_name
except Exception as ret:
return "異常: %s" %str(ret)
```
```python
# 註一: dict.items()
# 以列表返回可遍歷的元祖
In [1]: a = {"godjj":1, "toyz":2}
In [2]: a
Out[2]: {'godjj': 1, 'toyz': 2}
In [3]: a.items()
Out[3]: dict_items([('godjj', 1), ('toyz', 2)]) #
In [5]: for b,c in a.items():
...: print(b)
...: print("----")
...: print(c)
...:
godjj
----
1
toyz
----
2
# 註二: match第一參數是規則, 第二參數才是配對客體
In [46]: a = "123"
In [47]: b = r"\d"
In [48]: re.match(a,b)
In [49]: re.match(b,a)
Out[49]: <re.Match object; span=(0, 1), match='1'>
```
### 增刪數據
```python
import re
from pymysql import connect
URL_INFO_DICT = dict()
def route(url):
def set_func(func):
URL_INFO_DICT[url] = func
def call_func(*args, **kwargs):
return func(*args, **kwargs)
return call_func
return set_func
# 記得開行參
@route(r"/index.html")
def index(ret):
with open("./templates/index.html") as f:
content = f.read()
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select * from info;")
stock_info = cs.fetchall()
cs.close()
conn.close()
tr="""
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
"""
html = ""
html += "<table>"
for lines in stock_info:
#print(lines)
html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7])
html += "</table>"
content = re.sub(r"\{%content%\}", html, content)
return content
# 記得開行參
@route(r"/center.html")
def center(ret):
with open("./templates/center.html") as f:
content = f.read()
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select i.code, i.short, i.chg, i.turnover, i.price, i.highs, f.note_info from info as i inner join focus as f on i.id = f.info_id;")
stock_info = cs.fetchall()
cs.close()
conn.close()
tr="""
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
"""
html = ""
html += "<table>"
for lines in stock_info:
#print(lines)
html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6])
html += "</table>"
content = re.sub(r"\{%content%\}", html, content)
return content
# 參數的地方(), 要拿來使用
@route(r"/add/(\d+)\.html")
def add_focus(ret):
# 1.取得參數
stock_code = ret.group(1)
# 2.判斷是否有這檔股票
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
# 避免SQL注入
# sql = """select * from info where code = '%s';"""%stock_code
sql = """select * from info where code = %s;"""
cs.execute(sql, (stock_code,))
# 取一次就沒東西時, 表示根本沒有這檔股票
# 如果沒有就關起來返回沒有
if not cs.fetchone():
cs.close()
conn.close()
return "沒有這檔股票(%s)"%stock_code
# 3.判斷是否已經關注過
sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;"""
cs.execute(sql, (stock_code,))
if cs.fetchone():
cs.close()
conn.close()
return "這檔股票(%s)已關注過"%stock_code
# 4. 新增關注
# 複習:
# 一般新增
# insert into 表 value (值),(值),..;
# 部分插入
# insert into 表(列,..) value (值,..),(值,..),..;
# 子查詢插入(沒有value!!!)
# insert into 表(列,..) select ...;
sql = """insert into focus(info_id) select id from info where code = %s;"""
cs.execute(sql, [stock_code])
# 修改數據記得提交
conn.commit()
cs.close()
conn.close()
return "關注成功(%s)"% stock_code
@route(r"/del/(\d+)\.html")
def del_focus(ret):
# 1.取得參數
stock_code = ret.group(1)
# 2.判斷是否有這檔股票
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
# 避免SQL注入
# sql = """select * from info where code = '%s';"""%stock_code
sql = """select * from info where code = %s;"""
cs.execute(sql, (stock_code,))
if not cs.fetchone():
cs.close()
conn.close()
return "沒有這檔股票(%s)"%stock_code
# 3.判斷是否有關注過
sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;"""
cs.execute(sql, (stock_code,))
# 如果沒關注過就沒辦法刪, 故返回
if not cs.fetchone():
cs.close()
conn.close()
return "這檔股票(%s)尚未關注過"%stock_code
# 4. 新增關注
# 複習:
# 物理刪除
# delete from 表名 where 條件;
#sql = """insert into focus(info_id) select id from info where code = %s;"""
sql = """delete from focus where info_id = (select id from info where code = %s);"""
cs.execute(sql, [stock_code])
# 修改數據記得提交
conn.commit()
cs.close()
conn.close()
return "取消關注成功(%s)"% stock_code
@route(r"/update/(\d+)\.html")
def show_update_info(ret):
with open("./templates/update.html") as f:
content = f.read()
stock_code = ret.group(1)
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
# 1. 確認股票有無(略)
# 2. 確認有無關注(略)
sql = """select note_info from focus where info_id = (select id from info where code = %s);"""
cs.execute(sql, (stock_code,))
# 3. 確認note_info有無
if cs.fetchone():
# 因為if判斷把cs存放的內容取掉了, 故再執行一次sql語句
cs.execute(sql, (stock_code,))
stock_info = cs.fetchone()
note_info = stock_info[0]
# 沒有備註的話, 讓備註顯示空白
else:
note_info = " "
cs.close()
conn.close()
content = re.sub(r"\{%code%\}", stock_code, content)
content = re.sub(r"\{%note_indfo%\}", note_info, content)
return content
@route(r"/update/(\d+)/(.*)\.html")
def update_info(ret):
stock_code = ret.group(1)
comment = ret.group(2)
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
# 確認股票有無(略)
# 確認有無關注(略)
# 修改
# 複習:
# 修改數據
# update 表名 set 列1=值1,列2=值2,... where 條件;
sql = """ update focus set note_info=%s where info_id = (select id from info where code =%s);"""
cs.execute(sql, (comment, stock_code,))
conn.commit()
cs.close()
conn.close()
return "修改成功"
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
try:
for url, func in URL_INFO_DICT.items():
ret = re.match(url, file_name)
if ret:
# 把url傳進去, 才能取得參數來用
return func(ret)
else:
return "沒有對應的函數(%s)"%file_name
except Exception as ret:
return "異常: %s" %str(ret)
```
### URL編碼解碼
> - URL傳進來的參數遇到非英文字母會編碼
> - 故傳到框架時應該將其解碼後再傳入SQL
```python
In [1]: import urllib.parse
# URL編碼
In [2]: urllib.parse.quote("Toyz喊在")
Out[2]: 'Toyz%E5%96%8A%E5%9C%A8'
# URL解碼
In [4]: urllib.parse.unquote("Toyz%E5%96%8A%E5%9C%A8")
Out[4]: 'Toyz喊在'
```
```python
import re
# 導入模塊
import urllib.parse
from pymysql import connect
@route(r"/update/(\d+)/(.*)\.html")
def update_info(ret):
stock_code = ret.group(1)
comment = ret.group(2)
# 解碼
comment = urllib.parse.unquote(comment)
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
sql = """ update focus set note_info=%s where info_id = (select id from info where code =%s);"""
cs.execute(sql, (comment, stock_code,))
conn.commit()
cs.close()
conn.close()
return "修改成功"
```
> - 問題: SQL不論幾個空格, 瀏覽器都只顯示一個?
> - 瀏覽器遇到多個空格皆案一個空格來解析
> - 解法: 前端解決 > &nbps;
## Log日誌
> - 日誌級別:
> - DEBUG: 診斷問題紀錄
> - INFO: 一般資訊
> - WARNING: 低錯誤, 不影響運行
> - ERROR: 中錯誤, 影響部分運行
> - CRITICAL: 高錯誤, 無法運行
### 顯示log在Terminal上
```python
import logging
# level: 開啟級別, 級別以下的不會紀錄
# format: 格式
logging.basicConfig(level=logging.WARNING,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
# 開啟功能
logging.debug('loggging debug message')
logging.info('loggging info message')
logging.warning('loggging a warning message')
logging.error('loggging error message')
logging.critical('loggging critical message')
```
### 記錄在文件中
```python
import logging
# 在參數設定檔名跟寫入方式
logging.basicConfig(level=logging.WARNING,
filename='./log.txt',
filemode='a',
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
logging.debug('loggging debug message')
logging.info('loggging info message')
logging.warning('loggging a warning message')
logging.error('loggging error message')
logging.critical('loggging critical message')
```
### 我全都要
```python
import logging
# 創建logger
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 總等級
# FileHandler用於寫入文件
logfile = './log.txt'
fh = logging.FileHandler(logfile, mode='a')
fh.setLevel(logging.DEBUG)
# StreamHandler用於輸出控制台
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING)
# Formatter用於設定格式
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# addHandler添加
logger.addHandler(fh)
logger.addHandler(ch)
logging.debug('loggging debug message')
logging.info('loggging info message')
logging.warning('loggging a warning message')
logging.error('loggging error message')
logging.critical('loggging critical message')
# 如果總等級開的比其他handler來得高
# 那就不會顯示低於總等級的等級
# 例如logger開到warning而handler開到debug
# 那麼handler的debug不會顯示, 只會顯示warning以上
```
> - 格式參數
> - %(levelno)s: 日誌級別數值
> - %(levelname)s: 日誌級別名稱
> - %(pathname)s: 當前執行路徑,亦即sys.argv[0]
> - %(filename)s: 程序名
> - %(funcName)s: 函數名
> - %(lineno)d: 行號
> - %(asctime)s: 時間
> - %(thread)d: 線程ID
> - %(threadName)s: 線程名
> - %(process)d: 進程ID
> - %(message)s: 日誌信息
### 實作
```python
import re
import urllib.parse
# 導入
import logging
from pymysql import connect
URL_INFO_DICT = dict()
def route(url):
def set_func(func):
URL_INFO_DICT[url] = func
def call_func(*args, **kwargs):
return func(*args, **kwargs)
return call_func
return set_func
@route(r"/index.html")
def index(ret):
with open("./templates/index.html") as f:
content = f.read()
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select * from info;")
stock_info = cs.fetchall()
cs.close()
conn.close()
tr="""
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
"""
html = ""
html += "<table>"
for lines in stock_info:
html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7])
html += "</table>"
content = re.sub(r"\{%content%\}", html, content)
return content
@route(r"/center.html")
def center(ret):
with open("./templates/center.html") as f:
content = f.read()
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
cs.execute("select i.code, i.short, i.chg, i.turnover, i.price, i.highs, f.note_info from info as i inner join focus as f on i.id = f.info_id;")
stock_info = cs.fetchall()
cs.close()
conn.close()
tr="""
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
"""
html = ""
html += "<table>"
for lines in stock_info:
html += tr %(lines[0],lines[1],lines[2],lines[3],lines[4],lines[5],lines[6])
html += "</table>"
content = re.sub(r"\{%content%\}", html, content)
return content
@route(r"/add/(\d+)\.html")
def add_focus(ret):
stock_code = ret.group(1)
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
sql = """select * from info where code = %s;"""
cs.execute(sql, (stock_code,))
if not cs.fetchone():
cs.close()
conn.close()
return "沒有這檔股票(%s)"%stock_code
sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;"""
cs.execute(sql, (stock_code,))
if cs.fetchone():
cs.close()
conn.close()
return "這檔股票(%s)已關注過"%stock_code
sql = """insert into focus(info_id) select id from info where code = %s;"""
cs.execute(sql, [stock_code])
conn.commit()
cs.close()
conn.close()
return "關注成功(%s)"% stock_code
@route(r"/del/(\d+)\.html")
def del_focus(ret):
stock_code = ret.group(1)
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
sql = """select * from info where code = %s;"""
cs.execute(sql, (stock_code,))
if not cs.fetchone():
cs.close()
conn.close()
return "沒有這檔股票(%s)"%stock_code
sql = """select * from info as i inner join focus as f on f.info_id = i.id where i.code=%s;"""
cs.execute(sql, (stock_code,))
if not cs.fetchone():
cs.close()
conn.close()
return "這檔股票(%s)尚未關注過"%stock_code
sql = """delete from focus where info_id = (select id from info where code = %s);"""
cs.execute(sql, [stock_code])
conn.commit()
cs.close()
conn.close()
return "取消關注成功(%s)"% stock_code
@route(r"/update/(\d+)\.html")
def show_update_info(ret):
with open("./templates/update.html") as f:
content = f.read()
stock_code = ret.group(1)
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
sql = """select note_info from focus where info_id = (select id from info where code = %s);"""
cs.execute(sql, (stock_code,))
if cs.fetchone():
cs.execute(sql, (stock_code,))
stock_info = cs.fetchone()
note_info = stock_info[0]
else:
note_info = " "
cs.close()
conn.close()
content = re.sub(r"\{%code%\}", stock_code, content)
content = re.sub(r"\{%note_indfo%\}", note_info, content)
return content
@route(r"/update/(\d+)/(.*)\.html")
def update_info(ret):
stock_code = ret.group(1)
comment = ret.group(2)
comment = urllib.parse.unquote(comment)
conn = connect(host="localhost", port=3306, user="root", password="youknow.", database="stock_db", charset="utf8")
cs = conn.cursor()
sql = """ update focus set note_info=%s where info_id = (select id from info where code =%s);"""
cs.execute(sql, (comment, stock_code,))
conn.commit()
cs.close()
conn.close()
return "修改成功"
def application(env, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')])
file_name = env["PATH_INFO"]
# 格式
logging.basicConfig(level=logging.INFO,
filename='./log.txt',
filemode='a',
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
# 寫
logging.info('訪問:%s'%file_name)
try:
for url, func in URL_INFO_DICT.items():
ret = re.match(url, file_name)
if ret:
return func(ret)
else:
# 寫
logging.warning('沒有對應的函數(%s)'%file_name)
return "沒有對應的函數(%s)"%file_name
except Exception as ret:
return "異常: %s" %str(ret)
```
### 主從
> - 修改處訪問主服務器
> - 查詢相關訪問從服務器
## Djongo 改Type邏輯
> Djongo為了省略常常打SQL語句,
> 故利用元類改了輸出的類
> 並將所有參數乾坤大挪移成SQL語句,
> 最終達成只需輸入參數及調用函數, 即可完成SQL功能
```python
class XX(type):
# name = "User"
# bases = ()
# attrs = {"uid":(..), "password":(..)}
def __new__(cls, name, bases, attrs):
mappings = dict()
# print(attrs)
# {
# '__module__': '__main__',
# '__qualname__': 'User',
# 'uid': ('uid', 'int unsigned'),
# 'password': ('password', 'varchar(30)')
# }
# dirt.items(): [(k:v), (k:v)]
for k,v in attrs.items():
# isintance 判斷對象是否為同類型
# isintance(object, classinfo)
if isinstance(v, tuple):
mappings[k] = v
# dirt.keys()返回 ([k1,k2])
for k in mappings.keys():
attrs.pop(k)
attrs["__mappings__"] = mappings
attrs["__tables__"] = name
# print(attrs)
# {
# '__module__': '__main__',
# '__qualname__': 'User',
# '__mappings__': {
# 'uid': ('uid, 'int unsigned'),
# 'password': ('password', 'varchar(30)')
# },
# '__tables__': 'User'
# }
return type.__new__(cls, name, bases, attrs)
class User(metaclass=XX):
uid = ("uid", "int unsigned")
password = ("password", "varchar(30)")
def __init__(self, **kwargs):
# print(kwargs) # {'uid': 12345, 'password': 'kkbox'}
for name, value in kwargs.items():
# self.name = value
# 這只會保存self.name = "kkbox"
# 而不是保存想保存的屬性 uid=12345a, password = "kkbox"
# setattr用來設置數性質, 且屬性不一定要存在
# setattr(object, name, value) # 註
setattr(self, name, value)
# self.uid = 12345
# self.password = "kkbox"
def save(self):
field = []
arg = []
for k,v in self.__mappings__.items():
# k = "uid", "password"
# v = ("int", "int unsigned"), ("password", "varchar(30)")
field.append(v[0]) # "int", "password"
arg.append(getattr(self, k, None)) # 12345, "kkbox"
#print(field)
#print(arg)
# insert into 表名(列,...) values (值,...);
sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join([str(i) for i in arg]))
print(sql)
u = User(uid=12345, password="kkbox")
u.save()
# insert into User(int,password) values(12345,kkbox)
# 此時有個bug就是kkbox沒有引號
```
```python
class XX(type):
def __new__(cls, name, bases, attrs):
mappings = dict()
for k,v in attrs.items():
if isinstance(v, tuple):
mappings[k] = v
for k in mappings.keys():
attrs.pop(k)
attrs["__mappings__"] = mappings
attrs["__tables__"] = name
return type.__new__(cls, name, bases, attrs)
class User(metaclass=XX):
uid = ("int", "int unsigned")
password = ("password", "varchar(30)")
def __init__(self, **kwargs):
for name, value in kwargs.items():
setattr(self, name, value)
def save(self):
field = []
arg = []
for k,v in self.__mappings__.items():
field.append(v[0])
arg.append(getattr(self, k, None))
# insert into 表名(列,...) values (值,...);
# sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join([str(i) for i in arg]))
# print(sql)
arg_temp = []
for i in arg:
if isinstance(i ,int):
arg_temp.append(str(i))
elif isinstance(i, str):
arg_temp.append("""'%s'"""%i)
sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join(arg_temp))
print(sql)
u = User(uid=12345, password="kkbox")
u.save()
```
```python
# setattr
# 已存在的屬性會被重設
In [5]: class A:
...: a = 100
...:
In [6]: b = A()
In [8]: getattr(b,"a")
Out[8]: 100
In [9]: setattr(b, "a", 200)
In [10]: b.a
Out[10]: 200
In [11]: getattr(b, "a")
Out[11]: 200
# 不存在的屬性會被創建
In [12]: class C():
...: pass
...:
In [13]: c = C()
In [14]: setattr(c, "d", "godjj")
In [15]: c.d
Out[15]: 'godjj'
# join
# 用指定的元素將序列的str串起來
In [1]: a = ["int", "password"]
In [2]: ",".join(a)
Out[2]: 'int,password'
In [3]: "大中天".join(a)
Out[3]: 'int大中天password'
```
```python
# 最後整理
class XX(type):
def __new__(cls, name, bases, attrs):
mappings = dict()
for k,v in attrs.items():
if isinstance(v, tuple):
mappings[k] = v
for k in mappings.keys():
attrs.pop(k)
attrs["__mappings__"] = mappings
attrs["__tables__"] = name
return type.__new__(cls, name, bases, attrs)
class Model(metaclass=XX):
def __init__(self, **kwargs):
for name, value in kwargs.items():
setattr(self, name, value)
def save(self):
field = []
arg = []
for k,v in self.__mappings__.items():
field.append(v[0])
arg.append(getattr(self, k, None))
arg_temp = []
for i in arg:
if isinstance(i ,int):
arg_temp.append(str(i))
elif isinstance(i, str):
arg_temp.append("""'%s'"""%i)
sql = "insert into %s(%s) values(%s)" %(self.__tables__, ",".join(field), ",".join(arg_temp)) # 12345,'kkbox'
print(sql)
##############################################
# 經過乾坤大挪移後,只要創一個函數就能創表,傳參就能寫SQL
# 創建表
class User(Model):
uid = ("int", "int unsigned")
password = ("password", "varchar(30)")
class Haha(Model):
uid = ("Uid", "int unsigned")
email = ("Email", "varchar(30)")
phone = ("Phone", "int unsigned")
# 傳參數設值
u = User(uid=12345, password="kkbox")
# 調用功能
u.save()
h = Haha(uid=22222, phone=9999999, email="haha@gmail.com")
h.save()
```