> # SQL 與 Python 程式碼之間的等價轉換 Python 是目前資料處理上的主流程式語言之一,SQL 則是使用於關聯式資料庫的資料查詢與操作語言。兩者都是屬於圖靈完備的程式語言 (※註),並且在概念上有許多等價的行為,所以可依據情境的不同,適時地使用這兩種不同的工具。 Python 由於有廣大的支援,因此使用情境更為廣泛,例如用來處理非結構化資料、多行程/多執行緒的程式設計,這些也是資料處理時的常見議題;至於 SQL 則是擅長於結構化資料的處理。但由於資料處理通常是屬於一種無序至有序的轉換過程 (非結構化轉換為結構化),因此關聯式資料庫通常還是 Data Pipiline 的處理終點,所以即使 SQL or 關聯式資料庫這個工具問世已久,但仍是資料處理時的重要工具。 ※註:SQL 自從 SQL:1999 標準增加 recursive Common Table Expressions (CTEs) 的機制之後,就正式成為圖靈完備的程式語言。 ## 大綱: * 主題 1:欄位選取、篩選、排序 (select ... from ... where ... order) * 主題 2:資料聚合、篩選 (group by ... having ...) * 主題 3:Window Function (over ... partition by ... order by ...) * 主題 4:資料合併 (join) * 主題 5:資料聯集、交集、差集 (union、intersect、except) * 主題 6:其他各種常見的欄位操作 * 去除空白字元 (trim) * 擷取字串:以位置長度切 (substring)、以分割字元切 (split_part) * 取代字串 (replace) * 查表並異動欄位內容 (update) * 查表並新增欄位內容 (map) * 依據規則填值,並指定儲存類型 (列舉值、短整數、浮點數、...) * 固定值 * 是否在一集合中 (例如:有在集合中則為Y,無則為N) * 比較兩欄位是否相同 * 類型轉換 (int/float to string、string to int/float) * 其他工作使用案例 ## 主題 1: 欄位選取、篩選、排序 (select ... from ... where ... order) <div style="display: flex; flex-wrap: wrap; gap: 20px;"> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> SQL 範例 1: ```sql select col1, col2, col3 from table where key1 = 'value1' and key2 = 'value2' and col1 between 1 and 1000 order by col1, col2; ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 1-1:資料來源為 CSV 檔案,且使用 pandas <br> * 測試結果:以 1 GB、1000萬筆資料為例,執行時間為 28.614 secs. ```python import pandas as pd # 讀取整個 CSV 檔案 # 可加上 usecols=['col1', 'col2', 'col3'] 限制讀取欄位,或加上 encoding='utf-8-sig' 設定編碼,並建議加上:dtype=str, low_memory=False df = pd.read_csv('table.csv') # 篩選符合條件的列,選取特定欄位,並依 col1 與 col2 排序 result = df.loc[(df['key1'] == 'value1') & (df['key2'] == 'value2') & df['col1'].between(1, 1000)][['col1', 'col2', 'col3']].sort_values(by=['col1', 'col2']) print(result) # print(df.loc[(df['key1'] == 'A') & (df['key2'] == 'I'), 'col1']) # 取出滿足該條件的 col1 欄位 ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 1-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict]) <br> * 測試結果:以 1 GB、1000萬筆資料為例,執行時間為 64.184 secs. ```python import csv with open('table.csv', newline='') as csvfile: # 可加上 encoding='utf-8-sig' 設定編碼 reader = csv.DictReader(csvfile) # 篩選條件:key1 = 'value1' 且 key2 = 'value2' 且 col1 between 1 and 1000 filtered = [ {'col1': row['col1'], 'col2': row['col2'], 'col3': row['col3']} for row in reader if row['key1'] == 'value1' and row['key2'] == 'value2' and int(row['col1']) >= 1 and int(row['col1']) &lt;= 1000 ] # 排序依 col1, col2 result = sorted(filtered, key=lambda x: (x['col1'], x['col2'])) print(result) ``` * 備註:若使用 polars,以 1 GB、1000萬筆資料為例,執行時間為 3.726 secs. </div> </div> ## 主題 2: 資料聚合、篩選 (group by ... having ...) <div style="display: flex; flex-wrap: wrap; gap: 20px;"> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> SQL 範例 2: ```sql select key1, key2, count(1), sum(col1), max(col1), min(col1) from table group by key1, key2 having count(1) > 2 order by key1, key2; ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 2-1:資料來源為 CSV 檔案,且使用 pandas ```python import pandas as pd # 讀取 CSV 檔案 df = pd.read_csv('table.csv') # 使用 groupby 聚合資料 agg_df = ( df.groupby(['key1', 'key2'], as_index=False) .agg( count=('col1', 'size'), sum_col1=('col1', 'sum'), max_col1=('col1', 'max'), min_col1=('col1', 'min') ) ) # 套用 HAVING 條件 agg_df = agg_df[agg_df['count'] > 2] # 排序 agg_df = agg_df.sort_values(by=['key1', 'key2']) # 顯示結果 print(agg_df) ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 2-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict]) ```python import csv from collections import defaultdict # 用來儲存聚合資料 agg = defaultdict(lambda: { 'count': 0, 'sum': 0, 'max': None, 'min': None }) # 開始讀取並聚合 with open('table.csv', newline='') as csvfile: # 可加上 encoding='utf-8-sig' 設定編碼 reader = csv.DictReader(csvfile) for row in reader: key1 = row['key1'] key2 = row['key2'] col1 = int(row['col1']) # 若 col1 是 float,也可用 float() k = (key1, key2) agg[k]['count'] += 1 agg[k]['sum'] += col1 agg[k]['max'] = col1 if agg[k]['max'] is None else max(agg[k]['max'], col1) agg[k]['min'] = col1 if agg[k]['min'] is None else min(agg[k]['min'], col1) # 將結果整理為 list 並過濾 HAVING 條件 result = [] for (key1, key2), stats in agg.items(): if stats['count'] > 2: # HAVING count(*) > 2 result.append({ 'key1': key1, 'key2': key2, 'count': stats['count'], 'sum_col1': stats['sum'], 'max_col1': stats['max'], 'min_col1': stats['min'] }) # 排序(ORDER BY key1, key2) result.sort(key=lambda x: (x['key1'], x['key2'])) # 印出結果 for row in result: print(row) ``` </div> </div> ## 主題 3: Window Function (over ... partition by ... order by ...) 例如:取各 key 值排序後的第一筆 <div style="display: flex; flex-wrap: wrap; gap: 20px;"> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> SQL 範例 3: ```sql select key1, key2, key3, col1, col2, col3 from ( select key1, key2, row_number() over (partition by key1, key2 order by col1, col2) as nrow from table ) as t where nrow = 1 order by key1, key2 ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 3-1-1:資料來源為 CSV 檔案,且使用 pandas ```python import pandas as pd # 讀取 CSV df = pd.read_csv('table.csv') # 確保 col1 和 col2 可以正確排序(數值型) df['col1'] = pd.to_numeric(df['col1'], errors='coerce') df['col2'] = pd.to_numeric(df['col2'], errors='coerce') # 模擬 row_number() over (partition by key1, key2 order by col1, col2) as nrow df['nrow'] = ( df.sort_values(['col1', 'col2']) .groupby(['key1', 'key2'], sort=False) .cumcount() + 1 ) # 篩選 nrow = 1(等價於每組中排序第一筆) result = df[df['nrow'] == 1] # 排序 key1, key2 result = result.sort_values(['key1', 'key2']).reset_index(drop=True) # 只選出要的欄位 result = result[['key1', 'key2', 'key3', 'col1', 'col2', 'col3']] print(result) ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 3-1-2:資料來源為 CSV 檔案,且使用 pandas (另一種寫法) ```python import pandas as pd # 讀取 CSV df = pd.read_csv('table.csv') # 確保 col1 和 col2 可以正確排序(數值型) df['col1'] = pd.to_numeric(df['col1'], errors='coerce') df['col2'] = pd.to_numeric(df['col2'], errors='coerce') # 另一種寫法:根據 key1, key2 分組,排序後取每組第一筆 result = df.sort_values(['key1', 'key2', 'col1', 'col2']) .groupby(['key1', 'key2'], as_index=False, sort=False) .first() print(result) ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 3-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict]) ```python import csv from collections import defaultdict with open('table.csv', newline='') as csvfile: # 可加上 encoding='utf-8-sig' 設定編碼 reader = csv.DictReader(csvfile) data = [row for row in reader] # 儲存所有資料(每列為 dict) # 將每列的 col1, col2 轉為比較用格式(這裡假設為數值) for row in data: row['col1'] = float(row['col1']) # 根據實際型別改為 int/float row['col2'] = float(row['col2']) # 用 key1+key2 作為分組依據 grouped = defaultdict(list) for row in data: key = (row['key1'], row['key2']) grouped[key].append(row) # 取每組排序後的第一筆(根據 col1, col2 排序) result = [] for key, rows in grouped.items(): rows.sort(key=lambda x: (x['col1'], x['col2'])) # SQL 的 ORDER BY col1, col2 result.append(rows[0]) # row_number() = 1 # 最後根據 key1, key2 排序 result.sort(key=lambda x: (x['key1'], x['key2'])) # 印出結果 for row in result: print({ 'key1': row['key1'], 'key2': row['key2'], 'key3': row['key3'], 'col1': row['col1'], 'col2': row['col2'], 'col3': row['col3'] }) ``` </div> </div> ## 主題 4: 資料合併 (join) <div style="display: flex; flex-wrap: wrap; gap: 20px;"> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> SQL 範例 4: ```sql select t1.key1, t1.key2, t1.key3, t2.col4 from table as t1, table2 as t2 where t1.key1 = t2.key1 and t1.key2 = t2.key2 and t1.key3 = t2.key3 ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 4-1:資料來源為 CSV 檔案,且使用 pandas ```python import pandas as pd # 讀取 CSV 檔 df1 = pd.read_csv('table.csv') df2 = pd.read_csv('table2.csv') # 依照 key1, key2, key3 做 inner join result = pd.merge( df1, df2[['key1', 'key2', 'key3', 'col4']], # 只保留需要的欄位 on=['key1', 'key2', 'key3'], # 左右欄位名稱不同時使用 left_on, right_on how='inner' ) # 只取指定欄位 result = result[['key1', 'key2', 'key3', 'col4']] print(result) ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 4-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict]) ```python import csv # 讀取 table.csv with open('table.csv', newline='') as csvfile1: # 可加上 encoding='utf-8-sig' 設定編碼 reader1 = csv.DictReader(csvfile1) table = [row for row in reader1] # 讀取 table2.csv,並將 key 組合成 dict 方便快速查找 with open('table2.csv', newline='') as csvfile2: # 可加上 encoding='utf-8-sig' 設定編碼 reader2 = csv.DictReader(csvfile2) table2_dict = { (row['key1'], row['key2'], row['key3']): row['col4'] for row in reader2 } # 執行 join result = [] for row in table: key = (row['key1'], row['key2'], row['key3']) if key in table2_dict: result.append({ 'key1': row['key1'], 'key2': row['key2'], 'key3': row['key3'], 'col4': table2_dict[key] }) print(result) ``` </div> </div> ## 主題 5: 資料聯集、交集、差集 (union、intersect、except) <div style="display: flex; flex-wrap: wrap; gap: 20px;"> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> SQL 範例 5: ```sql select key1, key2, key3 from table where key1 in ('value1', 'value2') union all select key1, key2, key3 from table where key1 = 'value3' intersect select key1, key2, key3 from table where key1 in ('value2', 'value3') except select key1, key2, key3 from table where key1 = 'value2' ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 5-1:資料來源為 CSV 檔案,且使用 pandas ```python import pandas as pd df = pd.read_csv("table.csv") # 各子查詢的 DataFrame df1 = df[df['key1'].isin(['value1', 'value2'])][['key1', 'key2', 'key3']] df2 = df[df['key1'] == 'value3'][['key1', 'key2', 'key3']] df3 = df[df['key1'].isin(['value2', 'value3'])][['key1', 'key2', 'key3']] df4 = df[df['key1'] == 'value2'][['key1', 'key2', 'key3']] # UNION ALL:保留重複值 union_all = pd.concat([df1, df2], ignore_index=True) # INTERSECT:找出與 df3 的交集(轉成 set of tuples) intersect = union_all[union_all.apply(tuple, axis=1).isin(set(map(tuple, df3.values)))] # EXCEPT:排除 df4 中的資料 result = intersect[~intersect.apply(tuple, axis=1).isin(set(map(tuple, df4.values)))] result = result.reset_index(drop=True) # 重設索引(可選) print(result) ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 5-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict]) ```python import csv # 擷取 key1, key2, key3 欄位 def extract_keys(row): return {'key1': row['key1'], 'key2': row['key2'], 'key3': row['key3']} # 讀取 CSV 檔 with open("table.csv", newline='') as f: # 可加上 encoding='utf-8-sig' 設定編碼 reader = csv.DictReader(f) data = [row for row in reader] # 取出符合條件的列(都保留為 dict 結構) df1 = [extract_keys(r) for r in data if r['key1'] in ('value1', 'value2')] df2 = [extract_keys(r) for r in data if r['key1'] == 'value3'] df3 = [extract_keys(r) for r in data if r['key1'] in ('value2', 'value3')] df4 = [extract_keys(r) for r in data if r['key1'] == 'value2'] union_all = df1 + df2 # UNION ALL intersect = [r for r in union_all if r in df3] # INTERSECT:只保留 key 在 df3 中的 result = [r for r in intersect if r not in df4] # EXCEPT:排除 key 在 df4 中的 print(result) ``` </div> </div> --- ## 暫存: 欄位選取、篩選、排序 (select、from、where、order) <div style="display: flex; flex-wrap: wrap; gap: 20px;"> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 XX:資料來源為 PostgreSQL 資料庫,且使用 pandas + sqlalchemy ```python import pandas as pd from sqlalchemy import create_engine # 建立 PostgreSQL 連線(請替換以下參數) engine = create_engine('postgresql://username:password@localhost:5432/your_database') # SQL 查詢 query = """ select col1, col2, col3 from table where key1 = 'value1' and key2 = 'value2' order by col1, col2; """ # 執行查詢並存入 DataFrame df = pd.read_sql_query(query, engine) print(df) ``` </div> <div style="flex: 1 1 500px; overflow: auto; padding: 10px;"> Python 方法 XX:資料來源為 PostgreSQL 資料庫,且使用原生 Python 結構 (list[dict]) ```python import psycopg2 # 建立連線 conn = psycopg2.connect( host="localhost", database="your_database", user="username", password="password" ) cur = conn.cursor() # 執行 SQL 查詢 cur.execute(""" select col1, col2, col3 from table where key1 = 'value1' and key2 = 'value2' order by col1, col2; """) # 取得欄位名稱 columns = [desc[0] for desc in cur.description] # 將查詢結果轉為 list[dict] result = [dict(zip(columns, row)) for row in cur.fetchall()] cur.close() conn.close() print(result) ``` </div> </div>