## GA 撈資料存到 MySQL: Example Code
###### tags: `Data Engineering`
###### 更新日期: `2025-10-04`
本案例使用 OAuth2 登入 Google 帳號取憑證
Google 帳號需先開通 GA4 的存取權
```
"""
GA4 Data API (OAuth 個人帳號登入版)
- 第一次執行會開瀏覽器登入 Google 帳號 -> 同意 scope
- 取得的 Token 會存到 token.json, 後續自動沿用/刷新
本例對齊 GA4「流量開發 / Traffic acquisition」表格欄位
維度: sessionDefaultChannelGroup
指標: sessions, engagedSessions, engagementRate, userEngagementDuration(用來算平均參與秒數),
eventsPerSession, eventCount, keyEvents, totalRevenue
另外計算:
平均單次工作階段參與時間(秒) = userEngagementDuration / sessions
工作階段重要事件發生率 = keyEvents / sessions
""""""
# NOTE:
# This sanitized version replaces sensitive fields with placeholders in the format (your XXX).
# Please replace them with your actual values before running:
# - (your GA4_PROPERTY_ID) : your GA4 numeric property id.
# - (your CLIENT_SECRET_JSON_PATH) : path to your OAuth client_secret.json.
# - (your TOKEN_JSON_PATH) : path to your token.json.
# - (your MYSQL_HOST)/(your MYSQL_DB)/(your MYSQL_USER)/(your MYSQL_PASSWORD) : MySQL connection.
# - (your TRANS_DATE: YYYY-MM-DD) : the date string for the report.
# - (your OUTPUT_CSV_PATH) : output CSV path and used as MySQL table name base.
#
import os
from dataclasses import dataclass
from typing import List, Dict
import pandas as pd
import mysql.connector
from google.analytics.data_v1beta import BetaAnalyticsDataClient
from google.analytics.data_v1beta.types import (
RunReportRequest, DateRange, Dimension, Metric, OrderBy
)
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
# ---- Settings -------------------------------------------------------
@dataclass
class Settings:
# GA4 Property
property_id: str = "(your GA4_PROPERTY_ID)"
# OAuth 檔案
client_secret_file: str = "(your CLIENT_SECRET_JSON_PATH)" # OAuth 2.0 Client ID JSON
token_file: str = "(your TOKEN_JSON_PATH)" # 第一次登入後會自動產生
# 權杖 scope (唯讀即可)
scopes: tuple = ("https://www.googleapis.com/auth/analytics.readonly",)
# ---- Auth (OAuth) ---------------------------------------------------
SCOPES = ["https://www.googleapis.com/auth/analytics.readonly"]
def get_oauth_credentials(st):
creds = None
# 若 token.json 存在且有效,直接使用
if os.path.exists(st.token_file):
from google.oauth2.credentials import Credentials
creds = Credentials.from_authorized_user_file(st.token_file, SCOPES)
# 沒有或失效就重新登入授權
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
# 有 refresh_token 直接刷新
creds.refresh(Request())
else:
# 第一次登入:開瀏覽器 → 選擇帳號 → 同意
flow = InstalledAppFlow.from_client_secrets_file(
st.client_secret_file, SCOPES)
# 建議使用本機 callback (會開 http://localhost:...)
creds = flow.run_local_server(port=0)
# 存檔以便下次直接使用
with open(st.token_file, "w") as token:
token.write(creds.to_json())
return creds
# ---- GA4 Helpers ----------------------------------------------------
def build_client(creds: Credentials) -> BetaAnalyticsDataClient:
return BetaAnalyticsDataClient(credentials=creds)
def df_from_rows(resp, dim_names: List[str], metric_names: List[str]) -> pd.DataFrame:
rows: List[Dict] = []
for r in resp.rows:
d = {dim: r.dimension_values[i].value for i, dim in enumerate(dim_names)}
for i, m in enumerate(metric_names):
v = r.metric_values[i].value
try:
f = float(v)
d[m] = int(f) if f.is_integer() else f
except Exception:
d[m] = v
rows.append(d)
return pd.DataFrame(rows)
def safe_div(a, b) -> float:
try:
a, b = float(a), float(b)
return a / b if b != 0 else 0.0
except Exception:
return 0.0
# ---- Query (單日表格,對齊 Traffic acquisition) ----------------------
def query_table_one_day(trans_date: str, # 例如 "2025-09-26"
channel_dims_api: List[str], # 例如 ["sessionSource","sessionMedium","sessionCampaignName"]
channel_dims_ui: List[str] # 例如 ["來源","媒介","廣告活動"]
) -> pd.DataFrame:
# 設定與授權
st = Settings()
creds = get_oauth_credentials(st)
client = build_client(creds)
metrics = [
"sessions",
"engagedSessions",
"engagementRate",
"userEngagementDuration",
"eventsPerSession",
"eventCount",
"totalRevenue",
"screenPageViews",
"totalUsers",
"activeUsers",
]
# 一次帶三個維度 + newVsReturning
req = RunReportRequest(
property=f"properties/{st.property_id}",
date_ranges=[DateRange(start_date=trans_date, end_date=trans_date)],
dimensions=[*(Dimension(name=d) for d in channel_dims_api),
Dimension(name="newVsReturning")],
metrics=[Metric(name=m) for m in metrics],
order_bys=[OrderBy(metric=OrderBy.MetricOrderBy(metric_name="sessions"), desc=True)],
limit=1000,
)
dim_names = channel_dims_api + ["newVsReturning"]
resp = client.run_report(req)
# 取得原始表
df_raw = df_from_rows(resp, dim_names, metrics)
# ---- 回訪者 ----
df_returning = (
df_raw[df_raw["newVsReturning"] == "returning"]
.groupby(channel_dims_api, as_index=False)
.agg({"activeUsers": "sum"})
.rename(columns={"activeUsers": "回訪者"})
)
# ---- 依三維度聚合 ----
df_agg = df_raw.groupby(channel_dims_api, as_index=False).agg({
"sessions": "sum",
"engagedSessions": "sum",
"engagementRate": "first", # 之後會重算
"userEngagementDuration": "sum",
"eventsPerSession": "first", # 之後會重算
"eventCount": "sum",
"totalRevenue": "sum",
"screenPageViews": "sum",
"totalUsers": "sum",
"activeUsers": "sum",
})
# 率類指標重算
df_agg["engagementRate"] = [safe_div(e, s) for e, s in zip(df_agg["engagedSessions"], df_agg["sessions"])]
df_agg["eventsPerSession"] = [safe_div(e, s) for e, s in zip(df_agg["eventCount"], df_agg["sessions"])]
# 併回訪者
df = pd.merge(df_agg, df_returning, on=channel_dims_api, how="left").fillna({"回訪者": 0})
# 衍生欄位
df["avgEngagementTimePerSession_sec"] = [
int(safe_div(u, s)) for u, s in zip(df["userEngagementDuration"], df["sessions"])
]
# 與原版一致:eventCount 轉字串兩位小數
df["eventCount"] = df["eventCount"].map(lambda x: f"{float(x):.2f}")
# 友善欄名(把三個 API 維度換成 UI 欄名)
rename_map = {api: ui for api, ui in zip(channel_dims_api, channel_dims_ui)}
df = df.rename(columns={
**rename_map,
"sessions": "工作階段",
"engagedSessions": "互動工作階段",
"engagementRate": "參與度",
"avgEngagementTimePerSession_sec": "平均單次工作階段參與時間(秒)",
"eventsPerSession": "每個工作階段的事件數",
"eventCount": "事件計數(所有事件)",
"totalRevenue": "總收益",
"screenPageViews": "瀏覽",
"totalUsers": "總人數",
"activeUsers": "活躍使用者(總)",
})
# 欄位順序:把三個維度各自獨立成三欄放在最前面
ordered = channel_dims_ui + [
"工作階段", "互動工作階段", "參與度",
"平均單次工作階段參與時間(秒)",
"每個工作階段的事件數",
"事件計數(所有事件)",
"總收益", "瀏覽", "總人數", "活躍使用者(總)", "回訪者",
]
removed_cols = ["活躍使用者(總)"] # 保持與你現有表頭一致(如不想隱藏可移除)
final_cols = [c for c in ordered if c in df.columns and c not in removed_cols]
final_df = df[final_cols].sort_values(by="工作階段", ascending=False)
return final_df
def write_csv(df, table_out):
out_dir = os.path.dirname(table_out)
if not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
df.to_csv(table_out, index=False, encoding="utf-8-sig", float_format="%.2f")
print(f"[OK] saved -> {table_out}\n")
# ---- MySQL 資料庫操作 ----
@dataclass
class MySQLConfig:
"""MySQL 連線設定 """
host: str = "(your MYSQL_HOST)"
port: int = 3306
database: str = "(your MYSQL_DB)"
user: str = "(your MYSQL_USER)"
password: str = "(your MYSQL_PASSWORD)"
def get_conn(config: MySQLConfig):
return mysql.connector.connect(
host=config.host,
port=config.port,
user=config.user,
password=config.password,
database=config.database,
use_pure=True, # 避免 C 擴充不支援
auth_plugin="mysql_native_password" # 強制用舊的驗證插件
)
def create_ga4_table(config: MySQLConfig, table_name: str, df: pd.DataFrame):
"""建立 GA4 資料表(根據 DataFrame 欄位動態產生)"""
conn = get_conn(config)
cursor = conn.cursor()
# 根據 df 欄位動態建立 DDL
column_definitions = ["`id` INT AUTO_INCREMENT PRIMARY KEY"]
for col in df.columns:
if col in ["工作階段", "互動工作階段", "瀏覽", "總人數", "回訪者"]:
column_definitions.append(f"`{col}` INT")
elif col == "參與度":
column_definitions.append(f"`{col}` DECIMAL(10,4)")
elif col == "每個工作階段的事件數":
column_definitions.append(f"`{col}` DECIMAL(10,2)")
elif col == "事件計數(所有事件)":
column_definitions.append(f"`{col}` VARCHAR(50)")
elif col == "總收益":
column_definitions.append(f"`{col}` DECIMAL(18,2)")
elif col == "平均單次工作階段參與時間(秒)":
column_definitions.append(f"`{col}` INT")
else:
# 其他欄位(包含動態的第一個欄位)預設為 VARCHAR
column_definitions.append(f"`{col}` VARCHAR(255)")
#column_definitions.append("`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS `{table_name}` (
{','.join(column_definitions)}
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"""
cursor.execute(create_table_sql)
conn.commit()
cursor.close()
conn.close()
print(f"[OK] 建立表格: {table_name} (欄位: {list(df.columns)})\n")
def write_to_mysql(df: pd.DataFrame, config: MySQLConfig, table_name: str):
"""將 DataFrame 寫入 MySQL 表格"""
conn = get_conn(config)
cursor = conn.cursor()
# 準備插入語句
columns = list(df.columns)
placeholders = ', '.join(['%s'] * len(columns))
insert_sql = f"INSERT INTO `{table_name}` ({', '.join([f'`{col}`' for col in columns])}) VALUES ({placeholders})"
# 轉換資料並插入
for _, row in df.iterrows():
values = []
for col in columns:
value = row[col]
# 處理 NaN 值
if pd.isna(value):
values.append(None)
else:
values.append(value)
cursor.execute(insert_sql, values)
conn.commit()
cursor.close()
conn.close()
print(f"[OK] 寫入 {len(df)} 筆資料到表格: {table_name}\n")
def save_df_to_mysql(df, table_out):
# 取得 MySQL 資料庫連線參數
mysql_config = MySQLConfig()
# 從 table_out 路徑提取表格名稱
table_name = os.path.splitext(os.path.basename(table_out))[0] # 移除副檔名
# 建立表格(根據 df 欄位動態產生)
create_ga4_table(mysql_config, table_name, df)
# 寫入資料到 MySQL 資料庫
write_to_mysql(df, mysql_config, table_name)
print(f"[完成] 資料已寫入 MySQL 表格: {mysql_config.database}.{table_name}\n")
# ---- Main -----------------------------------------------------------
def main():
# 查詢期間(可用 YYYY-MM-DD 或 today/yesterday/7daysAgo/28daysAgo)
#trans_date: str = "2025-09-26"
trans_date: str = "(your TRANS_DATE: YYYY-MM-DD)"
# 三個維度(API 名稱)
channel_dims_api = ["sessionSource", "sessionMedium", "sessionCampaignName"]
# 表格第一欄的顯示名稱
channel_dims_ui = ["來源", "媒介", "廣告活動"]
# 輸出檔
table_out: str = "(your OUTPUT_CSV_PATH)"
# 查單日表格
df = query_table_one_day(trans_date, channel_dims_api, channel_dims_ui)
# 輸出 CSV
write_csv(df, table_out)
# 寫入 MySQL
save_df_to_mysql(df, table_out)
if __name__ == "__main__":
main()
```