--- title: Pytorch 分布式系統 # 簡報的名稱 tags: LAB # 簡報的標籤 --- # 分布式系統 分布式系統是由多個獨立計算機或節點組成的計算系統,這些節點通過網絡進行通信和協作,共同完成一個或多個共享的任務。分布式系統的設計目標是提供高性能、可靠性、可擴展性和容錯性。 **簡單來說,就是你想訓練一份資料,我們可以把資料發到不同的電腦和顯卡上進行訓練,這個發送的過程就叫做分布 (數據在不同地方)** [Toc] > [name=DongDong] > [time=Mon, May 8, 2023 11:54 PM] > [color=#c82def] ## 為什麼要使用分布式系統並行訓練 1. 模型太大,GPU 放不下 2. 多塊 GPU 可以增加訓練速度 ## 常用的分布式方式 1. **Model Parallelism:** 將模型拆解成多個子 Module,將這些子 Module 分布到不同設備上面訓練 2. **Data Parallelism:** 將資料拆解成多個 sub data,將這些 sub data 分布到不同設備上面訓練 ## 並行訓練概念 我們先來介紹一下專有名詞 - process: 進程,是一個執行中的程序,它擁有獨立的記憶體空間,包含程序的代碼、數據和運行環境 - thread: 線程,進程中的執行單元,它是一個輕量級的執行緒,共享進程的記憶體空間和資源 - group: 進程組,進程的組別,默認為 1 - world_size: 並行數 - rank: 進程的 ID (0 通常表示主進程) - local_rank: 每個 node 的進程 ID (與 rank 獨立) - node: 節點,可以是物理節點 (機台) 或者虛擬機 (container or virtual machine) 看了定義是不是一頭霧水,我們來看個實際例子。假設我有兩台 server,第一台有兩張 GPU,第二台有四張 GPU ![](https://hackmd.io/_uploads/r1ENO2wEn.png) > [name=DongDong] 如上圖所示,rank 0 代表主進程,rank 代表進程的編號,local_rank 代表一個節點的進程編號,全局共有兩個節點,六個進程在運行 ## DDP 基本概念 DDP (DistributedDataParallel),假設我們有 N 張 GPU,當我們啟動 DDP 程序時,會同時啟動 N 個進程,每個進程都分配一張 GPU,我們可以在終端輸入 ```nvidia-smi``` 查看 GPU 使用情況 假設我們做的是 Data Parallelism,我們必須把資料分配到不同的 GPU 上面,我們可以透過 `torch.utils.data.distributed.DistributedSampler` 做到,其原理如下 假設我們有 5 個數據,要分配到兩個進程中,那麼我們會先將數據打亂,為了均分數據,我們將第一筆數據 (data 2) 複製一次,然後再把新的 6 個數據均分到兩個進程中 (每個進程 3 個數據) ![](https://hackmd.io/_uploads/HJAk_sOV2.png) > [name=DongDong] 訓練完成之後我們在把各個進程的數據梯度做加總更新一次參數 ## Pytorch 實戰 實戰的流程如下: 1. 初始化進程設置 2. 設置分布式模型 3. 設置分配資料方式 4. 開始執行進程 5. 銷毀進程組 ### 初始化進程設置 1. 查看裝置上有幾張 GPU ``` ngpus_per_node = torch.cuda.device_count() ``` 2. 要使用多進程設置,我們只需指定 `mp.spawn` 即可 `mp.spawn(fn, args, nprocs, join, daemon)` 參數 - fn: 進程要執行的程序 - nprocs: 有幾個進程 - join: 是否進入同一個進程組 - daemon: 是否創建守護進程 我們只需要簡單調用以下函數即可 ```python! def parse_args(): parser = argparse.ArgumentParser(description='PyTorch training script') parser.add_argument("--seed", type=int, default=42, help="random seed") parser.add_argument("--world_size", type=int, default=2, help="number of process") parser.add_argument("--rank", type=int, default=0, help="id of process") parser.add_argument("--multiprocessing_distributed", type=bool, default=True, help="use multi-gpus training or not") parser.add_argument("--dist_backend", type=str, default="nccl", help="the backend of the distributed") parser.add_argument("--dist_url", type=str, default="tcp://127.0.0.1:8009", help="the url of the distributed") args = parser.parse_args() return args def main(args): # some code ...... mp.spawn(main_worker, nprocs=world_size, args=(args,), join=True) ``` 該程式會開啟 nprocs 個進程,每個進程去執行 main_worker 函數的任務,args 作為每個 main_worker 的傳入參數。所以我們只需要在 main_worker 寫入每次訓練以及推理的代碼即可 在每一個進程要執行的程序中,我們必須透過函數 `torch.distributed.init_process_group` 初始化進程組 `torch.distributed.init_process_group(backend, init_method, timeout, world_size, store, group_name)` 參數 - init_method: 指定與後端通信方式 (tcp) - world_size: node 進程數量 - rank: 進程 ID - timeout: 連線等待時間 (建議設置 30s) - backend: 指定通信後端 (通常使用 nccl) - group_name: 進程名 ```python! def main_worker(gpu, args): # If your are not specify init_method, you have to define the following # os.environ['MASTER_ADDR'] = '127.0.0.1' # os.environ['MASTER_PORT'] = '2500' args.gpu = gpu if args.gpu is not None: logger.log("Use GPU %s" % args.gpu, "INFO") if args.distributed: if args.multiprocessing_distributed: args.rank = args.rank * ngpus_per_node + args.gpu torch.distributed.init_process_group( backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank, timeout=timedelta(seconds=30) ) ``` #### TCP 初始化 `init_method` 可以指定 tcp 模式,並且每個進程的 IP 位置和 port 必須一致 通訊地址為 `tcp://ip:port`,例如我們可以設置為 `tcp://127.0.0.1:8009` ### 設置分布模型 由於 `torch.distributed` 把函數包裝的很高級,不會讓使用者看到冗長的數據處理代碼,所以只需要以下一行就可以設置分部模型 ```python! model = DDP(model, device_ids=[args.rank], output_device=args.rank) ``` :::info 1. 有些優化器是基於模型參數的,DDP 會稍微影響優化器,所以優化器必須在 DDP 之後創建 2. 在 DDP 進程中,BN 層在計算 moving mean 和 moving variance 時,只用到主進程的 mean 和 variance,所以我們必須用 SyncBN 去解決 ::: ### 設置分配資料方式 在創建 dataloader 過程中,我們必須指定 sampler 為 `DistributedSampler` ```python! def get_dataloader(base_root, csv_name, batch_size, img_size, collate_fn): train_transform = get_transform(train=True, img_size=img_size, rotate_degree=10) val_transform = get_transform(train=False, img_size=img_size, rotate_degree=10) df_train, df_valid, df_test = split(base_root, csv_name) traindSet = CXRDataset(df_train, train_transform, False) validSet = CXRDataset(df_valid, val_transform, False) testSet = CXRDataset(df_test, val_transform, False) train_sampler=torch.utils.data.distributed.DistributedSampler(traindSet) val_sampler=torch.utils.data.distributed.DistributedSampler(validSet) test_sampler=torch.utils.data.distributed.DistributedSampler(testSet) trainloader = torch.utils.data.DataLoader(traindSet, batch_size=batch_size, sampler=train_sampler, collate_fn=collate_fn) validloader = torch.utils.data.DataLoader(validSet, batch_size=batch_size, sampler=val_sampler, collate_fn=collate_fn) testloader = torch.utils.data.DataLoader(testSet, batch_size=batch_size, sampler=test_sampler, collate_fn=collate_fn) return trainloader, validloader, testloader ``` ### 開始執行進程 這部分的代碼就跟你平常訓練 Pytorch 程式一樣。值得注意的一點是,我們在打印模型訓練結果以及儲存模型時,我們只需要在主進程中進行打印以及儲存即可 ```python! pass ``` ### 銷毀進程組 我們只要調用方法 ```python! destory_process_group() ``` 就可以銷毀當前進程組 ## 分布系統進階概念 接下來我們會介紹分布式系統的進階概念 ### 隨機種子 從上面的例子來看,每一個進程都會執行同一個任務,當我們在任務中指定隨機種子時,要是隨機種子的值是固定的,這會導致不同進程使用同一個隨機種子,如 ```python! def main_worker(args): seed_everything(42) ``` 在上述的例子中,每一個進程都會調用 main_worker 任務,但是他們都使用了 42 號種子,我們可以稍加修改上述代碼 ```python! def main_worker(args): seed_everything(42 + args.rank) ``` 用進程的唯一 ID 去讓每個進程都拿到不同的隨機種子,這樣就可以保證進程之間隨機種子是不同的 ### DistributedSampler pytorch 讀取資料的流程如下: 1. 隨機 shuffle 數據 (如果 shuffle 為真) 2. 透過 Sampler 抽出數據 indices (DistributedSampler 保證不同進程之間數據 indices 基本上不重複) 3. 將 indices 傳入 Dataset 得到回傳值 (`__getitem__` 方法) 4. 蒐集一個 batch 的回傳值 5. 對一個 batch 資料做 collate_fn 操作 在上述過程中,DistributedSampler 必須保證不同進程之間 shuffle 後的數據一致。所以我們必須為 DistributedSampler 設置一個隨機種子 (不同進程之間要用同一個隨機種子)。然而官方對於 DistributedSampler 使用的隨機種子為當前的 epoch 數 ```python! trainloader.sampler.set_epoch(epoch) ``` ### 不同進程之間怎麼通信的? 不同進程之間是透過一種叫做 Ring-Reduce 的方式通信的,這個大家有興趣在去查。分布式系統將數據發送到不同進程上進行處理 (reduce),然後再從各個進程上回收數據,然後再發送,周而復始。 **所謂的 reduce 指的是發送數據,處理指的是加總**。我們常常看到的 `all_reduce` 方法其實只是把 reduce 完的數據發送回各個進程而已。舉個實際的例子: pytorch 在進行分布式訓練時是使用 `all_reduce` 蒐集每個進程中的梯度 ,然後做加總,最後在廣播到每個進程中,以保證每個進程使用的都是同一個模型 (此行為在使用 loss.backward() 時執行) ### DDP vs Gradient Accumulation 假設我們有 N 個進程,DDP 會把數據分成 N 等分,然後進行 `all_reduce` 計算,這相當於把 batch size 放大 N 倍,然後做梯度累加,最後再更新參數 理論上,以下兩種操作應該要相等 1. N 次的 Gradient Accumulation 2. N 進程的 DDP 訓練 ### 蒐集各進程數據 當我們要對模型做 evaluation 時,我們最好是對整個資料做 evaluation,這時候我們可以透過 `all_gather` 蒐集各個進程中的數據,具體用法如下 ```python! # For distributed parallel, collect all data and then run metrics. if torch.distributed.is_initialized(): logits_gather_list = [torch.zeros_like(test_pred) for _ in range(ngpus_per_node)] torch.distributed.all_gather(logits_gather_list, test_pred) logits = torch.cat(logits_gather_list, dim=0) targets_gather_list = [torch.zeros_like(test_true) for _ in range(ngpus_per_node)] torch.distributed.all_gather(targets_gather_list, test_true) targets = torch.cat(targets_gather_list, dim=0) ``` ### 控制進程順序 通常,每個進程處理速度基本不會一致,因此我們必須管理進程之間的順序。大部分的任務,我們只需要再主進程中進行即可 (如打印、儲存模型)。或者是我們希望進程執行到某個地方時,要等全部進程執行完才繼續往前走,這時我們可以透過 `barrier` 方法設置進程的屏障,方法如下: ```python! torch.distributed.barrier() ``` ## 參考資料 [https://zhuanlan.zhihu.com/p/178402798](https://zhuanlan.zhihu.com/p/178402798) [https://zhuanlan.zhihu.com/p/187610959](https://zhuanlan.zhihu.com/p/187610959) [https://zhuanlan.zhihu.com/p/250471767](https://zhuanlan.zhihu.com/p/250471767) [https://zhuanlan.zhihu.com/p/402198819](https://zhuanlan.zhihu.com/p/402198819) [https://murphypei.github.io/blog/2020/09/pytorch-distributed](https://murphypei.github.io/blog/2020/09/pytorch-distributed) [網址太長了,用這個代替](https://www.zhihu.com/question/453920336#:~:text=word%20size%20%EF%BC%9A%20%E5%85%A8%E5%B1%80%EF%BC%88%E4%B8%80%E4%B8%AA%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%EF%BC%89%E4%B8%AD%EF%BC%8Crank%E7%9A%84%E6%95%B0%E9%87%8F%E3%80%82%20%E4%B8%8A%E4%B8%80%E4%B8%AA%E8%BF%90%E7%AE%97%E9%A2%98%EF%BC%9A,%E6%AF%8F%E4%B8%AAnode%E5%8C%85%E5%90%AB16%E4%B8%AAGPU%EF%BC%8C%E4%B8%94nproc_per_node%3D8%EF%BC%8Cnnodes%3D3%EF%BC%8C%E6%9C%BA%E5%99%A8%E7%9A%84node_rank%3D5%EF%BC%8C%E8%AF%B7%E9%97%AEword_size%E6%98%AF%E5%A4%9A%E5%B0%91%EF%BC%9F%20%E7%AD%94%E6%A1%88%EF%BC%9Aword_size%20%3D%203%2A8%20%3D%2024)