--- tags: py --- # Python_非同步協程 # Coroutine ## 什麼是 Coroutine (協程 / 微線程) 所謂 Coroutine 就是一個可以暫停將執行權讓給其他 Coroutine 或 Awaitables obj 的函數,等其執行完後再繼續執行,並可以多次的進行這樣的暫停與繼續 1. 如果要判斷一個函數是不是 Coroutine? - 可以使用 asyncio.iscoroutinefunction(func) 方法判別。 2. 如果要判斷一個函數返回的是不是 Coroutine 對象? - 可以使用 asyncio.iscoroutine(obj) 方法判別。 ## 了解 async / await 語法糖 > async / await 是 Python 3.5+ 之後出現的語法糖,讓 Coroutine 與 Coroutine 之間的調度更加清楚 簡單來說: - async:用來宣告 function 能夠有異步的功能 - await:用來標記 Coroutine 切換暫停和繼續的點 使用 async 用來宣告一個 native Coroutine ```python async def read_data(db): pass ``` 使用 await 讓 Coroutine 掛起 - 注意: await 後面必須接一個 Coroutine 對象或是 awaitable 類型的對象 - await 的目的是將控制權回傳給事件循環 (event loop) 並等待返回,而背後實現暫停掛起函數操作的是 yield - 使用方法:加在要等待的 function 前面,如下範例 ```python async def read_data(db): data = await db.fetch("SELECT ...") ``` ### Awaitables 特性 有三種主要類型:coroutines、 Tasks 、Futures 1. coroutines:一個 async def 函數就是 coroutine,也因為 awaitables 特性所以可以等待其他 coroutine。 2. tasks:tasks 是用來調度 coroutines,可通過 asyncio.create_task( ) 來打包 coroutines。 3. futures:futures 是一個異步操作 (asynchronous operation) 返回的結果 ## 如何建立事件循環 Python 3.5 - 使用 ```asyncio.get_event_loop()``` 先建立一個 event_loop,然後再將 Coroutine 放進 ```run_until_complete()``` 裡面,直到所有 Coroutine 運行結束。 ```python """ python 3.5 寫法 """ import asyncio async def hello_world(x): print("hello_world x" + str(x)) await asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(hello_world(3)) loop.close() ``` Python 3.7之後(更簡潔) - 將 loop 封裝,只需要使用 ```asyncio.run()``` 一行程式就結束,不用在建立 event_loop 結束時也不需要 loop.close,因為他都幫你做完了 ```python import asyncio async def hello_world(x): print(“hello_world x” + str(x)) await asyncio.sleep(x) asyncio.run(hello_world(2)) ``` ## 如何建立任務 Task? 建立任務方法有兩種 - asyncio.create_task( ) :Python 3.7+ 以上可使用 - asyncio.ensure_future( ):可讀性較差 ```python # In Python 3.7+ task = asyncio.create_task(main()) # This works in all Python versions but is less readable task = asyncio.ensure_future(main()) ``` 完整範例 ```python import asyncio import time async def dosomething(num): print(‘start{}’.format(num)) await asyncio.sleep(num) print(‘sleep{}’.format(num)) async def main(): task1 = asyncio.create_task(dosomething(1)) task2 = asyncio.create_task(dosomething(2)) task3 = asyncio.create_task(dosomething(3)) await task1 await task2 await task3 if __name__ == “__main__”: time_start = time.time() asyncio.run(main()) print(time.time() - time_start) # >> start1 # >> start2 # >> start3 # >> sleep1 # >> sleep2 # >> sleep3 # >> 3.0052239894866943 ``` ## 如何同時運行多個 Tasks 任務? 使用 ```asyncio.gather( )``` ,可同時放入多個 Coroutines 或 awaitable object 進入事件循環 (event loop),等 Coroutines 都結束後,並依序收集其回傳值 ```python asyncio.gather( *aws, loop=None, return_exceptions=False) ``` 1. *aws :可傳入多個 awaitable objects 2. Loop:此參數將會在 Python version 3.10 移除 3. return_exceptions:default 是 False,當發生 exception 時會立即中斷 task,如果設定為 True 則發生錯誤的訊息會與其他成功訊息一起回傳(如下範例,最終的 results 結果裡面包含了 ValueError() 結果) > 到目前為止你已經可以運行多個協程任務 # Gevent ``` pip install gevent ``` [Link](http://www.bjhee.com/gevent.html) Gevent 是一種基於協程的 Python 網路庫,它用到 Greenlet 提供的,封裝了 libevent 事件迴圈的高層同步 API。它讓開發者在不改變程式設計習慣的同時,用同步的方式寫非同步 I/O 的程式碼 它的協程是基於 greenlet 的 使用 Gevent 的效能確實要比用傳統的執行緒高,甚至高很多 ```python spawn(function, *args, **kwargs) # → Greenlet ``` > 創建一個新的 Greenlet 對象,安排它運行函數 funtion(*args, **kwargs) ## 例子 ```python import gevent def test1(): print("12") gevent.sleep(0) print("34") def test2(): print("56") gevent.sleep(0) print("78") gevent.joinall([ gevent.spawn(test1), gevent.spawn(test2), ]) ``` ```gevent.spawn()``` 方法會創建一個新的 greenlet 協程對象,並運行它。 ```gevent.joinall()``` 方法會等待所有傳入的 greenlet 協程運行結束後再退出,這個方法可以接受一個 timeout 參數來設置超時時間,單位是秒 執行順序如下: 1. 先進入協程 test1,打印 12 2. 遇到 ```gevent.sleep(0)``` 時,test1 被阻塞,自動切換到協程 test2,打印 56 3. 之後 test2 被阻塞,這時 test1 阻塞已結束,自動切換回 test1,打印 34 4. 當 test1 運行完畢返回後,此時 test2 阻塞已結束,再自動切換回 test2,打印 78 5. 所有協程執行完畢,程序退出 > greenlet 一個協程運行完後,必須顯式切換,不然會返回其父協程 > > 而在 gevent 中,一個協程運行完後,它會自動調度那些未完成的協程 ## 更有意思的例子 ```python import gevent import socket urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org'] jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] gevent.joinall(jobs, timeout=5) print([job.value for job in jobs]) ``` 我們通過協程分別獲取三個網站的 IP 地址,由於打開遠程地址會引起 IO 阻塞,所以 gevent 會自動調度不同的協程。另外,我們可以通過協程對象的 ```value``` 屬性,來獲取協程函數的返回值 ```python joinall(greenlets, timeout=None, raise_error=False, count=None) ``` 参数: ```greenlets ``` 一系列greenlets去等待。 ```timeout``` 如果给出,最大的等待秒数 返回: 在 timeout 結束前一系列已經結束的 greenlets 並行的核心思想是:可以將大的任務分割成一系列的子任務,這些子任務可以同時或者異步執行,而不是每次只執行一個或者同步執行。 ## 猴子補丁 Monkey patching 細心的朋友們在運行上面例子時會發現,其實程序運行的時間同不用協程是一樣的,是三個網站打開時間的總和 可是理論上協程是非阻塞的,那運行時間應該等於最長的那個網站打開時間呀? 其實這是因為 Python 標準庫裡的 socket 是阻塞式的,DNS 解析無法並發,包括像 urllib 庫也一樣,所以這種情況下用協程完全沒意義。那怎麼辦? 一種方法是使用 gevent 下的 socket 模塊,我們可以通過 ```from gevent import socket``` 來導入 不過更常用的方法是使用**猴子補丁(Monkey patching)** ```python from gevent import monkey; monkey.patch_socket() import gevent import socket urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org'] jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls] gevent.joinall(jobs, timeout=5) print([job.value for job in jobs]) ``` 上述代碼的第一行就是對 socket 標準庫打上猴子補丁,此後 socket 標準庫中的類和方法都會被替換成非阻塞式的,所有其他的代碼都不用修改,這樣協程的效率就真正體現出來了。 Python 中其它標準庫也存在阻塞的情況,gevent 提供了 ```monkey.patch_all()``` 方法將所有標準庫都替換。 ```python from gevent import monkey; monkey.patch_all() ``` 使用猴子補丁褒貶不一,但是官網上還是建議使用 ```patch_all()```,而且在程序的第一行就執行 ## 獲取協程狀態 協程狀態有已啟動和已停止,分別可以用協程對象的 ```started``` 屬性和 ```ready()``` 方法來判斷 對於已停止的協程,可以用 ```successful()``` 方法來判斷其是否成功運行且沒拋異常。如果協程執行完有返回值,可以通過 ```value``` 屬性來獲取 另外,greenlet 協程運行過程中發生的異常是不會被拋出到協程外的,因此需要用協程對象的 ```exception``` 屬性來獲取協程中的異常 下面的例子很好的演示了各種方法和屬性的使用 ```python #coding:utf8 import gevent def win(): return 'You win!' def fail(): raise Exception('You failed!') winner = gevent.spawn(win) loser = gevent.spawn(fail) print(winner.started) # True print(loser.started) # True # 在Greenlet中发生的异常,不会被抛到Greenlet外面。 # 控制台会打出Stacktrace,但程序不会停止 try: gevent.joinall([winner, loser]) except Exception as e: # 这段永远不会被执行 print('This will never be reached') print(winner.ready()) # True print(loser.ready()) # True print(winner.value) # 'You win!' print(loser.value) # None print(winner.successful()) # True print(loser.successful()) # False # 这里可以通过raise loser.exception 或 loser.get() # 来将协程中的异常抛出 print(loser.exception) ``` ## 協程運行超時 之前我們講過在 ```gevent.joinall()``` 方法中可以傳入 ```timeout``` 參數來設置超時,我們也可以在全局範圍內設置超時時間: ```python import gevent from gevent import Timeout timeout = Timeout(2) # 2 seconds timeout.start() def wait(): gevent.sleep(10) try: gevent.spawn(wait).join() except Timeout: print('Could not complete') ``` 上例中,我們將超時設為2秒,此後所有協程的運行,如果超過兩秒就會拋出 Timeout 異常 我們也可以將超時設置在with語句內,這樣該設置只在with語句塊中有效: ```python with Timeout(1): gevent.sleep(10) ``` 此外,我們可以指定超時所拋出的異常,來替換默認的Timeout異常。比如下例中超時就會拋出我們自定義的TooLong異常 ```python class TooLong(Exception): pass with Timeout(1, TooLong): gevent.sleep(10) ```