## GA 撈資料存到 MySQL: Example Code ###### tags: `Data Engineering` ###### 更新日期: `2025-10-04` 本案例使用 OAuth2 登入 Google 帳號取憑證 Google 帳號需先開通 GA4 的存取權 ``` """ GA4 Data API (OAuth 個人帳號登入版) - 第一次執行會開瀏覽器登入 Google 帳號 -> 同意 scope - 取得的 Token 會存到 token.json, 後續自動沿用/刷新 本例對齊 GA4「流量開發 / Traffic acquisition」表格欄位 維度: sessionDefaultChannelGroup 指標: sessions, engagedSessions, engagementRate, userEngagementDuration(用來算平均參與秒數), eventsPerSession, eventCount, keyEvents, totalRevenue 另外計算: 平均單次工作階段參與時間(秒) = userEngagementDuration / sessions 工作階段重要事件發生率 = keyEvents / sessions """""" # NOTE: # This sanitized version replaces sensitive fields with placeholders in the format (your XXX). # Please replace them with your actual values before running: # - (your GA4_PROPERTY_ID) : your GA4 numeric property id. # - (your CLIENT_SECRET_JSON_PATH) : path to your OAuth client_secret.json. # - (your TOKEN_JSON_PATH) : path to your token.json. # - (your MYSQL_HOST)/(your MYSQL_DB)/(your MYSQL_USER)/(your MYSQL_PASSWORD) : MySQL connection. # - (your TRANS_DATE: YYYY-MM-DD) : the date string for the report. # - (your OUTPUT_CSV_PATH) : output CSV path and used as MySQL table name base. # import os from dataclasses import dataclass from typing import List, Dict import pandas as pd import mysql.connector from google.analytics.data_v1beta import BetaAnalyticsDataClient from google.analytics.data_v1beta.types import ( RunReportRequest, DateRange, Dimension, Metric, OrderBy ) from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request # ---- Settings ------------------------------------------------------- @dataclass class Settings: # GA4 Property property_id: str = "(your GA4_PROPERTY_ID)" # OAuth 檔案 client_secret_file: str = "(your CLIENT_SECRET_JSON_PATH)" # OAuth 2.0 Client ID JSON token_file: str = "(your TOKEN_JSON_PATH)" # 第一次登入後會自動產生 # 權杖 scope (唯讀即可) scopes: tuple = ("https://www.googleapis.com/auth/analytics.readonly",) # ---- Auth (OAuth) --------------------------------------------------- SCOPES = ["https://www.googleapis.com/auth/analytics.readonly"] def get_oauth_credentials(st): creds = None # 若 token.json 存在且有效,直接使用 if os.path.exists(st.token_file): from google.oauth2.credentials import Credentials creds = Credentials.from_authorized_user_file(st.token_file, SCOPES) # 沒有或失效就重新登入授權 if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: # 有 refresh_token 直接刷新 creds.refresh(Request()) else: # 第一次登入:開瀏覽器 → 選擇帳號 → 同意 flow = InstalledAppFlow.from_client_secrets_file( st.client_secret_file, SCOPES) # 建議使用本機 callback (會開 http://localhost:...) creds = flow.run_local_server(port=0) # 存檔以便下次直接使用 with open(st.token_file, "w") as token: token.write(creds.to_json()) return creds # ---- GA4 Helpers ---------------------------------------------------- def build_client(creds: Credentials) -> BetaAnalyticsDataClient: return BetaAnalyticsDataClient(credentials=creds) def df_from_rows(resp, dim_names: List[str], metric_names: List[str]) -> pd.DataFrame: rows: List[Dict] = [] for r in resp.rows: d = {dim: r.dimension_values[i].value for i, dim in enumerate(dim_names)} for i, m in enumerate(metric_names): v = r.metric_values[i].value try: f = float(v) d[m] = int(f) if f.is_integer() else f except Exception: d[m] = v rows.append(d) return pd.DataFrame(rows) def safe_div(a, b) -> float: try: a, b = float(a), float(b) return a / b if b != 0 else 0.0 except Exception: return 0.0 # ---- Query (單日表格,對齊 Traffic acquisition) ---------------------- def query_table_one_day(trans_date: str, # 例如 "2025-09-26" channel_dims_api: List[str], # 例如 ["sessionSource","sessionMedium","sessionCampaignName"] channel_dims_ui: List[str] # 例如 ["來源","媒介","廣告活動"] ) -> pd.DataFrame: # 設定與授權 st = Settings() creds = get_oauth_credentials(st) client = build_client(creds) metrics = [ "sessions", "engagedSessions", "engagementRate", "userEngagementDuration", "eventsPerSession", "eventCount", "totalRevenue", "screenPageViews", "totalUsers", "activeUsers", ] # 一次帶三個維度 + newVsReturning req = RunReportRequest( property=f"properties/{st.property_id}", date_ranges=[DateRange(start_date=trans_date, end_date=trans_date)], dimensions=[*(Dimension(name=d) for d in channel_dims_api), Dimension(name="newVsReturning")], metrics=[Metric(name=m) for m in metrics], order_bys=[OrderBy(metric=OrderBy.MetricOrderBy(metric_name="sessions"), desc=True)], limit=1000, ) dim_names = channel_dims_api + ["newVsReturning"] resp = client.run_report(req) # 取得原始表 df_raw = df_from_rows(resp, dim_names, metrics) # ---- 回訪者 ---- df_returning = ( df_raw[df_raw["newVsReturning"] == "returning"] .groupby(channel_dims_api, as_index=False) .agg({"activeUsers": "sum"}) .rename(columns={"activeUsers": "回訪者"}) ) # ---- 依三維度聚合 ---- df_agg = df_raw.groupby(channel_dims_api, as_index=False).agg({ "sessions": "sum", "engagedSessions": "sum", "engagementRate": "first", # 之後會重算 "userEngagementDuration": "sum", "eventsPerSession": "first", # 之後會重算 "eventCount": "sum", "totalRevenue": "sum", "screenPageViews": "sum", "totalUsers": "sum", "activeUsers": "sum", }) # 率類指標重算 df_agg["engagementRate"] = [safe_div(e, s) for e, s in zip(df_agg["engagedSessions"], df_agg["sessions"])] df_agg["eventsPerSession"] = [safe_div(e, s) for e, s in zip(df_agg["eventCount"], df_agg["sessions"])] # 併回訪者 df = pd.merge(df_agg, df_returning, on=channel_dims_api, how="left").fillna({"回訪者": 0}) # 衍生欄位 df["avgEngagementTimePerSession_sec"] = [ int(safe_div(u, s)) for u, s in zip(df["userEngagementDuration"], df["sessions"]) ] # 與原版一致:eventCount 轉字串兩位小數 df["eventCount"] = df["eventCount"].map(lambda x: f"{float(x):.2f}") # 友善欄名(把三個 API 維度換成 UI 欄名) rename_map = {api: ui for api, ui in zip(channel_dims_api, channel_dims_ui)} df = df.rename(columns={ **rename_map, "sessions": "工作階段", "engagedSessions": "互動工作階段", "engagementRate": "參與度", "avgEngagementTimePerSession_sec": "平均單次工作階段參與時間(秒)", "eventsPerSession": "每個工作階段的事件數", "eventCount": "事件計數(所有事件)", "totalRevenue": "總收益", "screenPageViews": "瀏覽", "totalUsers": "總人數", "activeUsers": "活躍使用者(總)", }) # 欄位順序:把三個維度各自獨立成三欄放在最前面 ordered = channel_dims_ui + [ "工作階段", "互動工作階段", "參與度", "平均單次工作階段參與時間(秒)", "每個工作階段的事件數", "事件計數(所有事件)", "總收益", "瀏覽", "總人數", "活躍使用者(總)", "回訪者", ] removed_cols = ["活躍使用者(總)"] # 保持與你現有表頭一致(如不想隱藏可移除) final_cols = [c for c in ordered if c in df.columns and c not in removed_cols] final_df = df[final_cols].sort_values(by="工作階段", ascending=False) return final_df def write_csv(df, table_out): out_dir = os.path.dirname(table_out) if not os.path.exists(out_dir): os.makedirs(out_dir, exist_ok=True) df.to_csv(table_out, index=False, encoding="utf-8-sig", float_format="%.2f") print(f"[OK] saved -> {table_out}\n") # ---- MySQL 資料庫操作 ---- @dataclass class MySQLConfig: """MySQL 連線設定 """ host: str = "(your MYSQL_HOST)" port: int = 3306 database: str = "(your MYSQL_DB)" user: str = "(your MYSQL_USER)" password: str = "(your MYSQL_PASSWORD)" def get_conn(config: MySQLConfig): return mysql.connector.connect( host=config.host, port=config.port, user=config.user, password=config.password, database=config.database, use_pure=True, # 避免 C 擴充不支援 auth_plugin="mysql_native_password" # 強制用舊的驗證插件 ) def create_ga4_table(config: MySQLConfig, table_name: str, df: pd.DataFrame): """建立 GA4 資料表(根據 DataFrame 欄位動態產生)""" conn = get_conn(config) cursor = conn.cursor() # 根據 df 欄位動態建立 DDL column_definitions = ["`id` INT AUTO_INCREMENT PRIMARY KEY"] for col in df.columns: if col in ["工作階段", "互動工作階段", "瀏覽", "總人數", "回訪者"]: column_definitions.append(f"`{col}` INT") elif col == "參與度": column_definitions.append(f"`{col}` DECIMAL(10,4)") elif col == "每個工作階段的事件數": column_definitions.append(f"`{col}` DECIMAL(10,2)") elif col == "事件計數(所有事件)": column_definitions.append(f"`{col}` VARCHAR(50)") elif col == "總收益": column_definitions.append(f"`{col}` DECIMAL(18,2)") elif col == "平均單次工作階段參與時間(秒)": column_definitions.append(f"`{col}` INT") else: # 其他欄位(包含動態的第一個欄位)預設為 VARCHAR column_definitions.append(f"`{col}` VARCHAR(255)") #column_definitions.append("`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP") create_table_sql = f""" CREATE TABLE IF NOT EXISTS `{table_name}` ( {','.join(column_definitions)} ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ cursor.execute(create_table_sql) conn.commit() cursor.close() conn.close() print(f"[OK] 建立表格: {table_name} (欄位: {list(df.columns)})\n") def write_to_mysql(df: pd.DataFrame, config: MySQLConfig, table_name: str): """將 DataFrame 寫入 MySQL 表格""" conn = get_conn(config) cursor = conn.cursor() # 準備插入語句 columns = list(df.columns) placeholders = ', '.join(['%s'] * len(columns)) insert_sql = f"INSERT INTO `{table_name}` ({', '.join([f'`{col}`' for col in columns])}) VALUES ({placeholders})" # 轉換資料並插入 for _, row in df.iterrows(): values = [] for col in columns: value = row[col] # 處理 NaN 值 if pd.isna(value): values.append(None) else: values.append(value) cursor.execute(insert_sql, values) conn.commit() cursor.close() conn.close() print(f"[OK] 寫入 {len(df)} 筆資料到表格: {table_name}\n") def save_df_to_mysql(df, table_out): # 取得 MySQL 資料庫連線參數 mysql_config = MySQLConfig() # 從 table_out 路徑提取表格名稱 table_name = os.path.splitext(os.path.basename(table_out))[0] # 移除副檔名 # 建立表格(根據 df 欄位動態產生) create_ga4_table(mysql_config, table_name, df) # 寫入資料到 MySQL 資料庫 write_to_mysql(df, mysql_config, table_name) print(f"[完成] 資料已寫入 MySQL 表格: {mysql_config.database}.{table_name}\n") # ---- Main ----------------------------------------------------------- def main(): # 查詢期間(可用 YYYY-MM-DD 或 today/yesterday/7daysAgo/28daysAgo) #trans_date: str = "2025-09-26" trans_date: str = "(your TRANS_DATE: YYYY-MM-DD)" # 三個維度(API 名稱) channel_dims_api = ["sessionSource", "sessionMedium", "sessionCampaignName"] # 表格第一欄的顯示名稱 channel_dims_ui = ["來源", "媒介", "廣告活動"] # 輸出檔 table_out: str = "(your OUTPUT_CSV_PATH)" # 查單日表格 df = query_table_one_day(trans_date, channel_dims_api, channel_dims_ui) # 輸出 CSV write_csv(df, table_out) # 寫入 MySQL save_df_to_mysql(df, table_out) if __name__ == "__main__": main() ```