# Python Coroutine
[TOC]
> Coroutines are computer program components that allow execution to be suspended and resumed, generalizing subroutines for cooperative multitasking.
> Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes.
> - [wiki - Coroutine](https://en.wikipedia.org/wiki/Coroutine)
# Synchronous
一般不涉及 I/O 的 function, 如 `foo1()`, `foo2()` 會依序執行, 簡單
`sync.py`
```python=
#!/usr/bin/env python3
def foo1():
print('foo1')
def foo2():
print('foo2')
def main():
foo1()
foo2()
if __name__ == '__main__':
main()
```
```shell=
python sync.py
foo1
foo2
```
# Synchronous - I/O
當涉及 I/O, 若仍依序執行就會需要長時間等待, 下面用 `load_data()` 模擬長時間讀取檔案的情況
:::spoiler
這邊先用 synchronous 的方式實作 `sleep()`, 等後面介紹 asynchronous 再陸續改善
:::
`lib/sync.py`
```python=
import time
def timeit(func):
def wrapper(*args, **kwargs):
start_time = time.time()
print(f"Starting {time.ctime(start_time)}")
result = func(*args, **kwargs)
end_time = time.time()
print(f"Ending {time.ctime(end_time)}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
return result
return wrapper
def sleep(secs):
now = time.time()
while time.time() < now + secs:
pass
def load_data(file, secs=5):
sleep(secs)
```
`sync.v1.py`
```python=
#!/usr/bin/env python3
from lib import sync
def main():
print('Loading data')
sync.load_data('a.txt')
print('Data loaded')
if __name__ == '__main__':
main()
```
執行後, 我們可以立刻看見 `Loading data`, 但需要等約 5 秒才能看見 `Data loaded`
```shell=
python sync.v1.py
Loading data 👈 print immediately
Data loaded 👈 print after ~5 seconds
```
# Synchronous - blocking
雖然上面 `load_data` 的作法 *執行上* 沒有問題: 我們確實把檔案讀進來了
但是如果我們有其他事情要處理呢?
假設我們要讀兩個檔案
`sync.v2.py`
```python=
#!/usr/bin/env python3
from lib import sync
def load_a():
sync.load_data('a.txt')
print('a.txt loaded')
def load_b():
sync.load_data('b.txt')
print('b.txt loaded')
def main():
print('Loading data')
load_a()
load_b()
print('Data loaded')
if __name__ == '__main__':
main()
```
這次我們依序讀了兩個檔案, 且兩次讀取都需要約 5 秒, 所以總共需要 10 秒
```shell=
Loading data 👈 print immediately
a.txt loaded 👈 print after ~5 seconds
b.txt loaded 👈 print after ~5 seconds
Data loaded 👈 print after ~10 seconds
```
TODO: 圖
可以看到, 由於我們是等 `a.txt` 讀取後才開始讀取 `b.txt`, 所以需要等 10 秒
若有更多的檔案, 或是更多的 I/O 要處理呢?
這樣處理的時間就會是所有 I/O 時間的總和
甚至在比較複雜的情境中, I/O 也會擋住其他任務的執行
那麼, 要怎麼改善呢?
# Parallelism
一種方式是透過平行化 (parallelism), 讓每個任務能夠同時執行
以下介紹兩種平行化的方式: multiprocessing, multithreading
## Multiprocessing
我們讓每個任務都起一個新的 process 處理
可以用 python 的 std [multiprocessing](https://docs.python.org/3/library/multiprocessing.html)
下面將執行所花費的時間印出方便觀察
`sync.v3.py`
```python=
#!/usr/bin/env python3
from multiprocessing import Pool
from lib import (sync, timeit)
def load_data(file):
sync.load_data(file)
print(f"{file} loaded")
@timeit
def main():
print('Loading data')
with Pool(2) as p: # 2 processes to reduce overhead
p.map(load_data, ['a.txt', 'b.txt'])
print('Data loaded')
if __name__ == '__main__':
main()
```
可以看到僅執行了約 5 秒
```shell=
Starting Mon Jul 22 14:03:11 2024
Loading data
a.txt loaded
b.txt loaded
Data loaded
Ending Mon Jul 22 14:03:16 2024
Time taken: 5.20 seconds
```
:::info
`multiprocessing` 的文件中有提到可以用 `concurrent.futures.ProcessPoolExecutor` 取代
後面講到 coroutine 時再介紹
> See also [concurrent.futures.ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) offers a higher level interface to push tasks to a background process without blocking execution of the calling process. Compared to using the Pool interface directly, the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) API more readily allows the submission of work to the underlying process pool to be separated from waiting for the results.
:::
## Multithreading
Multiprocessing 外, 另一種平行化的方式是 multithreading
由於 python (特指 CPython) 受限於 Global Interpreter Lock (GIL), 每次只會有一個 thread 拿到 lock 並執行
所以 multithreading 無法做到 *真正的* 平行化
其背後實際上是 Cooperative Multitasking 架構
因此本文不另外介紹, 有興趣請參考 [深入 GIL: 如何寫出快速且 thread-safe 的 Python – Grok the GIL: How to write fast and thread-safe Python](https://blog.louie.lu/2017/05/19/%E6%B7%B1%E5%85%A5-gil-%E5%A6%82%E4%BD%95%E5%AF%AB%E5%87%BA%E5%BF%AB%E9%80%9F%E4%B8%94-thread-safe-%E7%9A%84-python-grok-the-gil-how-to-write-fast-and-thread-safe-python/)
## Summary
雖然 multiprocessing 和 multithreading 都能將多個任務平行處理, 但也有各自的 overhead
Multiprocessing
- CPython 用 [fork](https://man7.org/linux/man-pages/man2/fork.2.html) 和 [exec](https://man7.org/linux/man-pages/man3/exec.3.html) 實做
- `fork` 會複製當前的 process 給 child process, 產生額外的 overhead
- Kernel-level context switch
Multithreading
- CPython 用 [pthread](https://man7.org/linux/man-pages/man7/pthreads.7.html) 實作
- 和當前 process 共用 memory space
- kernel-level context switch
- 理論上比 process context switch 快
雖然我們用平行化縮短了執行時間, 但是並未解決 I/O blocking 的問題
# Asynchronous
Process-based, thread-based, generator-based, coroutine
## Process-based
和 multiprocessing 類似, 我們可以 fork 出一個 process 執行 I/O 任務
等待結果時執行其他任務, I/O 結束後再繼續執行
`sync.v4.py`
```python=
#!/usr/bin/env python3
from multiprocessing import Process
from lib import (sync, timeit)
def load_data(file):
p = Process(target=sync.load_data, args=(file,))
p.start()
print(f"{file} loaded")
return p
@timeit
def main():
print('Loading data')
p1 = load_data('a.txt')
p2 = load_data('b.txt')
print('Do other tasks')
p1.join()
print('a.txt done')
p2.join()
print('b.txt done')
print('Data loaded')
if __name__ == '__main__':
main()
```
缺點是需要自己 join
```shell=
Starting Mon Jul 22 16:27:03 2024
Loading data
a.txt loaded
b.txt loaded
Do other tasks
a.txt done
b.txt done
Data loaded
Ending Mon Jul 22 16:27:08 2024
Time taken: 5.14 seconds
```
## Thread-based
和 process-based 類似, 但不需要複製當前的 process, 所以理論上會快一些
`sync.v5.py`
```python=
#!/usr/bin/env python3
from threading import Thread
from lib import (sync, timeit)
def load_data(file):
t = Thread(target=sync.load_data, args=(file,))
t.start()
print(f"{file} loaded")
return t
@timeit
def main():
print('Loading data')
t1 = load_data('a.txt')
t2 = load_data('b.txt')
print('Do other tasks')
t1.join()
print('a.txt done')
t2.join()
print('b.txt done')
print('Data loaded')
if __name__ == '__main__':
main()
```
結果如預期比 process-based 快一點點
```shell=
Starting Mon Jul 22 16:45:03 2024
Loading data
a.txt loaded
b.txt loaded
Do other tasks
a.txt done
b.txt done
Data loaded
Ending Mon Jul 22 16:45:08 2024
Time taken: 5.04 seconds
```
缺點是需要自己 join threads, 且即使有 GIL, 也可能寫出 thread-unsafe function
## Generator-based
### Generator
Python 提供 generator function 讓我們可以暫停 function 的執行並轉移控制權給 caller, 之後再從暫停的地方繼續執行
以 `fib()` generator function 為例, 我們印出每個 fibonacci 的數值
```python=
def fib(n):
a, b = 1, 0
for _ in range(n):
a, b = b, a + b
yield b
def main():
n = 5
gen = fib(n)
while True:
try:
print(next(gen))
except StopIteration:
break
```
```shell=
1
1
2
3
5
```
這個例子是從 `fib` generator function 將每次的結果回傳給 caller
Python 3.5 開始支援 generator [send(value)](https://peps.python.org/pep-0342/#new-generator-method-send-value) method
讓我們可以將數值從 caller 傳給 callee
來改寫一下前面的範例
```python=
def fib(n):
a = 0
b = 0
a = yield a
b = yield b
for _ in range(n):
yield b
a, b = b, a + b
yield b
def main():
n = 5
gen = fib(n)
gen.send(None)
gen.send(1)
gen.send(0)
while True:
try:
print(next(gen))
except StopIteration:
break
```
我們將初始值透過 generator 的 `send()` method 傳給 `fib`, 並印出前 n 個數值
這樣我們就達成了 Generator 間的溝通
### Refactor with generator functions
由於 process-based 和 thread-based 都有佔用資源和 kernel-level context switch overhead 的缺點
我們可以將 `sleep` 和 `load_data` 改寫成 generator function
`lib/async_impl.py`
```python=
def sleep(secs):
now = time.time()
while time.time() < now + secs:
yield
pass
def load_data(file, secs=5):
yield from sleep(secs)
```
`async_util.py`
```python=
from lib import (async_util, timeit)
def load_data(file):
yield from async_util.load_data(file)
print(f"{file} loaded")
@timeit
def main():
print('Loading data')
gen_a = load_data('a.txt')
gen_b = load_data('b.txt')
pending_tasks = [gen_a, gen_b]
tasks = {task: None for task in pending_tasks}
print('Do other tasks')
while pending_tasks:
for task in pending_tasks:
try:
tasks[task] = task.send(tasks[task])
except StopIteration as e:
pending_tasks.remove(task)
tasks[task] = e.args[0] if len(e.args) > 0 else None
print('Data loaded')
if __name__ == '__main__':
main()
```
```shell=
Starting Tue Jul 23 10:41:46 2024
Loading data
Do other tasks
b.txt loaded
a.txt loaded
Data loaded
Ending Tue Jul 23 10:41:51 2024
Time taken: 5.00 seconds
```
由於我們每次都會檢查 pending task 的狀態, 所以能夠充分利用 I/O 時閒置的時間去執行其他任務
可以看到執行時間又比 thread-based 快了一些
這就是 Coroutine!
## Coroutines with Event Loop
我們用 `event_loop` annotation 將 `main()` function 中的 `while` loop 封裝起來
並新增 `regular_task` 和 `async_task` annotations 將 wrapped function 加入 event loop 中
`event_loop` 會依序執行 regular 和 async functions
regular functions 執行一次即結束
async functions 則在每一次 loop 時都會從之前 pending (yield) 的地方繼續執行 (resume)
TODO: 圖
`lib/async_util.py`
```python=
import time
regular_tasks = []
async_tasks = []
def async_task(func):
global async_tasks
def wrapper(*args, **kwargs):
gen = func(*args, **kwargs)
async_tasks.append(gen)
return wrapper
def regular_task(func):
global regular_tasks
def wrapper(*args, **kwargs):
regular_tasks.append((func, args, kwargs))
return wrapper
def event_loop(target_func):
global regular_tasks, async_tasks
def wrapper(*args, **kwargs):
target_func(*args, **kwargs)
tasks = { task: None for task in async_tasks }
while async_tasks or regular_tasks:
for regular_task in regular_tasks:
func, args, kwargs = regular_task
try:
func(*args, **kwargs)
finally:
regular_tasks.remove(regular_task)
for async_task in async_tasks:
try:
tasks[async_task] = async_task.send(tasks[async_task])
except StopIteration as e:
tasks[async_task] = e.args[0] if len(e.args) > 0 else None
async_tasks.remove(async_task)
return wrapper
def sleep(secs):
now = time.time()
while time.time() < now + secs:
yield
pass
def load_data(file, secs=5):
yield from sleep(secs)
```
`async_util.py`
```python=
#!/usr/bin/env python3
from lib import (async_util, timeit)
@async_util.async_task
def load_data(file):
print(file)
yield from async.load_data(file)
print(f"{file} loaded")
@async_util.regular_task
def do_other_tasks():
print('Do other tasks')
@timeit
@async_util.event_loop
def main():
print('Loading data')
load_data('a.txt')
load_data('b.txt')
do_other_tasks()
print('Data loaded')
if __name__ == '__main__':
main()
```
我們將想要給 event loop 執行的 task 用 decorator 表示
最終會得到下面結果
```shell=
Starting Mon Jul 22 18:02:30 2024
Loading data
a.txt
b.txt
Do other tasks
a.txt loaded
b.txt loaded
Data loaded
Ending Mon Jul 22 18:02:35 2024
Time taken: 5.00 seconds
```
由於我們將 I/O 任務都交給 event loop 管理, 且 event loop 每次都會繼續 (resume) I/O 任務以確認其狀態
所以就不會再擋住其他任務的執行
這就是 asyncio 的雛型!
## Asyncio
先介紹一下 `asyncio` library
`asyncio` 提供了很多方便的 API 來達成 coroutines, 這邊解釋一些 `asyncio` 的物件
Awaitables
- 可以使用 `await` 表達式的物件
- 包含
- `Coroutines`
- `Tasks`
- `Futures`
Coroutines
- 用 `async/await` 表示, 取代前面的 generator functions
- 可以是 *coroutine function* 或是 *coroutine object*
- coroutine function
- 用 `async def` 宣告的 function
- coroutine object
- coroutine function 回傳的物件
Tasks
- 當 `coroutine` 物件被 `asyncio.create_task()` 加入 Task, 就代表已經加入排程
Futures
- low-level object
- 代表 awaitable object 最終的結果
- `asyncio` 用來支援 coroutine callback 的實作
- 少數 API 會回傳 Futures 物件, ex. `loop.run_in_executor()`
- 通常用在 multithreading 或 multiprocessing
- 詳見 [Executing code in thread or process pools](https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools)
:::spoiler
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
if __name__ == '__main__':
asyncio.run(main())
:::
更多用法請見 [asyncio 官方文件](https://docs.python.org/3/library/asyncio-task.html)
前面我們用 generator 搭配 event loop 實做了簡單的 coroutine 架構
但是實作很複雜, 且不方便管理和維護
下面我們用 Python 提供的 `asyncio` library 和 `async def` keyword 來改寫前面的範例
`lib/async_util.py`
```python=
import time
def timeit(func):
async def wrapper(*args, **kwargs):
start_time = time.time()
print(f"Starting {time.ctime(start_time)}")
result = await func(*args, **kwargs)
end_time = time.time()
print(f"Ending {time.ctime(end_time)}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
return result
return wrapper
async def sleep(secs):
await asyncio.sleep(secs)
async def load_data(file, secs=5):
await sleep(secs)
```
`async.py`
```python=
import asyncio
import time
async def do_other_tasks():
print('Do other tasks')
async def _load_data(file):
await load_data(file)
print(f"{file} loaded")
@timeit
async def main():
print('Loading data')
task1 = asyncio.create_task(_load_data('data.txt'))
task2 = asyncio.create_task(_load_data('data2.txt'))
task3 = asyncio.create_task(do_other_tasks())
await asyncio.gather(task1, task2, task3)
print('Data loaded')
if __name__ == '__main__':
asyncio.run(main())
```
`asyncio.run()` 會產生 event loop, 類似我們前面用的 `@event_loop` annotation
`asyncio.create_task()` 會將 `coroutine` 物件封裝成 `Task` 物件加入排程, 類似前面的 `@regular_task` 和 `@async_task` annotations
Refactor 完後看起來簡潔多了
```shell=
Starting Tue Jul 23 12:05:31 2024
Loading data
Do other tasks
data.txt loaded
data2.txt loaded
Data loaded
Ending Tue Jul 23 12:05:36 2024
Time taken: 5.02 seconds
```
執行時間也比 multiprocessing 和 multithreading 快
# Summary
我們介紹並比較了 Synchronous 和 Asynchronous 的差別
發現 coroutine 執行時間確實比 multiprocessing 和 threading 要快
在 Synchronous 的介紹中, 說明了如何透過平行化 (Parallelism) 改善 IO-bound functions 效能
並介紹兩種平行化的方式: multiprocessing, multithreading, 其中 multithreading 在 Python 因為 GIL (Global Interpreter Lock) 的關係, 其背後實作其實是 coroutine
在 Asynchronous 中, 我們分別用 processing, threading, generator 實作了 asynchronous function
並從 generator 開始, 實作出簡單版本的 coroutine 架構
最後介紹了 `asyncio` library 的 `Awaitables` 等物件, 並用其改寫前面自己刻的 coroutine 架構
和 JavaScript 原生就有 event loop 不同, Python 需要透過 `asyncio` 提供的 API 產生 event loop
但是搭配 `async def`, `async for`, `async with` 等 Coroutine syntax
Python 也能享受到 coroutine 帶來的好處
搭配 `multiprocessing` 和 `threading`, 又有更多的彈性
之後如果程式有 IO-bound / CPU-bound 的需求, 不妨試試 coroutine 吧!
# Reference
- [PEP 255 - Simple Generators](https://peps.python.org/pep-0255/)
- [PEP 342 - Coroutines via Enhanced Generators](https://peps.python.org/pep-0342/)
- [Coroutines and Tasks](https://docs.python.org/3/library/asyncio-task.html)
- [Compound statements](https://docs.python.org/3/reference/compound_stmts.html)
- [multiprocessing — Process-based parallelism](https://docs.python.org/3/library/multiprocessing.html)
- [深入 GIL: 如何寫出快速且 thread-safe 的 Python – Grok the GIL: How to write fast and thread-safe Python](https://blog.louie.lu/2017/05/19/%E6%B7%B1%E5%85%A5-gil-%E5%A6%82%E4%BD%95%E5%AF%AB%E5%87%BA%E5%BF%AB%E9%80%9F%E4%B8%94-thread-safe-%E7%9A%84-python-grok-the-gil-how-to-write-fast-and-thread-safe-python/)
- [[Python]Concurreny效能實測 — MultiThreading/MultiProcessing/Coroutine](https://wuyiru.medium.com/python-concurreny%E6%95%88%E8%83%BD%E5%AF%A6%E6%B8%AC-multithreading-multiprocessing-coroutine-6043793e0328)