--- title: '梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor)' disqus: hackmd --- 梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor) === ## Table of Contents [TOC] ## 梯度下降迴歸法(Stochastic Gradient Descent Regressor,SGDRegressor) ### SGDRegressor 在數學上到底在做什麼? 它沒有改模型,不論是前面的: * Simple * Multiple * Polynomial 仍然使用同一個模型: $$ \hat{y} = wx + b $$ 同一個 SSE: $$ SSE = \sum_{i=1}^{n} (y_i - wx_i - b)^2 $$ 差別只在於: * **OLS**:對「整個 SSE」求導 → 解聯立方程式 * **SGD**:對「單一資料點的誤差」求導 → 逐步更新參數 ### OLS(Ordinary Least Square) 如前一節 [OLS(Ordinary Least Square,最小平方法)](https://hackmd.io/axF0Q0AmSgmEdiGtRZJgHg#OLS%EF%BC%88Ordinary-Least-Square%EF%BC%8C%E6%9C%80%E5%B0%8F%E5%B9%B3%E6%96%B9%E6%B3%95%EF%BC%89) 所推導 ### SGD(Stochastic Gradient Descent) 定義 **平均平方誤差(Mean Squared Error, MSE)** 為目標函數: $$ J(w,b)= \frac{1}{n}\sum_{i=1}^{n}(y_i - wx_i - b)^2 $$ 對n筆資料點進行偏微分(SGD 的核心) #### 對 (b) 偏微分 \begin{aligned} \frac{\partial J}{\partial b} &= \frac{\partial}{\partial b} \left( \frac{1}{n}\sum_{i=1}^{n}(y_i-wx_i-b)^2 \right) \\ &= \frac{1}{n}\sum_{i=1}^{n} 2(y_i-wx_i-b)(-1) \\ &= -\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i) \end{aligned} #### 對 (w) 偏微分 \begin{aligned} \frac{\partial J}{\partial w} &= \frac{1}{n}\sum_{i=1}^{n} 2(y_i-wx_i-b)(-x_i) \\ &= -\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i \end{aligned} #### SGD 的更新規則(關鍵差異在這) SGD **不會令導數 = 0**,而是設學習率(learning rate)為 $(\eta)$ $$ [ \theta \leftarrow \theta - \eta \nabla J(\theta) ] $$ 其中: * $(\nabla J(\theta))$:**梯度(函數上升最快的方向)** * $(\eta)$:學習率(步長) :::success #### 學習率 學習率 =「我相信這個方向有多準,所以我願意走多大一步」 在 SGD 裡,我們每次都在做這件事,我站在某一組參數 $(w,b)$,我知道「哪個方向會讓 SSE 下降最快」(梯度),但我不知道,要走多遠才不會走過頭?這個「走多遠」的比例,就是學習率。 ::: #### 更新 (b) \begin{aligned} b &\leftarrow b - \eta \frac{\partial J}{\partial b} \\ &= b - \eta\left( -\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i) \right) \\ &= b + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i) \end{aligned} #### 更新 (w) \begin{aligned} w &\leftarrow w - \eta \frac{\partial J}{\partial w} \\ &= w - \eta\left( -\frac{2}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i \right) \\ &= w + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i-\hat{y}_i)x_i \end{aligned} ### SGD 更新式總結 \begin{aligned} w &\leftarrow w + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)x_i \\ b &\leftarrow b + \frac{2\eta}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i) \end{aligned} ### 為什麼要用 SGD? 正規方程式的限制: $$ \mathbf{w} = (\mathbf{X}^\top \mathbf{X})^{-1}\mathbf{X}^\top y $$ OLS 能順利用,背後其實偷偷假設了: * 特徵數 ($p$) **不大** * ($X^\top X$) **可逆** * 記憶體放得下 ($p \times p$) 矩陣 * 願意花時間算矩陣反矩陣 只要其中一個條件出問題,OLS 就會開始崩。 | 面向 | OLS(Normal Equation) | SGD | | --------------- | -------------------- | ----------- | | 解法 | 一次算出封閉解 | 逐步數值逼近 | | 是否需要反矩陣 | ✅ | ❌ | | 計算成本 | $(O(p^3))$ | $(O(p))$ 每步 | | 記憶體需求 | 高 | 低 | | 大型資料 | ❌ | ✅ | | Online learning | ❌ | ✅ | | 正則化支援 | 有,但成本高 | 天然適合 | | 解的本質 | 全域最小值 | 同一個最小值(理論上) | numpy 程式碼範例 --- 引入模組與產生資料 ```python= import numpy as np import matplotlib.pyplot as plt import matplotlib as mpl # 啟用 LaTeX 渲染 mpl.rcParams["text.usetex"] = True mpl.rcParams["text.latex.preamble"] = r"\usepackage{amsmath}" #實驗規模 seed = 42 n_samples = 800 n_features = 3 # 資料特性 noise_std = 2.0 # y 的高斯噪聲強度 bias_true = 5.0 # 真實截距 b feature_scale = 1.0 # X 的數值尺度 w_scale = 1.0 # 真實 w 的尺度 # 離群值 outlier_frac = 0.05 # 離群比例(0.0 表示不加) outlier_y_scale = 25.0 # 離群值偏移幅度(乘在 noise_std 上) # 稀疏化(模擬 one-hot/高維稀疏資料,效能比較會更有感),這邊先不啟用 sparse_frac = 0.0 # 0.0 表示不稀疏;例如 0.9 代表 90% 變 0 ``` 固定隨機種子(保證可重現) ```python= rng = np.random.default_rng(seed) ``` 生成特徵矩陣 X(n_samples × n_features) ```python= X = rng.normal(loc=0.0, scale=feature_scale, size=(n_samples, n_features)) X.shape ``` 稀疏化(把部分特徵值變成 0) ```python= if sparse_frac > 0: mask = rng.random(size=X.shape) < sparse_frac X = X.copy() X[mask] = 0.0 ``` 設定真實參數 w_true、b_true ```python= w_true = rng.normal(loc=0.0, scale=w_scale, size=(n_features,)) b_true = float(bias_true) w_true, b_true ``` 生成 y(線性關係 + 高斯噪聲) ```python= noise = rng.normal(loc=0.0, scale=noise_std, size=(n_samples,)) y = X @ w_true + b_true + noise y.shape ``` 加入 y 方向離群值(outliers) ```python= outlier_mask = np.zeros(n_samples, dtype=bool) if outlier_frac > 0: k = max(1, int(n_samples * outlier_frac)) idx = rng.choice(n_samples, size=k, replace=False) outlier_mask[idx] = True y = y.copy() y[idx] += rng.normal(loc=0.0, scale=outlier_y_scale * noise_std, size=k) outlier_mask.sum() ``` 切分 train/val/test ```python= val_ratio = 0.2 test_ratio = 0.2 idx = np.arange(n_samples) rng = np.random.default_rng(seed) # 用同 seed 保持可重現 rng.shuffle(idx) n_test = int(n_samples * test_ratio) n_val = int(n_samples * val_ratio) n_train = n_samples - n_val - n_test train_idx = idx[:n_train] val_idx = idx[n_train:n_train+n_val] test_idx = idx[n_train+n_val:] X_train, y_train = X[train_idx], y[train_idx] X_val, y_val = X[val_idx], y[val_idx] X_test, y_test = X[test_idx], y[test_idx] (X_train.shape, X_val.shape, X_test.shape) ``` 資料摘要 ```python= print("X:", X.shape, " y:", y.shape) print("train/val/test:", X_train.shape, X_val.shape, X_test.shape) print("outliers:", outlier_mask.sum()) print("w_true:", w_true) print("b_true:", b_true) ``` ### OLS(詳細講解請看前面章節) ```python= import numpy as np import time # 第一欄是 1(代表截距 b) X_design = np.c_[np.ones(len(X_train)), X_train] # shape: (n_train, 1+n_features) X_design.shape theta_ols, residuals, rank, svals = np.linalg.lstsq(X_design, y_train, rcond=None) b_ols = float(theta_ols[0]) w_ols = theta_ols[1:] print("OLS done.") print("w_ols shape:", w_ols.shape, "b_ols:", b_ols) def mse(y_true, y_pred): return np.mean((y_true - y_pred)**2) yhat_train_ols = X_train @ w_ols + b_ols train_mse_ols = mse(y_train, yhat_train_ols) print("OLS train MSE:", train_mse_ols) yhat_val_ols = X_val @ w_ols + b_ols val_mse_ols = mse(y_val, yhat_val_ols) print("OLS val MSE:", val_mse_ols) ``` ### SGD 設定SGD 超參數 ```python= seed = 42 rng = np.random.default_rng(seed) epochs = 30 eta = 0.05 # learning rate(你之後可以調) batch_size = 128 # 1 = SGD;例如 32/128 就是 mini-batch shuffle = True #初始化參數 w = np.zeros(X_train.shape[1], dtype=float) b = 0.0 print("init w shape:", w.shape, "init b:", b) ``` 開始訓練 ```python= loss_history = [] w_history = [] b_history = [] n = len(X_train) for epoch in range(1, epochs + 1): # full-batch:不需要 steps_per_epoch / batch_idx # shuffle 對 full-batch 沒差(梯度用全部資料),可以忽略或保留都一樣 yhat = X_train @ w + b err = y_train - yhat # full-batch gradients(注意這裡是 /n) grad_w = -(2.0 / n) * (X_train.T @ err) grad_b = -(2.0 / n) * np.sum(err) # one update per epoch w = w - eta * grad_w b = b - eta * grad_b # 記錄(用 train 全體算 MSE) yhat_train = X_train @ w + b train_mse = mse(y_train, yhat_train) loss_history.append(train_mse) w_history.append(w.copy()) b_history.append(b) if epoch == 1 or epoch % 5 == 0: print(f"epoch {epoch:02d} | train MSE={train_mse:.4f}") ``` 訓練結果 ```python= yhat_train_sgd = X_train @ w + b train_mse_sgd = mse(y_train, yhat_train_sgd) print("SGD train MSE:", train_mse_sgd) yhat_val_sgd = X_val @ w + b val_mse_sgd = mse(y_val, yhat_val_sgd) print("SGD val MSE:", val_mse_sgd) ``` 產生loss圖 :::success #### 注意:在colab cell執行下面程式碼,下載Latex所需資源,才能在matplotlib的圖裡使用Latex ```python= !apt update !apt install -y cm-super dvipng texlive-latex-extra texlive-fonts-recommended ``` 如果是在window本機 * 安裝 MiKTeX(建議完整安裝) * 安裝 Ghostscript(必要) * 安裝完成後,將 latex, dvipng 所在路徑加入環境變數 PATH ::: ```python= plt.figure(figsize=(6,4)) plt.plot(loss_history, marker="o") plt.xlabel("epoch") plt.ylabel("train MSE") plt.title("SGD training loss (per epoch)") plt.grid(True) plt.show() ``` ![SDG_Loss](https://hackmd.io/_uploads/H1cu7hqEbx.png) 產生gif並存取(可見SGD逐漸往OLS走) ```python= import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation, PillowWriter # 準備資料(只用 x[0], x[1] 畫 3D) X2 = X_train[:, :2].astype(float) y = y_train.astype(float) # OLS(只取前兩維) w2_ols = np.array(w_ols[:2], dtype=float) b_ols_ = float(b_ols) # SGD history(只取前兩維) w2_hist = [np.array(w[:2], dtype=float) for w in w_history] b_hist = [float(b) for b in b_history] loss_history = list(loss_history) n_frames = len(loss_history) #meshgrid(平面網格) x0_surf, x1_surf = np.meshgrid( np.linspace(X2[:, 0].min(), X2[:, 0].max(), 30), np.linspace(X2[:, 1].min(), X2[:, 1].max(), 30) ) # OLS 平面(固定) y_surf_ols = b_ols_ + w2_ols[0] * x0_surf + w2_ols[1] * x1_surf #建立排版:2x3(左 3D / 右兩圖 / 最右文字) fig = plt.figure(figsize=(10, 4.5), constrained_layout=True) gs = fig.add_gridspec( 2, 3, width_ratios=[2.3, 1.0, 0.6], height_ratios=[1, 1] ) ax3d = fig.add_subplot(gs[:, 0], projection="3d") # 左:3D ax_loss = fig.add_subplot(gs[0, 1]) # 右上:loss ax_param = fig.add_subplot(gs[1, 1]) # 右下:params ax_text = fig.add_subplot(gs[:, 2]) # 最右:文字欄 ax_text.axis("off") #點雲 + OLS 平面(固定) ax3d.scatter( X2[~train_outlier, 0], X2[~train_outlier, 1], y[~train_outlier], s=25, edgecolors="k", alpha=0.7, label="normal" ) if train_outlier.sum() > 0: ax3d.scatter( X2[train_outlier, 0], X2[train_outlier, 1], y[train_outlier], s=60, edgecolors="k", alpha=0.9, color="red", label="outlier" ) # OLS:surface + wireframe(固定) surf_ols = ax3d.plot_surface(x0_surf, x1_surf, y_surf_ols, alpha=0.30) #wire_ols = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf_ols, rstride=2, cstride=2, linewidth=0.6) wire_ols = ax3d.plot_wireframe( x0_surf, x1_surf, y_surf_ols, rstride=2, cstride=2, linewidth=0.3, alpha=0.4 ) # SGD:先畫第 0 幀(會在 update 內更新) surf_sgd = None wire_sgd = None w0 = w2_hist[0] b0 = b_hist[0] y_surf_sgd0 = b0 + w0[0] * x0_surf + w0[1] * x1_surf surf_sgd = ax3d.plot_surface(x0_surf, x1_surf, y_surf_sgd0, alpha=0.18) # wire_sgd = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf_sgd0, rstride=2, cstride=2, linewidth=0.8) wire_sgd = ax3d.plot_wireframe( x0_surf, x1_surf, y_surf_sgd0, rstride=2, cstride=2, linewidth=0.4, # 原本 0.8 alpha=0.5 # 新增 ) ax3d.set_xlabel("x[0]", labelpad=10) ax3d.set_ylabel("x[1]", labelpad=10) ax3d.set_zlabel("y", labelpad=6) ax3d.set_title("3D: Data + OLS plane (fixed) + SGD plane (moving)", pad=18) ax3d.view_init(elev=18, azim=120) # 固定 zlim(避免 outlier 把平面壓扁) y_vis = y[~train_outlier] if train_outlier.sum() > 0 else y lo, hi = np.percentile(y_vis, [2, 98]) ax3d.set_zlim(lo, hi) # legend 放外面(只放點雲 legend) ax3d.legend(loc="upper left", bbox_to_anchor=(0.0, 1.02)) # 殘差垂直線 rng = np.random.default_rng(0) sample_size = min(50, len(X2)) sample_idx = rng.choice(len(X2), size=sample_size, replace=False) res_lines = [] for i in sample_idx: ln, = ax3d.plot( [X2[i, 0], X2[i, 0]], [X2[i, 1], X2[i, 1]], [y[i], y[i]], linewidth=0.8, alpha=0.7 ) res_lines.append(ln) #右上:loss(逐步長出來) ax_loss.set_title("Training loss (MSE)") ax_loss.set_xlabel("epoch") ax_loss.set_ylabel("MSE") ax_loss.grid(True) loss_line, = ax_loss.plot([], [], linewidth=2) loss_dot, = ax_loss.plot([], [], marker="o") ax_loss.set_xlim(0, n_frames - 1) ax_loss.set_ylim(min(loss_history) * 0.95, max(loss_history) * 1.05) #右下:參數軌跡(w0, w1, b) w0_hist = [w[0] for w in w2_hist] w1_hist = [w[1] for w in w2_hist] ax_param.set_title("Parameter trace") ax_param.set_xlabel("epoch") ax_param.set_ylabel("value") ax_param.grid(True) ax_param.set_xlim(0, n_frames - 1) vals = w0_hist + w1_hist + b_hist vmin, vmax = min(vals), max(vals) pad = (vmax - vmin) * 0.1 if vmax > vmin else 1.0 ax_param.set_ylim(vmin - pad, vmax + pad) w0_line, = ax_param.plot([], [], linewidth=2, label="w[0]") w1_line, = ax_param.plot([], [], linewidth=2, label="w[1]") b_line, = ax_param.plot([], [], linewidth=2, label="b") ax_param.legend(loc="upper right") #右側文字欄(不擋圖) info_text = ax_text.text(0.02, 0.98, "", va="top", ha="left", fontsize=12) # 動畫更新:更新 SGD 平面 + 殘差線 + loss + params + 文字 def update(frame): global surf_sgd, wire_sgd # --- 更新 SGD 平面(移除舊的再畫新的)--- if surf_sgd is not None: surf_sgd.remove() if wire_sgd is not None: wire_sgd.remove() w = w2_hist[frame] b = b_hist[frame] y_surf = b + w[0] * x0_surf + w[1] * x1_surf surf_sgd = ax3d.plot_surface(x0_surf, x1_surf, y_surf, alpha=0.18) wire_sgd = ax3d.plot_wireframe(x0_surf, x1_surf, y_surf, rstride=2, cstride=2, linewidth=0.8) # --- 更新殘差線(從 SGD 平面到真實點)--- z_hat = b + X2[sample_idx, 0] * w[0] + X2[sample_idx, 1] * w[1] for k, i in enumerate(sample_idx): res_lines[k].set_data([X2[i, 0], X2[i, 0]], [X2[i, 1], X2[i, 1]]) res_lines[k].set_3d_properties([z_hat[k], y[i]]) # --- 更新 loss --- xs = np.arange(frame + 1) ys = np.array(loss_history[:frame + 1]) loss_line.set_data(xs, ys) loss_dot.set_data([frame], [loss_history[frame]]) # --- 更新 params --- w0_line.set_data(xs, np.array(w0_hist[:frame + 1])) w1_line.set_data(xs, np.array(w1_hist[:frame + 1])) b_line.set_data(xs, np.array(b_hist[:frame + 1])) # --- 更新文字欄 --- info_text.set_text( "SGD status\n" "---------\n" f"epoch = {frame}\n\n" f"w0 = {w0_hist[frame]:.4f}\n" f"w1 = {w1_hist[frame]:.4f}\n" f"b = {b_hist[frame]:.4f}\n\n" f"MSE = {loss_history[frame]:.4f}\n\n" f"outliers = {int(train_outlier.sum())}" ) return (loss_line, loss_dot, w0_line, w1_line, b_line, info_text, surf_sgd, wire_sgd, *res_lines) anim = FuncAnimation(fig, update, frames=n_frames, interval=200, blit=False) plt.show() #存 GIF gif_path = "sgd_vs_ols_3d_clear.gif" anim.save(gif_path, writer=PillowWriter(fps=5)) print("Saved:", gif_path) ``` ![sgd_vs_ols_3d_clear (5) (1)](https://hackmd.io/_uploads/HyiqpqMBbx.gif) scikit-learn 程式碼(SGD vs OLS,大筆資料X大量特徵) --- 接下來實驗會同樣使用sklearn模組的OLS和SGD來比較使用記憶體和時間,來顯示為何使用SGD。 資料規模設定 ```python= import numpy as np rng = np.random.default_rng(42) # 設定資料規模 n_samples = 200_000 # 20 萬筆 n_features = 100 # 真實參數(固定,方便解釋) w_true = rng.normal(0, 2, size=n_features) b_true = 3.0 # 生成 X X = rng.normal(0, 1, size=(n_samples, n_features)) # 生成 y(線性 + 雜訊) noise = rng.normal(0, 1.0, size=n_samples) y = X @ w_true + b_true + noise print("X shape:", X.shape) print("y shape:", y.shape) ``` 資料分割 ```python= from sklearn.model_selection import train_test_split X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=42 ) print("Train size:", X_train.shape) print("Val size:", X_val.shape) ``` 引入分析模組和sklearn ```python= import time import tracemalloc from sklearn.linear_model import LinearRegression from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline from sklearn.metrics import mean_squared_error ``` OLS訓練 ```python= ols = Pipeline([ ("scaler", StandardScaler()), ("model", LinearRegression(fit_intercept=True)) ]) tracemalloc.start() t0 = time.perf_counter() ols.fit(X_train, y_train) t1 = time.perf_counter() current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() ols_time = t1 - t0 ols_mem = peak / (1024**2) yhat_train_ols = ols.predict(X_train) yhat_val_ols = ols.predict(X_val) mse_train_ols = mean_squared_error(y_train, yhat_train_ols) mse_val_ols = mean_squared_error(y_val, yhat_val_ols) print("\n=== OLS (n large) ===") print("fit time (sec):", ols_time) print("peak mem (MB):", ols_mem) print("train MSE:", mse_train_ols) print("val MSE:", mse_val_ols) ``` SGD訓練 ```python= from sklearn.linear_model import SGDRegressor sgd_fixed = Pipeline([ ("scaler", StandardScaler()), ("model", SGDRegressor( loss="squared_error", penalty=None, alpha=0.0, fit_intercept=True, max_iter=30, # 注意:每 iter = 掃過全部資料 tol=1e-3, learning_rate="constant", eta0=0.01, random_state=42 )) ]) tracemalloc.start() t0 = time.perf_counter() sgd_fixed.fit(X_train, y_train) t1 = time.perf_counter() current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() sgd_time = t1 - t0 sgd_mem = peak / (1024**2) yhat_train_sgd = sgd_fixed.predict(X_train) yhat_val_sgd = sgd_fixed.predict(X_val) mse_train_sgd = mean_squared_error(y_train, yhat_train_sgd) mse_val_sgd = mean_squared_error(y_val, yhat_val_sgd) print("\n=== SGD (n large, fixed iter) ===") print("fit time (sec):", sgd_time) print("peak mem (MB):", sgd_mem) print("n_iter_:", sgd_fixed.named_steps["model"].n_iter_) print("train MSE:", mse_train_sgd) print("val MSE:", mse_val_sgd) ``` 比較結果 | | OLS (n large) | SGD (n large, fixed iter) | | -------- | -------- | -------- | | fit time (sec)|2.076020639000035|2.052420466000001| | peak mem (MB) |368.74022674560547|137.4093828201294| | train MSE |0.9980185576293048|1.85396486854132| | val MSE |1.0007565636951319|1.8735307768331146| 由此可看出雖然效果不一定比較好,但是在記憶體和時間上,SGD都相對要少