# Python Coroutine [TOC] > Coroutines are computer program components that allow execution to be suspended and resumed, generalizing subroutines for cooperative multitasking. > Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes. > - [wiki - Coroutine](https://en.wikipedia.org/wiki/Coroutine) # Synchronous 一般不涉及 I/O 的 function, 如 `foo1()`, `foo2()` 會依序執行, 簡單 `sync.py` ```python= #!/usr/bin/env python3 def foo1(): print('foo1') def foo2(): print('foo2') def main(): foo1() foo2() if __name__ == '__main__': main() ``` ```shell= python sync.py foo1 foo2 ``` # Synchronous - I/O 當涉及 I/O, 若仍依序執行就會需要長時間等待, 下面用 `load_data()` 模擬長時間讀取檔案的情況 :::spoiler 這邊先用 synchronous 的方式實作 `sleep()`, 等後面介紹 asynchronous 再陸續改善 ::: `lib/sync.py` ```python= import time def timeit(func): def wrapper(*args, **kwargs): start_time = time.time() print(f"Starting {time.ctime(start_time)}") result = func(*args, **kwargs) end_time = time.time() print(f"Ending {time.ctime(end_time)}") print(f"Time taken: {end_time - start_time:.2f} seconds") return result return wrapper def sleep(secs): now = time.time() while time.time() < now + secs: pass def load_data(file, secs=5): sleep(secs) ``` `sync.v1.py` ```python= #!/usr/bin/env python3 from lib import sync def main(): print('Loading data') sync.load_data('a.txt') print('Data loaded') if __name__ == '__main__': main() ``` 執行後, 我們可以立刻看見 `Loading data`, 但需要等約 5 秒才能看見 `Data loaded` ```shell= python sync.v1.py Loading data 👈 print immediately Data loaded 👈 print after ~5 seconds ``` # Synchronous - blocking 雖然上面 `load_data` 的作法 *執行上* 沒有問題: 我們確實把檔案讀進來了 但是如果我們有其他事情要處理呢? 假設我們要讀兩個檔案 `sync.v2.py` ```python= #!/usr/bin/env python3 from lib import sync def load_a(): sync.load_data('a.txt') print('a.txt loaded') def load_b(): sync.load_data('b.txt') print('b.txt loaded') def main(): print('Loading data') load_a() load_b() print('Data loaded') if __name__ == '__main__': main() ``` 這次我們依序讀了兩個檔案, 且兩次讀取都需要約 5 秒, 所以總共需要 10 秒 ```shell= Loading data 👈 print immediately a.txt loaded 👈 print after ~5 seconds b.txt loaded 👈 print after ~5 seconds Data loaded 👈 print after ~10 seconds ``` TODO: 圖 可以看到, 由於我們是等 `a.txt` 讀取後才開始讀取 `b.txt`, 所以需要等 10 秒 若有更多的檔案, 或是更多的 I/O 要處理呢? 這樣處理的時間就會是所有 I/O 時間的總和 甚至在比較複雜的情境中, I/O 也會擋住其他任務的執行 那麼, 要怎麼改善呢? # Parallelism 一種方式是透過平行化 (parallelism), 讓每個任務能夠同時執行 以下介紹兩種平行化的方式: multiprocessing, multithreading ## Multiprocessing 我們讓每個任務都起一個新的 process 處理 可以用 python 的 std [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) 下面將執行所花費的時間印出方便觀察 `sync.v3.py` ```python= #!/usr/bin/env python3 from multiprocessing import Pool from lib import (sync, timeit) def load_data(file): sync.load_data(file) print(f"{file} loaded") @timeit def main(): print('Loading data') with Pool(2) as p: # 2 processes to reduce overhead p.map(load_data, ['a.txt', 'b.txt']) print('Data loaded') if __name__ == '__main__': main() ``` 可以看到僅執行了約 5 秒 ```shell= Starting Mon Jul 22 14:03:11 2024 Loading data a.txt loaded b.txt loaded Data loaded Ending Mon Jul 22 14:03:16 2024 Time taken: 5.20 seconds ``` :::info `multiprocessing` 的文件中有提到可以用 `concurrent.futures.ProcessPoolExecutor` 取代 後面講到 coroutine 時再介紹 > See also [concurrent.futures.ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) offers a higher level interface to push tasks to a background process without blocking execution of the calling process. Compared to using the Pool interface directly, the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures) API more readily allows the submission of work to the underlying process pool to be separated from waiting for the results. ::: ## Multithreading Multiprocessing 外, 另一種平行化的方式是 multithreading 由於 python (特指 CPython) 受限於 Global Interpreter Lock (GIL), 每次只會有一個 thread 拿到 lock 並執行 所以 multithreading 無法做到 *真正的* 平行化 其背後實際上是 Cooperative Multitasking 架構 因此本文不另外介紹, 有興趣請參考 [深入 GIL: 如何寫出快速且 thread-safe 的 Python – Grok the GIL: How to write fast and thread-safe Python](https://blog.louie.lu/2017/05/19/%E6%B7%B1%E5%85%A5-gil-%E5%A6%82%E4%BD%95%E5%AF%AB%E5%87%BA%E5%BF%AB%E9%80%9F%E4%B8%94-thread-safe-%E7%9A%84-python-grok-the-gil-how-to-write-fast-and-thread-safe-python/) ## Summary 雖然 multiprocessing 和 multithreading 都能將多個任務平行處理, 但也有各自的 overhead Multiprocessing - CPython 用 [fork](https://man7.org/linux/man-pages/man2/fork.2.html) 和 [exec](https://man7.org/linux/man-pages/man3/exec.3.html) 實做 - `fork` 會複製當前的 process 給 child process, 產生額外的 overhead - Kernel-level context switch Multithreading - CPython 用 [pthread](https://man7.org/linux/man-pages/man7/pthreads.7.html) 實作 - 和當前 process 共用 memory space - kernel-level context switch - 理論上比 process context switch 快 雖然我們用平行化縮短了執行時間, 但是並未解決 I/O blocking 的問題 # Asynchronous Process-based, thread-based, generator-based, coroutine ## Process-based 和 multiprocessing 類似, 我們可以 fork 出一個 process 執行 I/O 任務 等待結果時執行其他任務, I/O 結束後再繼續執行 `sync.v4.py` ```python= #!/usr/bin/env python3 from multiprocessing import Process from lib import (sync, timeit) def load_data(file): p = Process(target=sync.load_data, args=(file,)) p.start() print(f"{file} loaded") return p @timeit def main(): print('Loading data') p1 = load_data('a.txt') p2 = load_data('b.txt') print('Do other tasks') p1.join() print('a.txt done') p2.join() print('b.txt done') print('Data loaded') if __name__ == '__main__': main() ``` 缺點是需要自己 join ```shell= Starting Mon Jul 22 16:27:03 2024 Loading data a.txt loaded b.txt loaded Do other tasks a.txt done b.txt done Data loaded Ending Mon Jul 22 16:27:08 2024 Time taken: 5.14 seconds ``` ## Thread-based 和 process-based 類似, 但不需要複製當前的 process, 所以理論上會快一些 `sync.v5.py` ```python= #!/usr/bin/env python3 from threading import Thread from lib import (sync, timeit) def load_data(file): t = Thread(target=sync.load_data, args=(file,)) t.start() print(f"{file} loaded") return t @timeit def main(): print('Loading data') t1 = load_data('a.txt') t2 = load_data('b.txt') print('Do other tasks') t1.join() print('a.txt done') t2.join() print('b.txt done') print('Data loaded') if __name__ == '__main__': main() ``` 結果如預期比 process-based 快一點點 ```shell= Starting Mon Jul 22 16:45:03 2024 Loading data a.txt loaded b.txt loaded Do other tasks a.txt done b.txt done Data loaded Ending Mon Jul 22 16:45:08 2024 Time taken: 5.04 seconds ``` 缺點是需要自己 join threads, 且即使有 GIL, 也可能寫出 thread-unsafe function ## Generator-based ### Generator Python 提供 generator function 讓我們可以暫停 function 的執行並轉移控制權給 caller, 之後再從暫停的地方繼續執行 以 `fib()` generator function 為例, 我們印出每個 fibonacci 的數值 ```python= def fib(n): a, b = 1, 0 for _ in range(n): a, b = b, a + b yield b def main(): n = 5 gen = fib(n) while True: try: print(next(gen)) except StopIteration: break ``` ```shell= 1 1 2 3 5 ``` 這個例子是從 `fib` generator function 將每次的結果回傳給 caller Python 3.5 開始支援 generator [send(value)](https://peps.python.org/pep-0342/#new-generator-method-send-value) method 讓我們可以將數值從 caller 傳給 callee 來改寫一下前面的範例 ```python= def fib(n): a = 0 b = 0 a = yield a b = yield b for _ in range(n): yield b a, b = b, a + b yield b def main(): n = 5 gen = fib(n) gen.send(None) gen.send(1) gen.send(0) while True: try: print(next(gen)) except StopIteration: break ``` 我們將初始值透過 generator 的 `send()` method 傳給 `fib`, 並印出前 n 個數值 這樣我們就達成了 Generator 間的溝通 ### Refactor with generator functions 由於 process-based 和 thread-based 都有佔用資源和 kernel-level context switch overhead 的缺點 我們可以將 `sleep` 和 `load_data` 改寫成 generator function `lib/async_impl.py` ```python= def sleep(secs): now = time.time() while time.time() < now + secs: yield pass def load_data(file, secs=5): yield from sleep(secs) ``` `async_util.py` ```python= from lib import (async_util, timeit) def load_data(file): yield from async_util.load_data(file) print(f"{file} loaded") @timeit def main(): print('Loading data') gen_a = load_data('a.txt') gen_b = load_data('b.txt') pending_tasks = [gen_a, gen_b] tasks = {task: None for task in pending_tasks} print('Do other tasks') while pending_tasks: for task in pending_tasks: try: tasks[task] = task.send(tasks[task]) except StopIteration as e: pending_tasks.remove(task) tasks[task] = e.args[0] if len(e.args) > 0 else None print('Data loaded') if __name__ == '__main__': main() ``` ```shell= Starting Tue Jul 23 10:41:46 2024 Loading data Do other tasks b.txt loaded a.txt loaded Data loaded Ending Tue Jul 23 10:41:51 2024 Time taken: 5.00 seconds ``` 由於我們每次都會檢查 pending task 的狀態, 所以能夠充分利用 I/O 時閒置的時間去執行其他任務 可以看到執行時間又比 thread-based 快了一些 這就是 Coroutine! ## Coroutines with Event Loop 我們用 `event_loop` annotation 將 `main()` function 中的 `while` loop 封裝起來 並新增 `regular_task` 和 `async_task` annotations 將 wrapped function 加入 event loop 中 `event_loop` 會依序執行 regular 和 async functions regular functions 執行一次即結束 async functions 則在每一次 loop 時都會從之前 pending (yield) 的地方繼續執行 (resume) TODO: 圖 `lib/async_util.py` ```python= import time regular_tasks = [] async_tasks = [] def async_task(func): global async_tasks def wrapper(*args, **kwargs): gen = func(*args, **kwargs) async_tasks.append(gen) return wrapper def regular_task(func): global regular_tasks def wrapper(*args, **kwargs): regular_tasks.append((func, args, kwargs)) return wrapper def event_loop(target_func): global regular_tasks, async_tasks def wrapper(*args, **kwargs): target_func(*args, **kwargs) tasks = { task: None for task in async_tasks } while async_tasks or regular_tasks: for regular_task in regular_tasks: func, args, kwargs = regular_task try: func(*args, **kwargs) finally: regular_tasks.remove(regular_task) for async_task in async_tasks: try: tasks[async_task] = async_task.send(tasks[async_task]) except StopIteration as e: tasks[async_task] = e.args[0] if len(e.args) > 0 else None async_tasks.remove(async_task) return wrapper def sleep(secs): now = time.time() while time.time() < now + secs: yield pass def load_data(file, secs=5): yield from sleep(secs) ``` `async_util.py` ```python= #!/usr/bin/env python3 from lib import (async_util, timeit) @async_util.async_task def load_data(file): print(file) yield from async.load_data(file) print(f"{file} loaded") @async_util.regular_task def do_other_tasks(): print('Do other tasks') @timeit @async_util.event_loop def main(): print('Loading data') load_data('a.txt') load_data('b.txt') do_other_tasks() print('Data loaded') if __name__ == '__main__': main() ``` 我們將想要給 event loop 執行的 task 用 decorator 表示 最終會得到下面結果 ```shell= Starting Mon Jul 22 18:02:30 2024 Loading data a.txt b.txt Do other tasks a.txt loaded b.txt loaded Data loaded Ending Mon Jul 22 18:02:35 2024 Time taken: 5.00 seconds ``` 由於我們將 I/O 任務都交給 event loop 管理, 且 event loop 每次都會繼續 (resume) I/O 任務以確認其狀態 所以就不會再擋住其他任務的執行 這就是 asyncio 的雛型! ## Asyncio 先介紹一下 `asyncio` library `asyncio` 提供了很多方便的 API 來達成 coroutines, 這邊解釋一些 `asyncio` 的物件 Awaitables - 可以使用 `await` 表達式的物件 - 包含 - `Coroutines` - `Tasks` - `Futures` Coroutines - 用 `async/await` 表示, 取代前面的 generator functions - 可以是 *coroutine function* 或是 *coroutine object* - coroutine function - 用 `async def` 宣告的 function - coroutine object - coroutine function 回傳的物件 Tasks - 當 `coroutine` 物件被 `asyncio.create_task()` 加入 Task, 就代表已經加入排程 Futures - low-level object - 代表 awaitable object 最終的結果 - `asyncio` 用來支援 coroutine callback 的實作 - 少數 API 會回傳 Futures 物件, ex. `loop.run_in_executor()` - 通常用在 multithreading 或 multiprocessing - 詳見 [Executing code in thread or process pools](https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools) :::spoiler import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. Run in the default loop's executor: result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result) if __name__ == '__main__': asyncio.run(main()) ::: 更多用法請見 [asyncio 官方文件](https://docs.python.org/3/library/asyncio-task.html) 前面我們用 generator 搭配 event loop 實做了簡單的 coroutine 架構 但是實作很複雜, 且不方便管理和維護 下面我們用 Python 提供的 `asyncio` library 和 `async def` keyword 來改寫前面的範例 `lib/async_util.py` ```python= import time def timeit(func): async def wrapper(*args, **kwargs): start_time = time.time() print(f"Starting {time.ctime(start_time)}") result = await func(*args, **kwargs) end_time = time.time() print(f"Ending {time.ctime(end_time)}") print(f"Time taken: {end_time - start_time:.2f} seconds") return result return wrapper async def sleep(secs): await asyncio.sleep(secs) async def load_data(file, secs=5): await sleep(secs) ``` `async.py` ```python= import asyncio import time async def do_other_tasks(): print('Do other tasks') async def _load_data(file): await load_data(file) print(f"{file} loaded") @timeit async def main(): print('Loading data') task1 = asyncio.create_task(_load_data('data.txt')) task2 = asyncio.create_task(_load_data('data2.txt')) task3 = asyncio.create_task(do_other_tasks()) await asyncio.gather(task1, task2, task3) print('Data loaded') if __name__ == '__main__': asyncio.run(main()) ``` `asyncio.run()` 會產生 event loop, 類似我們前面用的 `@event_loop` annotation `asyncio.create_task()` 會將 `coroutine` 物件封裝成 `Task` 物件加入排程, 類似前面的 `@regular_task` 和 `@async_task` annotations Refactor 完後看起來簡潔多了 ```shell= Starting Tue Jul 23 12:05:31 2024 Loading data Do other tasks data.txt loaded data2.txt loaded Data loaded Ending Tue Jul 23 12:05:36 2024 Time taken: 5.02 seconds ``` 執行時間也比 multiprocessing 和 multithreading 快 # Summary 我們介紹並比較了 Synchronous 和 Asynchronous 的差別 發現 coroutine 執行時間確實比 multiprocessing 和 threading 要快 在 Synchronous 的介紹中, 說明了如何透過平行化 (Parallelism) 改善 IO-bound functions 效能 並介紹兩種平行化的方式: multiprocessing, multithreading, 其中 multithreading 在 Python 因為 GIL (Global Interpreter Lock) 的關係, 其背後實作其實是 coroutine 在 Asynchronous 中, 我們分別用 processing, threading, generator 實作了 asynchronous function 並從 generator 開始, 實作出簡單版本的 coroutine 架構 最後介紹了 `asyncio` library 的 `Awaitables` 等物件, 並用其改寫前面自己刻的 coroutine 架構 和 JavaScript 原生就有 event loop 不同, Python 需要透過 `asyncio` 提供的 API 產生 event loop 但是搭配 `async def`, `async for`, `async with` 等 Coroutine syntax Python 也能享受到 coroutine 帶來的好處 搭配 `multiprocessing` 和 `threading`, 又有更多的彈性 之後如果程式有 IO-bound / CPU-bound 的需求, 不妨試試 coroutine 吧! # Reference - [PEP 255 - Simple Generators](https://peps.python.org/pep-0255/) - [PEP 342 - Coroutines via Enhanced Generators](https://peps.python.org/pep-0342/) - [Coroutines and Tasks](https://docs.python.org/3/library/asyncio-task.html) - [Compound statements](https://docs.python.org/3/reference/compound_stmts.html) - [multiprocessing — Process-based parallelism](https://docs.python.org/3/library/multiprocessing.html) - [深入 GIL: 如何寫出快速且 thread-safe 的 Python – Grok the GIL: How to write fast and thread-safe Python](https://blog.louie.lu/2017/05/19/%E6%B7%B1%E5%85%A5-gil-%E5%A6%82%E4%BD%95%E5%AF%AB%E5%87%BA%E5%BF%AB%E9%80%9F%E4%B8%94-thread-safe-%E7%9A%84-python-grok-the-gil-how-to-write-fast-and-thread-safe-python/) - [[Python]Concurreny效能實測 — MultiThreading/MultiProcessing/Coroutine](https://wuyiru.medium.com/python-concurreny%E6%95%88%E8%83%BD%E5%AF%A6%E6%B8%AC-multithreading-multiprocessing-coroutine-6043793e0328)