--- title: 'High Performance Python (Part 1)' tags: Python disqus: hackmd --- High Performance Python (Part 1) == >[name=Cheng-Chin Chiang] [time= July 15, 2022] ## Table of Contents [TOC] ## Reference [High Performance Python](https://www.amazon.com/High-Performance-Python-Performant-Programming-ebook/dp/B087YTVL8F) by Micha Gorelick ![](https://i.imgur.com/ZKdvNdm.jpg) Please refer to the [code samples](https://github.com/mynameisfiber/high_performance_python_2e) on the GitHub. ## Python Project in the Beginning ### Set Python Enviroment and Packages Control For example, using [Miniconda](https://docs.conda.io/en/latest/miniconda.html). To install miniconda: 1. download `Miniconda3-py39_4.12.0-MacOSX-arm64.sh` (for Mac M1) 2. `$ shasum -a 256 Miniconda3-py39_4.12.0-MacOSX-arm64.sh` 3. `$ bash Miniconda3-py39_4.12.0-MacOSX-arm64.sh` To start conda enviroment: &nbsp;&nbsp;`$ conda` To deactivate conda environment: &nbsp;&nbsp;`$ conda deactivate ` To create a customized Python environment: &nbsp;&nbsp;`$ conda create -n py39 python=3.9` &nbsp;&nbsp;`$ conda activate py39` To install a package (for example `numpy`): &nbsp;&nbsp;`$conda search numpy` &nbsp;&nbsp;`$conda install numpy` ### Python IDEs You can refer to [ten best Python IDEs](https://www.stxnext.com/blog/best-python-ides-code-editors/) that are recommended by developers. For example, using [PyCharm](https://www.jetbrains.com/pycharm/) or [Visual Studio Code](https://code.visualstudio.com/). ## Prepare Testing Tools ### Use *pytest* to Check the Correctness Install it via the conda command: ``` $ conda install pytest ``` Write testing codes. For example: ```python= def test_some_fn(): """Check basic behaviours for our function""" assert some_fn(2) == 4 assert some_fn(1) == 1 assert some_fn(-1) == 1 ``` And run it: ``` $ pytest utility.py ``` See more applications on the [pytest home page](https://docs.pytest.org/en/7.1.x/). ### Use *time* Module ```python= import time t1 = time.time() ExpensiveCalculation() t2 = time.time() print(f"It took {t2 - t1} seconds") ``` ### Use *timeit* Module In the directory of the file [julia1.py](https://github.com/mynameisfiber/high_performance_python_2e/blob/master/02_profiling/julia1.py), we can execute the following command to plot the Julia set: ``` $ python -m timeit -n 5 -r 5 -s "import julia1" "julia1.calc_pure_python(True, 1000, 300)" ``` where `-s` is used to import the python file and module to be tested. `-n 5` means the number of loop for each test and `-r 5` means the number of repeated tests. ![](https://i.imgur.com/fE5drpw.png) ### Use Unix Timer ``` $ /usr/bin/time -p python julia1_nopil.py ``` ```bash= Length of x: 1000 Total elements: 1000000 calculate_z_serial_purepython took 3.1502881050109863 seconds real 3.49 user 3.35 sys 0.03 ``` ### Use *cProfile* `$ python -m cProfile -s cumulative julia1_nopil.py` ```bash= Length of x: 1000 Total elements: 1000000 calculate_z_serial_purepython took 4.452753782272339 seconds 36221995 function calls in 4.744 seconds Ordered by: cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 1 0.000 0.000 4.744 4.744 {built-in method builtins.exec} 1 0.012 0.012 4.744 4.744 julia1_nopil.py:1(<module>) 1 0.229 0.229 4.733 4.733 julia1_nopil.py:23(calc_pure_python) 1 3.622 3.622 4.453 4.453 julia1_nopil.py:9(calculate_z_serial_purepython) 34219980 0.831 0.000 0.831 0.000 {built-in method builtins.abs} 2002000 0.047 0.000 0.047 0.000 {method 'append' of 'list' objects} 1 0.004 0.004 0.004 0.004 {built-in method builtins.sum} 3 0.000 0.000 0.000 0.000 {built-in method builtins.print} 2 0.000 0.000 0.000 0.000 {built-in method time.time} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} 4 0.000 0.000 0.000 0.000 {built-in method builtins.len} ``` To visualize results, install `SnakeViz` package: ``` $ conda install snakeviz ``` And then run following command: ``` $ snakeviz profile.stats ``` ![](https://i.imgur.com/KMJ4vu7.png) It looks like the [flame graph](https://www.brendangregg.com/flamegraphs.html). ### Use `line_profiler` to Measure CPU-Bound The most powerful tool to find where the CPU-bound is. Can install it via the conda command: ``` $ conda install line_profiler ``` Add an annotation on the top of the function to measure its performance. For example: ```python= @profile def calculate_z_serial_purepython(maxiter, zs, cs): """Calculate output list using Julia update rule""" output = [0] * len(zs) for i in range(len(zs)): n = 0 z = zs[i] c = cs[i] while abs(z) < 2 and n < maxiter: z = z * z + c n += 1 output[i] = n return output ``` Then run it with the command: ``` $ kernprof -l -v julia1_lineprofiler.py ``` ```bash= Total time: 20.9474 s File: julia1_lineprofiler.py Function: calculate_z_serial_purepython at line 9 Line # Hits Time Per Hit % Time Line Contents ============================================================== 9 @profile 10 def calculate_z_serial_purepython(maxiter, zs, cs): 11 """Calculate output list using Julia update rule""" 12 1 2633.0 2633.0 0.0 output = [0] * len(zs) 13 1000001 161354.0 0.2 0.8 for i in range(len(zs)): 14 1000000 153136.0 0.2 0.7 n = 0 15 1000000 191255.0 0.2 0.9 z = zs[i] 16 1000000 166557.0 0.2 0.8 c = cs[i] 17 34219980 7902892.0 0.2 37.7 while abs(z) < 2 and n < maxiter: 18 33219980 6173072.0 0.2 29.5 z = z * z + c 19 33219980 5999649.0 0.2 28.6 n += 1 20 1000000 196864.0 0.2 0.9 output[i] = n 21 1 0.0 0.0 0.0 return output ``` ### Use `memory_profiler` to Measure Memory Usages Install it via the conda command: ``` $ conda install memory_profiler ``` Like the way of `line_profiler`, adding an annotation on the top of the function to measure its performance. For example: ```python= @profile def calculate_z_serial_purepython(maxiter, zs, cs): """Calculate output list using Julia update rule""" output = [0] * len(zs) for i in range(len(zs)): n = 0 z = zs[i] c = cs[i] while n < maxiter and abs(z) < 2: z = z * z + c n += 1 output[i] = n return output ``` Then run it with the command: ``` $ python -m memory_profiler julia1_memoryprofiler.py ``` ```bash= Filename: julia1_memoryprofiler.py Line # Mem usage Increment Occurences Line Contents ============================================================ 9 122.188 MiB 122.188 MiB 1 @profile 10 def calculate_z_serial_purepython(maxiter, zs, cs): 11 """Calculate output list using Julia update rule""" 12 129.828 MiB 7.641 MiB 1 output = [0] * len(zs) 13 140.047 MiB 0.000 MiB 537737 for i in range(len(zs)): 14 140.047 MiB 0.000 MiB 537737 n = 0 15 140.047 MiB 4.109 MiB 537737 z = zs[i] 16 140.047 MiB 4.109 MiB 537737 c = cs[i] 17 140.047 MiB 0.000 MiB 21587113 while n < maxiter and abs(z) < 2: 18 140.047 MiB 0.000 MiB 21049377 z = z * z + c 19 140.047 MiB 2.000 MiB 21049377 n += 1 20 140.047 MiB 0.000 MiB 537736 output[i] = n 21 return output ``` Or we can visualize the results by following commands: ``` $ mprof run julia1_memoryprofiler.py $ mprof plot ``` ![](https://i.imgur.com/dQRRFQC.png) ## How to Write High Quality Codes 節錄自原書: “編寫高性能程式只是讓成功的專案長期維持高性能的一個因素。比其提高解決方案的速度並且讓它更複雜,團隊的整體速度重要多了。團隊的整體速度有一些關鍵因素--好的結構、製作文件、除錯能力,以及共用的標準。 假設你有一個原型,你沒有對它進行徹底的測試,而且你的團隊也沒有檢查它。它看起來已經「夠好」了,並且被送至生產環境。因為你沒有用結構化的方式來編寫它,所以他缺乏測試程式與文件。突然之間,有一段引發慣性的程式碼需要別人支援,而管理層通常無法量化團隊的成本。 由於這個解決方案難以維護,大家往往彼之唯恐不及--它的架構沒有被重新整理過,也沒有可以協助團隊重構它得測試程式,沒有人喜歡碰到它,所以讓它運作的重責大任落在一個開發者身上。這可能會在壓力到來的時刻到至可怕的瓶頸,並且帶來重大的風險--如果那一位開發者離開專案怎麼辦? 這種開發形式通常會在管理層不了解難以維護的程式碼所導致的持續性慣性時出現。用長期的測試和文件來展示它可以協助團隊保持高效率,以及說服管理層分配時間來「清理」這段原型程式碼。 在研究環境中,經常有人用很爛的寫法建構許多Jupyter Notebook,並且用它來試驗許多想法以及各種資料組。他們總是想要在之後的階段再「把它寫好」,但是那個之後的階段從不會發生。最終,雖然他們得到可運作的成果,卻缺少可以重現結果和測試它的基礎設施,且大家對成果缺乏信心。這種情況的風險因素同樣很高,而且大家不太信任結果。 有一種通用的方法有很好的效果: ***讓它可以工作*** 先建立夠好的解決方案。先建構「拋棄式」的原型解決方案,用它設計更好的結構在第二版使用。先做一些規劃在開始寫程式絕對錯不了,否則你會花了一整個下午寫程式,只為了節省一小時的思考時間。這種情況稱為 Measure twice, cut once. ***讓它正確*** 借下來,你要加入一些穩健的測試程式,並且為它們編寫文件以及明確的重現指令,讓其他的團隊可以駕馭它。 ***讓它更快*** 最後,你可以把注意力放在分析、編譯或平行化上面,並使用既有的測試套件來確認這個新的、更快速的解決方案依然可以按預期工作。” ## How to Set Performance Test Environment - 停用 BIOS 內的 Turbo Boost - 停用作業系統覆寫 SpeedStep 的功能(如果你可以控制 BIOS,你可以在 BIOS 裡面找到它) - 只使用 AC 電力(永遠不使用電池電力) - 在進行實驗室停用背景工具,例如備份與 Dropbox - 執行多次實驗來取得穩定的數值 - 可能降為運行 level 1 (Unix),如此一來就沒有其他任務在運行 - 重新開機並重新執行實驗來再次確認結果 ## How to Improve CPU-boud (Parallel Calculations) ### Use `multiprocessing` It is usually used to solve the CPU-bound problem. For example, calculating $\pi$ with Monte Carlo method. We can use multi-process or multi-thread from this package and parallelize the calculation: ![](https://i.imgur.com/DXPDure.png) ```python= import random import time import argparse import os def estimate_nbr_points_in_quarter_circle(nbr_estimates): """Monte carlo estimate of the number of points in a quarter circle using pure Python""" print(f"Executing estimate_nbr_points_in_quarter_circle with {nbr_estimates:,} on pid {os.getpid()}") nbr_trials_in_quarter_unit_circle = 0 for step in range(int(nbr_estimates)): x = random.uniform(0, 1) y = random.uniform(0, 1) is_in_unit_circle = x * x + y * y <= 1.0 nbr_trials_in_quarter_unit_circle += is_in_unit_circle return nbr_trials_in_quarter_unit_circle if __name__ == "__main__": parser = argparse.ArgumentParser(description='Project description') parser.add_argument('nbr_workers', type=int, help='Number of workers e.g. 1, 2, 4, 8') parser.add_argument('--nbr_samples_in_total', type=int, default=1e8, help='Number of samples in total e.g. 100000000') parser.add_argument('--processes', action="store_true", default=False, help='True if using Processes, absent (False) for Threads') args = parser.parse_args() if args.processes: print("Using Processes") # <1> from multiprocessing import Pool else: print("Using Threads") # <2> from multiprocessing.dummy import Pool nbr_samples_in_total = args.nbr_samples_in_total # should be 1e8 nbr_parallel_blocks = args.nbr_workers # <3> pool = Pool(processes=nbr_parallel_blocks) nbr_samples_per_worker = nbr_samples_in_total / nbr_parallel_blocks print("Making {:,} samples per {} worker".format(nbr_samples_per_worker, nbr_parallel_blocks)) nbr_trials_per_process = [nbr_samples_per_worker] * nbr_parallel_blocks t1 = time.time() # <4> nbr_in_quarter_unit_circles = pool.map(estimate_nbr_points_in_quarter_circle, nbr_trials_per_process) pi_estimate = sum(nbr_in_quarter_unit_circles) * 4 / float(nbr_samples_in_total) print("Estimated pi", pi_estimate) print("Delta:", time.time() - t1) ``` ### Use `joblib` `joblib` is used to improve the multi-thread problem in the `multiprocessing` package. Since the main problem in Python parallel calculations is from its GIL (global interpreter lock). To install it, using the conda command ``` $ conda install joblib ``` A code example: ```python= import random import os import time import argparse # <1> from joblib import Parallel, delayed from pi_lists_parallel import estimate_nbr_points_in_quarter_circle if __name__ == "__main__": nbr_samples_in_total = int(1e8) nbr_parallel_blocks = 8 nbr_samples_per_worker = int(nbr_samples_in_total / nbr_parallel_blocks) print("Making {:,} samples per {} worker".format(nbr_samples_per_worker, nbr_parallel_blocks)) t1 = time.time() # <2> nbr_in_quarter_unit_circles = Parallel(n_jobs=nbr_parallel_blocks, verbose=1)(delayed(estimate_nbr_points_in_quarter_circle)(nbr_samples_per_worker) for sample_idx in range(nbr_parallel_blocks)) pi_estimate = sum(nbr_in_quarter_unit_circles) * 4 / float(nbr_samples_in_total) print("Estimated pi", pi_estimate) print("Delta:", time.time() - t1) ``` ### Use `Memory` from `joblib` to Cache Results For example, using the `@memory.cache` decorator: ```python= import random import os import time import argparse # <1> from joblib import Parallel, delayed from joblib import Memory from pi_lists_parallel import estimate_nbr_points_in_quarter_circle as estimate_nbr_points_in_quarter_circle_orig memory = Memory("./joblib_cache", verbose=0) # <2> @memory.cache def estimate_nbr_points_in_quarter_circle_with_idx(nbr_estimates, idx): print(f"Executing estimate_nbr_points_in_quarter_circle with {nbr_estimates} on sample {idx} on pid {os.getpid()}") nbr_trials_in_quarter_unit_circle = 0 for step in range(int(nbr_estimates)): x = random.uniform(0, 1) y = random.uniform(0, 1) is_in_unit_circle = x * x + y * y <= 1.0 nbr_trials_in_quarter_unit_circle += is_in_unit_circle return nbr_trials_in_quarter_unit_circle if __name__ == "__main__": nbr_samples_in_total = int(1e8) nbr_parallel_blocks = 8 nbr_samples_per_worker = int(nbr_samples_in_total / nbr_parallel_blocks) print("Making {:,} samples per {} worker".format(nbr_samples_per_worker, nbr_parallel_blocks)) t1 = time.time() # <3> # beware if you don't have a sample_idx, you cache the same result! nbr_in_quarter_unit_circles = Parallel(n_jobs=nbr_parallel_blocks)(delayed(estimate_nbr_points_in_quarter_circle_with_idx)(nbr_samples_per_worker, idx) for idx in range(nbr_parallel_blocks)) pi_estimate = sum(nbr_in_quarter_unit_circles) * 4 / float(nbr_samples_in_total) print("Estimated pi", pi_estimate) print("Delta:", time.time() - t1) ``` Another example, using `memory.cache(...)` function: ```python= import random import os import time import argparse # <1> from joblib import Parallel, delayed from joblib import Memory from pi_lists_parallel import estimate_nbr_points_in_quarter_circle as estimate_nbr_points_in_quarter_circle_orig # <2> memory = Memory("./joblib_cache", verbose=0) # <3> estimate_nbr_points_in_quarter_circle = memory.cache(estimate_nbr_points_in_quarter_circle_orig) if __name__ == "__main__": nbr_samples_in_total = int(1e8) nbr_parallel_blocks = 8 nbr_samples_per_worker = int(nbr_samples_in_total / nbr_parallel_blocks) print("Making {:,} samples per {} worker".format(nbr_samples_per_worker, nbr_parallel_blocks)) t1 = time.time() # <4> nbr_in_quarter_unit_circles = Parallel(n_jobs=nbr_parallel_blocks)(delayed(estimate_nbr_points_in_quarter_circle)(nbr_samples_per_worker) for idx in range(nbr_parallel_blocks)) pi_estimate = sum(nbr_in_quarter_unit_circles) * 4 / float(nbr_samples_in_total) print("Estimated pi", pi_estimate) print("Delta:", time.time() - t1) ``` ## How to Improve I/O bound In a working thread. We can switch the task to another (i.e., `yield` the thread) when waiting for a I/O process finished. Thus we can reduce the idle time in a working thread and effectively utilize the CPU time. It is also called `concurrency`. The differences between parallel and concurrency can refer to [Ref\[1\]](https://medium.com/mr-efacani-teatime/concurrency%E8%88%87parallelism%E7%9A%84%E4%B8%8D%E5%90%8C%E4%B9%8B%E8%99%95-1b212a020e30) and [Ref\[2\]](https://hackmd.io/@sysprog/concurrency/https%3A%2F%2Fhackmd.io%2F%40sysprog%2FS1AMIFt0D). ### Use `asyncio` By applying key words `async` and `await` in the code. For example, we send 10 requests to an URL and wait for responses: ```python= import aiohttp import asyncio def do_requests(session): return session.get('https://example.com') async def main(): # <1> async with aiohttp.ClientSession() as session: tasks = [] for _ in range(0, 10): tasks.append(do_requests(session)) # <2> results = await asyncio.gather(*tasks) for r in results: print('example.com =>', r.status) if __name__ == '__main__': # <3> asyncio.run(main()) ``` `asyncio` can also create a new thread by the syntax `asyncio.to_thread(...)` for a long waiting tasks. ```python= import asyncio import threading from time import sleep def hard_work(): print('thread id:', threading.get_ident()) sleep(10) async def do_async_job(): # <1> await asyncio.to_thread(hard_work) await asyncio.sleep(1) print('job done!') async def main(): task1 = asyncio.create_task(do_async_job()) task2 = asyncio.create_task(do_async_job()) task3 = asyncio.create_task(do_async_job()) # <2> await asyncio.gather(task1, task2, task3) asyncio.run(main()) ``` For more examples, see this [Ref](https://myapollo.com.tw/zh-tw/begin-to-asyncio/). ### Use `gevent` Generating semaphores `Semaphore(chunk_size)` to control number of tasks to be done. Creating future objects by `gevent.spawn(...)` and put them in `gevent.iwait(requests)` wait for tasks finished. For example, the crawler code: ```python= import random import string import urllib.error import urllib.parse import urllib.request from contextlib import closing import gevent from gevent import monkey from gevent.lock import Semaphore monkey.patch_socket() def generate_urls(base_url, num_urls): for i in range(num_urls): yield base_url + "".join(random.sample(string.ascii_lowercase, 10)) def download(url, semaphore): # <2> with semaphore: with closing(urllib.request.urlopen(url)) as data: return data.read() def chunked_requests(urls, chunk_size=100): """ Given an iterable of urls, this function will yield back the contents of the URLs. The requests will be batched up in "chunk_size" batches using a semaphore """ # <1> semaphore = Semaphore(chunk_size) # <3> requests = [gevent.spawn(download, u, semaphore) for u in urls] for response in gevent.iwait(requests): yield response def run_experiment(base_url, num_iter=1000): urls = generate_urls(base_url, num_iter) # <4> response_futures = chunked_requests(urls, 100) response_size = sum(len(r.value) for r in response_futures) return response_size if __name__ == "__main__": import time delay = 100 num_iter = 1000 base_url = f"http://127.0.0.1:8080/add?name=gevent&delay={delay}&" start = time.time() result = run_experiment(base_url, num_iter) end = time.time() print(f"Result: {result}, Time: {end - start}") ```