---
title: '梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)'
disqus: hackmd
---
梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)
===
## Table of Contents
[TOC]
## 梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)
### SGDRegressor 在數學上到底在做什麼?
它沒有改模型,不論是前面的:
* Simple
* Multiple
* Polynomial
仍然使用同一個模型:
$$
\hat{y} = wx + b
$$
同一個 SSE:
$$
SSE = \sum_{i=1}^{n} (y_i - wx_i - b)^2
$$
差別只在於:
* **OLS**:對「整個 SSE」求導 → 解聯立方程式
* **SGD**:對「單一資料點的誤差」求導 → 逐步更新參數
### OLS(Ordinary Least Square)
如前一節 [OLS(Ordinary Least Square,最小平方法)](https://hackmd.io/axF0Q0AmSgmEdiGtRZJgHg#OLS%EF%BC%88Ordinary-Least-Square%EF%BC%8C%E6%9C%80%E5%B0%8F%E5%B9%B3%E6%96%B9%E6%B3%95%EF%BC%89) 所推導
### SGD(Stochastic Gradient Descent)
定義 **平均平方誤差(Mean Squared Error, MSE)** 為目標函數:
$$
J(w,b)=
\frac{1}{n}\sum_{i=1}^{n}(y_i - wx_i - b)^2
$$
對n筆資料點進行偏微分(SGD 的核心)
#### 對 (b) 偏微分
\begin{aligned}
\frac{\partial J}{\partial b}
&=
\frac{\partial}{\partial b}
\left(
\frac{1}{n}\sum_{i=1}^{n}(y_i-wx_i-b)^2
\right) \\
&=
\frac{1}{n}\sum_{i=1}^{n} 2(y_i-wx_i-b)(-1) \\
&=
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)
\end{aligned}
#### 對 (w) 偏微分
\begin{aligned}
\frac{\partial J}{\partial w}
&=
\frac{1}{n}\sum_{i=1}^{n} 2(y_i-wx_i-b)(-x_i) \\
&=
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i
\end{aligned}
#### SGD 的更新規則(關鍵差異在這)
SGD **不會令導數 = 0**,而是設學習率(learning rate)為 $(\eta)$
$$
[
\theta \leftarrow \theta - \eta \nabla J(\theta)
]
$$
其中:
* $(\nabla J(\theta))$:**梯度(函數上升最快的方向)**
* $(\eta)$:學習率(步長)
:::success
#### 學習率
學習率 =「我相信這個方向有多準,所以我願意走多大一步」
在 SGD 裡,我們每次都在做這件事,我站在某一組參數 $(w,b)$,我知道「哪個方向會讓 SSE 下降最快」(梯度),但我不知道,要走多遠才不會走過頭?這個「走多遠」的比例,就是學習率。
:::
#### 更新 (b)
\begin{aligned}
b
&\leftarrow
b - \eta \frac{\partial J}{\partial b} \\
&=
b - \eta\left(
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)
\right) \\
&=
b + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)
\end{aligned}
#### 更新 (w)
\begin{aligned}
w
&\leftarrow
w - \eta \frac{\partial J}{\partial w} \\
&=
w - \eta\left(
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i
\right) \\
&=
w + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i
\end{aligned}
### SGD 更新式總結
\begin{aligned}
w &\leftarrow w + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)x_i \\
b &\leftarrow b + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)
\end{aligned}
### 為什麼要用 SGD?
正規方程式的限制:
$$
\mathbf{w} = (\mathbf{X}^\top \mathbf{X})^{-1}\mathbf{X}^\top y
$$
OLS 能順利用,背後其實偷偷假設了:
* 特徵數 ($p$) **不大**
* ($X^\top X$) **可逆**
* 記憶體放得下 ($p \times p$) 矩陣
* 願意花時間算矩陣反矩陣
只要其中一個條件出問題,OLS 就會開始崩。
| 面向 | OLS(Normal Equation) | SGD |
| --------------- | -------------------- | ----------- |
| 解法 | 一次算出封閉解 | 逐步數值逼近 |
| 是否需要反矩陣 | ✅ | ❌ |
| 計算成本 | $(O(p^3))$ | $(O(p))$ 每步 |
| 記憶體需求 | 高 | 低 |
| 大型資料 | ❌ | ✅ |
| Online learning | ❌ | ✅ |
| 正則化支援 | 有,但成本高 | 天然適合 |
| 解的本質 | 全域最小值 | 同一個最小值(理論上) |
numpy 程式碼範例
---
引入模組與產生資料
```python=
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
# 啟用 LaTeX 渲染
mpl.rcParams["text.usetex"] = True
mpl.rcParams["text.latex.preamble"] = r"\usepackage{amsmath}"
#實驗規模
seed = 42
n_samples = 800
n_features = 3
# 資料特性
noise_std = 2.0 # y 的高斯噪聲強度
bias_true = 5.0 # 真實截距 b
feature_scale = 1.0 # X 的數值尺度
w_scale = 1.0 # 真實 w 的尺度
# 離群值
outlier_frac = 0.05 # 離群比例(0.0 表示不加)
outlier_y_scale = 25.0 # 離群值偏移幅度(乘在 noise_std 上)
# 稀疏化(模擬 one-hot/高維稀疏資料,效能比較會更有感),這邊先不啟用
sparse_frac = 0.0 # 0.0 表示不稀疏;例如 0.9 代表 90% 變 0
```
固定隨機種子(保證可重現)
```python=
rng = np.random.default_rng(seed)
```
生成特徵矩陣 X(n_samples × n_features)
```python=
X = rng.normal(loc=0.0, scale=feature_scale, size=(n_samples, n_features))
X.shape
```
稀疏化(把部分特徵值變成 0)
```python=
if sparse_frac > 0:
mask = rng.random(size=X.shape) < sparse_frac
X = X.copy()
X[mask] = 0.0
```
設定真實參數 w_true、b_true
```python=
w_true = rng.normal(loc=0.0, scale=w_scale, size=(n_features,))
b_true = float(bias_true)
w_true, b_true
```
生成 y(線性關係 + 高斯噪聲)
```python=
noise = rng.normal(loc=0.0, scale=noise_std, size=(n_samples,))
y = X @ w_true + b_true + noise
y.shape
```
加入 y 方向離群值(outliers)
```python=
outlier_mask = np.zeros(n_samples, dtype=bool)
if outlier_frac > 0:
k = max(1, int(n_samples * outlier_frac))
idx = rng.choice(n_samples, size=k, replace=False)
outlier_mask[idx] = True
y = y.copy()
y[idx] += rng.normal(loc=0.0, scale=outlier_y_scale * noise_std, size=k)
outlier_mask.sum()
```
切分 train/val/test
```python=
val_ratio = 0.2
test_ratio = 0.2
idx = np.arange(n_samples)
rng = np.random.default_rng(seed) # 用同 seed 保持可重現
rng.shuffle(idx)
n_test = int(n_samples * test_ratio)
n_val = int(n_samples * val_ratio)
n_train = n_samples - n_val - n_test
train_idx = idx[:n_train]
val_idx = idx[n_train:n_train+n_val]
test_idx = idx[n_train+n_val:]
X_train, y_train = X[train_idx], y[train_idx]
X_val, y_val = X[val_idx], y[val_idx]
X_test, y_test = X[test_idx], y[test_idx]
(X_train.shape, X_val.shape, X_test.shape)
```
資料摘要
```python=
print("X:", X.shape, " y:", y.shape)
print("train/val/test:", X_train.shape, X_val.shape, X_test.shape)
print("outliers:", outlier_mask.sum())
print("w_true:", w_true)
print("b_true:", b_true)
```
### OLS(詳細講解請看前面章節)
```python=
import numpy as np
import time
# 第一欄是 1(代表截距 b)
X_design = np.c_[np.ones(len(X_train)), X_train] # shape: (n_train, 1+n_features)
X_design.shape
theta_ols, residuals, rank, svals = np.linalg.lstsq(X_design, y_train, rcond=None)
b_ols = float(theta_ols[0])
w_ols = theta_ols[1:]
print("OLS done.")
print("w_ols shape:", w_ols.shape, "b_ols:", b_ols)
def mse(y_true, y_pred):
return np.mean((y_true - y_pred)**2)
yhat_train_ols = X_train @ w_ols + b_ols
train_mse_ols = mse(y_train, yhat_train_ols)
print("OLS train MSE:", train_mse_ols)
yhat_val_ols = X_val @ w_ols + b_ols
val_mse_ols = mse(y_val, yhat_val_ols)
print("OLS val MSE:", val_mse_ols)
```
### SGD
設定SGD 超參數
```python=
seed = 42
rng = np.random.default_rng(seed)
epochs = 30
eta = 0.05 # learning rate(你之後可以調)
batch_size = 128 # 1 = SGD;例如 32/128 就是 mini-batch
shuffle = True
#初始化參數
w = np.zeros(X_train.shape[1], dtype=float)
b = 0.0
print("init w shape:", w.shape, "init b:", b)
```
開始訓練
```python=
loss_history = []
w_history = []
b_history = []
n = len(X_train)
for epoch in range(1, epochs + 1):
# full-batch:不需要 steps_per_epoch / batch_idx
# shuffle 對 full-batch 沒差(梯度用全部資料),可以忽略或保留都一樣
yhat = X_train @ w + b
err = y_train - yhat
# full-batch gradients(注意這裡是 /n)
grad_w = -(2.0 / n) * (X_train.T @ err)
grad_b = -(2.0 / n) * np.sum(err)
# one update per epoch
w = w - eta * grad_w
b = b - eta * grad_b
# 記錄(用 train 全體算 MSE)
yhat_train = X_train @ w + b
train_mse = mse(y_train, yhat_train)
loss_history.append(train_mse)
w_history.append(w.copy())
b_history.append(b)
if epoch == 1 or epoch % 5 == 0:
print(f"epoch {epoch:02d} | train MSE={train_mse:.4f}")
```
訓練結果
```python=
yhat_train_sgd = X_train @ w + b
train_mse_sgd = mse(y_train, yhat_train_sgd)
print("SGD train MSE:", train_mse_sgd)
yhat_val_sgd = X_val @ w + b
val_mse_sgd = mse(y_val, yhat_val_sgd)
print("SGD val MSE:", val_mse_sgd)
```
產生loss圖
:::success
#### 注意:在colab cell執行下面程式碼,下載Latex所需資源,才能在matplotlib的圖裡使用Latex
```python=
!apt update
!apt install -y cm-super dvipng texlive-latex-extra texlive-fonts-recommended
```
如果是在window本機
* 安裝 MiKTeX(建議完整安裝)
* 安裝 Ghostscript(必要)
* 安裝完成後,將 latex, dvipng 所在路徑加入環境變數 PATH
:::
```python=
plt.figure(figsize=(6,4))
plt.plot(loss_history, marker="o")
plt.xlabel("epoch")
plt.ylabel("train MSE")
plt.title("SGD training loss (per epoch)")
plt.grid(True)
plt.show()
```

產生gif並存取(可見SGD逐漸往OLS走)
```python=
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
# 準備資料(只用 x[0], x[1] 畫 3D)
X2 = X_train[:, :2].astype(float)
y = y_train.astype(float)
# OLS(只取前兩維)
w2_ols = np.array(w_ols[:2], dtype=float)
b_ols_ = float(b_ols)
# SGD history(只取前兩維)
w2_hist = [np.array(w[:2], dtype=float) for w in w_history]
b_hist = [float(b) for b in b_history]
loss_history = list(loss_history)
n_frames = len(loss_history)
#meshgrid(平面網格)
x0_surf, x1_surf = np.meshgrid(
np.linspace(X2[:, 0].min(), X2[:, 0].max(), 30),
np.linspace(X2[:, 1].min(), X2[:, 1].max(), 30)
)
# OLS 平面(固定)
y_surf_ols = b_ols_ + w2_ols[0] * x0_surf + w2_ols[1] * x1_surf
#建立排版:2x3(左 3D / 右兩圖 / 最右文字)
fig = plt.figure(figsize=(10, 4.5), constrained_layout=True)
gs = fig.add_gridspec(
2, 3,
width_ratios=[2.3, 1.0, 0.6],
height_ratios=[1, 1]
)
ax3d = fig.add_subplot(gs[:, 0], projection="3d") # 左:3D
ax_loss = fig.add_subplot(gs[0, 1]) # 右上:loss
ax_param = fig.add_subplot(gs[1, 1]) # 右下:params
ax_text = fig.add_subplot(gs[:, 2]) # 最右:文字欄
ax_text.axis("off")
#點雲 + OLS 平面(固定)
ax3d.scatter(
X2[~train_outlier, 0], X2[~train_outlier, 1], y[~train_outlier],
s=25, edgecolors="k", alpha=0.7, label="normal"
)
if train_outlier.sum() > 0:
ax3d.scatter(
X2[train_outlier, 0], X2[train_outlier, 1], y[train_outlier],
s=60, edgecolors="k", alpha=0.9, color="red", label="outlier"
)
# OLS:surface + wireframe(固定)
surf_ols = ax3d.plot_surface(x0_surf, x1_surf, y_surf_ols, alpha=0.30)
#wire_ols = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf_ols, rstride=2, cstride=2, linewidth=0.6)
wire_ols = ax3d.plot_wireframe(
x0_surf, x1_surf, y_surf_ols,
rstride=2, cstride=2,
linewidth=0.3,
alpha=0.4
)
# SGD:先畫第 0 幀(會在 update 內更新)
surf_sgd = None
wire_sgd = None
w0 = w2_hist[0]
b0 = b_hist[0]
y_surf_sgd0 = b0 + w0[0] * x0_surf + w0[1] * x1_surf
surf_sgd = ax3d.plot_surface(x0_surf, x1_surf, y_surf_sgd0, alpha=0.18)
# wire_sgd = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf_sgd0, rstride=2, cstride=2, linewidth=0.8)
wire_sgd = ax3d.plot_wireframe(
x0_surf, x1_surf, y_surf_sgd0,
rstride=2, cstride=2,
linewidth=0.4, # 原本 0.8
alpha=0.5 # 新增
)
ax3d.set_xlabel("x[0]", labelpad=10)
ax3d.set_ylabel("x[1]", labelpad=10)
ax3d.set_zlabel("y", labelpad=6)
ax3d.set_title("3D: Data + OLS plane (fixed) + SGD plane (moving)", pad=18)
ax3d.view_init(elev=18, azim=120)
# 固定 zlim(避免 outlier 把平面壓扁)
y_vis = y[~train_outlier] if train_outlier.sum() > 0 else y
lo, hi = np.percentile(y_vis, [2, 98])
ax3d.set_zlim(lo, hi)
# legend 放外面(只放點雲 legend)
ax3d.legend(loc="upper left", bbox_to_anchor=(0.0, 1.02))
# 殘差垂直線
rng = np.random.default_rng(0)
sample_size = min(50, len(X2))
sample_idx = rng.choice(len(X2), size=sample_size, replace=False)
res_lines = []
for i in sample_idx:
ln, = ax3d.plot(
[X2[i, 0], X2[i, 0]],
[X2[i, 1], X2[i, 1]],
[y[i], y[i]],
linewidth=0.8,
alpha=0.7
)
res_lines.append(ln)
#右上:loss(逐步長出來)
ax_loss.set_title("Training loss (MSE)")
ax_loss.set_xlabel("epoch")
ax_loss.set_ylabel("MSE")
ax_loss.grid(True)
loss_line, = ax_loss.plot([], [], linewidth=2)
loss_dot, = ax_loss.plot([], [], marker="o")
ax_loss.set_xlim(0, n_frames - 1)
ax_loss.set_ylim(min(loss_history) * 0.95, max(loss_history) * 1.05)
#右下:參數軌跡(w0, w1, b)
w0_hist = [w[0] for w in w2_hist]
w1_hist = [w[1] for w in w2_hist]
ax_param.set_title("Parameter trace")
ax_param.set_xlabel("epoch")
ax_param.set_ylabel("value")
ax_param.grid(True)
ax_param.set_xlim(0, n_frames - 1)
vals = w0_hist + w1_hist + b_hist
vmin, vmax = min(vals), max(vals)
pad = (vmax - vmin) * 0.1 if vmax > vmin else 1.0
ax_param.set_ylim(vmin - pad, vmax + pad)
w0_line, = ax_param.plot([], [], linewidth=2, label="w[0]")
w1_line, = ax_param.plot([], [], linewidth=2, label="w[1]")
b_line, = ax_param.plot([], [], linewidth=2, label="b")
ax_param.legend(loc="upper right")
#右側文字欄(不擋圖)
info_text = ax_text.text(0.02, 0.98, "", va="top", ha="left", fontsize=12)
# 動畫更新:更新 SGD 平面 + 殘差線 + loss + params + 文字
def update(frame):
global surf_sgd, wire_sgd
# --- 更新 SGD 平面(移除舊的再畫新的)---
if surf_sgd is not None:
surf_sgd.remove()
if wire_sgd is not None:
wire_sgd.remove()
w = w2_hist[frame]
b = b_hist[frame]
y_surf = b + w[0] * x0_surf + w[1] * x1_surf
surf_sgd = ax3d.plot_surface(x0_surf, x1_surf, y_surf, alpha=0.18)
wire_sgd = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf, rstride=2, cstride=2, linewidth=0.8)
# --- 更新殘差線(從 SGD 平面到真實點)---
z_hat = b + X2[sample_idx, 0] * w[0] + X2[sample_idx, 1] * w[1]
for k, i in enumerate(sample_idx):
res_lines[k].set_data([X2[i, 0], X2[i, 0]], [X2[i, 1], X2[i, 1]])
res_lines[k].set_3d_properties([z_hat[k], y[i]])
# --- 更新 loss ---
xs = np.arange(frame + 1)
ys = np.array(loss_history[:frame + 1])
loss_line.set_data(xs, ys)
loss_dot.set_data([frame], [loss_history[frame]])
# --- 更新 params ---
w0_line.set_data(xs, np.array(w0_hist[:frame + 1]))
w1_line.set_data(xs, np.array(w1_hist[:frame + 1]))
b_line.set_data(xs, np.array(b_hist[:frame + 1]))
# --- 更新文字欄 ---
info_text.set_text(
"SGD status\n"
"---------\n"
f"epoch = {frame}\n\n"
f"w0 = {w0_hist[frame]:.4f}\n"
f"w1 = {w1_hist[frame]:.4f}\n"
f"b = {b_hist[frame]:.4f}\n\n"
f"MSE = {loss_history[frame]:.4f}\n\n"
f"outliers = {int(train_outlier.sum())}"
)
return (loss_line, loss_dot, w0_line, w1_line, b_line, info_text, surf_sgd, wire_sgd, *res_lines)
anim = FuncAnimation(fig, update, frames=n_frames, interval=200, blit=False)
plt.show()
#存 GIF
gif_path = "sgd_vs_ols_3d_clear.gif"
anim.save(gif_path, writer=PillowWriter(fps=5))
print("Saved:", gif_path)
```

scikit-learn 程式碼(SGD vs OLS,大筆資料X大量特徵)
---
接下來實驗會同樣使用sklearn模組的OLS和SGD來比較使用記憶體和時間,來顯示為何使用SGD。
資料規模設定
```python=
import numpy as np
rng = np.random.default_rng(42)
# 設定資料規模
n_samples = 200_000 # 20 萬筆
n_features = 100
# 真實參數(固定,方便解釋)
w_true = rng.normal(0, 2, size=n_features)
b_true = 3.0
# 生成 X
X = rng.normal(0, 1, size=(n_samples, n_features))
# 生成 y(線性 + 雜訊)
noise = rng.normal(0, 1.0, size=n_samples)
y = X @ w_true + b_true + noise
print("X shape:", X.shape)
print("y shape:", y.shape)
```
資料分割
```python=
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(
X, y,
test_size=0.2,
random_state=42
)
print("Train size:", X_train.shape)
print("Val size:", X_val.shape)
```
引入分析模組和sklearn
```python=
import time
import tracemalloc
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error
```
OLS訓練
```python=
ols = Pipeline([
("scaler", StandardScaler()),
("model", LinearRegression(fit_intercept=True))
])
tracemalloc.start()
t0 = time.perf_counter()
ols.fit(X_train, y_train)
t1 = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
ols_time = t1 - t0
ols_mem = peak / (1024**2)
yhat_train_ols = ols.predict(X_train)
yhat_val_ols = ols.predict(X_val)
mse_train_ols = mean_squared_error(y_train, yhat_train_ols)
mse_val_ols = mean_squared_error(y_val, yhat_val_ols)
print("\n=== OLS (n large) ===")
print("fit time (sec):", ols_time)
print("peak mem (MB):", ols_mem)
print("train MSE:", mse_train_ols)
print("val MSE:", mse_val_ols)
```
SGD訓練
```python=
from sklearn.linear_model import SGDRegressor
sgd_fixed = Pipeline([
("scaler", StandardScaler()),
("model", SGDRegressor(
loss="squared_error",
penalty=None,
alpha=0.0,
fit_intercept=True,
max_iter=30, # 注意:每 iter = 掃過全部資料
tol=1e-3,
learning_rate="constant",
eta0=0.01,
random_state=42
))
])
tracemalloc.start()
t0 = time.perf_counter()
sgd_fixed.fit(X_train, y_train)
t1 = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
sgd_time = t1 - t0
sgd_mem = peak / (1024**2)
yhat_train_sgd = sgd_fixed.predict(X_train)
yhat_val_sgd = sgd_fixed.predict(X_val)
mse_train_sgd = mean_squared_error(y_train, yhat_train_sgd)
mse_val_sgd = mean_squared_error(y_val, yhat_val_sgd)
print("\n=== SGD (n large, fixed iter) ===")
print("fit time (sec):", sgd_time)
print("peak mem (MB):", sgd_mem)
print("n_iter_:", sgd_fixed.named_steps["model"].n_iter_)
print("train MSE:", mse_train_sgd)
print("val MSE:", mse_val_sgd)
```
比較結果
| | OLS (n large) | SGD (n large, fixed iter) |
| -------- | -------- | -------- |
| fit time (sec)|2.076020639000035|2.052420466000001|
| peak mem (MB) |368.74022674560547|137.4093828201294|
| train MSE |0.9980185576293048|1.85396486854132|
| val MSE |1.0007565636951319|1.8735307768331146|
由此可看出雖然效果不一定比較好,但是在記憶體和時間上,SGD都相對要少