# 1 Python資料庫DB-API介詔 ###### tags: `backend` Python中如果要連結到資料庫,不管是MySQL、SQL Server、PostgreSQL亦或是SQLite,使用時都是採用`Cursor`物件,也就是透過了Python DB-API所定義的介面。 Python所有的資料庫介面程式都在一定程度上遵守了Python DB-API的規範。`DB-API`定義了一組的物件和資料庫存取方式的介面,以便為各種底層資料庫系統和多種多樣的資料庫介面程式提供一致的訪問介面。 由於DB-API 為不同的資料庫提供了一致的訪問介面, 在不同的資料庫之間移植程式碼成為一件輕鬆的事情。 🥓python online editor: https://replit.com/~ ## Python連線資料庫流程 ### 使用`connect()`建立`connection`連線物件 `connect()`方法生成一個`connection`物件, 我們通過這個物件來訪問資料庫。符合標準的模組都會實現`connect()`方法。 connect()方法的引數如下所示: * **user** - Username * **password** - Password * **host** - Hostname * **database** - Database name * **dsn** - Data source name 資料庫連線引數可以以一個`DSN字串`的形式提供,例如: ```python connect(dsn='host:MYDB',user='root',password='') ``` 當然,不同的資料庫介面程式可能有些差異,並非都是嚴格按照規範實現,例如MySQLdb則使用db引數而不是規範推薦的database引數來表示要訪問的資料庫。 #### MySQLdb連線時可用引數 * host - 資料庫主機名.預設是用本地主機 * user - 資料庫登陸名.預設是當前使用者 * passwd - 資料庫登陸的祕密.預設為空 * db - 要使用的資料庫名.沒有預設值 * port - MySQL服務使用的TCP埠.預設是3306 * charset - 資料庫編碼 #### psycopg2連線時可用引數: * dbname – 資料庫名稱 (dsn連線模式) * database – 資料庫名稱 * user – 使用者名稱 * password – 密碼 * host – 伺服器地址 (如果不提供預設連線Unix Socket) * port – 連線埠 (預設5432) ### `connection`物件 connect物件定義了如下方法: * **close()**: 關閉此connect物件, 關閉後無法再進行操作,除非再次建立連線 * **commit()**: 提交當前事務,如果是支援事務的資料庫執行增刪改後沒有commit則資料庫預設回滾 * **rollback()**: 取消當前事務 * **cursor()**: 建立遊標物件 ### `cursor`遊標物件 `cursor`遊標物件常用的方法: * **close()**: 關閉此遊標物件 * **fetchone()**: 得到結果集的下一行 * **fetchmany([size = cursor.arraysize])**: 得到結果集的下幾行 * **fetchall()**: 得到結果集中剩下的所有行 * **excute(sql[, args])**: 執行一個資料庫查詢或命令 * **excutemany(sql, args)**: 執行多個資料庫查詢或命令 常用屬性: * **connection**: 建立此遊標物件的資料庫連線 * **arraysize**: 使用fetchmany()方法一次取出多少條記錄,預設為1 * **lastrowid**: 相當於PHP的last_inset_id() 其它的方法: * **__iter__()**:建立一個可迭代物件(可選) * **next()**:獲取結果集的下一行(如果支援迭代的話) * **nextset()**:移到下一個結果集(如果支援的話) * **callproc(func[,args])**:呼叫一個儲存過程 * **setinputsizes(sizes)**:設定輸入最大值(必須有,但具體實現是可選的) * **setoutputsizes(sizes[,col])**:設定大列 fetch 的最大緩衝區大小 其它屬性: * **description**:返回遊標活動狀態(包含7個元素的tuple):(name, type_code, display_size, internal_size, precision, scale, null_ok)只有 name 和 type_code 是必需的 * **rowcount**:最近一次 execute() 建立或影響的行數 * **messages**:遊標執行後資料庫返回的資訊元組(可選) * **rownumber**:當前結果集中游標所在行的索引(起始行號為 0) ### DB-API中定義的錯誤 錯誤類別的上下關係: ```bash StandardError |__Warning |__Error |__InterfaceError |__DatabaseError |__DataError |__OperationalError |__IntegrityError |__InternalError |__ProgrammingError |__NotSupportedError ``` ## DB-API的資料庫操作範例 要能夠連接到不同的資料庫, 必需要在作業系統上安裝一些資料特定的native client以及不同資料庫的DBAPI實作的函式庫。為了便利學員學習,事先己經在本次的環境上安裝好了以下的connector。 **不同資料庫的DBAPI的Driver:** Database |PyPI package :------------|:--------------- Azure MS SQL |pip install pymssql SQL ServerL |pip install pyodbc MySQL |pip install mysqlclient Oracle |pip install cx_Oracle PostgreSQL |pip install psycopg2 SQLite |(python 內含) 同時也使用Docker啟動了四種不同類型的資料庫來供大家練習: 1. Oracle 12c 2. SQL Server (SQL Server 2019) 3. MariaDB 4. PostgreSQL ### 連接到資料庫`DATABASE` 以下範例顯示了如何連接到一個現有的資料庫, 結果會返回一個`connection`物件。 #### sqlite3 ```python import sqlite3 try: # 連接到資料庫 db_conn = sqlite3.connect('test.db') print(type(db_conn)) print('Connect [sqlite3_example.db] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` <class 'sqlite3.Connection'> Connect [sqlite3_example.db] database successfully! #### PostgreSQL ```python import psycopg2 # "host='localhost' dbname='my_database' user='postgres' password='secret'" try: db_conn = psycopg2.connect(host='active.deacademydev.service.paas.wistron.com', dbname='dvdrental', user='dxlab', password='wistron888', port='15067') print('Connect [postgres] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` Connect [postgres] database successfully! #### MSSQL ```python # 使用pyodbc的connector import pyodbc try: server = 'tcp:10.34.124.114' database = 'master' username = 'sa' password = 'wistron888' db_conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password) print('Connect [mssql] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` Connect [mssql] database successfully! ```python import pymssql try: db_conn = pymssql.connect(server='10.34.124.114', database='master', user='sa', password='wistron888') print('Connect [mssql] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` Connect [mssql] database successfully! #### Mysql/Maria ```python import mysql.connector try: db_conn = mysql.connector.connect(host='10.34.124.114', database='dxlab', user='dxlab', password='wistron888') print('Connect [mysql/maria] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` Connect [mysql/maria] database successfully! #### Oracle ```python # 使用oracle官方提供的connector import cx_Oracle try: # oracle在連接是的特別之處是需要構建一個dsn的字串 dsn_str = cx_Oracle.makedsn("10.34.124.114", "1521", sid="orclcdb") # 以SID的方式來連結 print(dsn_str) db_conn = cx_Oracle.connect(user='sys', password='Oradoc_db1', dsn=dsn_str, mode=cx_Oracle.SYSDBA) print('Connect [oracle] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=10.34.124.114)(PORT=1521))(CONNECT_DATA=(SID=orclcdb))) Connect [oracle] database successfully! ```python # 使用oracle官方提供的connector import cx_Oracle try: # oracle在連接是的特別之處是需要構建一個dsn的字串 dsn_str = cx_Oracle.makedsn("10.34.124.114", "1521", service_name="ORCLCDB.localdomain")# 以Service Name的方式來連結 print(dsn_str) db_conn = cx_Oracle.connect(user='sys', password='Oradoc_db1', dsn=dsn_str, mode=cx_Oracle.SYSDBA) print('Connect [oracle] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=10.34.124.114)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=ORCLCDB.localdomain))) Connect [oracle] database successfully! ```python import cx_Oracle try: dsn_str = cx_Oracle.makedsn("10.34.124.114", "1521", sid="orclcdb") print(dsn_str) db_conn = cx_Oracle.connect(user='admin123', password='admin123', dsn=dsn_str) # 以一般使用者來連線 print('Connect [oracle] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=10.34.124.114)(PORT=1521))(CONNECT_DATA=(SID=orclcdb))) Connect [oracle] database successfully! ### 創建資料表`TABLE` 以下範例將開啟一個既存的資料庫並且創建一個資料表: ```python import sqlite3 try: # 連接到資料庫 db_conn = sqlite3.connect('test.db') # 取得cursor物件 cursor = db_conn.cursor() # 創建資料表(sql) cursor.execute( ''' CREATE TABLE COMPANY ( ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL ); ''' ) print('Table [COMPANY] created successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` Table [COMPANY] created successfully! ### 新增資料`INSERT`操作 以下範例將在資料表上新增加一些數據: ```python import sqlite3 try: # 連接到資料庫 db_conn = sqlite3.connect('test.db') # 取得cursor物件 cursor = db_conn.cursor() # 執行INSERT的SQL語句 cursor.execute( ''' INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY) VALUES (1, 'Paul', 32, 'California', 20000.00 ) ''' ) cursor.execute( ''' INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY) VALUES (2, 'Allen', 25, 'Texas', 15000.00 ) ''' ) cursor.execute( ''' INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY) VALUES (3, 'Teddy', 23, 'Norway', 20000.00 ) ''' ) cursor.execute( ''' INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY) VALUES (4, 'Mark', 25, 'Rich-Mond ', 65000.00 ) ''' ) # 確認交易 db_conn.commit() print('Records created successfully!') except Exception as e: print(f"Encounter exception: {e}") db_conn.rollback() finally: # 斷開資料庫的連線 db_conn.close() ``` Records created successfully! ### 資料選取 `SELECT` 操作 `cursor`遊標物件取回SELECT結果常用的方法: * **fetchone()**: 得到結果集的下一行或是None * **fetchmany([size = cursor.arraysize])**: 得到結果集的下幾行list of tuple或是一個空的list物件 * **fetchall()**: 得到結果集的所有的資料list of tuple或是一個空的list物件 以下範例將從資料表上取回一些數據: ```python import sqlite3 try: # 連接到資料庫 db_conn = sqlite3.connect('test.db') # 取得cursor物件 cursor = db_conn.cursor() # 使用fetchone方法取回一筆資料 cursor.execute("SELECT COUNT(*) FROM COMPANY") result = cursor.fetchone() print(type(result)) print(f"Table[COMPANY] count: {result}") # 方法:1. 透過迭代一筆一筆來取出數據 (當數據量大的時候, 這個方法可以有效處理) # 執行sql語句並返回可迭代的cursor物件 print("\nMethod#01: fetchone()") cursor.execute('SELECT id, name, address, salary from COMPANY') while True: row = cursor.fetchone() if row == None: break print(row) # 方法:2. 透過迭代一次多筆地來取出數據 (當數據量大的時候, 這個方法可以有效處理) # 執行sql語句並返回可迭代的cursor物件 print("\nMethod#02: fetchmany()") cursor.execute('SELECT id, name, address, salary from COMPANY') while True: retrive_batch_size = 2 rows = cursor.fetchmany(retrive_batch_size) if len(rows) == 0: break print(rows) # 方法:3. 一次性把所有的資料載入到list中 (當數據量大的時候, 這個方法導致Out of Memory) print("\nMethod#03: fetchall()") cursor.execute('SELECT id, name, address, salary from COMPANY') rows = cursor.fetchall() print(rows) print('SELECT operation is executed successfully') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` <class 'tuple'> Table[COMPANY] count: (4,) Method#01: fetchone() (1, 'Paul', 'California', 20000.0) (2, 'Allen', 'Texas', 15000.0) (3, 'Teddy', 'Norway', 20000.0) (4, 'Mark', 'Rich-Mond ', 65000.0) Method#02: fetchmany() [(1, 'Paul', 'California', 20000.0), (2, 'Allen', 'Texas', 15000.0)] [(3, 'Teddy', 'Norway', 20000.0), (4, 'Mark', 'Rich-Mond ', 65000.0)] Method#03: fetchall() [(1, 'Paul', 'California', 20000.0), (2, 'Allen', 'Texas', 15000.0), (3, 'Teddy', 'Norway', 20000.0), (4, 'Mark', 'Rich-Mond ', 65000.0)] SELECT operation is executed successfully ### 資料更新 `UPDATE` 操作 以下範例將更新資料表上的一些數據: ```python import sqlite3 try: # 連接到資料庫 db_conn = sqlite3.connect('test.db') # 執行sql語句 db_conn.execute('UPDATE COMPANY set SALARY=25000.00 where ID=1') # 確認交易 db_conn.commit() print('Total number of rows updated: {db_conn.total_changes}') # 執行sql語句並返回可迭代的cursor物件 cursor = db_conn.execute('SELECT id, name, address, salary from COMPANY where ID=1') # 取出數據 for row in cursor: print(f'{row}') print('UPDATE operation is executed successfully') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` Total number of rows updated: {db_conn.total_changes} (1, 'Paul', 'California', 25000.0) UPDATE operation is executed successfully ### 資料刪除 `DELETE` 操作 以下範例將刪除資料表上的一些數據: ```python import sqlite3 try: # 連接到資料庫 db_conn = sqlite3.connect('test.db') # 執行sql語句並返回可迭代的cursor物件 db_conn.execute('DELETE from COMPANY where ID=2;') # 確認交易 db_conn.commit() print('Total number of rows deleted: {db_conn.total_changes}') # 執行sql語句並返回可迭代的cursor物件 cursor = db_conn.execute('SELECT id, name, address, salary from COMPANY') # 取出數據 for row in cursor: print(f'{row}') print('DELETE operation is executed successfully') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 db_conn.close() ``` Total number of rows deleted: {db_conn.total_changes} (1, 'Paul', 'California', 25000.0) (3, 'Teddy', 'Norway', 20000.0) (4, 'Mark', 'Rich-Mond ', 65000.0) DELETE operation is executed successfully # 2 SQLAlchemy 介詔 官網: https://www.sqlalchemy.org/ Python SQL工具包和對象關係映射器(Object Relational Mapper) **SQLAlchemy**是Python SQL工具箱和對象關係映射器(Object Relational Mapper),它為應用程序開發人員提供了SQL的全部功能和靈活性。 透過它我們可以使用Python來高效,高性能地訪問不同類型的資料庫。 SQLAlchemy透過`dialect`來支援多種不同類型的資料庫, 包括: * SQLite * Postgresql * MySQL * Oracle * MS-SQL * Firebird * Sybase 透過抽象化DBAPI介面的實現, 應用程序開發人員可以以一致的的方法來跟不同的資料庫進行資料的處理。 SQLAlchemy由兩個不同的組件組成,稱為**Core**和**ORM**。 其中**Core**模組本身是功能齊全的SQL抽象工具包,可為各種資料庫互動介面DBAPI實現和行為提供抽象層,以及一種SQL Expression Language,它允許通過生成的Python表達式表達SQL語言。 SQLAlchemy具有三種與資料庫互動的方式: * Raw SQL * SQL Expression language * ORM 雖然我們總是可以使用各資料庫原生的SQL語法來操作資料庫。 但是如果可以的話, SQL Expression API可能是更好的一種選擇。 SQL Expression API允許你使用Python物件和運算符來構建SQL查詢語句。 SQL Expression API是對SQL語句的抽象層,並且處理數據庫之間的各種實現差異。 SQLAlchemy對象關係映射器(ORM)提供了一種將用戶定義的Python類別與數據庫表相映對的手法。 SQLAlchemy ORM基於SQL Expression API。 ![](https://i.imgur.com/IwhuWvq.png) SQLAlchemy 環境安裝: ```bash pip install sqlalchemy ``` ```python import sqlalchemy print(sqlalchemy.__version__) ``` 1.4.32 ## SQLAlchemy Core 操作範例 SQLAlchemy `core`包括SQL rendering引擎,DBAPI集成,transaction集成和schema描述服務。 `Expression Language`是SQLAlchemy的核心組件之一。它允許使用Python程式碼來定義要生成SQL語句。`Expression Language`獨立於不同的資料庫,並且全面涵蓋原始SQL的多個面向。它比SQLAlchemy中的任何其他組件都更接近原始SQL。 ### 連接到資料庫`DATABASE` `Engine`類別將`Pool`和`Dialect`連接在一起,以提供資料庫連接性和行為的來源。使用`create_engine()`函數實例化`Engine`。 `create_engine()`將資料庫的連線字串作為輸入的參數。 ```python from sqlalchemy import create_engine engine = create_engine('sqlite:///colleage.db', echo=True) # echo標誌是設置SQLAlchemy日誌記錄的快捷方式 ``` 創建一個SQLAlchemy連接物件 除了在Python安裝SqlAlchemy的套件以外, 我們還需要安裝database connector套件才能夠連接到資料庫,但是這個database connector套件取決於將要連接到的資料庫的類型。 **不同資料庫的DBAPI的Driver:** Database |PyPI package :------------|:--------------- Amazon Redshift |pip install sqlalchemy-redshift Apache Hive |pip install pyhive Apache Impala |pip install impala Apache Spark SQL |pip install pyhive Azure MS SQL |pip install pymssql Big Query |pip install pybigquery Elasticsearch |pip install elasticsearch-dbapi MySQL |pip install mysqlclient Oracle |pip install cx_Oracle PostgreSQL |pip install psycopg2 SQLite |(python 內含) SQL Server |pip install pymssql `Engine`物件有以下重要的方法: * **connect()**: 返回connection物件 * **execute()**: 執行一個SQL語句 * **begin()**: 返回一個上下文管理器context manager,該上下文管理器通過已建立的事務傳遞連接。成功操作後,將提交事務,否則將回滾 * **dispose()**: 回收Engine物件使用的資料庫連接 * **driver()**: Engine物件正在使用的Dialect的Driver名稱 * **table_names()**: 返回數據庫中所有可用資料表名稱的列表 * **transaction()**: 使用資料庫交易物件來執行相關操作 ### SQLAlchemy 資料庫連線字串`URIs` create_engine()函數是根據`資料庫連線URL`來生成`Engine`物件。SqlAlchemy的資料庫連線字串有以下的模版: `dialect+driver://username:password@host:port/database` 以下列出常用資料庫的連線模版。 #### SQLite 預設情況下,SQLite使用Python內置模塊`sqlite3`連接到基於文件的sqlite數據庫。 當SQLite連接到本地文件時,URL格式略有不同。對於相對文件路徑,這需要三個斜杠: ```python # sqlite://<nohostname>/<path> # where <path> is relative: engine = create_engine('sqlite:///foo.db') ``` 對於絕對文件路徑,三個斜杠後再加上絕對路徑: ```python engine = create_engine('sqlite:////absolute/path/to/foo.db') # Windows engine = create_engine('sqlite:///C:\\path\\to\\foo.db') # Windows alternative using raw string engine = create_engine(r'sqlite:///C:\path\to\foo.db') ``` 要使用SQLite:memory:數據庫,請指定一個空URL: ```python engine = create_engine('sqlite://') ``` #### PostgreSQL PostgreSQL的dialet使用`psycopg2`作為預設的DBAPI。 ```python # default engine = create_engine('postgresql://scott:tiger@localhost/mydatabase') # psycopg2 engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase') ``` ```python from sqlalchemy import create_engine # 原始DBAPI的設定 # db_conn = psycopg2.connect(host='10.34.124.114', dbname='dvdrental', user='demo', password='demo8888') try: pg_engine = create_engine('postgresql+psycopg2://demo:demo8888@10.34.124.114/dvdrental', echo=True) # echo標誌是設置SQLAlchemy日誌記錄的快捷方式 print(type(pg_engine)) print('Connect [PostgreSQL] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 pg_engine.dispose() ``` <class 'sqlalchemy.engine.base.Engine'> Connect [PostgreSQL] database successfully! #### MySQL/MariaDB MySQL的dialet使用`mysql-python`作為預設DBAPI(由於Airflow使用`mysql-connector-python`)。 ```python # default engine = create_engine('mysql://scott:tiger@localhost/foo') # mysql-connector-python engine = create_engine('mysql+mysqlconnector://scott:tiger@localhost/foo') # mysqlclient (a maintained fork of MySQL-Python) engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo') # PyMySQL engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo') ``` ```python from sqlalchemy import create_engine # 原始DBAPI的設定 # db_conn = mysql.connector.connect(host='10.34.124.114', database='dxlab', user='dxlab', password='wistron888') try: mariadb_engine = create_engine('mysql+mysqlconnector://dxlab:wistron888@10.34.124.114/dxlab', echo=True) # echo標誌是設置SQLAlchemy日誌記錄的快捷方式 print(type(mariadb_engine)) print('Connect [MariaDB] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 mariadb_engine.dispose() ``` <class 'sqlalchemy.engine.base.Engine'> Connect [MariaDB] database successfully! #### Oracle Oracle的dialet使用`cx_oracle`作為作為預設DBAPI。 ```python engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname') engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname') ``` ```python from sqlalchemy import create_engine import cx_Oracle # 原始DBAPI的設定 # db_conn = cx_Oracle.connect(user='admin123', password='admin123', dsn=dsn_str) try: # Oracle的連線特點在於必需構建dsn連線描述字串, 你可以使用sid或service_name來做為設定 dsn_str = cx_Oracle.makedsn("10.34.124.114", "1521", sid="orclcdb") # 使用sid來連線 print(dsn_str) dsn_str = cx_Oracle.makedsn("10.34.124.114", "1521", service_name="ORCLCDB.localdomain") # 使用service_name來連線 print(dsn_str) oracle_engine = create_engine(f'oracle+cx_oracle://admin123:admin123@{dsn_str}', echo=True) # echo標誌是設置SQLAlchemy日誌記錄的快捷方式 print(type(oracle_engine)) print('Connect [Oracle] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 oracle_engine.dispose() ``` (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=10.34.124.114)(PORT=1521))(CONNECT_DATA=(SID=orclcdb))) (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=10.34.124.114)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=ORCLCDB.localdomain))) <class 'sqlalchemy.engine.base.Engine'> Connect [Oracle] database successfully! #### Microsoft SQL Server SQL Server的dialet使用`pyodbc `作為作為預設DBAPI。 ```python # pyodbc engine = create_engine('mssql+pyodbc://scott:tiger@mydsn') # pymssql engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname') ``` ```python from sqlalchemy import create_engine # 原始DBAPI的設定 # db_conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password) try: dsn_str = '?driver=ODBC+Driver+17+for+SQL+Server' mssql_engine = create_engine(f'mssql+pyodbc://sa:wistron888@10.34.124.114/master{dsn_str}', echo=True) # echo標誌是設置SQLAlchemy日誌記錄的快捷方式 print(type(mssql_engine)) print('Connect [MSSQL] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 mssql_engine.dispose() ``` <class 'sqlalchemy.engine.base.Engine'> Connect [MSSQL] database successfully! ```python from sqlalchemy import create_engine # 原始DBAPI的設定 # db_conn = pymssql.connect(server='10.34.124.114', database='master', user='sa', password='wistron888') try: mssql_engine = create_engine('mssql+pymssql://sa:wistron888@10.34.124.114/master', echo=True) # echo標誌是設置SQLAlchemy日誌記錄的快捷方式 print(type(mssql_engine)) print('Connect [MSSQL] database successfully!') except Exception as e: print(f"Encounter exception: {e}") finally: # 斷開資料庫的連線 mssql_engine.dispose() ``` <class 'sqlalchemy.engine.base.Engine'> Connect [MSSQL] database successfully! ### 創建資料表`TABLE` SQLAlchemy的`MetaData`是`Table`物件及其關聯的schema構造的集合。 ```python from sqlalchemy import MetaData meta = MetaData() ``` Table類別的物件表示數據庫中的對應的資料表。在構建`Table`物件時需要定義每一個欄位`column`的屬性與其中的data型別。 SQLAlchemy定義了一些通用型來適應不同的資料庫: * Boolean * Date * Time * DateTime * SmallInteger * Integer * BigInteger * Float * Numeric * String * Text 以下範例用來創立一個`Table`物件: ```python from sqlalchemy import Table, Column, Integer, String, MetaData meta = MetaData() students = Table( 'students', meta, Column('id', Integer, primary_key = True), Column('name', String), Column('lastname', String), ) ``` `create_all()`函數使用`Engine`物件來創建定義的`Table`物件,並將信息存儲在`metadata`中。 ```python meta.create_all(engine) ``` 2022-10-19 01:09:08,525 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2022-10-19 01:09:08,526 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("students") 2022-10-19 01:09:08,527 INFO sqlalchemy.engine.Engine [raw sql] () 2022-10-19 01:09:08,528 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("students") 2022-10-19 01:09:08,528 INFO sqlalchemy.engine.Engine [raw sql] () 2022-10-19 01:09:08,530 INFO sqlalchemy.engine.Engine CREATE TABLE students ( id INTEGER NOT NULL, name VARCHAR, lastname VARCHAR, PRIMARY KEY (id) ) 2022-10-19 01:09:08,531 INFO sqlalchemy.engine.Engine [no key 0.00094s] () 2022-10-19 01:09:08,549 INFO sqlalchemy.engine.Engine COMMIT ### 新增資料`INSERT`操作 以下範例將在資料表上新增加一些數據: ```python insertObj = students.insert().values(name='Simon', lastname='Lin') db_conn = engine.connect() result = db_conn.execute(insertObj) print(result) ``` 2022-10-19 01:09:11,748 INFO sqlalchemy.engine.Engine INSERT INTO students (name, lastname) VALUES (?, ?) 2022-10-19 01:09:11,749 INFO sqlalchemy.engine.Engine [generated in 0.00103s] ('Simon', 'Lin') 2022-10-19 01:09:11,753 INFO sqlalchemy.engine.Engine COMMIT <sqlalchemy.engine.cursor.LegacyCursorResult object at 0x0000024A82324CD0> 如果一次要新增多筆資料, 我們可以發送字典列表作為參數。 ```python db_conn.execute(students.insert(), [ {'name':'Rajiv', 'lastname' : 'Khanna'}, {'name':'Komal','lastname' : 'Bhandari'}, {'name':'Abdul','lastname' : 'Sattar'}, {'name':'Priya','lastname' : 'Rajhans'}, ]) ``` 2022-10-19 01:09:16,176 INFO sqlalchemy.engine.Engine INSERT INTO students (name, lastname) VALUES (?, ?) 2022-10-19 01:09:16,177 INFO sqlalchemy.engine.Engine [generated in 0.00081s] (('Rajiv', 'Khanna'), ('Komal', 'Bhandari'), ('Abdul', 'Sattar'), ('Priya', 'Rajhans')) 2022-10-19 01:09:16,180 INFO sqlalchemy.engine.Engine COMMIT <sqlalchemy.engine.cursor.LegacyCursorResult at 0x24a80aefa00> ### 資料選取 `SELECT` 操作 `table`物件的`select()`方法使我們能夠對資料表進行資料`SELECT`的操作。 ```python selectOp = students.select() db_conn = engine.connect() result = db_conn.execute(selectOp) for row in result: print(row) print(type(row[0])) print(type(row[1])) print(type(row[2])) ``` 2022-10-19 01:09:19,521 INFO sqlalchemy.engine.Engine SELECT students.id, students.name, students.lastname FROM students 2022-10-19 01:09:19,522 INFO sqlalchemy.engine.Engine [generated in 0.00099s] () (1, 'Simon', 'Lin') <class 'int'> <class 'str'> <class 'str'> (2, 'Rajiv', 'Khanna') <class 'int'> <class 'str'> <class 'str'> (3, 'Komal', 'Bhandari') <class 'int'> <class 'str'> <class 'str'> (4, 'Abdul', 'Sattar') <class 'int'> <class 'str'> <class 'str'> (5, 'Priya', 'Rajhans') <class 'int'> <class 'str'> <class 'str'> 可以通過使用`Select.where()`來應用SELECT查詢的WHERE子句。例如,如果我們要顯示ID>2的資料: ```python selectOp = students.select().where(students.columns.id>2) result = db_conn.execute(selectOp) for row in result: print(row) ``` 2022-10-19 01:09:23,415 INFO sqlalchemy.engine.Engine SELECT students.id, students.name, students.lastname FROM students WHERE students.id > ? 2022-10-19 01:09:23,416 INFO sqlalchemy.engine.Engine [generated in 0.00082s] (2,) (3, 'Komal', 'Bhandari') (4, 'Abdul', 'Sattar') (5, 'Priya', 'Rajhans') 對於已經知道SQL並且不需要強烈支持動態功能的語句的情況,SQLAlchemy允許使用字符串。 `text()`用於組成一個文本語句,該語句幾乎不變地傳遞到數據庫。 ```python from sqlalchemy.sql import text from sqlalchemy.sql import bindparam stmt = text("SELECT * FROM students WHERE students.name BETWEEN :x AND :y") stmt.bindparams stmt = stmt.bindparams( bindparam("x", type_= String), bindparam("y", type_= String) ) result = db_conn.execute(stmt, {"x": "A", "y": "L"}) ``` 2022-10-19 01:09:26,355 INFO sqlalchemy.engine.Engine SELECT * FROM students WHERE students.name BETWEEN ? AND ? 2022-10-19 01:09:26,356 INFO sqlalchemy.engine.Engine [generated in 0.00070s] ('A', 'L') 當然SQLAlchemy也允許直接去執行raw sql的語句: ```python result = db_conn.execute('SELECT * FROM students') for row in result: print(row) ``` 2022-10-19 01:09:28,962 INFO sqlalchemy.engine.Engine SELECT * FROM students 2022-10-19 01:09:28,963 INFO sqlalchemy.engine.Engine [raw sql] () (1, 'Simon', 'Lin') (2, 'Rajiv', 'Khanna') (3, 'Komal', 'Bhandari') (4, 'Abdul', 'Sattar') (5, 'Priya', 'Rajhans') ### 資料更新 `UPDATE` 操作 以下範例將更新資料表上的一些數據: ```python stmt=students.update().where(students.c.lastname=='Khanna').values(lastname='Kapoor') db_conn.execute(stmt) selectOp = students.select() result = db_conn.execute(selectOp) for row in result: print(row) ``` 2022-10-19 01:09:31,888 INFO sqlalchemy.engine.Engine UPDATE students SET lastname=? WHERE students.lastname = ? 2022-10-19 01:09:31,889 INFO sqlalchemy.engine.Engine [generated in 0.00076s] ('Kapoor', 'Khanna') 2022-10-19 01:09:31,892 INFO sqlalchemy.engine.Engine COMMIT 2022-10-19 01:09:31,906 INFO sqlalchemy.engine.Engine SELECT students.id, students.name, students.lastname FROM students 2022-10-19 01:09:31,908 INFO sqlalchemy.engine.Engine [cached since 12.39s ago] () (1, 'Simon', 'Lin') (2, 'Rajiv', 'Kapoor') (3, 'Komal', 'Bhandari') (4, 'Abdul', 'Sattar') (5, 'Priya', 'Rajhans') ### 資料刪除 `DELETE` 操作 以下範例將刪除資料表上的一些數據: ```python stmt = students.delete().where(students.columns.lastname == 'Kapoor') db_conn.execute(stmt) result = db_conn.execute(students.select()) for row in result: print(row) ``` 2022-10-19 01:09:36,991 INFO sqlalchemy.engine.Engine DELETE FROM students WHERE students.lastname = ? 2022-10-19 01:09:36,992 INFO sqlalchemy.engine.Engine [generated in 0.00080s] ('Kapoor',) 2022-10-19 01:09:36,995 INFO sqlalchemy.engine.Engine COMMIT 2022-10-19 01:09:37,011 INFO sqlalchemy.engine.Engine SELECT students.id, students.name, students.lastname FROM students 2022-10-19 01:09:37,012 INFO sqlalchemy.engine.Engine [cached since 17.49s ago] () (1, 'Simon', 'Lin') (3, 'Komal', 'Bhandari') (4, 'Abdul', 'Sattar') (5, 'Priya', 'Rajhans') ### 如何使用己存在的資料庫(讓SQLAlchemy自動構建Metadata) ```python from sqlalchemy import create_engine from sqlalchemy import MetaData from sqlalchemy.ext.declarative import declarative_base metadata = MetaData() Base = declarative_base() Base.metadata = metadata # create engine to connect Postgresql engine = create_engine('sqlite:///test.db') metadata.reflect(bind=engine) # check how many tables for table in metadata.tables: print(f'Table: {table}') # retrieve each TABLE object for t in metadata.sorted_tables: print(type(t)) print(t.name) print(t.columns) ``` Table: COMPANY <class 'sqlalchemy.sql.schema.Table'> COMPANY ImmutableColumnCollection(COMPANY.ID, COMPANY.NAME, COMPANY.AGE, COMPANY.ADDRESS, COMPANY.SALARY) # 3 Pandas與資料庫的整合 `pandas.io.sql`模組提供了從不同資料庫提取資料並轉換成DataFrame的整合。Pandas透過**SQLAlchemy**來提供對數據庫互動的抽象層。除了SQLAlchemy以外,也會需要安裝不同資料庫的驅動函式庫。 主有有以下的方法: * **read_sql_table** (table_name, con[, schema, ...]) - 將資料庫中的一個資料表讀取成為DataFrame物件。 * **read_sql_query** (sql, con[, index_col, ...]) - 將SQL查詢讀取成為DataFrame物件。 * **read_sql** (sql, con[, index_col, ...]) - 將SQL查詢或資料表讀取到DataFrame中。 * **DataFrame.to_sql** (name, col[, schema, ...]) - 將DataFrame中的記錄寫入到SQL數據庫的某特定資料表(append only)。 **範例資料庫**: DVD rental database 說明: DVD出租數據庫代表DVD出租商店的業務流程。 DVD出租數據庫具有許多資料表,包括: * 15 tables * 1 trigger * 7 views * 8 functions * 1 domain * 13 sequences **DVD Rental ER Model** ![](https://i.imgur.com/DfkFjFJ.png) 在下面的範例中,我們使用Postgre資料庫來作為資料庫引擎。要與SQLAlchemy連接,請使用`create_engine()`函數從數據庫`URI`創建`Engine`物件。 ```python from sqlalchemy import create_engine import pandas as pd # create engine to connect Postgresql pg_engine_source = create_engine('postgresql+psycopg2://dxlab:wistron888@active.deacademydev.service.paas.wistron.com:15067/dvdrental', echo=True) ``` ## 3.1 SQL查詢轉換成DataFrame 你可以在`read_sql_query()`函數中使用原始SQL語句來進行查詢。在這種情況下,必須使用適合特定資料庫的SQL語法。 由於pandas使用SQLAlchemy作為與資料庫的抽象互動層,所以你也還選擇與數據庫無關的SQLAlchemy表達式語言來進行SQL查詢。 ```python df_actor = pd.read_sql_query("select * from actor", con=pg_engine_source) print(df_actor.info()) print(df_actor.head()) ``` 2022-10-19 10:34:30,629 INFO sqlalchemy.engine.Engine select * from actor 2022-10-19 10:34:30,631 INFO sqlalchemy.engine.Engine [raw sql] {} <class 'pandas.core.frame.DataFrame'> RangeIndex: 200 entries, 0 to 199 Data columns (total 4 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 actor_id 200 non-null int64 1 first_name 200 non-null object 2 last_name 200 non-null object 3 last_update 200 non-null datetime64[ns] dtypes: datetime64[ns](1), int64(1), object(2) memory usage: 6.4+ KB None actor_id first_name last_name last_update 0 1 Penelope Guiness 2013-05-26 14:47:57.620 1 2 Nick Wahlberg 2013-05-26 14:47:57.620 2 3 Ed Chase 2013-05-26 14:47:57.620 3 4 Jennifer Davis 2013-05-26 14:47:57.620 4 5 Johnny Lollobrigida 2013-05-26 14:47:57.620 ```python # 由於read_sql()包裝了read_sql_query()與read_sql_table()的兩個方法, 建議學員未來使用read_sql() df_actor = pd.read_sql("select * from actor", con=pg_engine_source) print(df_actor.info()) print(df_actor.head()) ``` 2022-10-19 10:34:41,058 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s 2022-10-19 10:34:41,059 INFO sqlalchemy.engine.Engine [generated in 0.00181s] {'name': 'select * from actor'} 2022-10-19 10:34:41,097 INFO sqlalchemy.engine.Engine select * from actor 2022-10-19 10:34:41,098 INFO sqlalchemy.engine.Engine [raw sql] {} <class 'pandas.core.frame.DataFrame'> RangeIndex: 200 entries, 0 to 199 Data columns (total 4 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 actor_id 200 non-null int64 1 first_name 200 non-null object 2 last_name 200 non-null object 3 last_update 200 non-null datetime64[ns] dtypes: datetime64[ns](1), int64(1), object(2) memory usage: 6.4+ KB None actor_id first_name last_name last_update 0 1 Penelope Guiness 2013-05-26 14:47:57.620 1 2 Nick Wahlberg 2013-05-26 14:47:57.620 2 3 Ed Chase 2013-05-26 14:47:57.620 3 4 Jennifer Davis 2013-05-26 14:47:57.620 4 5 Johnny Lollobrigida 2013-05-26 14:47:57.620 由於Pandas會把數據都先讀取dataframe中(在記憶體裡), 如果SQL的資料量太大(比如上千萬筆), 那麼很容易導致記憶體不足而失敗。建議根據預計讀入的資料量預估使用的記憶量。若是要讀入dataframe的筆數單次超過**500萬筆**的數據時, 需要透過某些邏輯來切割資料後再交由Pandas來分析。比如根據某種partition鍵值來切割資料。 ```python SELECT case_id, text FROM first_case limit 1000 offset 0 ``` > 請注意,pandas從query結果的資料來推斷列的`dtype`,而不是通過在資料庫本身的schema類型來推斷。 你也可以將某特定列指定為DataFrame的索引。 ```python df_actor = pd.read_sql("select * from actor", con=pg_engine_source, index_col="actor_id") print(df_actor.info()) print(df_actor.head()) ``` 2022-10-19 10:35:04,953 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s 2022-10-19 10:35:04,954 INFO sqlalchemy.engine.Engine [cached since 23.9s ago] {'name': 'select * from actor'} 2022-10-19 10:35:04,985 INFO sqlalchemy.engine.Engine select * from actor 2022-10-19 10:35:04,986 INFO sqlalchemy.engine.Engine [raw sql] {} <class 'pandas.core.frame.DataFrame'> Int64Index: 200 entries, 1 to 200 Data columns (total 3 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 first_name 200 non-null object 1 last_name 200 non-null object 2 last_update 200 non-null datetime64[ns] dtypes: datetime64[ns](1), object(2) memory usage: 6.2+ KB None first_name last_name last_update actor_id 1 Penelope Guiness 2013-05-26 14:47:57.620 2 Nick Wahlberg 2013-05-26 14:47:57.620 3 Ed Chase 2013-05-26 14:47:57.620 4 Jennifer Davis 2013-05-26 14:47:57.620 5 Johnny Lollobrigida 2013-05-26 14:47:57.620 ## 3.2 將DataFrame直接寫進SQL資料表 Pandas將dataframe寫入資料表時, 並不會去偵測修改或刪除的業務情境, 它純粹是以`append`的模型來把資料倒入指定的資料表。此種方法很適合用於資料的交換/整合的使用情境。 我們將透過`Dataframe.to_sql()`方法來把DataFrame物件的資料寫進指定的資料表。 為了讓每個學員可以練習資料庫的寫入, 學員幫每個學員創建了一個Postgres的資料庫。 > 舉例:工號 8008888 -> 資料庫: db_8008888 , 帳號: user_8008888 , 密碼: xxxxxxx ```python # 請學員連線到學員專屬的資料庫 from sqlalchemy import create_engine # 原始DBAPI的設定 # pg_engine = create_engine('postgresql+psycopg2://{user_工號}:{指定的密碼}@10.34.124.114/{db_工號}', echo=True) # 舉例:工號 8008888 -> 資料庫: db_8008888 , 帳號: user_8008888 , 密碼: xxxxxxx # pg_engine = create_engine('postgresql+psycopg2://user_8008888:xxxxxxx@10.34.124.114/db_8008888', echo=True) pg_engine_destination = create_engine('postgresql+psycopg2://dxlab:wistron888@active.deacademydev.service.paas.wistron.com:15067/dvdrental', echo=True) # echo標誌是設置SQLAlchemy日誌記錄的快捷方式 print(type(pg_engine_destination)) print('Connect [PostgreSQL] database successfully!') ``` <class 'sqlalchemy.engine.base.Engine'> Connect [PostgreSQL] database successfully! ```python df_actor.to_sql("staging_actor", con=pg_engine_destination, index=False, if_exists='append') ``` 2022-10-19 10:35:53,066 INFO sqlalchemy.engine.Engine select pg_catalog.version() 2022-10-19 10:35:53,067 INFO sqlalchemy.engine.Engine [raw sql] {} 2022-10-19 10:35:53,091 INFO sqlalchemy.engine.Engine select current_schema() 2022-10-19 10:35:53,092 INFO sqlalchemy.engine.Engine [raw sql] {} 2022-10-19 10:35:53,114 INFO sqlalchemy.engine.Engine show standard_conforming_strings 2022-10-19 10:35:53,116 INFO sqlalchemy.engine.Engine [raw sql] {} 2022-10-19 10:35:53,138 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s 2022-10-19 10:35:53,140 INFO sqlalchemy.engine.Engine [generated in 0.00238s] {'name': 'staging_actor'} 2022-10-19 10:35:53,179 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2022-10-19 10:35:53,187 INFO sqlalchemy.engine.Engine CREATE TABLE staging_actor ( first_name TEXT, last_name TEXT, last_update TIMESTAMP WITHOUT TIME ZONE ) 2022-10-19 10:35:53,190 INFO sqlalchemy.engine.Engine [no key 0.00172s] {} 2022-10-19 10:35:53,233 INFO sqlalchemy.engine.Engine COMMIT 2022-10-19 10:35:53,250 INFO sqlalchemy.engine.Engine BEGIN (implicit) 2022-10-19 10:35:53,262 INFO sqlalchemy.engine.Engine INSERT INTO staging_actor (first_name, last_name, last_update) VALUES (%(first_name)s, %(last_name)s, %(last_update)s) 2022-10-19 10:35:53,265 INFO sqlalchemy.engine.Engine [generated in 0.00428s] ({'first_name': 'Penelope', 'last_name': 'Guiness', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Nick', 'last_name': 'Wahlberg', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Ed', 'last_name': 'Chase', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Jennifer', 'last_name': 'Davis', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Johnny', 'last_name': 'Lollobrigida', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Bette', 'last_name': 'Nicholson', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Grace', 'last_name': 'Mostel', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Matthew', 'last_name': 'Johansson', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)} ... displaying 10 of 200 total bound parameter sets ... {'first_name': 'Julia', 'last_name': 'Fawcett', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}, {'first_name': 'Thora', 'last_name': 'Temple', 'last_update': datetime.datetime(2013, 5, 26, 14, 47, 57, 620000)}) 2022-10-19 10:35:53,316 INFO sqlalchemy.engine.Engine COMMIT 200