--- ### [python - speed up] - [[python - speed up]利用Concurrency加速你的 Python 程式執行效率](https://hackmd.io/@YungHuiHsu/SJ5EgB5eT) - [[python - speed up] process(程序/進程) 與 thread (執行緒/線程)](https://hackmd.io/@YungHuiHsu/SJCIdO5x6) - [[python - speed up] 多執行緒(multi-threading)的基本概念](https://hackmd.io/@YungHuiHsu/r1C9Akpgp) - [python - speed up] threading 模組的使用(待補) - [python - speed up] multiprocessing模組的各種使用方法(待補) - [[python - speed up] multiprocessing模組的Pool功能使用範例。multiprocessing.Pool Sample Notes](https://hackmd.io/@YungHuiHsu/BJiMZLKxp) --- ![](https://hackmd.io/_uploads/r1lOnLtl6.png) https://datanoon.com/blog/multiprocessing_in_python/ ![](https://hackmd.io/_uploads/ryjbnUFxT.png) https://www.educba.com/python-multiprocessing/ `multiprocessing.Pool` 是 python `multiprocessing` 模塊下的一個類別,它提供一種便利的方式來平行化執行函數的多個不同版本。當你有一個函數需要對一個輸入數組(或任何可迭代的對象)中的每個項目進行操作,並且這些操作之間==沒有依賴關係==時,這將特別有用。 ### 原理 當你創建一個 `Pool` 對象並且通過 `map` 或 `apply` 等方法提交任務時,`Pool` 將會創建多個工作程序,並且平行地對輸入的資料進行處理。它通常會嘗試將資料分割成多個塊(chunk),並將每個塊分配給一個程序去處理。每個程序(process)運行在其自己的Python interpreter中,並且運行在其自己的全局變數空間。 在每個子程序中,對函數的調用和資料的操作都是獨立的。當所有的程序都完成了他們的任務後,結果會被收集並返回給主程序。 - Python Multiprocessing Pool Life-Cycle ![](https://hackmd.io/_uploads/H18o3IYxp.png =300x) https://superfastpython.com/multiprocessing-pool-python/ ### 適合的使用情境 :::info 1. **計算密集型任務**:`Pool` 適合於計算密集型的任務,因為它允許Python利用多核CPU。 2. **資料平行處理**:當有一個可以獨立處理資料塊的任務,並且不同資料塊間沒有依賴關係時。 3. **重複任務的快速執行**:當你有一個任務需要對一個資料集合的每個元素重複執行,並且這些執行可以平行化進行。 ::: ### 範例 #### Example_1 simple case ```python=! from multiprocessing import Pool def square(n): return n * n if __name__ == "__main__": inputs = [1, 2, 3, 4, 5] with Pool(2) as p: results = p.map(square, inputs) print(results) # output # [1, 4, 9, 16, 25] ``` #### Example_2 - 建立資料集 ```python=! import pandas as pd import numpy as np from multiprocessing import cpu_count, Pool # Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz 2.11 GHz print(cpu_count()) # 8 # Create a sample dataset data = pd.DataFrame(np.random.randint(0, 100, (1000000, 4)), columns=list('ABCD')) print(data.info()) # RangeIndex: 1000000 entries, 0 to 999999 # Data columns (total 4 columns): # A 1000000 non-null int64 # B 1000000 non-null int64 # C 1000000 non-null int64 # D 1000000 non-null int64 # dtypes: int64(4) # memory usage: 30.5 MB ``` - 直接運算 - excute time : 2min38.8s ```python! # Direct computation start_time = time.time() # Start measuring execution time result_direct = data.apply(np.sum, axis=1) end_time = time.time() # End measuring execution time execution_time_direct = end_time - start_time print(f"Execution time without Pool: {execution_time_direct:.2f}s") # Execution time without Pool: 73.6s ``` - 使用Pool進行平行運算 ```py! # Using Pool for parallel processing def func(df): return df.apply(np.sum, axis=1) def parallelize_dataframe(df, func): df_split = np.array_split(df, cpu_count()) with Pool(cpu_count()) as pool: start_time = time.time() # Start measuring execution time result = pd.concat(pool.map(func, df_split)) end_time = time.time() # End measuring execution time execution_time = end_time - start_time return result, execution_time result_parallel, execution_time_parallel = parallelize_dataframe(data, func) print(f"Execution time with Pool: {execution_time_parallel:.2f}s") # Execution time with Pool: 35.5s ``` 達到2.1倍的加速效果!:rocket: ![](https://hackmd.io/_uploads/r1VPXc9g6.png =250x) :::spoiler plot ``` # Calculate speedup speedup = execution_time_direct / execution_time_parallel print(f"Speedup: {speedup:.1f}x") # Set the figure size plt.figure(figsize=(4, 3)) # Create a bar chart plt.bar(['Direct', 'Parallel'], [execution_time_direct, execution_time_parallel]) plt.ylabel('Execution Time (s)') plt.title('Execution Time Comparison') plt.text(0, execution_time_direct - 10, f"{execution_time_direct:.1f}s", ha='center', color='white') plt.text(1, execution_time_parallel - 10, f"{execution_time_parallel:.1f}s", ha='center', color='white') # Show the speedup as text plt.text(1, execution_time_parallel/3, f"{speedup:.2f}x", ha='center', color='red') # Adjust the font position to avoid overlap plt.tight_layout() # Display the plot plt.show() ``` ::: ### `pool.map(main_map, inputs)` 運作流程拆解 當程式碼運行到 `pool_outputs = pool.map(main_map, inputs)` 時,背後發生了什麼呢?讓我們按步驟拆分來深入理解這個過程。 1. **資料分割**: - `pool.map()` 將 `inputs` 列表中的資料分割成幾個資料子集(chunks)。每個部分會被分發給池中的一個程序(Process)進行處理。分割數量通常與我們設定的程序數量相同(或者至少與它相關)。 - 每個程序會運行在自己的Python解釋器實例中,並在自己的GIL(全局解釋器鎖)的影響下運行。 2. **任務分發**: - 程序池(`Pool`)中的每個程序被分配了一個任務,即計算 `main_map` 函數並傳入分割後的資料。換句話說,每個程序都會執行 `main_map(i)` 函數,其中 `i` 是 `inputs` 列表的一部分。 3. **並行運算**(Concurrency): - 所有程序並行(同時)運行,每個程序處理分配給它的資料塊。這一步的效率取決於系統的CPU核心數量和當前系統負載。 4. **結果收集**: - 一旦所有程序完成它們的計算任務,`pool.map()` 會收集所有程序返回的結果並將它們組合成一個列表。 5. **返回結果**: - 最終,`pool.map()` 返回一個列表,其中包含了從每個程序收集到的結果,並將該列表賦值給 `pool_outputs` 變量。 #### 決定 `Pool` 數量的考慮因素 將 `Pool` 的數量設定為何值是一個平衡性的考慮。以下是需要考慮的要點: 1. **CPU核心數量**: - 通常來說,你應該將程序池的大小(即程序的數量)設置為你的系統的CPU核心數量。這樣可以最大化利用多核心進行平行運算。 2. **I/O密集 vs. CPU密集**: - I/O密集型任務 - 例如文件讀寫、網絡通信等),增加更多的程序可能會有助於改善性能,即使程序數量超過了CPU核心數量,因為當某些程序等待I/O操作完成時,其他程序可以運行。 - 任務是CPU密集型任務 - 例如數學、矩陣運算),則通常將程序數量設定為CPU核心數量是一個好的開始,因為添加更多的程序不太可能帶來性能的提升。 3. **系統資源**: - 你也需要考慮系統的內存使用情況。如果每個程序都消耗大量的內存,那麼創建過多的程序可能會耗盡系統資源,從而導致性能下降。 4. **任務特點**: - 確保考慮到你的任務的特點和需求。某些任務可能更適合於多線程而非多程序(例如,如果它們需要密集的資料共享或通信)。 通常情況下,進行一些實驗,並觀察不同 數量對性能的影響,可以幫助找到最佳的數值 #### 注意事項: - **資料的序列化和傳輸**:由於每個程序運行在自己的Python解釋器實例中,資料需要在程序之間進行傳輸。這通過序列化(通常使用pickle模組)將資料轉換為可以在程序之間安全傳輸的格式來完成。 - **物理核心的影響**:`multiprocessing.Pool` 的理想程序數量通常與系統的物理核心數量相匹配。這是因為每個程序對應到一個物理核心允許平行運算,而避免過多的上下文切換。不過,在I/O密集型任務中,即使程序數量多於核心數量,由於任務會經常等待I/O操作(例如網絡通信、硬碟讀寫等),使用更多的程序仍然可以增加系統吞吐量。 這個過程允許 `multiprocessing.Pool` 在多個程序之間分發任務,從而充分利用多核處理器的能力,特別適用於CPU密集型任務的平行化。 - multiprocessing.Pool的Concurrency與Parallelism 定義見[Concurrency(並發)和Parallelism(並行)](https://hackmd.io/6ey5sjDhS2awiFaKPvrziQ) :::info 當在多核心的機器上使用 multiprocessing.Pool 並分配多個工作進程時,是在做 Parallelism(平行),因為子任務是物理上同時在不同的處理器核心上運行的。 但在更高的層面,當你將一個大任務拆分成多個小任務,並使用Pool來管理這些任務和處理結果時,這個過程也涉及到 Concurrency(並行) 的概念,因為你設計程式的方式使得多個任務能夠在邏輯上同時進行處理。 因此,multiprocessing.Pool 可以涵蓋這兩個概念,取決於從哪個角度看它。它通過允許並行執行多個工作進程來實現平行(Parallelism),同時它也通過允許用戶編寫程式來處理多個任務,並且在邏輯上同時對它們進行管理和排程,來實現並行(Concurrency)。 ::: - 補充 `concurrent` 模組 在python [multiprocessing官方文件](https://docs.python.org/zh-tw/3/library/multiprocessing.html)中,提供了另一個高階 API的選擇 :::info `concurrent.futures.ProcessPoolExecutor 提供了一個更高層級的接口用來將任務推送到後台進程而不會阻塞調用方進程的執行。 與直接使用 Pool 接口相比,concurrent.futures API 能更好地允許將工作單元發往無需等待結果的下層進程池。 ::: ## Reference - [2017。louie_lu。深入 GIL: 如何寫出快速且 thread-safe 的 Python - Grok the GIL: How to write fast and thread-safe Python](https://blog.louie.lu/2017/05/19/%E6%B7%B1%E5%85%A5-gil-%E5%A6%82%E4%BD%95%E5%AF%AB%E5%87%BA%E5%BF%AB%E9%80%9F%E4%B8%94-thread-safe-%E7%9A%84-python-grok-the-gil-how-to-write-fast-and-thread-safe-python/) - [2019。datanoon.com。MULTIPROCESSING IN PYTHON](https://datanoon.com/blog/multiprocessing_in_python/) - [2022。JASON BROWNLEE(superfastpython.com)。Python Multiprocessing Pool: The Complete Guide](https://superfastpython.com/multiprocessing-pool-python/) - [2023。Priya Pedamkar(educba.com)。Python Multiprocessing](https://www.educba.com/python-multiprocessing/)