---
# System prepended metadata

title: '梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)'
tags: [regression, machine learning, linear regression]

---

---
title: '梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)'
disqus: hackmd
---

梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)
===



## Table of Contents

[TOC]

## 梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)

### SGDRegressor 在數學上到底在做什麼？

它沒有改模型，不論是前面的：

* Simple
* Multiple
* Polynomial

仍然使用同一個模型：

$$
\hat{y} = wx + b
$$

同一個 SSE：

$$
SSE = \sum_{i=1}^{n} (y_i - wx_i - b)^2
$$

差別只在於：

* **OLS**：對「整個 SSE」求導 → 解聯立方程式
* **SGD**：對「單一資料點的誤差」求導 → 逐步更新參數

### OLS（Ordinary Least Square）

如前一節 [OLS（Ordinary Least Square，最小平方法）](https://hackmd.io/axF0Q0AmSgmEdiGtRZJgHg#OLS%EF%BC%88Ordinary-Least-Square%EF%BC%8C%E6%9C%80%E5%B0%8F%E5%B9%B3%E6%96%B9%E6%B3%95%EF%BC%89) 所推導



### SGD（Stochastic Gradient Descent）

定義 **平均平方誤差（Mean Squared Error, MSE）** 為目標函數：

$$
J(w,b)=
\frac{1}{n}\sum_{i=1}^{n}(y_i - wx_i - b)^2
$$



對n筆資料點進行偏微分（SGD 的核心）

#### 對 (b) 偏微分


\begin{aligned}
\frac{\partial J}{\partial b}
&=
\frac{\partial}{\partial b}
\left(
\frac{1}{n}\sum_{i=1}^{n}(y_i-wx_i-b)^2
\right) \\
&=
\frac{1}{n}\sum_{i=1}^{n} 2(y_i-wx_i-b)(-1) \\
&=
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)
\end{aligned}



#### 對 (w) 偏微分


\begin{aligned}
\frac{\partial J}{\partial w}
&=
\frac{1}{n}\sum_{i=1}^{n} 2(y_i-wx_i-b)(-x_i) \\
&=
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i
\end{aligned}


#### SGD 的更新規則（關鍵差異在這）

SGD **不會令導數 = 0**，而是設學習率（learning rate）為 $(\eta)$

$$
[
\theta \leftarrow \theta - \eta \nabla J(\theta)
]
$$
其中：

* $(\nabla J(\theta))$：**梯度（函數上升最快的方向）**
* $(\eta)$：學習率（步長）
:::success
#### 學習率

學習率 =「我相信這個方向有多準，所以我願意走多大一步」
在 SGD 裡，我們每次都在做這件事，我站在某一組參數 $(w,b)$，我知道「哪個方向會讓 SSE 下降最快」（梯度），但我不知道，要走多遠才不會走過頭?這個「走多遠」的比例，就是學習率。
:::


#### 更新 (b)



\begin{aligned}
b
&\leftarrow
b - \eta \frac{\partial J}{\partial b} \\
&=
b - \eta\left(
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)
\right) \\
&=
b + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)
\end{aligned}


#### 更新 (w)

\begin{aligned}
w
&\leftarrow
w - \eta \frac{\partial J}{\partial w} \\
&=
w - \eta\left(
-\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i
\right) \\
&=
w + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i
\end{aligned}

### SGD 更新式總結


\begin{aligned}
w &\leftarrow w + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)x_i \\
b &\leftarrow b + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)
\end{aligned}


### 為什麼要用 SGD？

正規方程式的限制:

$$
\mathbf{w} = (\mathbf{X}^\top \mathbf{X})^{-1}\mathbf{X}^\top y
$$

OLS 能順利用，背後其實偷偷假設了：

* 特徵數 ($p$) **不大**
* ($X^\top X$) **可逆**
* 記憶體放得下 ($p \times p$) 矩陣
* 願意花時間算矩陣反矩陣

只要其中一個條件出問題，OLS 就會開始崩。


| 面向              | OLS（Normal Equation） | SGD         |
| --------------- | -------------------- | ----------- |
| 解法              | 一次算出封閉解              | 逐步數值逼近      |
| 是否需要反矩陣         | ✅                    | ❌           |
| 計算成本            | $(O(p^3))$             | $(O(p))$ 每步   |
| 記憶體需求           | 高                    | 低           |
| 大型資料            | ❌                    | ✅           |
| Online learning | ❌                    | ✅           |
| 正則化支援           | 有，但成本高               | 天然適合        |
| 解的本質            | 全域最小值                | 同一個最小值（理論上） |


numpy 程式碼範例
---


引入模組與產生資料
```python=
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

# 啟用 LaTeX 渲染
mpl.rcParams["text.usetex"] = True
mpl.rcParams["text.latex.preamble"] = r"\usepackage{amsmath}"

#實驗規模
seed = 42
n_samples = 800
n_features = 3

# 資料特性
noise_std = 2.0            # y 的高斯噪聲強度
bias_true = 5.0            # 真實截距 b
feature_scale = 1.0        # X 的數值尺度
w_scale = 1.0              # 真實 w 的尺度

# 離群值
outlier_frac = 0.05        # 離群比例（0.0 表示不加）
outlier_y_scale = 25.0     # 離群值偏移幅度（乘在 noise_std 上）

# 稀疏化（模擬 one-hot/高維稀疏資料，效能比較會更有感），這邊先不啟用
sparse_frac = 0.0          # 0.0 表示不稀疏；例如 0.9 代表 90% 變 0
```

固定隨機種子（保證可重現）

```python=
rng = np.random.default_rng(seed)
```

生成特徵矩陣 X（n_samples × n_features）

```python=
X = rng.normal(loc=0.0, scale=feature_scale, size=(n_samples, n_features))
X.shape
```

稀疏化（把部分特徵值變成 0）

```python=
if sparse_frac > 0:
    mask = rng.random(size=X.shape) < sparse_frac
    X = X.copy()
    X[mask] = 0.0
```

設定真實參數 w_true、b_true

```python=
w_true = rng.normal(loc=0.0, scale=w_scale, size=(n_features,))
b_true = float(bias_true)

w_true, b_true
```
生成 y（線性關係 + 高斯噪聲）

```python=
noise = rng.normal(loc=0.0, scale=noise_std, size=(n_samples,))
y = X @ w_true + b_true + noise
y.shape
```
加入 y 方向離群值（outliers）

```python=
outlier_mask = np.zeros(n_samples, dtype=bool)

if outlier_frac > 0:
    k = max(1, int(n_samples * outlier_frac))
    idx = rng.choice(n_samples, size=k, replace=False)
    outlier_mask[idx] = True

    y = y.copy()
    y[idx] += rng.normal(loc=0.0, scale=outlier_y_scale * noise_std, size=k)

outlier_mask.sum()
```
切分 train/val/test

```python=
val_ratio = 0.2
test_ratio = 0.2

idx = np.arange(n_samples)
rng = np.random.default_rng(seed)  # 用同 seed 保持可重現
rng.shuffle(idx)

n_test = int(n_samples * test_ratio)
n_val  = int(n_samples * val_ratio)
n_train = n_samples - n_val - n_test

train_idx = idx[:n_train]
val_idx   = idx[n_train:n_train+n_val]
test_idx  = idx[n_train+n_val:]

X_train, y_train = X[train_idx], y[train_idx]
X_val, y_val     = X[val_idx], y[val_idx]
X_test, y_test   = X[test_idx], y[test_idx]

(X_train.shape, X_val.shape, X_test.shape)
```

資料摘要

```python=
print("X:", X.shape, " y:", y.shape)
print("train/val/test:", X_train.shape, X_val.shape, X_test.shape)
print("outliers:", outlier_mask.sum())
print("w_true:", w_true)
print("b_true:", b_true)
```

### OLS(詳細講解請看前面章節)

```python=
import numpy as np
import time

# 第一欄是 1（代表截距 b）
X_design = np.c_[np.ones(len(X_train)), X_train]  # shape: (n_train, 1+n_features)
X_design.shape



theta_ols, residuals, rank, svals = np.linalg.lstsq(X_design, y_train, rcond=None)



b_ols = float(theta_ols[0])
w_ols = theta_ols[1:]

print("OLS done.")
print("w_ols shape:", w_ols.shape, "b_ols:", b_ols)

def mse(y_true, y_pred):
    return np.mean((y_true - y_pred)**2)

yhat_train_ols = X_train @ w_ols + b_ols
train_mse_ols = mse(y_train, yhat_train_ols)

print("OLS train MSE:", train_mse_ols)


yhat_val_ols = X_val @ w_ols + b_ols
val_mse_ols = mse(y_val, yhat_val_ols)
print("OLS val   MSE:", val_mse_ols)

```




### SGD 

設定SGD 超參數

```python=

seed = 42
rng = np.random.default_rng(seed)

epochs = 30
eta = 0.05            # learning rate（你之後可以調）
batch_size = 128        # 1 = SGD；例如 32/128 就是 mini-batch
shuffle = True

#初始化參數 
w = np.zeros(X_train.shape[1], dtype=float)
b = 0.0

print("init w shape:", w.shape, "init b:", b)
```

開始訓練

```python=
loss_history = []
w_history = []
b_history = []

n = len(X_train)

for epoch in range(1, epochs + 1):
    # full-batch：不需要 steps_per_epoch / batch_idx
    # shuffle 對 full-batch 沒差（梯度用全部資料），可以忽略或保留都一樣
    yhat = X_train @ w + b
    err = y_train - yhat

    # full-batch gradients（注意這裡是 /n）
    grad_w = -(2.0 / n) * (X_train.T @ err)
    grad_b = -(2.0 / n) * np.sum(err)

    # one update per epoch
    w = w - eta * grad_w
    b = b - eta * grad_b

    # 記錄（用 train 全體算 MSE）
    yhat_train = X_train @ w + b
    train_mse = mse(y_train, yhat_train)

    loss_history.append(train_mse)
    w_history.append(w.copy())
    b_history.append(b)

    if epoch == 1 or epoch % 5 == 0:
        print(f"epoch {epoch:02d} | train MSE={train_mse:.4f}")

```

訓練結果

```python=
yhat_train_sgd = X_train @ w + b
train_mse_sgd = mse(y_train, yhat_train_sgd)
print("SGD train MSE:", train_mse_sgd)

yhat_val_sgd = X_val @ w + b
val_mse_sgd = mse(y_val, yhat_val_sgd)
print("SGD val   MSE:", val_mse_sgd)

```

產生loss圖

:::success
#### 注意:在colab cell執行下面程式碼，下載Latex所需資源，才能在matplotlib的圖裡使用Latex
```python=
!apt update
!apt install -y cm-super dvipng texlive-latex-extra texlive-fonts-recommended
```
如果是在window本機
* 安裝 MiKTeX（建議完整安裝）
* 安裝 Ghostscript（必要）
* 安裝完成後，將 latex, dvipng 所在路徑加入環境變數 PATH
:::

```python=
plt.figure(figsize=(6,4))
plt.plot(loss_history, marker="o")
plt.xlabel("epoch")
plt.ylabel("train MSE")
plt.title("SGD training loss (per epoch)")
plt.grid(True)
plt.show()
```

![SDG_Loss](https://hackmd.io/_uploads/H1cu7hqEbx.png)

產生gif並存取(可見SGD逐漸往OLS走)

```python=
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter


# 準備資料（只用 x[0], x[1] 畫 3D）

X2 = X_train[:, :2].astype(float)
y = y_train.astype(float)

# OLS（只取前兩維）
w2_ols = np.array(w_ols[:2], dtype=float)
b_ols_ = float(b_ols)

# SGD history（只取前兩維）
w2_hist = [np.array(w[:2], dtype=float) for w in w_history]
b_hist = [float(b) for b in b_history]

loss_history = list(loss_history)
n_frames = len(loss_history)




#meshgrid（平面網格）


x0_surf, x1_surf = np.meshgrid(
    np.linspace(X2[:, 0].min(), X2[:, 0].max(), 30),
    np.linspace(X2[:, 1].min(), X2[:, 1].max(), 30)
)

# OLS 平面（固定）
y_surf_ols = b_ols_ + w2_ols[0] * x0_surf + w2_ols[1] * x1_surf


#建立排版：2x3（左 3D / 右兩圖 / 最右文字）

fig = plt.figure(figsize=(10, 4.5), constrained_layout=True)
gs = fig.add_gridspec(
    2, 3,
    width_ratios=[2.3, 1.0, 0.6],
    height_ratios=[1, 1]
)

ax3d = fig.add_subplot(gs[:, 0], projection="3d")  # 左：3D
ax_loss = fig.add_subplot(gs[0, 1])                # 右上：loss
ax_param = fig.add_subplot(gs[1, 1])               # 右下：params
ax_text = fig.add_subplot(gs[:, 2])                # 最右：文字欄
ax_text.axis("off")


#點雲 + OLS 平面（固定）

ax3d.scatter(
    X2[~train_outlier, 0], X2[~train_outlier, 1], y[~train_outlier],
    s=25, edgecolors="k", alpha=0.7, label="normal"
)
if train_outlier.sum() > 0:
    ax3d.scatter(
        X2[train_outlier, 0], X2[train_outlier, 1], y[train_outlier],
        s=60, edgecolors="k", alpha=0.9, color="red", label="outlier"
    )

# OLS：surface + wireframe（固定）
surf_ols = ax3d.plot_surface(x0_surf, x1_surf, y_surf_ols, alpha=0.30)
#wire_ols = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf_ols, rstride=2, cstride=2, linewidth=0.6)
wire_ols = ax3d.plot_wireframe(
    x0_surf, x1_surf, y_surf_ols,
    rstride=2, cstride=2,
    linewidth=0.3,
    alpha=0.4
)

# SGD：先畫第 0 幀（會在 update 內更新）
surf_sgd = None
wire_sgd = None

w0 = w2_hist[0]
b0 = b_hist[0]
y_surf_sgd0 = b0 + w0[0] * x0_surf + w0[1] * x1_surf
surf_sgd = ax3d.plot_surface(x0_surf, x1_surf, y_surf_sgd0, alpha=0.18)
# wire_sgd = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf_sgd0, rstride=2, cstride=2, linewidth=0.8)
wire_sgd = ax3d.plot_wireframe(
    x0_surf, x1_surf, y_surf_sgd0,
    rstride=2, cstride=2,
    linewidth=0.4,      # 原本 0.8
    alpha=0.5           # 新增
)


ax3d.set_xlabel("x[0]", labelpad=10)
ax3d.set_ylabel("x[1]", labelpad=10)
ax3d.set_zlabel("y", labelpad=6)
ax3d.set_title("3D: Data + OLS plane (fixed) + SGD plane (moving)", pad=18)
ax3d.view_init(elev=18, azim=120)

# 固定 zlim（避免 outlier 把平面壓扁）
y_vis = y[~train_outlier] if train_outlier.sum() > 0 else y
lo, hi = np.percentile(y_vis, [2, 98])
ax3d.set_zlim(lo, hi)

# legend 放外面（只放點雲 legend）
ax3d.legend(loc="upper left", bbox_to_anchor=(0.0, 1.02))


# 殘差垂直線

rng = np.random.default_rng(0)
sample_size = min(50, len(X2))
sample_idx = rng.choice(len(X2), size=sample_size, replace=False)

res_lines = []
for i in sample_idx:
    ln, = ax3d.plot(
        [X2[i, 0], X2[i, 0]],
        [X2[i, 1], X2[i, 1]],
        [y[i], y[i]],
        linewidth=0.8,
        alpha=0.7
    )
    res_lines.append(ln)


#右上：loss（逐步長出來）

ax_loss.set_title("Training loss (MSE)")
ax_loss.set_xlabel("epoch")
ax_loss.set_ylabel("MSE")
ax_loss.grid(True)

loss_line, = ax_loss.plot([], [], linewidth=2)
loss_dot, = ax_loss.plot([], [], marker="o")

ax_loss.set_xlim(0, n_frames - 1)
ax_loss.set_ylim(min(loss_history) * 0.95, max(loss_history) * 1.05)


#右下：參數軌跡（w0, w1, b）

w0_hist = [w[0] for w in w2_hist]
w1_hist = [w[1] for w in w2_hist]

ax_param.set_title("Parameter trace")
ax_param.set_xlabel("epoch")
ax_param.set_ylabel("value")
ax_param.grid(True)
ax_param.set_xlim(0, n_frames - 1)

vals = w0_hist + w1_hist + b_hist
vmin, vmax = min(vals), max(vals)
pad = (vmax - vmin) * 0.1 if vmax > vmin else 1.0
ax_param.set_ylim(vmin - pad, vmax + pad)

w0_line, = ax_param.plot([], [], linewidth=2, label="w[0]")
w1_line, = ax_param.plot([], [], linewidth=2, label="w[1]")
b_line,  = ax_param.plot([], [], linewidth=2, label="b")
ax_param.legend(loc="upper right")


#右側文字欄（不擋圖）

info_text = ax_text.text(0.02, 0.98, "", va="top", ha="left", fontsize=12)


# 動畫更新：更新 SGD 平面 + 殘差線 + loss + params + 文字

def update(frame):
    global surf_sgd, wire_sgd

    # --- 更新 SGD 平面（移除舊的再畫新的）---
    if surf_sgd is not None:
        surf_sgd.remove()
    if wire_sgd is not None:
        wire_sgd.remove()

    w = w2_hist[frame]
    b = b_hist[frame]
    y_surf = b + w[0] * x0_surf + w[1] * x1_surf

    surf_sgd = ax3d.plot_surface(x0_surf, x1_surf, y_surf, alpha=0.18)
    wire_sgd = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf, rstride=2, cstride=2, linewidth=0.8)

    # --- 更新殘差線（從 SGD 平面到真實點）---
    z_hat = b + X2[sample_idx, 0] * w[0] + X2[sample_idx, 1] * w[1]
    for k, i in enumerate(sample_idx):
        res_lines[k].set_data([X2[i, 0], X2[i, 0]], [X2[i, 1], X2[i, 1]])
        res_lines[k].set_3d_properties([z_hat[k], y[i]])

    # --- 更新 loss ---
    xs = np.arange(frame + 1)
    ys = np.array(loss_history[:frame + 1])
    loss_line.set_data(xs, ys)
    loss_dot.set_data([frame], [loss_history[frame]])

    # --- 更新 params ---
    w0_line.set_data(xs, np.array(w0_hist[:frame + 1]))
    w1_line.set_data(xs, np.array(w1_hist[:frame + 1]))
    b_line.set_data(xs, np.array(b_hist[:frame + 1]))

    # --- 更新文字欄 ---
    info_text.set_text(
        "SGD status\n"
        "---------\n"
        f"epoch = {frame}\n\n"
        f"w0 = {w0_hist[frame]:.4f}\n"
        f"w1 = {w1_hist[frame]:.4f}\n"
        f"b  = {b_hist[frame]:.4f}\n\n"
        f"MSE = {loss_history[frame]:.4f}\n\n"
        f"outliers = {int(train_outlier.sum())}"
    )

    return (loss_line, loss_dot, w0_line, w1_line, b_line, info_text, surf_sgd, wire_sgd, *res_lines)

anim = FuncAnimation(fig, update, frames=n_frames, interval=200, blit=False)
plt.show()


#存 GIF

gif_path = "sgd_vs_ols_3d_clear.gif"
anim.save(gif_path, writer=PillowWriter(fps=5))
print("Saved:", gif_path)

```


![sgd_vs_ols_3d_clear (5) (1)](https://hackmd.io/_uploads/HyiqpqMBbx.gif)




scikit-learn 程式碼(SGD vs OLS，大筆資料X大量特徵)
---

接下來實驗會同樣使用sklearn模組的OLS和SGD來比較使用記憶體和時間，來顯示為何使用SGD。

資料規模設定
```python=
import numpy as np

rng = np.random.default_rng(42)

# 設定資料規模
n_samples = 200_000   # 20 萬筆
n_features = 100

# 真實參數（固定，方便解釋）
w_true = rng.normal(0, 2, size=n_features)
b_true = 3.0

# 生成 X
X = rng.normal(0, 1, size=(n_samples, n_features))

# 生成 y（線性 + 雜訊）
noise = rng.normal(0, 1.0, size=n_samples)
y = X @ w_true + b_true + noise

print("X shape:", X.shape)
print("y shape:", y.shape)

```

資料分割

```python=
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42
)

print("Train size:", X_train.shape)
print("Val size:", X_val.shape)

```

引入分析模組和sklearn

```python=
import time
import tracemalloc
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error
```

OLS訓練

```python=
ols = Pipeline([
    ("scaler", StandardScaler()),
    ("model", LinearRegression(fit_intercept=True))
])

tracemalloc.start()
t0 = time.perf_counter()

ols.fit(X_train, y_train)

t1 = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

ols_time = t1 - t0
ols_mem = peak / (1024**2)

yhat_train_ols = ols.predict(X_train)
yhat_val_ols = ols.predict(X_val)

mse_train_ols = mean_squared_error(y_train, yhat_train_ols)
mse_val_ols = mean_squared_error(y_val, yhat_val_ols)

print("\n=== OLS (n large) ===")
print("fit time (sec):", ols_time)
print("peak mem (MB):", ols_mem)
print("train MSE:", mse_train_ols)
print("val   MSE:", mse_val_ols)

```
 
 SGD訓練
 
 
 ```python=
from sklearn.linear_model import SGDRegressor

sgd_fixed = Pipeline([
    ("scaler", StandardScaler()),
    ("model", SGDRegressor(
        loss="squared_error",
        penalty=None,
        alpha=0.0,
        fit_intercept=True,
        max_iter=30,          # 注意：每 iter = 掃過全部資料
        tol=1e-3,
        learning_rate="constant",
        eta0=0.01,
        random_state=42
    ))
])

tracemalloc.start()
t0 = time.perf_counter()

sgd_fixed.fit(X_train, y_train)

t1 = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

sgd_time = t1 - t0
sgd_mem = peak / (1024**2)

yhat_train_sgd = sgd_fixed.predict(X_train)
yhat_val_sgd = sgd_fixed.predict(X_val)

mse_train_sgd = mean_squared_error(y_train, yhat_train_sgd)
mse_val_sgd = mean_squared_error(y_val, yhat_val_sgd)

print("\n=== SGD (n large, fixed iter) ===")
print("fit time (sec):", sgd_time)
print("peak mem (MB):", sgd_mem)
print("n_iter_:", sgd_fixed.named_steps["model"].n_iter_)
print("train MSE:", mse_train_sgd)
print("val   MSE:", mse_val_sgd)

 
 ```
 
 比較結果
 
 
 | | OLS (n large) | SGD (n large, fixed iter) |
| -------- | -------- | -------- |
| fit time (sec)|2.076020639000035|2.052420466000001|
| peak mem (MB) |368.74022674560547|137.4093828201294|
| train MSE |0.9980185576293048|1.85396486854132|
| val   MSE |1.0007565636951319|1.8735307768331146|

由此可看出雖然效果不一定比較好，但是在記憶體和時間上，SGD都相對要少


