> # SQL 與 Python 程式碼之間的等價轉換
Python 是目前資料處理上的主流程式語言之一,SQL 則是使用於關聯式資料庫的資料查詢與操作語言。兩者都是屬於圖靈完備的程式語言 (※註),並且在概念上有許多等價的行為,所以可依據情境的不同,適時地使用這兩種不同的工具。
Python 由於有廣大的支援,因此使用情境更為廣泛,例如用來處理非結構化資料、多行程/多執行緒的程式設計,這些也是資料處理時的常見議題;至於 SQL 則是擅長於結構化資料的處理。但由於資料處理通常是屬於一種無序至有序的轉換過程 (非結構化轉換為結構化),因此關聯式資料庫通常還是 Data Pipiline 的處理終點,所以即使 SQL or 關聯式資料庫這個工具問世已久,但仍是資料處理時的重要工具。
※註:SQL 自從 SQL:1999 標準增加 recursive Common Table Expressions (CTEs) 的機制之後,就正式成為圖靈完備的程式語言。
## 大綱:
* 主題 1:欄位選取、篩選、排序 (select ... from ... where ... order)
* 主題 2:資料聚合、篩選 (group by ... having ...)
* 主題 3:Window Function (over ... partition by ... order by ...)
* 主題 4:資料合併 (join)
* 主題 5:資料聯集、交集、差集 (union、intersect、except)
* 主題 6:其他各種常見的欄位操作
* 去除空白字元 (trim)
* 擷取字串:以位置長度切 (substring)、以分割字元切 (split_part)
* 取代字串 (replace)
* 查表並異動欄位內容 (update)
* 查表並新增欄位內容 (map)
* 依據規則填值,並指定儲存類型 (列舉值、短整數、浮點數、...)
* 固定值
* 是否在一集合中 (例如:有在集合中則為Y,無則為N)
* 比較兩欄位是否相同
* 類型轉換 (int/float to string、string to int/float)
* 其他工作使用案例
## 主題 1:
欄位選取、篩選、排序 (select ... from ... where ... order)
<div style="display: flex; flex-wrap: wrap; gap: 20px;">
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
SQL 範例 1:
```sql
select col1, col2, col3
from table
where key1 = 'value1' and key2 = 'value2'
and col1 between 1 and 1000
order by col1, col2;
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 1-1:資料來源為 CSV 檔案,且使用 pandas <br>
* 測試結果:以 1 GB、1000萬筆資料為例,執行時間為 28.614 secs.
```python
import pandas as pd
# 讀取整個 CSV 檔案
# 可加上 usecols=['col1', 'col2', 'col3'] 限制讀取欄位,或加上 encoding='utf-8-sig' 設定編碼,並建議加上:dtype=str, low_memory=False
df = pd.read_csv('table.csv')
# 篩選符合條件的列,選取特定欄位,並依 col1 與 col2 排序
result = df.loc[(df['key1'] == 'value1') & (df['key2'] == 'value2') & df['col1'].between(1, 1000)][['col1', 'col2', 'col3']].sort_values(by=['col1', 'col2'])
print(result)
# print(df.loc[(df['key1'] == 'A') & (df['key2'] == 'I'), 'col1']) # 取出滿足該條件的 col1 欄位
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 1-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict]) <br>
* 測試結果:以 1 GB、1000萬筆資料為例,執行時間為 64.184 secs.
```python
import csv
with open('table.csv', newline='') as csvfile: # 可加上 encoding='utf-8-sig' 設定編碼
reader = csv.DictReader(csvfile)
# 篩選條件:key1 = 'value1' 且 key2 = 'value2' 且 col1 between 1 and 1000
filtered = [
{'col1': row['col1'], 'col2': row['col2'], 'col3': row['col3']}
for row in reader
if row['key1'] == 'value1' and row['key2'] == 'value2' and int(row['col1']) >= 1 and int(row['col1']) <= 1000
]
# 排序依 col1, col2
result = sorted(filtered, key=lambda x: (x['col1'], x['col2']))
print(result)
```
* 備註:若使用 polars,以 1 GB、1000萬筆資料為例,執行時間為 3.726 secs.
</div>
</div>
## 主題 2:
資料聚合、篩選 (group by ... having ...)
<div style="display: flex; flex-wrap: wrap; gap: 20px;">
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
SQL 範例 2:
```sql
select key1, key2, count(1), sum(col1), max(col1), min(col1)
from table
group by key1, key2
having count(1) > 2
order by key1, key2;
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 2-1:資料來源為 CSV 檔案,且使用 pandas
```python
import pandas as pd
# 讀取 CSV 檔案
df = pd.read_csv('table.csv')
# 使用 groupby 聚合資料
agg_df = (
df.groupby(['key1', 'key2'], as_index=False)
.agg(
count=('col1', 'size'),
sum_col1=('col1', 'sum'),
max_col1=('col1', 'max'),
min_col1=('col1', 'min')
)
)
# 套用 HAVING 條件
agg_df = agg_df[agg_df['count'] > 2]
# 排序
agg_df = agg_df.sort_values(by=['key1', 'key2'])
# 顯示結果
print(agg_df)
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 2-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict])
```python
import csv
from collections import defaultdict
# 用來儲存聚合資料
agg = defaultdict(lambda: {
'count': 0,
'sum': 0,
'max': None,
'min': None
})
# 開始讀取並聚合
with open('table.csv', newline='') as csvfile: # 可加上 encoding='utf-8-sig' 設定編碼
reader = csv.DictReader(csvfile)
for row in reader:
key1 = row['key1']
key2 = row['key2']
col1 = int(row['col1']) # 若 col1 是 float,也可用 float()
k = (key1, key2)
agg[k]['count'] += 1
agg[k]['sum'] += col1
agg[k]['max'] = col1 if agg[k]['max'] is None else max(agg[k]['max'], col1)
agg[k]['min'] = col1 if agg[k]['min'] is None else min(agg[k]['min'], col1)
# 將結果整理為 list 並過濾 HAVING 條件
result = []
for (key1, key2), stats in agg.items():
if stats['count'] > 2: # HAVING count(*) > 2
result.append({
'key1': key1,
'key2': key2,
'count': stats['count'],
'sum_col1': stats['sum'],
'max_col1': stats['max'],
'min_col1': stats['min']
})
# 排序(ORDER BY key1, key2)
result.sort(key=lambda x: (x['key1'], x['key2']))
# 印出結果
for row in result:
print(row)
```
</div>
</div>
## 主題 3:
Window Function (over ... partition by ... order by ...)
例如:取各 key 值排序後的第一筆
<div style="display: flex; flex-wrap: wrap; gap: 20px;">
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
SQL 範例 3:
```sql
select key1, key2, key3, col1, col2, col3
from (
select key1, key2, row_number() over (partition by key1, key2 order by col1, col2) as nrow
from table
) as t
where nrow = 1
order by key1, key2
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 3-1-1:資料來源為 CSV 檔案,且使用 pandas
```python
import pandas as pd
# 讀取 CSV
df = pd.read_csv('table.csv')
# 確保 col1 和 col2 可以正確排序(數值型)
df['col1'] = pd.to_numeric(df['col1'], errors='coerce')
df['col2'] = pd.to_numeric(df['col2'], errors='coerce')
# 模擬 row_number() over (partition by key1, key2 order by col1, col2) as nrow
df['nrow'] = (
df.sort_values(['col1', 'col2'])
.groupby(['key1', 'key2'], sort=False)
.cumcount() + 1
)
# 篩選 nrow = 1(等價於每組中排序第一筆)
result = df[df['nrow'] == 1]
# 排序 key1, key2
result = result.sort_values(['key1', 'key2']).reset_index(drop=True)
# 只選出要的欄位
result = result[['key1', 'key2', 'key3', 'col1', 'col2', 'col3']]
print(result)
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 3-1-2:資料來源為 CSV 檔案,且使用 pandas (另一種寫法)
```python
import pandas as pd
# 讀取 CSV
df = pd.read_csv('table.csv')
# 確保 col1 和 col2 可以正確排序(數值型)
df['col1'] = pd.to_numeric(df['col1'], errors='coerce')
df['col2'] = pd.to_numeric(df['col2'], errors='coerce')
# 另一種寫法:根據 key1, key2 分組,排序後取每組第一筆
result = df.sort_values(['key1', 'key2', 'col1', 'col2'])
.groupby(['key1', 'key2'], as_index=False, sort=False)
.first()
print(result)
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 3-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict])
```python
import csv
from collections import defaultdict
with open('table.csv', newline='') as csvfile: # 可加上 encoding='utf-8-sig' 設定編碼
reader = csv.DictReader(csvfile)
data = [row for row in reader] # 儲存所有資料(每列為 dict)
# 將每列的 col1, col2 轉為比較用格式(這裡假設為數值)
for row in data:
row['col1'] = float(row['col1']) # 根據實際型別改為 int/float
row['col2'] = float(row['col2'])
# 用 key1+key2 作為分組依據
grouped = defaultdict(list)
for row in data:
key = (row['key1'], row['key2'])
grouped[key].append(row)
# 取每組排序後的第一筆(根據 col1, col2 排序)
result = []
for key, rows in grouped.items():
rows.sort(key=lambda x: (x['col1'], x['col2'])) # SQL 的 ORDER BY col1, col2
result.append(rows[0]) # row_number() = 1
# 最後根據 key1, key2 排序
result.sort(key=lambda x: (x['key1'], x['key2']))
# 印出結果
for row in result:
print({
'key1': row['key1'],
'key2': row['key2'],
'key3': row['key3'],
'col1': row['col1'],
'col2': row['col2'],
'col3': row['col3']
})
```
</div>
</div>
## 主題 4:
資料合併 (join)
<div style="display: flex; flex-wrap: wrap; gap: 20px;">
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
SQL 範例 4:
```sql
select t1.key1, t1.key2, t1.key3, t2.col4
from table as t1, table2 as t2
where t1.key1 = t2.key1 and t1.key2 = t2.key2 and t1.key3 = t2.key3
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 4-1:資料來源為 CSV 檔案,且使用 pandas
```python
import pandas as pd
# 讀取 CSV 檔
df1 = pd.read_csv('table.csv')
df2 = pd.read_csv('table2.csv')
# 依照 key1, key2, key3 做 inner join
result = pd.merge(
df1,
df2[['key1', 'key2', 'key3', 'col4']], # 只保留需要的欄位
on=['key1', 'key2', 'key3'], # 左右欄位名稱不同時使用 left_on, right_on
how='inner'
)
# 只取指定欄位
result = result[['key1', 'key2', 'key3', 'col4']]
print(result)
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 4-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict])
```python
import csv
# 讀取 table.csv
with open('table.csv', newline='') as csvfile1: # 可加上 encoding='utf-8-sig' 設定編碼
reader1 = csv.DictReader(csvfile1)
table = [row for row in reader1]
# 讀取 table2.csv,並將 key 組合成 dict 方便快速查找
with open('table2.csv', newline='') as csvfile2: # 可加上 encoding='utf-8-sig' 設定編碼
reader2 = csv.DictReader(csvfile2)
table2_dict = {
(row['key1'], row['key2'], row['key3']): row['col4']
for row in reader2
}
# 執行 join
result = []
for row in table:
key = (row['key1'], row['key2'], row['key3'])
if key in table2_dict:
result.append({
'key1': row['key1'],
'key2': row['key2'],
'key3': row['key3'],
'col4': table2_dict[key]
})
print(result)
```
</div>
</div>
## 主題 5:
資料聯集、交集、差集 (union、intersect、except)
<div style="display: flex; flex-wrap: wrap; gap: 20px;">
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
SQL 範例 5:
```sql
select key1, key2, key3 from table where key1 in ('value1', 'value2')
union all
select key1, key2, key3 from table where key1 = 'value3'
intersect
select key1, key2, key3 from table where key1 in ('value2', 'value3')
except
select key1, key2, key3 from table where key1 = 'value2'
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 5-1:資料來源為 CSV 檔案,且使用 pandas
```python
import pandas as pd
df = pd.read_csv("table.csv")
# 各子查詢的 DataFrame
df1 = df[df['key1'].isin(['value1', 'value2'])][['key1', 'key2', 'key3']]
df2 = df[df['key1'] == 'value3'][['key1', 'key2', 'key3']]
df3 = df[df['key1'].isin(['value2', 'value3'])][['key1', 'key2', 'key3']]
df4 = df[df['key1'] == 'value2'][['key1', 'key2', 'key3']]
# UNION ALL:保留重複值
union_all = pd.concat([df1, df2], ignore_index=True)
# INTERSECT:找出與 df3 的交集(轉成 set of tuples)
intersect = union_all[union_all.apply(tuple, axis=1).isin(set(map(tuple, df3.values)))]
# EXCEPT:排除 df4 中的資料
result = intersect[~intersect.apply(tuple, axis=1).isin(set(map(tuple, df4.values)))]
result = result.reset_index(drop=True) # 重設索引(可選)
print(result)
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 5-2:資料來源為 CSV 檔案,且使用原生 Python 結構 (list[dict])
```python
import csv
# 擷取 key1, key2, key3 欄位
def extract_keys(row):
return {'key1': row['key1'], 'key2': row['key2'], 'key3': row['key3']}
# 讀取 CSV 檔
with open("table.csv", newline='') as f: # 可加上 encoding='utf-8-sig' 設定編碼
reader = csv.DictReader(f)
data = [row for row in reader]
# 取出符合條件的列(都保留為 dict 結構)
df1 = [extract_keys(r) for r in data if r['key1'] in ('value1', 'value2')]
df2 = [extract_keys(r) for r in data if r['key1'] == 'value3']
df3 = [extract_keys(r) for r in data if r['key1'] in ('value2', 'value3')]
df4 = [extract_keys(r) for r in data if r['key1'] == 'value2']
union_all = df1 + df2 # UNION ALL
intersect = [r for r in union_all if r in df3] # INTERSECT:只保留 key 在 df3 中的
result = [r for r in intersect if r not in df4] # EXCEPT:排除 key 在 df4 中的
print(result)
```
</div>
</div>
---
## 暫存:
欄位選取、篩選、排序 (select、from、where、order)
<div style="display: flex; flex-wrap: wrap; gap: 20px;">
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 XX:資料來源為 PostgreSQL 資料庫,且使用 pandas + sqlalchemy
```python
import pandas as pd
from sqlalchemy import create_engine
# 建立 PostgreSQL 連線(請替換以下參數)
engine = create_engine('postgresql://username:password@localhost:5432/your_database')
# SQL 查詢
query = """
select col1, col2, col3
from table
where key1 = 'value1' and key2 = 'value2'
order by col1, col2;
"""
# 執行查詢並存入 DataFrame
df = pd.read_sql_query(query, engine)
print(df)
```
</div>
<div style="flex: 1 1 500px; overflow: auto; padding: 10px;">
Python 方法 XX:資料來源為 PostgreSQL 資料庫,且使用原生 Python 結構 (list[dict])
```python
import psycopg2
# 建立連線
conn = psycopg2.connect(
host="localhost",
database="your_database",
user="username",
password="password"
)
cur = conn.cursor()
# 執行 SQL 查詢
cur.execute("""
select col1, col2, col3
from table
where key1 = 'value1' and key2 = 'value2'
order by col1, col2;
""")
# 取得欄位名稱
columns = [desc[0] for desc in cur.description]
# 將查詢結果轉為 list[dict]
result = [dict(zip(columns, row)) for row in cur.fetchall()]
cur.close()
conn.close()
print(result)
```
</div>
</div>