# Coroutines and Tasks(翻譯) ###### tags: `翻譯` :::danger * [官方文件](https://docs.python.org/3.8/library/asyncio-task.html) ::: 這章節概略說明用於協程、任務的高階asyncio的APIs。 [TOC] ## Coroutines 使用async/await語法宣告的協程是寫asyncio應用程式最好的方法。舉例來說,下面片段程式碼(需要Python 3.7+)列印"hello",等待1秒,然後列印"world": ```python >>> import asyncio >>> async def main(): ... print('hello') ... await asyncio.sleep(1) ... print('world') >>> asyncio.run(main()) hello world ``` 注意到,單純的呼叫協程並不會調度它被執行: ```python >>> main() <coroutine object main at 0x1053bb7c8> ``` 要實際的執行協程,asyncio提供三種機制: * 使用函數[asyncio.run()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.run)來執行頂層的入口"main()"函數(見上範例) * 協程中的等待。下面程式碼片段會在等待1秒之後列印"hello",,然後在等待另外2秒之後列印"world": ```python import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) ``` 預期輸出如下: ```shell started at 17:13:52 hello world finished at 17:13:55 ``` * 函數[asyncio.create_task()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.create_task)作為asyncio.Tasks同時執行協程 讓我們調整上面範例,並且同時執行兩個協程`say_after`: ```python async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") ``` 注意到,預期輸出現在顯示出這程式碼片段比之前還要快一秒: ```shell started at 17:14:32 hello world finished at 17:14:34 ``` ## Awaitables 我們說,如果一個物件可以用於await表達示中,那它就是一個可等待物件。許多asyncio APIs被設計為接受等待。 有三種類型的可等待物件:**coroutines**, **Tasks**, and **Futures**。 ### Coroutines Python協程是可等待的,因此可以從其它協程中等待: ```python import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won't run at all*. nested() # Let's do it differently now and await it: print(await nested()) # will print "42". asyncio.run(main()) ``` 重要:這文件中的術語"協程"可以用於兩個緊密相關的概念: * 協程函數:一個[async def](https://docs.python.org/3.8/reference/compound_stmts.html#async-def)函數 * 協程物件:透過呼叫協程函數回傳的物件 asyncio亦支援傳統基於生程的協程。 ### Tasks Tasks用於並行調度協程。 當協程被包裝到具有像是`asyncio.create_task()`函數的任務內時,這個協程會很快的自動調度來執行: ```python import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main()) ``` ### Futures [Future](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future)是一個特別的低階可等待物件,代表一個非同步(異步)操作的事件結果。 當Future物件為awaited的時候,意味著協程會等待,一直到Future在其它位置被解析。 asyncio中的Future物件需要允許基於回呼程式碼與async/await一起使用。 一般來說,不需要在應用程式級別程式碼中建立Future物件。 Future objects, sometimes exposed by libraries and some asyncio APIs, can be awaited: Future物件(有些時候會由套件或asyncio APIs公開)可以是awaited: ```python await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() ) ``` 一個回傳Future物件的低階函數範例為[loop.run_in_executor()](https://docs.python.org/3.8/library/asyncio-eventloop.html#asyncio.loop.run_in_executor)。 ## Running an asyncio Program * asyncio.run(coro, *, debug=False) 執行[協程](https://docs.python.org/3.8/glossary.html#term-coroutine)並回傳結果。 這個函數執行一個傳遞過來的協程,並負責管理asyncio事件迴圈以及完成非同步生成器。 當另一個asyncio事件迴圈在相同thread(執行緒)執行的時候,這函數無法被呼叫。 如果`debug=True`,那事件迴圈會在除錯模式中執行。 這函數總是建立一個新的事件迴圈,並在最後關閉它。它應該用做為asyncio程式的主要入口點,而且理想情況下應該只調用一次。 範例: ```python async def main(): await asyncio.sleep(1) print('hello') asyncio.run(main()) ``` New in version 3.7. :::info 原始碼可以在[Lib/asyncio/runners.py](https://github.com/python/cpython/tree/3.8/Lib/asyncio/runners.py)找到 ::: ## Creating Tasks * asyncio.create_task(coro, *, name=None) 將協程包裝到Task中,並調度它的執行。回傳Task物件。 如果它的`name`不為None,則使用[Task.set_name()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.set_name)來設置task名稱。 task在[get_running_loop()](https://docs.python.org/3.8/library/asyncio-eventloop.html#asyncio.get_running_loop)回傳的迴圈中執行,如果當前的執行緒(thread)沒有正在執行中的迴圈,那就拋出[RuntimeError](https://docs.python.org/3.8/library/asyncio-task.html)。 這個函數在Python 3.7中被加入。Python 3.7之前的版本,以低階函數[asyncio.ensure_future()](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.ensure_future)替代: ```python async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro()) ... ``` New in version 3.7. Changed in version 3.8:增加參數`name` ## Sleeping * coroutine asyncio.sleep(delay, result=None, *, loop=None) 延遲秒數的阻塞。 如果提供結果,那在協程完成的時候會回傳給調用者。 `sleep()`總是暫停當前task,允許其它tasks執行。 Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數`loop`。 協程範例,每秒顯示當前日期5秒: ```python import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date()) ``` ## Running Tasks Concurrently * awaitable asyncio.gather(*aws, loop=None, return_exceptions=False) 在asw序列中同時執行[awaitable物件](https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables) 如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。 如果所有的awaitables都成功的完成,彙總清單(list)。其順序相對應於aws中的awaitables的順序。 如果`return_exceptions`為False(預設),那第一個拋出的異常會立即被傳送到在`gather()`上等待的task。 如果`return_exceptions`為True,那異常會跟成功的結果一樣的處理方法,並彙總於結果清單。 如果`gather()`被取消,所有提交的awaitables(未完成的)也會一併被取消。 如果任一來自aws序列的Task或Future被取消,那就將它視為引發[CancelledError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError),這種情況下不會取消`gather()`的呼叫。 Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數`loop`。 範例: ```python import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24 ``` Changed in version 3.7:如果gather本身被取消,那取消傳播與return_exceptions無關。 ## Shielding From Cancellation * awaitable asyncio.shield(aw, *, loop=None) 保護一個[awaitable物件](https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables)不被[取消](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.cancel)。 如果aw是一個協程,那就視為Task自動調度。 語法: ```python res = await shield(something()) ``` 等價於: ```python res = await something() ``` 除非包含它的協程被取消,那在something()中執行的Task就不會被取消。從something()的觀點來看,取消沒有發生。儘管其調用者仍然被取消,但"await"表達式依然會拋出[CancelledError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError)。 如果`something()`被其它方式取消(由自身內部),那會同時取消`shield()`。 如果希望完全忽略取消(不建議),那函數`shield()`應該結合try/except,如下: ```python try: res = await shield(something()) except CancelledError: res = None ``` Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數`loop`。 ## Timeouts * coroutine asyncio.wait_for(aw, timeout, \*, loop=None) 等待aw awaitable以超時完成。 如果aw是一個協程,那就視為Task自動調度。 `timeout`可以是`None`或是float或是int做為等待的秒數。如果`timeout`是`None`,那阻塞會一直到future完成。 避免task[取消](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.cancel),可以將task包裝在[shield()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.shield) 函數會等待,一直到future確實的取消,因此總等待時間也許會超過`timeout`。 Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數`loop`。 範例: ```python async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout! ``` Changed in version 3.7:當aw因為timeout而取消,則`wait_for`等待aw被取消。之前的版本會立即拋出[asyncio.TimeoutError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.TimeoutError)。 ## Waiting Primitives * coroutine asyncio.wait(aws, \*, loop=None, timeout=None, return_when=ALL_COMPLETED) 同時在aws上執行awaitable物件,並阻塞直到`return_when`指定的條件為止。 回傳兩組Tasks/Futures:`(done, pending)` 用法: ```python done, pending = await asyncio.wait(aws) ``` `timeout`(float or ing),如果指定,可以用來控制回傳前等待的最大秒數。 注意,這個函數並不會拋出[`asyncio.TimeoutError`](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.TimeoutError)。發生timeout未完成的Futures或Tasks僅在第二組中回傳。 `return_when`指示這函數何時應該回傳。它必須為下面項目之一: | Constant | Description | |-----------------|-----------------------------------------------------| | FIRST_COMPLETED | The function will return when any future finishes or is cancelled. | | FIRST_EXCEPTION | The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED. | | ALL_COMPLETED | The function will return when all futures finish or are cancelled. | 不像[wait_for()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.wait_for),當發生timeout的時候,`wait`並不會取消futures。 Python 3.8之後不建議使用:如果任一個awaitable在aws中是一個協程,那就視為Task自動調度。不建議將協程物件直接傳遞給`wait()`,因為它會導致混亂的行為。 Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數`loop`。 :::info **Note:**`wait()`自動的將協程視為Task調度,稍後會回傳隱式建立的Task物件`(done, pending)`。因此,下面程式碼將無法如預期般執行: ```python async def foo(): return 42 coro = foo() done, pending = await asyncio.wait({coro}) if coro in done: # This branch will never be run! ``` 修復上面片段程式碼如下: ```python async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now. ``` ::: Python 3.8之後不建議使用:不建議直接將協程物件直接傳遞給`wait()` * asyncio.as_completed(aws, \*, loop=None, timeout=None) 同時在aws集中執行[awaitable物件](https://docs.python.org/3.8/library/asyncio-task.html#asyncio-awaitables)。回傳一個Future物件的迭代器。回傳的每一個Future物件表示剩餘的awaitables集的最早結果。 如果在所有的Futures完成之前發生timeout,則拋出[asyncio.TimeoutError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.TimeoutError)。 Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數`loop`。 範例: ```python for f in as_completed(aws): earliest_result = await f # ... ``` ## Scheduling From Other Threads * asyncio.run_coroutine_threadsafe(coro, loop) 提交協程到給定的事件迴圈。線程安全。 回傳[concurrent.futures.Future](https://docs.python.org/3.8/library/concurrent.futures.html#concurrent.futures.Future)以等待其它OS線程的結果。 這個函數意思是,從不同於執行事件循環的OS線程中調用。範例: ```python # Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3 ``` 如果協程中拋出異常,那將通知回傳的Future。這也可以用來取消事件迴圈中的task: ```python try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}') ``` 參考[concurrency and multithreading](https://docs.python.org/3.8/library/asyncio-dev.html#asyncio-multithreading) 不像其它asyncio函數,這個函數要求顯式的傳遞參數`loop` New in version 3.5.1. ## Introspection * asyncio.current_task(loop=None) 回傳目前執行中的[Task](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task)實例,如果沒有Task在執行,則回傳`None` 如果`loop is None`,那就用[get_running_loop()](https://docs.python.org/3.8/library/asyncio-eventloop.html#asyncio.get_running_loop)來取得當前迴圈。 New in version 3.7. * asyncio.all_tasks(loop=None) 回傳由迴圈執行的一組未完成的Task物件。 如果`loop is None`,那就用[get_running_loop()](https://docs.python.org/3.8/library/asyncio-eventloop.html#asyncio.get_running_loop)來取得當前迴圈。 New in version 3.7. ## Task Object * class asyncio.Task(coro, \*, loop=None, name=None) 執行Python協程的[Future-like](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future)物件。非線程安全。 Tasks用於在事件迴圈中執行協程。如果協程等待Future,那Task會暫停協程的執行並等待Future完成。當Future完成,將繼續執行包裝的協程。 事件迴圈使用協同調度:事件迴圈一次執行一個Task。當Task等待協程完成的時候,事件迴圈會執行其它Task、回呼或執行IO操作。 使用高階函數[asyncio.create_task()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.create_task),或低階函數[loop.create_task()](https://docs.python.org/3.8/library/asyncio-eventloop.html#asyncio.loop.create_task)或[ensure_future()](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.ensure_future)建立Tasks。不建議手動實例化Tasks。 要取消執行中的Task,請使用[cancal()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.cancel)方法。呼叫它將導致Task向包裝的協程中拋出[CancelledError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError)異常。如果在取消過程中某個Future物件正在等待協程,那這個Future物件將被取消。 [calcelled()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.cancelled)可以用來確認Task是否已經被取消。如果包裝的協程沒有抑制[CancelledError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError)異常並確實的取消,那就回傳True。 [asyncio.Task](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task)繼承[Future](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future)所有的APIs,除了[Future.set_result()](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future.set_result)與[Future.set_exception()](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future.set_exception)。 Tasks支援[contextvars](https://docs.python.org/3.8/library/contextvars.html#module-contextvars)模組。建立Task的時候,它會複製當前的上下文,然後在複製的上下文中執行協程。 Changed in version 3.7: 增加支援[contextvars](https://docs.python.org/3.8/library/contextvars.html#module-contextvars)模組。 Changed in version 3.8: 增加參數`name`。 Python 3.8之後不建議使用,將在Python 3.10的時候移除:參數`loop`。 ### **cancel()** 請求取消Task。 這安排在事件迴圈的下一個週期將CancelledError異常拋出到包裝的協程中。 然後,協程有機會清楚,或甚至拒絕請求,透過利用`try except CancelledError finally`區塊來抑制異常。 然而,不同於[Future.cancel()](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future.cancel),[Task.cancel()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.cancel)並不保證Task一定被取消,儘管完全抑制取消並不常見,並且主動阻止。 下面範例說明協程如何攔截取消請求: ```python async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now ``` ### **cancelled()** 如果Task已經取消,則回傳True。 當使用[calcel()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.cancel)請求取消,Task將被取消,並且包裝的協程將[CancelledError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError)異常傳播到Task中。 ### **done()** 如果Task完成,則回傳True。 當包裝的協程回傳值,拋出異常、或Task被取消,Task就算完成了。 ### **result()** 回傳Task的結果。 如果Task完成,那就回傳包裝的協程的結果(或者如何協程拋出異常,則重新拋出該異常) 如果Task已經被取消,這個方法將拋出[CancelledError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError)異常。 如果Task的結果尚不可用,這個方法會拋出[InvalidStateError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.InvalidStateError)異常。 ### **exception()** 回傳Task的異常。 如果包裝的協程拋出異常,則回傳該異常。如果包裝的協程正常回傳,這個方法將回傳`None`。 如果Task已經被取消,這個方法將拋出[CancelledError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError)異常。 如果Task尚未完成,這個方法會拋出[InvalidStateError](https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.InvalidStateError)異常。 ### **add_done_callback**(callback, \*, context=None) 增加一個callback function給Task完成的時候執行。 這個方法只能在基於低階回呼的程式碼中使用。 更多細節請參考[Future.add_done_callback()](Future.add_done_callback()https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future.add_done_callback) ### **remove_done_callback**(callback) 從callback清單中取消callback。 這個方法只能在基於低階回呼的程式碼中使用。 更多細節請參考[Future.remove_done_callback()](https://docs.python.org/3.8/library/asyncio-future.html#asyncio.Future.remove_done_callback) ### **get_stack**(\*, limit=None) 回傳該Task的堆疊框的清單(list of stack frames)。 如果包裝的協程還沒有完成,那就回傳掛起它的堆疊。如果協程已經成功地完成或被取消,那就回傳空清單(empty list)。如果協程被異常終止,那就回傳回溯框清單(list of traceback frames)。 框(frames)總是依著舊到新的排序。 對懸置的協程,只會回傳一個堆疊框(stack frame)。 選項參數`limit`設置回傳的框(frame)的最大數量;預設情況下回傳所有可用的框(frame)。回傳的清單順序依回傳是堆疊或回溯而有所不同:回傳最新的堆疊,但回傳最舊的回溯。(這與回溯模組的行為匹配。) ### **print_stack**(\*, limit=None, file=None) 列印該Task的堆棧或回溯。 對於[get_stack()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task.get_stack)檢索到的frames,將生成與回溯模組類似的輸出。 ### **get_coro()** 回傳[Task](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.Task)包裝的協程。 New in version 3.8. ### **get_name()** 回傳Task的名稱。 如果沒有為Task明確的分配名稱,則預設asyncio Task實現在實例化過程中生成一個預設名稱。 New in version 3.8. ### **set_name**(value) 設置Task的名稱。 參數`value`可以是任意物件,然後將它轉為字串。 在預設的Task實現中,名稱將在Task物件的`repr()`的輸出看的見。 New in version 3.8. ### classmethod **all_tasks**(loop=None) 回傳事件迴圈的所有Tasks集合。 預設情況下,回傳當前事件迴圈的所有Tasks。如果`loop is None`,則使用函數[get_event_loop()](https://docs.python.org/3.8/library/asyncio-eventloop.html#asyncio.get_event_loop)取得當前迴圈。 版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用[asyncio.all_tasks()](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.all_tasks)。 ### classmethod **current_task**(loop=None) 回傳當前的task或None。 如果`loop is None`,則使用函數[get_event_loop()](https://docs.python.org/3.8/library/asyncio-eventloop.html#asyncio.get_event_loop)取得當前迴圈。 版本3.7之後不建議使用,將在版本3.9中移除:不要將此做為task方法調用。改用[asyncio.current_task() ](https://docs.python.org/3.8/library/asyncio-task.html#asyncio.current_task)。 ## 延伸閱讀 [Coroutines](https://docs.python.org/3.8/reference/compound_stmts.html#coroutines) [Python黑魔法--- 異步IO( asyncio) 協程_by人世間](https://www.jianshu.com/p/b5e347b3a17c) [RealPython_Async IO in Python: A Complete Walkthrough](https://realpython.com/async-io-python/#setting-up-your-environment)