# Effective Python 2E
### Chapter 7: Concurrency and Parallelism
**`concurrent.futures`** and Coroutines (Items 58--64)
[David Ye](https://dwye.dev/) @ Houzz
<aside class="notes">
上半:items 52 to 57, subprocess, threading<br />
下半:items 59 to 64, concurrent.futures, Coroutines<br />
item 58 沒啥重點所以跳過...
</aside>
---
## Outline
- [`concurrent.futures` (Item 59, 64)](#/2)
- [Coroutines (Item 60, 61)](#/3)
- [Mix `threading` and Coroutines (Items 62, 53)](#/4)
---
## `concurrent.futures`
> provides a high-level interface for asynchronously executing callables.
### Item 59: Consider `ThreadPoolExecutor` When Threads Are Necessary for Concurrency
### Item 64: Consider `concurrent.futures` for True Parallelism
----
```python=
def slow_io(n): # an I/O bound func
time.sleep(n)
```
```python=
from concurrent.futures import ThreadPoolExecutor
def main_thread():
start = time.time()
pool = ThreadPoolExecutor(max_workers=N_POOL)
list(pool.map(slow_io, NUMBERS)) # exec jobs by threading
print(f'Took {time.time() - start:.3f} seconds')
```
```python=
from concurrent.futures import ProcessPoolExecutor
def main_process():
start = time.time()
pool = ProcessPoolExecutor(max_workers=N_POOL)
list(pool.map(slow_io, NUMBERS)) # exec jobs by multiprocessing
print(f'Took {time.time() - start:.3f} seconds')
```
```python=
N_JOB = 4
N_POOL = 4
NUMBERS = [1 for _ in range(N_JOB)]
main_thread() # Took 1.002 seconds
main_process() # Took 1.082 seconds
```
<aside class="notes">
Slow I/O 的例子,ThreadPoolExecutor 可以簡單的開 thread,ProcessPoolExecutor 則是可以輕易開 sub process<br />
process 的 overhead 還是大一些 <br />
Thread 的部分有 Lock 的話一樣要自己處理,省略不講
</aside>
----
```python=
def gcd(pair): # CPU bounded
a, b = pair
low = min(a, b)
for i in range(low, 0, -1):
if a % i == 0 and b % i == 0:
return i
```
```python=
N_JOB = 100
N_POOL = 4 # <= your cpu cores (6 cores for my Mac)
NUMBERS = [(random.randint(100000, 900000), random.randint(100000, 900000)) for _ in range(N_JOB)]
main_thread() # Took 2.127 seconds
main_process() # Took 0.851 seconds
```
<aside class="notes">
算最大公因數,thread 因為 GIL 所以不會真的平行跑<br />
用ProcessPoolExecutor明顯快不少
</aside>
---
## Coroutines
### Item 60: Achieve Highly Concurrent I/O with Coroutines
```python=
# Python 3.7+
import asyncio
# defining a coroutine with "async"
async def slow_io():
# do some i/o-blocking things
...
async def main():
# create a coroutine
coro = slow_io()
# create_task will "run" the job ASAP
task = asyncio.create_task(coro)
# wait until "resolved"
await task
# create a new event loop to run
asyncio.run(main())
```
```python=
async def main():
# or await coroutine directly:
await slow_io()
```
- `await` will wait an `awaitable` to completed
- `awaitable`: coroutines, Tasks, and Futures (low-level)
<aside class="notes">
和 javascript 的 async await 超像的<br />
唯一的差別是,coroutine 被呼叫的時候,不會立刻開始跑,要 await 或 create_task 才會開始跑
</aside>
----
#### Pros (over `threading`)
- Lower costs: <= 1KB memory, low context switch cost, no more lock
- simpler refactoring
- coroutines raise errors (`threading` does not)
----
#### Gather Tasks
```python=
N_TASK = 10
async def main():
coros = [slow_io() for _ in range(N_TASK)]
# run many awaitables concurrently
await asyncio.gather(*coros)
asyncio.run(main())
```
<aside class="notes">
Promise.all
</aside>
----
### Item 61: Know How to Port Threaded I/O to `asyncio`
Guess number:
- Given lower, upper
- Closer than last guess -> WARNER, otherwise, COLDER

<aside class="notes">
他給了一個例子是 server 和 client 的溝通猜數字遊戲,先用 thread 版本寫,然後改寫成用 asyncio
</aside>
----
#### Common: Connection Base

<aside class="notes">
改寫原因:socket.connection -> asyncio streams<br />
不過主要邏輯沒變,streams 就是 r/w 分開
</aside>
----
#### Server

<aside class="notes">
server 幾乎沒啥變
</aside>
----

<aside class="notes">
client 也沒啥變,只有 session 部分使用 asynccontextmanager, 之後要搭配 async with 使用<br />
https://docs.python.org/3/reference/datamodel.html#async-context-managers 需要 return awaitable
</aside>
----

<aside class="notes">
差異最大的部分,也算是因為 asyncio 提供了一些 high level 的 connection api,和開 Thread 不同<br />
結果有 bug , 因為他不保證 server 會在 client 之前先起起來,因此加上 sleep 等他一下 www
</aside>
---
## Mix `threading` and Coroutines
### Item 62: Mix Threads and Coroutines to Ease the Transition to `asyncio`
step by step migrating
- threads to be able to run coroutines
- coroutines to be able to start and wait on threads.
**Low-Level Warning**
----
#### Top Down
- `loop = asyncio.get_event_loop()`
- `loop.run_in_executor`
```python=
async def run_tasks_mixed(handles, interval, output_path):
loop = asyncio.get_event_loop() # get the event loop in this thread
with open(output_path, 'wb') as output:
async def write_async(data):
output.write(data)
def write(data):
coro = write_async(data)
future = asyncio.run_coroutine_threadsafe(
coro, loop # similar to "Lock"
)
future.result() # block until the "future" is complete
tasks = []
for handle in handles:
task = loop.run_in_executor(
# Run in the default loop's executor
# "tail_file" is the sync function
None, tail_file, handle, interval, write
)
tasks.append(task)
# fan out
await asyncio.gather(*tasks)
# on top level
asyncio.run(run_tasks_mixed(handles, 0.1, output_path))
```
<aside class="notes">
不外乎就是處理好 async 和 sync thread 的接口。
</aside>
----
#### Botton Up
- `loop = asyncio.new_event_loop()`
- `loop.run_until_complete`
```python=
# button level
# call the new async function in the sync wrapper
def tail_file(handle, interval, write_func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def write_async(data):
write_func(data)
# "tail_async" is the new async function
coro = tail_async(handle, interval, write_async)
# run until the "future" is complete
loop.run_until_complete(coro)
```
----
### Item 63: Avoid Blocking the `asyncio` Event Loop to Maximize Responsiveness
slow system call in main event loop can block the event loop!
```python=
import time
import asyncio
N_TASK = 10
async def slow_coroutine():
time.sleep(0.1)
async def fan_out():
coros = [slow_coroutine() for _ in range(N_TASK)]
await asyncio.gather(*coros)
start = time.time()
asyncio.run(fan_out())
print(f'Took {time.time() - start:.3f} seconds')
# Took 1.023 seconds
```
<aside class="notes">
time.sleep 跑的時候,不會把所有權交出去....,會一直等結果
</aside>
----
`asyncio.sleep` will ask the event loop to run other coroutine without blocking:
```python=
async def slow_coroutine():
await asyncio.sleep(0.1)
async def fan_out():
coros = [slow_coroutine() for _ in range(N_TASK)]
await asyncio.gather(*coros)
start = time.time()
asyncio.run(fan_out())
print(f'Took {time.time() - start:.3f} seconds')
# Took 0.107 seconds
```
https://stackoverflow.com/questions/56729764/
----
use debug mode to find slow coroutine
```python=
asyncio.run(fan_out(), debug=True)
>>>
Executing <Task finished name='Task-2' coro=<slow_coroutine() done, ...> took 0.101 seconds
Executing <Task finished name='Task-3' coro=<slow_coroutine() done, ...> took 0.105 seconds
Executing <Task finished name='Task-4' coro=<slow_coroutine() done, ...> took 0.103 seconds
Executing <Task finished name='Task-5' coro=<slow_coroutine() done, ...> took 0.105 seconds
Executing <Task finished name='Task-6' coro=<slow_coroutine() done, ...> took 0.101 seconds
Executing <Task finished name='Task-7' coro=<slow_coroutine() done, ...> took 0.100 seconds
Executing <Task finished name='Task-8' coro=<slow_coroutine() done, ...> took 0.101 seconds
Executing <Task finished name='Task-9' coro=<slow_coroutine() done, ...> took 0.100 seconds
Executing <Task finished name='Task-10' coro=<slow_coroutine() done, ...> took 0.101 seconds
Executing <Task finished name='Task-11' coro=<slow_coroutine() done, ...> took 0.100 seconds
Took 1.025 seconds
```
A general solution: run in another thread. *(too many low-level stuffs so I skipped it)*
---
# The End
Thanks for listening!
- [back to outline](#/1)
{"metaMigratedAt":"2023-06-17T00:28:46.750Z","metaMigratedFrom":"YAML","title":"Effective Python Chp 7-2: Concurrency and Parallelism","breaks":true,"slideOptions":"{\"height\":1000,\"width\":1500,\"theme\":\"white\"}","contributors":"[{\"id\":\"915f29e1-3f9c-4908-bbd4-a58795589e48\",\"add\":21166,\"del\":12477}]"}