--- tags: flower --- # [使用手冊] Flower Federated Learning Framework (Kuihao撰) <!-- 摺疊語法 --> <details> <!-- 摺疊標題 --> <summary> <b><mark>本站更新紀錄</mark> 2021/11/18 新增 Colab 版本 Flower 單機模擬版</b> </summary> <!-- 列表 --> <div> <ul> <li>2021/11/18 新增 Colab 版本 Flower 單機模擬版</li> <li>2021/10/18 新增 SOTA non-iid dataset source</li> <li>2021/10/09 新增「Flower FedAvg 程式碼」解說 & Linux GPU trouble shooting</li> <li>2021/10/01 進階篇 撰寫完成</li> </ul> </div> <!-- 本網誌著作權為 張揆晧 Kuihao Chang 所有 --> </details> <br> 目錄超連結 [TOC] --- ## 本教學說明: * **教學重點:** 本教學重點是「如何架設Flower環境,以及重要的Function API使用說明」,並包含可能遇到的Bug與解決方法。由於Flower仍在不斷更新,更多資訊請參考Flower官方文件: * 官網: https://flower.dev/ * 一般文件: https://flower.dev/docs/ * **API 文件(查找特定函式時使用)**: https://flower.dev/docs/apiref-flwr.html * Github: * 首頁: https://github.com/adap/flower * **Python 函式程式碼(當網頁文件找不到時,可到此查找):** https://github.com/adap/flower/tree/main/src/py * Flower相關論文 * Flower Framework: https://arxiv.org/abs/2007.14390 * Flower with IoT (TX2): https://arxiv.org/abs/2104.03042 * Flower with Carbon Footprint: https://arxiv.org/abs/2010.06537 * **AI軟體庫:** Flower可以相容Tensorflow、Pytorch、MXNet、Scikit-learn等四種機器學習軟體庫,官方皆有相應的基本範例(其差異只在model建構、訓練、測試的語法不同,聯合學習的溝通方式是相同的),為簡化說明 ==**本教學皆以Tensorflow撰寫**==。 ## 安裝步驟 1. **安裝python:** 建議3.7以上版本 2. **安裝Tensorflow** 使用python pip套件安裝當前最新版本即可 ```linux $ pip install tensorflow ``` 4. **(選擇性, option) 安裝python虛擬環境,建置flower專屬的開發環境:** 可安裝Anaconda或Miniconda管理python虛擬環境 https://medium.com/python4u/%E7%94%A8conda%E5%BB%BA%E7%AB%8B%E5%8F%8A%E7%AE%A1%E7%90%86python%E8%99%9B%E6%93%AC%E7%92%B0%E5%A2%83-b61fd2a76566 5. **透過 python pip套件,下載安裝flower** (==本教學為 flwr 0.17.0版本==) (==本教學為 flwr 0.17.0版本==) (==本教學為 flwr 0.17.0版本==) ```linux $ pip install flwr ``` * 正常安裝時所顯示的訊息畫面: ![prompt message of installation](https://i.imgur.com/ryI4m11.png) * flwr套件安裝位置(可用來追查flower程式碼的實際寫法) * Windows ``` 路徑結構: (Anaconda的安裝路徑)\envs\(自訂的虛擬環境名稱)\Lib\site-packages 例如: D:\K_Install\K_Anaconda\envs\flower\Lib\site-packages ``` * Linux ``` 路徑結構: /home/(使用者名稱)/(conda版本名稱)/envs/(自訂的虛擬環境名稱)/lib/(python版本)/site-packages/ 例如: /home/kuihao/miniconda3/envs/Tensorflow-GPU/lib/python3.9/site-packages/ ``` 若成功安裝,可在site-packages資料夾中發現**flwr、flwr_example、flwr_experimental、flwr-0.17.0.dist-info**四個flower框架的相關資料夾 ## **[程式碼範例] 基本篇:** Client-Server建立連線 此段落的程式碼改寫自官方[Quickstart (TensorFlow)](https://flower.dev/docs/quickstart_tensorflow.html),其中Client-side的dataset從cifar10改成mnist、Local model從MobileNetV2改成CNN。 * 基本篇程式碼的**FL溝通示意圖**: ![FlowerFramework-basic](https://i.imgur.com/Qkdrf2E.png) * Flower的FL框架是由一台機器作為Server-side(伺服端)、多台機器作為Client-side(用戶端)所構成;Server-side使用FedAvg演算法進行aggregation(聚合) * **基本篇的範例程式碼**是採用**1台Server及2台Client**,並且可以選擇**單機模擬**或**多機連線**的方式實作,以下將詳細介紹這兩種實作方法 * **基本篇**的範例程式碼的FL架構是**由Client-side預先建立Local model的結構及權重**,進階篇會說明如何由Server-side發送Global model的結構、參數及初始權重。 * 示意圖中的Server會設定「**FL Strategy**」,Strategy 包含FL的溝通次數、Client的連線數量、Training及Evaluating時的hyperparameter、model config(model結構)、model weight(model權重)等。**基本篇程式碼的Strategy僅設定FL的溝通次數。** * ==**執行程式的注意事項**== 1. **連線前的準備** Flower是由python程式碼撰寫的FL框架,因此建立連線時需要預先撰寫Server-side及Client-side的程式碼(以下將代稱為 server.py 及 client.py),後續有程式碼範例及說明。 2. **程式的執行順序** 請**先執行 server.py,再執行 client.py**;否則,若先執行 client.py,會因為client找不到對應的Server,而自動中斷連線並結束程式。 3. ==**建立兩個Client**== server.py 的範例程式碼 **==預設==需要與至少2位Client建立連線,整個聯合學習才會開始運作**,因此請依序執行 server.py、client.py (client-1)、client.py (client-2) ### 程式碼 * **單機模擬 (Windows版本)** * ==**server.py 完整程式碼**==<br> Server建立連線的基本語法,僅需短短兩行程式碼。 ```python= import flwr as fl fl.server.start_server("localhost:8080", config={"num_rounds": 3}) ``` * 執行程式,於 Command Line 輸入: ```linux python server.py ``` * **fl.server.start_server()** Server開啟連線的語法 * **"localhost:8080"** 字串,表示Server對外的Address及Port * **config={"num_rounds": 3}** **num_rounds表示FL的Communication次數**,接收Client-side回傳的結果為溝通一次,即1 round(輪),此處設定為3輪。 <br>目前此語法的config參數只能傳入**溝通的輪替次數**,官方尚無開放其他參數。 * **範例程式碼預設只能與2個client建立連線,進階篇將介紹如何更改client連線數量** * ==**client.py 完整程式碼**==<br> 注意: 此範例是由Client端自行建立Local Model。(後續教學會介紹如何由Server建立Global Model並傳遞給Client) ```python= ''' 引入套件 ''' import flwr as fl import tensorflow as tf from tensorflow.keras import layers, models # 建立CNN架構 import numpy as np # 資料前處理 ''' Step 1. Build Local Model (建立本地模型) ''' # Hyperparameter超參數 num_classes = 10 input_shape = (28, 28, 1) # Build Model model = models.Sequential([ tf.keras.Input(shape=input_shape), layers.Conv2D(32, kernel_size=(3, 3), activation="relu"), layers.MaxPooling2D(pool_size=(2, 2)), layers.Conv2D(64, kernel_size=(3, 3), activation="relu"), layers.MaxPooling2D(pool_size=(2, 2)), layers.Flatten(), layers.Dropout(0.5), layers.Dense(num_classes, activation="softmax"), ]) #model.summary() # Defines the loss function, the optimizer and the metrics model.compile("adam", "categorical_crossentropy", metrics=["accuracy"]) ''' Step 2. Load local dataset (引入本地端資料集) ''' # Load data (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Data preprocessing ## Scale images to the [0, 1] range x_train = x_train.astype("float32") / 255 x_test = x_test.astype("float32") / 255 ## Make sure images have shape (28, 28, 1) x_train = np.expand_dims(x_train, -1) x_test = np.expand_dims(x_test, -1) print("x_train shape:", x_train.shape) print(x_train.shape[0], "train samples") print(x_test.shape[0], "test samples") ## convert class vectors to binary class matrices y_train = tf.keras.utils.to_categorical(y_train, num_classes) y_test = tf.keras.utils.to_categorical(y_test, num_classes) ''' Step 3. Get the configuration (weights of global model, hyperparameters of training and evaluating) from the server-side. (繼承NumPyClient類別,定義匯入模型權重、模型訓練及評估等的函式,其中的權重值、訓練及評估的超參數皆來自Server端) ''' class MnistClient(fl.client.NumPyClient): def get_parameters(self): return model.get_weights() def fit(self, parameters, config): model.set_weights(parameters) model.fit(x_train, y_train, epochs=3, batch_size=256) # steps_per_epoch=3 return model.get_weights(), len(x_train), {} def evaluate(self, parameters, config): model.set_weights(parameters) loss, accuracy = model.evaluate(x_test, y_test) return loss, len(x_test), {"accuracy": accuracy} ''' Step 4. Create an instance of our New-NumPyClient and add one line to actually run this client. (使用一行指令,同時產生NumPyClient類別的實例(物件)以執行模型的訓練及評估,並建立Client-to-Server的連線) ''' fl.client.start_numpy_client("localhost:8080", client=MnistClient()) ``` * 執行程式,於 Command Line 輸入: ```linux python client.py ``` * Client端的程式碼共分成四大部分: 1. **建立local model**<br> 此處與一般的深度學習相同。聯合學習是由Client進行model的訓練,此範例是由Client自訂Model架構,後續教學會介紹如何接收並重建來自Server的Global model。 2. **匯入local data**<br> 此處與一般的深度學習相同。 3. **接收Server端的model weights及訓練評估的超參數設定**<br> 此為聯合學習Client端的核心部分,需先接收並解碼Server傳過來的訊息封包,取得模型的權重初始化、訓練、評估等設定,再配置於local model進行訓練及評估。<br> 訓練及評估結束後,會在client的terminal印出結果,同時也會將訓練後的權重、結果等訊息回傳至Server。 4. **建立連線**<br> * **fl.client.start_numpy_client()** Client建立連線的函式 * **"localhost:8080"** 目標Server的Address及Port * **client=MnistClient()** 參數client傳入自訂義的Client類別的實例(繼承自Flower提供的NumPyClient類別,此類別已實作Server封包的接收與解碼) * 成功執行訊息 * Server.py 當Terminal停滯於以下訊息時是**正常**的狀況,表示Server已經啟動並等待Client與之連線,**此時要開2個新的Terminal啟動Client** (範例程式預設要2位Client連線,Server才會繼續執行) ```linux INFO flower 2021-10-01 10:54:11,022 | app.py:73 | Flower server running (insecure, 3 rounds) INFO flower 2021-10-01 10:54:11,022 | server.py:118 | Getting initial parameters ``` 後續執行訊息 ```linux INFO flower 2021-10-01 10:55:26,781 | server.py:306 | Received initial parameters from one random client INFO flower 2021-10-01 10:55:26,781 | server.py:120 | Evaluating initial parameters INFO flower 2021-10-01 10:55:26,781 | server.py:133 | FL starting DEBUG flower 2021-10-01 10:55:43,531 | server.py:251 | fit_round: strategy sampled 2 clients (out of 2) DEBUG flower 2021-10-01 10:56:19,241 | server.py:260 | fit_round received 2 results and 0 failures DEBUG flower 2021-10-01 10:56:19,241 | server.py:201 | evaluate_round: strategy sampled 2 clients (out of 2) DEBUG flower 2021-10-01 10:56:22,139 | server.py:210 | evaluate_round received 2 results and 0 failures DEBUG flower 2021-10-01 10:56:22,139 | server.py:251 | fit_round: strategy sampled 2 clients (out of 2) DEBUG flower 2021-10-01 10:56:40,401 | server.py:260 | fit_round received 2 results and 0 failures DEBUG flower 2021-10-01 10:56:40,401 | server.py:201 | evaluate_round: strategy sampled 2 clients (out of 2) DEBUG flower 2021-10-01 10:56:43,041 | server.py:210 | evaluate_round received 2 results and 0 failures DEBUG flower 2021-10-01 10:56:43,041 | server.py:251 | fit_round: strategy sampled 2 clients (out of 2) DEBUG flower 2021-10-01 10:57:00,901 | server.py:260 | fit_round received 2 results and 0 failures DEBUG flower 2021-10-01 10:57:00,911 | server.py:201 | evaluate_round: strategy sampled 2 clients (out of 2) DEBUG flower 2021-10-01 10:57:02,591 | server.py:210 | evaluate_round received 2 results and 0 failures INFO flower 2021-10-01 10:57:02,591 | server.py:172 | FL finished in 95.80974900000001 INFO flower 2021-10-01 10:57:02,591 | app.py:109 | app_fit: losses_distributed [(1, 0.05556971952319145), (2, 0.03884832561016083), (3, 0.031007489189505577)] INFO flower 2021-10-01 10:57:02,591 | app.py:110 | app_fit: metrics_distributed {} INFO flower 2021-10-01 10:57:02,591 | app.py:111 | app_fit: losses_centralized [] INFO flower 2021-10-01 10:57:02,591 | app.py:112 | app_fit: metrics_centralized {} DEBUG flower 2021-10-01 10:57:02,591 | server.py:201 | evaluate_round: strategy sampled 2 clients (out of 2) DEBUG flower 2021-10-01 10:57:05,161 | server.py:210 | evaluate_round received 2 results and 0 failures INFO flower 2021-10-01 10:57:05,161 | app.py:121 | app_evaluate: federated loss: 0.031007489189505577 INFO flower 2021-10-01 10:57:05,161 | app.py:122 | app_evaluate: results [('ipv6:[::1]:57002', EvaluateRes(loss=0.031007489189505577, num_examples=10000, accuracy=0.0, metrics={'accuracy': 0.9894999861717224})), ('ipv6:[::1]:57004', EvaluateRes(loss=0.031007489189505577, num_examples=10000, accuracy=0.0, metrics={'accuracy': 0.9894999861717224}))] INFO flower 2021-10-01 10:57:05,161 | app.py:127 | app_evaluate: failures [] ``` * Client.py 當畫面停滯於以下訊息是正常情況,表示已與Server連線,但Server尚在等待其他Client連線。請繼續啟動更多Client直到滿足Server最低連線數量需求。 ```linux DEBUG flower 2021-10-01 11:10:49,885 | connection.py:36 | ChannelConnectivity.IDLE DEBUG flower 2021-10-01 11:10:49,885 | connection.py:36 | ChannelConnectivity.CONNECTING INFO flower 2021-10-01 11:10:49,885 | app.py:61 | Opened (insecure) gRPC connection DEBUG flower 2021-10-01 11:10:49,890 | connection.py:36 | ChannelConnectivity.READY ``` 後續執行訊息 ```linux 2021-10-01 10:55:43.837267: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2) Epoch 1/3 2021-10-01 10:55:44.361202: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library cudnn64_8.dll 2021-10-01 10:55:48.670356: I tensorflow/stream_executor/cuda/cuda_dnn.cc:359] Loaded cuDNN version 8202 2021-10-01 10:55:51.858640: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library cublas64_11.dll 2021-10-01 10:55:56.857417: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library cublasLt64_11.dll 235/235 [==============================] - 23s 29ms/step - loss: 0.4643 - accuracy: 0.8622 Epoch 2/3 235/235 [==============================] - 6s 27ms/step - loss: 0.1324 - accuracy: 0.9594 Epoch 3/3 235/235 [==============================] - 6s 26ms/step - loss: 0.0989 - accuracy: 0.9694 313/313 [==============================] - 3s 9ms/step - loss: 0.0556 - accuracy: 0.9829 Epoch 1/3 235/235 [==============================] - 6s 27ms/step - loss: 0.0825 - accuracy: 0.9747 Epoch 2/3 235/235 [==============================] - 4s 17ms/step - loss: 0.0716 - accuracy: 0.9781 Epoch 3/3 235/235 [==============================] - 6s 27ms/step - loss: 0.0657 - accuracy: 0.9797 313/313 [==============================] - 1s 2ms/step - loss: 0.0388 - accuracy: 0.9869 Epoch 1/3 235/235 [==============================] - 4s 17ms/step - loss: 0.0587 - accuracy: 0.9820 Epoch 2/3 235/235 [==============================] - 6s 17ms/step - loss: 0.0543 - accuracy: 0.9833 Epoch 3/3 235/235 [==============================] - 6s 25ms/step - loss: 0.0521 - accuracy: 0.9834 313/313 [==============================] - 1s 2ms/step - loss: 0.0310 - accuracy: 0.9895 313/313 [==============================] - 3s 8ms/step - loss: 0.0310 - accuracy: 0.9895 DEBUG flower 2021-10-01 10:57:05,171 | connection.py:68 | Insecure gRPC channel closed INFO flower 2021-10-01 10:57:05,171 | app.py:72 | Disconnect and shut down ``` * **單機模擬 (Linux版本)** * **server.py 部分程式碼**<br> 與Windows版本大致相同,僅需更動 IP address and port 即可。 ```python= # ipv4 version fl.server.start_server("0.0.0.0:8080", config={"num_rounds": 3}) ``` 或 ```python= # ipv6 version (address要加中括弧) fl.server.start_server("[::]:8080", config={"num_rounds": 3}) ``` * **client.py 部分程式碼**<br> 與Windows版本大致相同,僅需更動 IP address and port 即可。 ```python= # ipv4 version fl.client.start_numpy_client("0.0.0.0:8080", client=MnistClient()) ``` 或 ```python= # ipv6 version (address要加中括弧) fl.client.start_numpy_client("[::]:8080", client=MnistClient()) ``` * **多機連線**<br> 2021/07/24測試結果: **僅Linux-to-Linux可以正常運作**,Windows作業系統等待官方修正Bug。 * **PC互連 (Desktop-to-Desktop):** 理論上更改IP address及port即可,請先確定雙方SSH連線可正常運作(gRPC的底層是SSH) * **PC與物連網裝置互連 (Desktop-to-Embedded Devices)** **(Jetson & Raspi)** * Nvidia Jetson Nano: 搭載GPU可進行model訓練,目前已成功讓上述程式碼運行於Jetson Nano * Raspberry Pi: 無法進行Training,但可利用Flower架構進行model的部屬。 * 官方Github有範例 * 詳細內容: coming soon --- ## **[程式碼範例] 進階篇 (訓練策略、設定Client連線數量、改寫聚合演算法、儲存聚合參數、傳送Model結構)** ### ==**進階篇目錄(學習重點)**== 1. 前言 2. FL-strategy (制定聯合學習策略) 1. [制定聯合學習策略-1] Hyperparameter strategy:**如何設定FL溝通次數**、**如何設定Client連線數量**、**如何設定client的Train及Evaluate的超參數** 2. [制定聯合學習策略-2] Aggregation strategy: **如何改寫聚合演算法**、**如何儲存聚合參數**、**如何取出Client回傳的結果** 3. 傳送Model結構: **Server如何發送model config、Client如何接收並重建model** --- **前言:** 在聯合學習框架中,**Server-side是主動方**,負責建置Global model、制定聯合學習訓練策略、發送Global Model權重、接收Local Model的權重並聚合更新Global model;**Client-side是被動方**,擁有Local Data、接收Global model權重以更新Local model、負責model訓練。 因此進階篇介紹的各種Flower API**大都是更動Server-side的程式碼**,Client-side的程式碼則較少更動。 <br> ### ==FL-strategy (制定聯合學習策略)== * 定義: **FL-strategy (Federated learning strategy, 聯合學習策略)** 是本人為了方便稱呼而定義的名詞,由於Flower官方文件對於strategy(策略)的描述相當**混亂、沒有一個清楚的定義導致文件閱讀困難**,因此本人**新定義FL-strategy是Flower文件所提到的各種strategy的統稱。**<br> 而Flower的strategy**可粗略分成兩大類,Hyperparameter strategy (超參數的策略)、Aggregation strategy (聚合的策略)**。 * FL-strategy程式碼的**FL溝通示意圖**: ![](https://i.imgur.com/mYS1BcD.png) * 注意此架構與基本篇不同的地方是「Server-side會預先建立**Global Model**」以及「Server-side擁有用來**評估(測試)Global model的Dataset**」 * 當然,Server-side沒有依定要有Global dataset,這是可選擇性的。至於Global model,通常FL架構都一定會有Global model,這也是FedAvg的核心要素,由此可知基本篇的架構是不完整的FL架構。 #### ==[制定聯合學習策略-1] **Hyperparameter strategy (超參數策略)**== 實際上Hyperparameter strategy**又可細分成兩類「FL Hyperparameter (聯合學習溝通方面的超參數)」、「Model Hyperparameter (Model訓練與評估方面的超參數)」** * server.py 完整程式碼 (程式碼方塊下方有**程式碼解說**) ```python= ''' 引入套件 ''' from typing import Any, Callable, Dict, List, Optional, Tuple import flwr as fl import tensorflow as tf # 建立Global model並取得初始參數 from tensorflow.keras import Input, Model, layers, models # 建立CNN架構 import numpy as np # 資料前處理 ''' Step 1. Build Global Model (建立全域模型) ''' # Hyperparameter超參數 num_classes = 10 input_shape = (28, 28, 1) # Build Model def CNN_Model(input_shape, number_classes): # define Input layer input_tensor = Input(shape=input_shape) # Input: convert normal numpy to Tensor (float32) # define layer connection x = layers.Conv2D(filters = 32, kernel_size=(3, 3), activation="relu")(input_tensor) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Conv2D(filters = 64, kernel_size=(3, 3), activation="relu")(x) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Flatten()(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(number_classes, activation="softmax")(x) # define model model = Model(inputs=input_tensor, outputs=outputs, name="mnist_model") return model ''' Step 2. Start server and run the strategy (套用所設定的策略,啟動Server) ''' def main() -> None: # Load and compile model for # 1. server-side parameter initialization # 2. server-side parameter evaluation model = CNN_Model(input_shape=input_shape, number_classes=num_classes) #model.summary() model.compile("adam", "categorical_crossentropy", metrics=["accuracy"]) # Create strategy strategy = fl.server.strategy.FedAvg( fraction_fit=0.5, # 每一輪參與Training的Client比例 fraction_eval=0.5, # 每一輪參與Evaluating的Client比例 min_fit_clients=2, # 每一輪參與Training的最少Client連線數量 (與比例衝突時,以此為準) min_eval_clients=1, # 每一輪參與Evaluating的最少Client連線數量 (與比例衝突時,以此為準) min_available_clients=2, # 啟動聯合學習之前,Client連線的最小數量 on_fit_config_fn=fit_config, # 設定 Client-side Training Hyperparameter on_evaluate_config_fn=evaluate_config, # 設定 Client-side Evaluating Hyperparameter eval_fn=get_eval_fn(model), # 設定 Server-side Evaluating Hyperparameter (用Global Dataset進行評估) initial_parameters=fl.common.weights_to_parameters(model.get_weights()), # Global Model 初始參數設定 ) # Start Flower server for four rounds of federated learning fl.server.start_server("localhost:8080", config={"num_rounds": 2}, strategy=strategy) #windows ''' [Model Hyperparameter](Client-side, train strategy) * 設定Client Training 的 Hyperparameter: 包含batch_size、epochs、learning-rate...皆可設定。 * 甚至可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def fit_config(rnd: int): """Return training configuration dict for each round. Keep batch size fixed at 128, perform two rounds of training with one local epoch, increase to two local epochs afterwards. """ config = { "batch_size": 128, "local_epochs": 1 if rnd < 2 else 2, # Client 進行 local model Training時,前兩輪的epoch設為1,之後epoch設為2 } return config ''' [Model Hyperparameter](Client-side, evaluate strategy) * 設定Client Testing 的 Hyperparameter: 包含epochs、steps(Total number of steps, 也就是 batche個數 (batches of samples))。 * 可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def evaluate_config(rnd: int): """Return evaluation configuration dict for each round. Perform five local evaluation steps on each client (i.e., use five batches) during rounds one to three, then increase to ten local evaluation steps. """ val_steps = 5 if rnd < 4 else 10 # Client 進行 local model evaluate時,前4輪 step 為 5,之後 step 為 10 return {"val_steps": val_steps} ''' [Model Hyperparameter](Server-side, evaluate strategy) 用 Global Dataset 評估 Global model (不含訓練) ''' def get_eval_fn(model): """Return an evaluation function for server-side evaluation.""" # Load data and model here to avoid the overhead of doing it in `evaluate` itself (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() # Train sample size: 60000 # Data preprocessing x_train = x_train.astype("float32") / 255 x_train = np.expand_dims(x_train, -1) y_train = tf.keras.utils.to_categorical(y_train, 10) # Use the last 5k training examples as a validation set x_val, y_val = x_train[60000-5000:], y_train[60000-5000:] # The `evaluate` function will be called after every round def evaluate( weights: fl.common.Weights, ) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]: model.set_weights(weights) # Update model with the latest parameters loss, accuracy = model.evaluate(x_val, y_val) return loss, {"accuracy": accuracy} return evaluate ''' main ''' if __name__ == "__main__": main() ``` * 執行程式,於 Command Line 輸入: ``` python server.py ``` * 程式碼解說: * **FL Hyperparameter,意指專門控制Client與Server溝通方面的超參數**, **fl.server.strategy.FedAvg()** 可帶入的參數中,fraction_fit、fraction_eval、min_fit_clients、min_eval_clients、min_available_clients皆屬之。 * **Model Hyperparameter,意指Client的Local model訓練或評估時的超參數**,**fl.server.strategy.FedAvg()** 可帶入的參數中 on_fit_config_fn、on_evaluate_config_fn、eval_fn,以及其對應的函式fit_config()、evaluate_config()、get_eval_fn()皆屬之。 * 欲瞭解更多可查看: * FL Hyperparameter相關文件: https://flower.dev/docs/evaluation.html * Model Hyperparameter相關文件: https://flower.dev/docs/strategies.html * client.py 完整程式碼 * 與基本篇程式碼相異之處: * Step 3: fit()、evaluate()提取Server傳來的strategy的參數 * **fit()的"config"參數**就是 Server-side 的 fit_config() 的 return 值 * **evaluate()的"config"參數**就是 Server-side 的 evaluate_config() 的 return 值 * Step 4: 使用argparse套件,簡易地為每個Client切割等份的dataset ```python= ''' 引入套件 ''' import flwr as fl import tensorflow as tf from tensorflow.keras import Input, Model, layers, models # 建立CNN架構 import numpy as np # 資料前處理 import argparse # CmmandLine 輸入控制參數 import os # 更改tensorflow的Log訊息的顯示模式 # Make TensorFlow logs less verbose (減少不必要的訊息顯示) os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" ''' Step 1. Build Local Model (建立本地模型) ''' # Hyperparameter超參數 num_classes = 10 input_shape = (28, 28, 1) # Build Model def CNN_Model(input_shape, number_classes): # define Input layer input_tensor = Input(shape=input_shape) # Input: convert normal numpy to Tensor (float32) # define layer connection x = layers.Conv2D(filters = 32, kernel_size=(3, 3), activation="relu")(input_tensor) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Conv2D(filters = 64, kernel_size=(3, 3), activation="relu")(x) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Flatten()(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(number_classes, activation="softmax")(x) # define model model = Model(inputs=input_tensor, outputs=outputs, name="mnist_model") return model ''' Step 2. Load local dataset (引入本地端資料集),對Dataset進行切割 ''' def load_partition(idx: int): """Load 1/10th of the training and test data to simulate a partition.""" #assert idx in range(10) # limit it can't more than 10... (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Train 60000-5000, Test 10000 # Data preprocessing x_train = x_train.astype("float32") / 255 x_test = x_test.astype("float32") / 255 x_train = np.expand_dims(x_train, -1) x_test = np.expand_dims(x_test, -1) y_train = tf.keras.utils.to_categorical(y_train, 10) y_test = tf.keras.utils.to_categorical(y_test, 10) sample_size = 60000 #x_train.shape[0] sample_size -= 5000 # Server-side 預留做驗證 training_size = int(sample_size/2) testing_size = int(10000/2) return ( x_train[idx * training_size : (idx + 1) * training_size], y_train[idx * training_size : (idx + 1) * training_size], ), ( x_test[idx * testing_size : (idx + 1) * testing_size], y_test[idx * testing_size : (idx + 1) * testing_size], ) ''' Step 3. Define Flower client (定義client的相關設定: 接收Server-side的global model weight、hyperparameters) ''' class MnistClient(fl.client.NumPyClient): # Class初始化: local model、dataset def __init__(self, model, x_train, y_train, x_test, y_test): self.model = model self.x_train, self.y_train = x_train, y_train self.x_test, self.y_test = x_test, y_test # 此時已無作用,原用來取得 local model 的 ini-weight, # 目前初始權重值是來自 Server-side 而非 client 自己 def get_parameters(self): """Get parameters of the local model.""" raise Exception("Not implemented (server-side parameter initialization)") def fit(self, parameters, config): """Train parameters on the locally held training set.""" # Update local model parameters self.model.set_weights(parameters) # Get hyperparameters for this round batch_size: int = config["batch_size"] epochs: int = config["local_epochs"] # Train the model using hyperparameters from config # (依 Server-side 的 hyperparameters 進行訓練) history = self.model.fit( self.x_train, self.y_train, batch_size, epochs, validation_split=0.1, ) # Return updated model parameters and results # 將訓練後的權重、資料集筆數、正確率/loss值等,回傳至server-side parameters_prime = self.model.get_weights() num_examples_train = len(self.x_train) results = { "loss": history.history["loss"][-1], "accuracy": history.history["accuracy"][-1], "val_loss": history.history["val_loss"][-1], "val_accuracy": history.history["val_accuracy"][-1], } return parameters_prime, num_examples_train, results def evaluate(self, parameters, config): """Evaluate parameters on the locally held test set.""" # Update local model with global parameters self.model.set_weights(parameters) # Get config values steps: int = config["val_steps"] # Evaluate global model parameters on the local test data and return results loss, accuracy = self.model.evaluate(self.x_test, self.y_test, 32, steps=steps) num_examples_test = len(self.x_test) return loss, num_examples_test, {"accuracy": accuracy} ''' Step 4. Create an instance of our flower client and add one line to actually run this client. (建立Client-to-Server的連線) ''' def main() -> None: # Parse command line argument `partition` # 從 CommandLine 輸入 Client 編號,對 Dataset進行切割 parser = argparse.ArgumentParser(description="Flower") parser.add_argument("--partition", type=int, choices=range(0, 10), required=True) args = parser.parse_args() # Load and compile Keras model model = CNN_Model(input_shape=(28, 28, 1), number_classes=10) #model.summary() model.compile("adam", "categorical_crossentropy", metrics=["accuracy"]) # Load a subset of CIFAR-10 to simulate the local data partition (x_train, y_train), (x_test, y_test) = load_partition(args.partition) # Start Flower client client = MnistClient(model, x_train, y_train, x_test, y_test) fl.client.start_numpy_client("localhost:8080", client=client) # windows if __name__ == "__main__": main() ``` * 執行程式,於 Command Line 輸入: ``` python client.py --partition <client-id> ``` 例如: client-1,輸入 python client.py --partition 0 client-2,輸入 python client.py --partition 1 #### ==[制定聯合學習策略-2] **Aggregation strategy (聚合策略)**== * 針對聚合策略,Flower提供的FedAvg Class提供四種API: 1. **configure_fit():** 即前述FL Hyperparameter的實作部分,內部程式碼實現**如何挑選Client(隨機)、傳遞給Client-side的資料傳輸格式等** * API資料傳遞方向: Server-to-Client, 發送訓練策略。 * [內部程式碼](https://flower.dev/docs/_modules/flwr/server/strategy/fedavg.html#FedAvg.configure_fit) 2. **aggregate_fit():** 實作聚合演算法,將Clent回傳的model參數加以聚合。 * API資料傳遞方向: Client-to-Server,接收訓練策略。 * [內部程式碼](https://flower.dev/docs/_modules/flwr/server/strategy/fedavg.html#FedAvg.aggregate_fit) 4. **configure_evaluate():** 與configure_fit()類似,只是改成evaluate方面。 * API資料傳遞方向: Server-to-Client,發送評估策略。 * [內部程式碼](https://flower.dev/docs/_modules/flwr/server/strategy/fedavg.html#FedAvg.configure_evaluate) 5. **aggregate_evaluate():** 與aggregate_fit()類似,但evaluate不需要計算model參數,因此只會將聚合演算法套用至正確率或loss值。 * API資料傳遞方向: Client-to-Server,接收評估結果。 * [內部程式碼](https://flower.dev/docs/_modules/flwr/server/strategy/fedavg.html#FedAvg.aggregate_evaluate) * 此段落參照Flower官方文件 [Implementing Strategies](https://flower.dev/docs/implementing-strategies.html) 及 [Saving Progress](https://flower.dev/docs/saving-progress.html ) 將詳細介紹aggregate_fit()及aggregate_evaluate()的使用方法。 1. **如何改寫聚合演算法、取得Client回傳的訓練結果/模型權重參數、儲存Global model 權重值** * server.py 完整程式碼 ```python= ''' 引入套件 ''' from typing import Any, Callable, Dict, List, Optional, Tuple import flwr as fl import tensorflow as tf # 建立Global model並取得初始參數 from tensorflow.keras import Input, Model, layers, models # 建立CNN架構 import numpy as np # 資料前處理 ''' Step 1. Build Global Model (建立全域模型) ''' # Hyperparameter超參數 num_classes = 10 input_shape = (28, 28, 1) # Build Model def CNN_Model(input_shape, number_classes): # define Input layer input_tensor = Input(shape=input_shape) # Input: convert normal numpy to Tensor (float32) # define layer connection x = layers.Conv2D(filters = 32, kernel_size=(3, 3), activation="relu")(input_tensor) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Conv2D(filters = 64, kernel_size=(3, 3), activation="relu")(x) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Flatten()(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(number_classes, activation="softmax")(x) # define model model = Model(inputs=input_tensor, outputs=outputs, name="mnist_model") return model ''' Step 2. Override fl.server.strategy.FedAvg 覆寫FedAvg class ''' class SaveModelStrategy(fl.server.strategy.FedAvg): def aggregate_fit( self, rnd: int, results: List[Tuple[fl.server.client_proxy.ClientProxy, fl.common.FitRes]], failures: List[BaseException], ) -> Optional[fl.common.Weights]: # [!!!] 此處再次使用原本 fl.server.strategy.FedAvg 的 aggregate_fit, # aggregate_fit() 會回傳聚合後的模型權重參數 # 相關官方文件: #https://flower.dev/docs/_modules/flwr/server/strategy/fedavg.html#FedAvg.aggregate_fit # [!!!] 若要改寫聚合演算法,此處就不能寫 aggregated_weights = super().aggregate_fit(rnd, results, failures) # 而是要參考文件中 aggregate_fit() 的程式碼,去改寫聚合演算法 aggregated_weights = super().aggregate_fit(rnd, results, failures) # 取出 Ckient 回傳的結果: https://flower.dev/docs/_modules/flwr/common/typing.html#FitRes # accuracies = [r.metrics["accuracy"] * r.num_examples for _, r in results] # type(r.metrics) = dict for _, r in results: # 取得 Client 回傳的訓練結果 print(f"\n[!!!!!!!!] Client loss = { r.metrics['loss'] }") print(f"[!!!!!!!!] Client accuracy = { r.metrics['accuracy'] }") print(f"[!!!!!!!!] Client val_loss = { r.metrics['val_loss'] }") print(f"[!!!!!!!!] Client val_accuracy = { r.metrics['val_accuracy'] }\n") # 將Model權重參數存起來 if aggregated_weights is not None: # Save aggregated_weights print(f"Saving round {rnd} aggregated_weights...") np.savez(f"round-{rnd}-weights.npz", *aggregated_weights) return aggregated_weights ''' Step 3. Start server and run the strategy (套用所設定的策略,啟動Server) ''' def main() -> None: # Load and compile model for # 1. server-side parameter initialization # 2. server-side parameter evaluation model = CNN_Model(input_shape=input_shape, number_classes=num_classes) #model.summary() model.compile("adam", "categorical_crossentropy", metrics=["accuracy"]) # Create strategy strategy = SaveModelStrategy( fraction_fit=0.5, # 每一輪參與Training的Client比例 fraction_eval=0.5, # 每一輪參與Evaluating的Client比例 min_fit_clients=2, # 每一輪參與Training的最少Client連線數量 (與比例衝突時,以此為準) min_eval_clients=1, # 每一輪參與Evaluating的最少Client連線數量 (與比例衝突時,以此為準) min_available_clients=2, # 啟動聯合學習之前,Client連線的最小數量 on_fit_config_fn=fit_config, # 設定 Client-side Training Hyperparameter on_evaluate_config_fn=evaluate_config, # 設定 Client-side Evaluating Hyperparameter eval_fn=get_eval_fn(model), # 設定 Server-side Evaluating Hyperparameter (用Global Dataset進行評估) initial_parameters=fl.common.weights_to_parameters(model.get_weights()), # Global Model 初始參數設定 ) # Start Flower server for four rounds of federated learning fl.server.start_server("localhost:8080", config={"num_rounds": 2}, strategy=strategy) #windows ''' [Model Hyperparameter](Client-side, train strategy) * 設定Client Training 的 Hyperparameter: 包含batch_size、epochs、learning-rate...皆可設定。 * 甚至可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def fit_config(rnd: int): """Return training configuration dict for each round. Keep batch size fixed at 128, perform two rounds of training with one local epoch, increase to two local epochs afterwards. """ config = { "batch_size": 128, "local_epochs": 1 if rnd < 2 else 2, # Client 進行 local model Training時,前兩輪的epoch設為1,之後epoch設為2 } return config ''' [Model Hyperparameter](Client-side, evaluate strategy) * 設定Client Testing 的 Hyperparameter: 包含epochs、steps(Total number of steps, 也就是 batche個數 (batches of samples))。 * 可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def evaluate_config(rnd: int): """Return evaluation configuration dict for each round. Perform five local evaluation steps on each client (i.e., use five batches) during rounds one to three, then increase to ten local evaluation steps. """ val_steps = 5 if rnd < 4 else 10 # Client 進行 local model evaluate時,前4輪 step 為 5,之後 step 為 10 return {"val_steps": val_steps} ''' [Model Hyperparameter](Server-side, evaluate strategy) 用 Global Dataset 評估 Global model (不含訓練) ''' def get_eval_fn(model): """Return an evaluation function for server-side evaluation.""" # Load data and model here to avoid the overhead of doing it in `evaluate` itself (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() # Train sample size: 60000 # Data preprocessing x_train = x_train.astype("float32") / 255 x_train = np.expand_dims(x_train, -1) y_train = tf.keras.utils.to_categorical(y_train, 10) # Use the last 5k training examples as a validation set x_val, y_val = x_train[60000-5000:], y_train[60000-5000:] # The `evaluate` function will be called after every round def evaluate( weights: fl.common.Weights, ) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]: model.set_weights(weights) # Update model with the latest parameters loss, accuracy = model.evaluate(x_val, y_val) return loss, {"accuracy": accuracy} return evaluate ''' main ''' if __name__ == "__main__": main() ``` * 執行程式,於 Command Line 輸入: ``` python server.py ``` * 與前段Hyperparameter strategy程式碼相異之處: Step 2: 定義新的SaveModelStrategy Class,並繼承自Flower提供的FedAvg Class。如此一來便可以override(覆寫)FedAvg原有的功能、演算法,亦可提取Client回傳的參數、訓練結果。 * client.py 程式碼 與前述 Hyperparameter strategy 的 client.py 相同 2. **對Client-side回傳的 evaluate 結果,套用聚合演算法,計算聚合後的正確率、loss值** * Server.py 完整程式碼 ```python ''' 引入套件 ''' from typing import Any, Callable, Dict, List, Optional, Tuple import flwr as fl import tensorflow as tf # 建立Global model並取得初始參數 from tensorflow.keras import Input, Model, layers, models # 建立CNN架構 import numpy as np # 資料前處理 ''' Step 1. Build Global Model (建立全域模型) ''' # Hyperparameter超參數 num_classes = 10 input_shape = (28, 28, 1) # Build Model def CNN_Model(input_shape, number_classes): # define Input layer input_tensor = Input(shape=input_shape) # Input: convert normal numpy to Tensor (float32) # define layer connection x = layers.Conv2D(filters = 32, kernel_size=(3, 3), activation="relu")(input_tensor) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Conv2D(filters = 64, kernel_size=(3, 3), activation="relu")(x) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Flatten()(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(number_classes, activation="softmax")(x) # define model model = Model(inputs=input_tensor, outputs=outputs, name="mnist_model") return model ''' Step 2. Override fl.server.strategy.FedAvg (覆寫FedAvg class) ''' class AggregateCustomMetricStrategy(fl.server.strategy.FedAvg): def aggregate_evaluate( self, rnd: int, results: List[Tuple[fl.server.client_proxy.ClientProxy, fl.common.EvaluateRes]], # [!!!] I fix this failures: List[BaseException], ) -> Optional[float]: """Aggregate evaluation losses using weighted average.""" if not results: return None # Weigh accuracy of each client by number of examples used accuracies = [r.metrics["accuracy"] * r.num_examples for _, r in results] examples = [r.num_examples for _, r in results] # Aggregate and print custom metric accuracy_aggregated = sum(accuracies) / sum(examples) print(f"Round {rnd} accuracy aggregated from client results: {accuracy_aggregated}") print("*** SHOW: list of accuracies ***") for index, item in enumerate(accuracies): print(f"[{index}], {item}") print("*** SHOW: list of examples ***") for index, item in enumerate(examples): print(f"[{index}], {item}") # Call aggregate_evaluate from base class (FedAvg) return super().aggregate_evaluate(rnd, results, failures) ''' Step 3. Start server and run the strategy (套用所設定的策略,啟動Server) ''' def main() -> None: # Load and compile model for # 1. server-side parameter initialization # 2. server-side parameter evaluation model = CNN_Model(input_shape=input_shape, number_classes=num_classes) #model.summary() model.compile("adam", "categorical_crossentropy", metrics=["accuracy"]) # Create strategy strategy = AggregateCustomMetricStrategy( fraction_fit=0.5, # 每一輪參與Training的Client比例 fraction_eval=0.5, # 每一輪參與Evaluating的Client比例 min_fit_clients=2, # 每一輪參與Training的最少Client連線數量 (與比例衝突時,以此為準) min_eval_clients=1, # 每一輪參與Evaluating的最少Client連線數量 (與比例衝突時,以此為準) min_available_clients=2, # 啟動聯合學習之前,Client連線的最小數量 on_fit_config_fn=fit_config, # 設定 Client-side Training Hyperparameter on_evaluate_config_fn=evaluate_config, # 設定 Client-side Evaluating Hyperparameter eval_fn=get_eval_fn(model), # 設定 Server-side Evaluating Hyperparameter (用Global Dataset進行評估) initial_parameters=fl.common.weights_to_parameters(model.get_weights()), # Global Model 初始參數設定 ) # Start Flower server for four rounds of federated learning fl.server.start_server("localhost:8080", config={"num_rounds": 2}, strategy=strategy) #windows ''' [Model Hyperparameter](Client-side, train strategy) * 設定Client Training 的 Hyperparameter: 包含batch_size、epochs、learning-rate...皆可設定。 * 甚至可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def fit_config(rnd: int): """Return training configuration dict for each round. Keep batch size fixed at 128, perform two rounds of training with one local epoch, increase to two local epochs afterwards. """ config = { "batch_size": 128, "local_epochs": 1 if rnd < 2 else 2, # Client 進行 local model Training時,前兩輪的epoch設為1,之後epoch設為2 } return config ''' [Model Hyperparameter](Client-side, evaluate strategy) * 設定Client Testing 的 Hyperparameter: 包含epochs、steps(Total number of steps, 也就是 batche個數 (batches of samples))。 * 可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def evaluate_config(rnd: int): """Return evaluation configuration dict for each round. Perform five local evaluation steps on each client (i.e., use five batches) during rounds one to three, then increase to ten local evaluation steps. """ val_steps = 5 if rnd < 4 else 10 # Client 進行 local model evaluate時,前4輪 step 為 5,之後 step 為 10 return {"val_steps": val_steps} ''' [Model Hyperparameter](Server-side, evaluate strategy) 用 Global Dataset 評估 Global model (不含訓練) ''' def get_eval_fn(model): """Return an evaluation function for server-side evaluation.""" # Load data and model here to avoid the overhead of doing it in `evaluate` itself (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() # Train sample size: 60000 # Data preprocessing x_train = x_train.astype("float32") / 255 x_train = np.expand_dims(x_train, -1) y_train = tf.keras.utils.to_categorical(y_train, 10) # Use the last 5k training examples as a validation set x_val, y_val = x_train[60000-5000:], y_train[60000-5000:] # The `evaluate` function will be called after every round def evaluate( weights: fl.common.Weights, ) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]: model.set_weights(weights) # Update model with the latest parameters loss, accuracy = model.evaluate(x_val, y_val) return loss, {"accuracy": accuracy} return evaluate ''' main ''' if __name__ == "__main__": main() ``` * 執行程式,於 Command Line 輸入: ``` python client.py --partition <client-id> ``` 例如: client-1,輸入 python client.py --partition 0 client-2,輸入 python client.py --partition 1 * 與前段Hyperparameter strategy程式碼相異之處: Step 2: 定義新的AggregateCustomMetricStrategy Class,並繼承自Flower提供的FedAvg Class。如此一來便可以override(覆寫)FedAvg原有的aggregate_evaluate()。 * Client.py 程式碼 與前述 Hyperparameter strategy 的 client.py 相同 * **將FL-strategy套用至 Server.py** 所有 FL-strategy 最終會 assign 給 "變數strategy",並作為 fl.server.start_server() 的參數,當執行 Server.py 便會由這段語法去執行所有自行設定的strategy,如下所示: ```python= fl.server.start_server("localhost:8080", config={"num_rounds": 3}, strategy=strategy) # Windows ``` ### ==傳送Model結構== * 前言: 此為本人開發的功能,前述的溝通架構中都有個Bug,就是Client居然會預先把model的結構寫死,這對FL而言勢必會造成困擾,例如Server-side更改Global model的層數設定,Client-side也需更改model的結構。然而官方的API尚未提供一個傳送並重建model結構的功能。<br> 對此,本人發現可以利用Json套件及Server-side的fit_config()函式,便可將model結構(Tensorflow的config資料結構) 傳送至Client,並於 Client 重建 model結構。 * 傳送Model結構程式碼的FL溝通示意圖 ![](https://i.imgur.com/RduTyF5.png) * 與之前的溝通架構不同的地方是Client-side的**Local model並非預先寫死的**,而是透過Flower框架傳送Global model結構,並**於Client-side重建model**。 * server.py 完整程式碼 ```python= ''' 引入套件 ''' from typing import Any, Callable, Dict, List, Optional, Tuple import flwr as fl import tensorflow as tf # 建立Global model並取得初始參數 from tensorflow.keras import Input, Model, layers, models # 建立CNN架構 import json # 傳遞model config 所需 import numpy as np # 資料前處理 ''' # Hyperparameter ''' K_Model_config = None K_Model_optimizer = 'adam' K_Model_loss = 'categorical_crossentropy' num_classes = 10 input_shape = (28, 28, 1) ''' Step 1. Build Global Model (建立全域模型) ''' # Build Model def CNN_Model(input_shape, number_classes): # define Input layer input_tensor = Input(shape=input_shape) # Input: convert normal numpy to Tensor (float32) # define layer connection x = layers.Conv2D(filters = 32, kernel_size=(3, 3), activation="relu")(input_tensor) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Conv2D(filters = 64, kernel_size=(3, 3), activation="relu")(x) x = layers.MaxPooling2D(pool_size=(2, 2))(x) x = layers.Flatten()(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(number_classes, activation="softmax")(x) # define model model = Model(inputs=input_tensor, outputs=outputs, name="mnist_model") return model ''' Step 2. Start server and run the strategy (套用所設定的策略,啟動Server) ''' def main() -> None: # Load and compile model for # 1. server-side parameter initialization # 2. server-side parameter evaluation model = CNN_Model(input_shape=input_shape, number_classes=num_classes) #model.summary() model.compile("adam", "categorical_crossentropy", metrics=["accuracy"]) global K_Model_config # 取出model結構(dict型態),並轉成json字串(string型態) K_Model_config = json.dumps(model.get_config()) # Create strategy strategy = fl.server.strategy.FedAvg( fraction_fit=0.5, # 每一輪參與Training的Client比例 fraction_eval=0.5, # 每一輪參與Evaluating的Client比例 min_fit_clients=2, # 每一輪參與Training的最少Client連線數量 (與比例衝突時,以此為準) min_eval_clients=1, # 每一輪參與Evaluating的最少Client連線數量 (與比例衝突時,以此為準) min_available_clients=2, # 啟動聯合學習之前,Client連線的最小數量 on_fit_config_fn=fit_config, # 設定 Client-side Training Hyperparameter on_evaluate_config_fn=evaluate_config, # 設定 Client-side Evaluating Hyperparameter eval_fn=get_eval_fn(model), # 設定 Server-side Evaluating Hyperparameter (用Global Dataset進行評估) initial_parameters=fl.common.weights_to_parameters(model.get_weights()), # Global Model 初始參數設定 ) # Start Flower server for four rounds of federated learning fl.server.start_server("localhost:8080", config={"num_rounds": 2}, strategy=strategy) #windows ''' [Model Hyperparameter](Client-side, train strategy) * 設定Client Training 的 Hyperparameter: 包含batch_size、epochs、learning-rate...皆可設定。 * 甚至可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def fit_config(rnd: int): """Return training configuration dict for each round. Keep batch size fixed at 128, perform two rounds of training with one local epoch, increase to two local epochs afterwards. """ config = { "batch_size": 128, "local_epochs": 1 if rnd < 2 else 2, # Client 進行 local model Training時,前兩輪的epoch設為1,之後epoch設為2 "config": K_Model_config, "optimizer": K_Model_optimizer, "loss_fn": K_Model_loss, } return config ''' [Model Hyperparameter](Client-side, evaluate strategy) * 設定Client Testing 的 Hyperparameter: 包含epochs、steps(Total number of steps, 也就是 batche個數 (batches of samples))。 * 可以設定不同 FL round 給予 client 不同的 Hyperparameter ''' def evaluate_config(rnd: int): """Return evaluation configuration dict for each round. Perform five local evaluation steps on each client (i.e., use five batches) during rounds one to three, then increase to ten local evaluation steps. """ config = { "val_steps": 5 if rnd < 4 else 10, # Client 進行 local model evaluate時,前4輪 step 為 5,之後 step 為 10 "config": K_Model_config, "optimizer": K_Model_optimizer, "loss_fn": K_Model_loss, } return config ''' [Model Hyperparameter](Server-side, evaluate strategy) 用 Global Dataset 評估 Global model (不含訓練) ''' def get_eval_fn(model): """Return an evaluation function for server-side evaluation.""" # Load data and model here to avoid the overhead of doing it in `evaluate` itself (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() # Train sample size: 60000 # Data preprocessing x_train = x_train.astype("float32") / 255 x_train = np.expand_dims(x_train, -1) y_train = tf.keras.utils.to_categorical(y_train, 10) # Use the last 5k training examples as a validation set x_val, y_val = x_train[60000-5000:], y_train[60000-5000:] # The `evaluate` function will be called after every round def evaluate( weights: fl.common.Weights, ) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]: model.set_weights(weights) # Update model with the latest parameters loss, accuracy = model.evaluate(x_val, y_val) return loss, {"accuracy": accuracy} return evaluate ''' main ''' if __name__ == "__main__": main() ``` * 執行程式,於 Command Line 輸入: ``` python server.py ``` * 程式碼說明 1. 新定義3個Hyperparameter * **K_Model_config = None** 用來裝 global model 結構 * **K_Model_optimizer = 'adam'** 用來儲存 local model 的 activation function 設定 * **K_Model_loss = 'categorical_crossentropy'** 用來儲存 local model 的 loss function 設定 2. 於Step 2 的 main(),用 K_Model_config 儲存 Global model 的結構 (轉成json字串格式) 3. 透過 **fit_config()、evaluate_config()** 來傳遞這3個 Hyperparameter * client.py 完整程式碼 ```python= ''' 引入套件 ''' import os # Make TensorFlow logs less verbose # 更改tensorflow的Log訊息的顯示模式 os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" ''' Level | Level for Humans | Level Description -------|------------------|------------------------------------ 0 | DEBUG | [Default] Print all messages 1 | INFO | Filter out INFO messages 2 | WARNING | Filter out INFO & WARNING messages 3 | ERROR | Filter out all messages ''' import flwr as fl import tensorflow as tf from tensorflow import keras import numpy as np # 資料前處理 import json # 重建 global model import argparse # CmmandLine 輸入控制參數 # Make TensorFlow logs less verbose (減少不必要的訊息顯示) os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" # Step 0. Do not need to build Local Model ''' Step 1. Load local dataset (引入本地端資料集),對Dataset進行切割 ''' def load_partition(idx: int): """Load 1/10th of the training and test data to simulate a partition.""" #assert idx in range(10) # limit it can't more than 10... (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() # Train 60000-5000, Test 10000 # Data preprocessing x_train = x_train.astype("float32") / 255 x_test = x_test.astype("float32") / 255 x_train = np.expand_dims(x_train, -1) x_test = np.expand_dims(x_test, -1) y_train = tf.keras.utils.to_categorical(y_train, 10) y_test = tf.keras.utils.to_categorical(y_test, 10) sample_size = 60000 #x_train.shape[0] sample_size -= 5000 # Server-side 預留做驗證 training_size = int(sample_size/2) # !!! 此 2 是指 client-side 的總數量為 2 位,若有 10 個 clients,要填 10 testing_size = int(10000/2) # !!! 此 2 是指 client-side 的總數量為 2 位,若有 10 個 clients,要填 10 return ( x_train[idx * training_size : (idx + 1) * training_size], y_train[idx * training_size : (idx + 1) * training_size], ), ( x_test[idx * testing_size : (idx + 1) * testing_size], y_test[idx * testing_size : (idx + 1) * testing_size], ) ''' Step 2. Define Flower client (定義client的相關設定: 接收Server-side的global model weight、hyperparameters) ''' class MnistClient(fl.client.NumPyClient): # Class初始化: 盛裝local model、dataset def __init__(self, model, x_train, y_train, x_test, y_test): self.model = model self.x_train, self.y_train = x_train, y_train self.x_test, self.y_test = x_test, y_test # 此時已無作用,原用來取得 local model 的 ini-weight, # 目前初始權重值是來自 Server-side 而非 client 自己 def get_parameters(self): """Get parameters of the local model.""" raise Exception("Not implemented (server-side parameter initialization)") def fit(self, parameters, config): """Train parameters on the locally held training set.""" print("*** Training Start ***") # [!!!!] 接收並重建 Local Model model_config = json.loads(config['config']) self.model = tf.keras.Model().from_config(model_config) self.model.compile(optimizer=config['optimizer'], loss=config['loss_fn'], metrics=["accuracy"]) # Update local model parameters: from server-side initial_parameters self.model.set_weights(parameters) # Get hyperparameters for this round batch_size: int = config["batch_size"] epochs: int = config["local_epochs"] # Train the model using hyperparameters from config # (依 Server-side 的 hyperparameters 進行訓練) history = self.model.fit( self.x_train, self.y_train, batch_size, epochs, validation_split=0.1, ) # Return updated model parameters and results # 將訓練後的權重、資料集筆數、正確率/loss值等,回傳至server-side parameters_prime = self.model.get_weights() num_examples_train = len(self.x_train) results = { # return server-side: fl.common.FitRes.metrics.keys() "Client_loss": history.history["loss"][-1], "Client_accuracy": history.history["accuracy"][-1], "Client_val_loss": history.history["val_loss"][-1], "Client_val_accuracy": history.history["val_accuracy"][-1], } return parameters_prime, num_examples_train, results # return to server-side: app_fit def evaluate(self, parameters, config): """Evaluate parameters on the locally held test set.""" print("*** Evaluating Start ***") # [!!!!] 接收並重建 Local Model model_config = json.loads(config['config']) self.model = tf.keras.Model().from_config(model_config) self.model.compile(optimizer=config['optimizer'], loss=config['loss_fn'], metrics=["accuracy"]) # Update local model with global parameters self.model.set_weights(parameters) # Get config values steps: int = config["val_steps"] # Evaluate global model parameters on the local test data and return results loss, accuracy = self.model.evaluate(self.x_test, self.y_test, batch_size=32, steps=steps) #batch_size default=32 num_examples_test = len(self.x_test) return loss, num_examples_test, {"Client_accuracy": accuracy} # return to server-side: app_evaluate ''' Step 3. Create an instance of our flower client and add one line to actually run this client. (建立Client-to-Server的連線) ''' def main() -> None: # Parse command line argument `partition` # 從 CommandLine 輸入 Client 編號,對 Dataset進行切割 parser = argparse.ArgumentParser(description="Flower") parser.add_argument("--partition", type=int, choices=range(0, 10), required=True) args = parser.parse_args() # Load a subset of CIFAR-10 to simulate the local data partition (x_train, y_train), (x_test, y_test) = load_partition(args.partition) # Start Flower client client = MnistClient(None, x_train, y_train, x_test, y_test) # 不用輸入 local model fl.client.start_numpy_client("localhost:8080", client=client) # windows if __name__ == "__main__": main() ``` * 執行程式,於 Command Line 輸入: ``` python client.py --partition <client-id> ``` 例如: client-1,輸入 python client.py --partition 0 client-2,輸入 python client.py --partition 1 * 程式碼解說 * 於 Step 2,"config"變數就是儲存Server傳過來的Strategy,其為 dict 資料結構。<br>取出 model 結構之後 ( config['config'] ),需用 json.load() 對其解碼,轉換回 dict 結構,才能匯入 local model * **fit()的"config"變數**就是 Server-side 的 fit_config() 的 return 值 * **evaluate()的"config"變數**就是 Server-side 的 evaluate_config() 的 return 值 ## ==**[Flower Simulation] 在Google Colab執行Flower**== Flower作者已開發單機模擬的套件,省去gRPC連線,實現如同TFF的單機模擬功能,可以模擬大量Client測試演算法。 [官方 merged code 單機模擬 ipynb](https://github.com/adap/flower/blob/main/examples/simulation_tensorflow/sim.ipynb) [原始碼Github issue連結](https://github.com/adap/flower/pull/833) ```python= # -*- coding: utf-8 -*- """Flower_Simulation Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/gist/kuihao/bc4aae1a9bc23295a06a2b1d81e79ac9/flower_simulation.ipynb # Flower Quickstart (Simulation with TensorFlow/Keras) **Origianl Auther: Danieljanes ([Link](https://github.com/adap/flower/pull/833))** Welcome to Flower, a friendly federated learning framework! In this notebook, we'll simulate a federated learning system with 100 clients. The clients will use TensorFlow/Keras to define model training and evaluation. Let's start by installing Flower Nightly, published as `flwr-nightly` on PyPI: ## IMPORT PKG """ !pip install git+https://github.com/adap/flower.git@release/0.17#egg=flwr["simulation"] # For a specific branch (release/0.17) w/ extra ("simulation") # !pip install -U flwr["simulation"] # Once 0.17.1 is released """Next, we import the required dependencies. The most important imports are Flower (`flwr`) and TensorFlow:""" import os import math # Make TensorFlow logs less verbose os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" import flwr as fl import tensorflow as tf """##Client-side With that out of the way, let's move on to the interesting bits. Federated learning systems consist of a server and multiple clients. In Flower, we create clients by implementing subclasses of `flwr.client.Client` or `flwr.client.NumPyClient`. We use `NumPyClient` in this tutorial because it is easier to implement and requires us to write less boilerplate. To implement the Flower client, we create a subclass of `flwr.client.NumPyClient` and implement the three methods `get_parameters`, `fit`, and `evaluate`: - `get_parameters`: Return the current local model parameters - `fit`: Receive model parameters from the server, train the model parameters on the local data, and return the (updated) model parameters to the server - `evaluate`: Received model parameters from the server, evaluate the model parameters on the local data, and return the evaluation result to the server We mentioned that our clients will use TensorFlow/Keras for the model training and evaluation. Keras models provide methods that make the implementation staightforward: we can update the local model with server-provides parameters through `model.set_weights`, we can train/evaluate the model through `fit/evaluate`, and we can get the updated model parameters through `model.get_weights`. Let's see a simple implementation: """ class FlowerClient(fl.client.NumPyClient): def __init__(self, model, x_train, y_train, x_val, y_val) -> None: self.model = model self.x_train, self.y_train = x_train, y_train self.x_val, self.y_val = x_train, y_train def get_parameters(self): return self.model.get_weights() def fit(self, parameters, config): self.model.set_weights(parameters) self.model.fit(self.x_train, self.y_train, epochs=1, verbose=2) return self.model.get_weights(), len(self.x_train), {} def evaluate(self, parameters, config): self.model.set_weights(parameters) loss, acc = self.model.evaluate(self.x_val, self.y_val, verbose=2) return loss, len(self.x_val), {"accuracy": acc} """Our class `FlowerClient` defines how local training/evaluation will be performed and allows Flower to call the local training/evaluation through `fit` and `evaluate`. Each instance of `FlowerClient` represents a *single client* in our federated learning system. Federated learning systems have multiple clients (otherwise there's not much to federate, is there?), so each client will be represented by its own instance of `FlowerClient`. If we have, for example, three clients in our workload, we'd have three instances of `FlowerClient`. Flower calls `FlowerClient.fit` on the respective instance when the server selects a particular client for training (and `FlowerClient.evaluate` for evaluation). In this notebook, we want to simulate a federated learning system with 100 clients on a single machine. This means that the server and all 100 clients will live on a single machine and share resources such as CPU, GPU, and memory. Having 100 clients would mean having 100 instances of `FlowerClient` im memory. Doing this on a single machine can quickly exhaust the available memory resources, even if only a subset of these clients participates in a single round of federated learning. In addition to the regular capabilities where server and clients run on multiple machines, Flower therefore provides special simulation capabilities that create `FlowerClient` instances only when they are actually necessary for training or evaluation. To enable the Flower framework to create clients when necessary, we need to implement a function called `client_fn` that creates a `FlowerClient` instance on demand. Flower calls `client_fn` whenever it needs an instance of one particular client to call `fit` or `evaluate` (those instances are usually discarded after use). Clients are identified by a client ID, or short `cid`. The `cid` can be used, for example, to load different local data partitions for each client: """ # Client main function def client_fn(cid: str) -> fl.client.Client: # Create model model = tf.keras.models.Sequential( [ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10, activation="softmax"), ] ) model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"]) # Load data partition (divide MNIST into NUM_CLIENTS distinct partitions) (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() partition_size = math.floor(len(x_train) / NUM_CLIENTS) idx_from, idx_to = int(cid) * partition_size, (int(cid) + 1) * partition_size x_train_cid = x_train[idx_from:idx_to] / 255.0 y_train_cid = y_train[idx_from:idx_to] # Use 10% of the client's training data for validation split_idx = math.floor(len(x_train) * 0.9) x_train_cid, y_train_cid = x_train_cid[:split_idx], y_train_cid[:split_idx] x_val_cid, y_val_cid = x_train_cid[split_idx:], y_train_cid[split_idx:] # Create and return client return FlowerClient(model, x_train_cid, y_train_cid, x_val_cid, y_val_cid) """We now have `FlowerClient` which defines client-side training and evaluation and `client_fn` which allows Flower to create `FlowerClient` instances whenever it needs to call `fit` or `evaluate` on one particular client. The last step is to start the actual simulation using `flwr.simulation.start_simulation`. The function `start_simulation` accepts a number of arguments, amongst them the `client_fn` used to create `FlowerClient` instances, the number of clients to simulate `num_clients`, the number of rounds `num_rounds`, and the strategy. The strategy encapsulates the federated learning approach/algorithm, for example, *Federated Averaging* (FedAvg). Flower comes with a number of built-in strategies, but we can also use our own strategy implementations to customize nearly all aspects of the federated learning approach. For this example, we use the built-in `FedAvg` implementation and customize it using a few basic parameters. The last step is the actual call to `start_simulation` which - you guessed it - actually starts the simulation. ##Server-side """ NUM_CLIENTS = 10 # Create FedAvg strategy strategy=fl.server.strategy.FedAvg( fraction_fit=0.1, # Sample 10% of available clients for training fraction_eval=0.05, # Sample 5% of available clients for evaluation min_fit_clients=10, # Never sample less than 10 clients for training min_eval_clients=10, # Never sample less than 5 clients for evaluation min_available_clients=int(NUM_CLIENTS * 0.75), # Wait until at least 75 clients are available ) # Start simulation fl.simulation.start_simulation( client_fn=client_fn, num_clients=NUM_CLIENTS, num_rounds=5, strategy=strategy, ) """Congratulations! With that, you built a Flower client, customized it's instantiation through the `client_fn`, customized the server-side execution through a `FedAvg` strategy configured for this workload, and started a simulation with 100 clients (each holding their own individual partition of the MNIST dataset). Next, you can continue to explore more advanced Flower topics: - Deploy server and clients on different machines using `start_server` and `start_client` - Customize the server-side execution through custom strategies - Customize the client-side exectution through `config` dictionaries """ ``` ## Others ### ==**Flower Aggregate 程式碼解說**== #### **FedAvg: aggregate()** * FedAvg 演算法: ![](https://i.imgur.com/Xu6mg8K.png) * [官方原始程式碼(Github file)](https://github.com/adap/flower/blob/main/src/py/flwr/server/strategy/aggregate.py) * Code ```python= def aggregate(results: List[Tuple[Weights, int]]) -> Weights: """Compute weighted average.""" # Calculate the total number of examples used during training num_examples_total = sum([num_examples for _, num_examples in results]) # Create a list of weights, each multiplied by the related number of examples weighted_weights = [ [layer * num_examples for layer in weights] for weights, num_examples in results ] # Compute average weights of each layer weights_prime: Weights = [ reduce(np.add, layer_updates) / num_examples_total for layer_updates in zip(*weighted_weights) ] return weights_prime ``` * **aggregate() 重點說明** * **輸入參數 (Parameters):** * **Flower List**: node數量等於client數量 * **Flower Tuple**: (Weights, int) * **Weights:** Keras模型權重 * **int:** 資料集size * **回傳值 (Return):** * 聚合後的Keras模型權重 * **Return type:** * **Keras模型權重的資料結構** [tensorflow.keras.model.get_weights()](https://www.tensorflow.org/api_docs/python/tf/keras/layers/Layer#get_weights) * **Others** * **第4行**: 取出所有client的dataset size,裝進list,再用sum(),算出資料集總筆數(num_examples_total) * **第7~9行**: weighted_weights是一個二維list,第一層node數量等於client數量,第二層node數量等於model的layer數量;第二層每個node裝「該層的參數」與「該client的資料集筆數」的乘積。 * **第12~15行**: 實作FedAvg * **zip(*)**: 會把每位client同一層的weighted_weight打包成tuple * **reduce(np.add, 變數)**: 更底層的 numpy.sum() 的寫法,對 list 的所有 node 做加總,此為編譯最佳化寫法。 * **weights_prime就是聚合後的model參數**,資料結構是一維list,每個node是「對所有client某一層的參數」做FedAvg的結果。 #### **(已理解) FedOpt** (FL+ Adaptive Optimizatoin) 2021/10/15-26 Code-tracing... 解說文待補 * [Flower FedOpt](https://github.com/adap/flower/blob/main/src/py/flwr/server/strategy/fedopt.py) * [Flower FedAdagrad](https://github.com/adap/flower/blob/main/src/py/flwr/server/strategy/fedadagrad.py) * [Flower FedAdam ](https://github.com/adap/flower/blob/main/src/py/flwr/server/strategy/fedadam.py) * [Flower FedYogi](https://github.com/adap/flower/blob/main/src/py/flwr/server/strategy/fedyogi.py) #### **(待研究) Other aggreation method** * [Flower qfedavg](https://github.com/adap/flower/blob/main/src/py/flwr/server/strategy/qfedavg.py) * [Flower fedfs_v0](https://github.com/adap/flower/blob/main/src/py/flwr/server/strategy/fedfs_v0.py) ### [BUG解決] Linux 聯合學習時 GPU 記憶體溢出問題 * source: https://www.tensorflow.org/guide/gpu * 說明 * Tensorflow 的 linux 版本,預設會限制GPU的記憶體使用量,防止程式不斷索求記憶體,然而這可能會導致 model 運作異常 * 解決方法 * 於 Client 端的程式加入以下程式碼,便可以解除 Tensorflow 使用 GPU 時的限制 * client.py 部分程式碼 ```python= ''' # Linux need to add this # Limiting GPU memory growth: https://www.tensorflow.org/guide/gpu ''' gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: # Currently, memory growth needs to be the same across GPUs for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) # [Key]: Limiting GPU memory growth 只有必要時才會延展GPU記憶體的使用空間 logical_gpus = tf.config.experimental.list_logical_devices('GPU') print(f"\n*** {len(gpus)} Physical GPUs, {len(logical_gpus)} Logical GPUs ***\n") except RuntimeError as e: # Memory growth must be set before GPUs have been initialized print(e) ``` ### ==SOTA non-iid 資料集== * **說明:** 目前大部分 FL 文獻所使用的資料集都源自於[LEAF: A Benchmark for Federated Settings (2018)](https://arxiv.org/abs/1812.01097)的[Github repository](https://github.com/TalwalkarLab/leaf),但LEAF的API難以使用。**所幸 Google 研究員已將部分LEAF的資料集收編進 Tensorflow Federated (TFF),供大家輕鬆存取。** * **下載來源:** * **Google TFF 團隊** * https://www.tensorflow.org/federated/api_docs/python/tff/simulation/datasets * 提供 cifar100、emnist (=FEMNIST)、gldv2、shakespeare、stackoverflow * **LEAF** * [LEAF Github](https://github.com/TalwalkarLab/leaf) * 提供 FEMNIST、Sentiment140、Shakespeare、Celeba、Synthetic Dataset、Reddit * 其中 FEMNIST 及 Shakespeare 已被 Google 收編進 TFF dataset hub * **Flower** * 埋沒於Flower Github之中,有空再找... * **Dataset簡述:** * EMNIST: (Federated) Extend-MNIST,除了原先MNIST的10個手寫阿拉伯數字之外,又增加26個手寫英文字母(含大小寫)。