# Effective Python 2E ### Chapter 7: Concurrency and Parallelism **`concurrent.futures`** and Coroutines (Items 58--64) [David Ye](https://dwye.dev/) @ Houzz <aside class="notes"> 上半:items 52 to 57, subprocess, threading<br /> 下半:items 59 to 64, concurrent.futures, Coroutines<br /> item 58 沒啥重點所以跳過... </aside> --- ## Outline - [`concurrent.futures` (Item 59, 64)](#/2) - [Coroutines (Item 60, 61)](#/3) - [Mix `threading` and Coroutines (Items 62, 53)](#/4) --- ## `concurrent.futures` > provides a high-level interface for asynchronously executing callables. ### Item 59: Consider `ThreadPoolExecutor` When Threads Are Necessary for Concurrency ### Item 64: Consider `concurrent.futures` for True Parallelism ---- ```python= def slow_io(n): # an I/O bound func time.sleep(n) ``` ```python= from concurrent.futures import ThreadPoolExecutor def main_thread(): start = time.time() pool = ThreadPoolExecutor(max_workers=N_POOL) list(pool.map(slow_io, NUMBERS)) # exec jobs by threading print(f'Took {time.time() - start:.3f} seconds') ``` ```python= from concurrent.futures import ProcessPoolExecutor def main_process(): start = time.time() pool = ProcessPoolExecutor(max_workers=N_POOL) list(pool.map(slow_io, NUMBERS)) # exec jobs by multiprocessing print(f'Took {time.time() - start:.3f} seconds') ``` ```python= N_JOB = 4 N_POOL = 4 NUMBERS = [1 for _ in range(N_JOB)] main_thread() # Took 1.002 seconds main_process() # Took 1.082 seconds ``` <aside class="notes"> Slow I/O 的例子,ThreadPoolExecutor 可以簡單的開 thread,ProcessPoolExecutor 則是可以輕易開 sub process<br /> process 的 overhead 還是大一些 <br /> Thread 的部分有 Lock 的話一樣要自己處理,省略不講 </aside> ---- ```python= def gcd(pair): # CPU bounded a, b = pair low = min(a, b) for i in range(low, 0, -1): if a % i == 0 and b % i == 0: return i ``` ```python= N_JOB = 100 N_POOL = 4 # <= your cpu cores (6 cores for my Mac) NUMBERS = [(random.randint(100000, 900000), random.randint(100000, 900000)) for _ in range(N_JOB)] main_thread() # Took 2.127 seconds main_process() # Took 0.851 seconds ``` <aside class="notes"> 算最大公因數,thread 因為 GIL 所以不會真的平行跑<br /> 用ProcessPoolExecutor明顯快不少 </aside> --- ## Coroutines ### Item 60: Achieve Highly Concurrent I/O with Coroutines ```python= # Python 3.7+ import asyncio # defining a coroutine with "async" async def slow_io(): # do some i/o-blocking things ... async def main(): # create a coroutine coro = slow_io() # create_task will "run" the job ASAP task = asyncio.create_task(coro) # wait until "resolved" await task # create a new event loop to run asyncio.run(main()) ``` ```python= async def main(): # or await coroutine directly: await slow_io() ``` - `await` will wait an `awaitable` to completed - `awaitable`: coroutines, Tasks, and Futures (low-level) <aside class="notes"> 和 javascript 的 async await 超像的<br /> 唯一的差別是,coroutine 被呼叫的時候,不會立刻開始跑,要 await 或 create_task 才會開始跑 </aside> ---- #### Pros (over `threading`) - Lower costs: <= 1KB memory, low context switch cost, no more lock - simpler refactoring - coroutines raise errors (`threading` does not) ---- #### Gather Tasks ```python= N_TASK = 10 async def main(): coros = [slow_io() for _ in range(N_TASK)] # run many awaitables concurrently await asyncio.gather(*coros) asyncio.run(main()) ``` <aside class="notes"> Promise.all </aside> ---- ### Item 61: Know How to Port Threaded I/O to `asyncio` Guess number: - Given lower, upper - Closer than last guess -> WARNER, otherwise, COLDER ![](https://i.imgur.com/Gx5LPc8.png) <aside class="notes"> 他給了一個例子是 server 和 client 的溝通猜數字遊戲,先用 thread 版本寫,然後改寫成用 asyncio </aside> ---- #### Common: Connection Base ![](https://i.imgur.com/vNjJB1I.png) <aside class="notes"> 改寫原因:socket.connection -> asyncio streams<br /> 不過主要邏輯沒變,streams 就是 r/w 分開 </aside> ---- #### Server ![](https://i.imgur.com/5e1gSsK.png) <aside class="notes"> server 幾乎沒啥變 </aside> ---- ![](https://i.imgur.com/V5N98IC.png) <aside class="notes"> client 也沒啥變,只有 session 部分使用 asynccontextmanager, 之後要搭配 async with 使用<br /> https://docs.python.org/3/reference/datamodel.html#async-context-managers 需要 return awaitable </aside> ---- ![](https://i.imgur.com/q5vHruG.png) <aside class="notes"> 差異最大的部分,也算是因為 asyncio 提供了一些 high level 的 connection api,和開 Thread 不同<br /> 結果有 bug , 因為他不保證 server 會在 client 之前先起起來,因此加上 sleep 等他一下 www </aside> --- ## Mix `threading` and Coroutines ### Item 62: Mix Threads and Coroutines to Ease the Transition to `asyncio` step by step migrating - threads to be able to run coroutines - coroutines to be able to start and wait on threads. **Low-Level Warning** ---- #### Top Down - `loop = asyncio.get_event_loop()` - `loop.run_in_executor` ```python= async def run_tasks_mixed(handles, interval, output_path): loop = asyncio.get_event_loop() # get the event loop in this thread with open(output_path, 'wb') as output: async def write_async(data): output.write(data) def write(data): coro = write_async(data) future = asyncio.run_coroutine_threadsafe( coro, loop # similar to "Lock" ) future.result() # block until the "future" is complete tasks = [] for handle in handles: task = loop.run_in_executor( # Run in the default loop's executor # "tail_file" is the sync function None, tail_file, handle, interval, write ) tasks.append(task) # fan out await asyncio.gather(*tasks) # on top level asyncio.run(run_tasks_mixed(handles, 0.1, output_path)) ``` <aside class="notes"> 不外乎就是處理好 async 和 sync thread 的接口。 </aside> ---- #### Botton Up - `loop = asyncio.new_event_loop()` - `loop.run_until_complete` ```python= # button level # call the new async function in the sync wrapper def tail_file(handle, interval, write_func): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def write_async(data): write_func(data) # "tail_async" is the new async function coro = tail_async(handle, interval, write_async) # run until the "future" is complete loop.run_until_complete(coro) ``` ---- ### Item 63: Avoid Blocking the `asyncio` Event Loop to Maximize Responsiveness slow system call in main event loop can block the event loop! ```python= import time import asyncio N_TASK = 10 async def slow_coroutine(): time.sleep(0.1) async def fan_out(): coros = [slow_coroutine() for _ in range(N_TASK)] await asyncio.gather(*coros) start = time.time() asyncio.run(fan_out()) print(f'Took {time.time() - start:.3f} seconds') # Took 1.023 seconds ``` <aside class="notes"> time.sleep 跑的時候,不會把所有權交出去....,會一直等結果 </aside> ---- `asyncio.sleep` will ask the event loop to run other coroutine without blocking: ```python= async def slow_coroutine(): await asyncio.sleep(0.1) async def fan_out(): coros = [slow_coroutine() for _ in range(N_TASK)] await asyncio.gather(*coros) start = time.time() asyncio.run(fan_out()) print(f'Took {time.time() - start:.3f} seconds') # Took 0.107 seconds ``` https://stackoverflow.com/questions/56729764/ ---- use debug mode to find slow coroutine ```python= asyncio.run(fan_out(), debug=True) >>> Executing <Task finished name='Task-2' coro=<slow_coroutine() done, ...> took 0.101 seconds Executing <Task finished name='Task-3' coro=<slow_coroutine() done, ...> took 0.105 seconds Executing <Task finished name='Task-4' coro=<slow_coroutine() done, ...> took 0.103 seconds Executing <Task finished name='Task-5' coro=<slow_coroutine() done, ...> took 0.105 seconds Executing <Task finished name='Task-6' coro=<slow_coroutine() done, ...> took 0.101 seconds Executing <Task finished name='Task-7' coro=<slow_coroutine() done, ...> took 0.100 seconds Executing <Task finished name='Task-8' coro=<slow_coroutine() done, ...> took 0.101 seconds Executing <Task finished name='Task-9' coro=<slow_coroutine() done, ...> took 0.100 seconds Executing <Task finished name='Task-10' coro=<slow_coroutine() done, ...> took 0.101 seconds Executing <Task finished name='Task-11' coro=<slow_coroutine() done, ...> took 0.100 seconds Took 1.025 seconds ``` A general solution: run in another thread. *(too many low-level stuffs so I skipped it)* --- # The End Thanks for listening! - [back to outline](#/1)
{"metaMigratedAt":"2023-06-17T00:28:46.750Z","metaMigratedFrom":"YAML","title":"Effective Python Chp 7-2: Concurrency and Parallelism","breaks":true,"slideOptions":"{\"height\":1000,\"width\":1500,\"theme\":\"white\"}","contributors":"[{\"id\":\"915f29e1-3f9c-4908-bbd4-a58795589e48\",\"add\":21166,\"del\":12477}]"}
    415 views