Try   HackMD

asyncio coroutin (非同步IO協程)

MultiThreading 或 MultiProcessing 的 IO 工作取決於系統,而協程取決於 user 的定義。

協程 (coroutin)

協程可以看做是"能在中途中斷、能中途return 結果給其他協程、中途恢復、中途傳入參數的函數",和一般的函數只能在起始傳入參數,不能中斷,而且最後返回值給父函數之後就結束的概念不一樣。

範例

定義協程

利用 await asyncio.sleep(x) 來模擬耗時的 IO 工作 (ex: 網路請求, 讀寫檔案), 中斷並輪轉 cpu 的控制權。
asyncio.sleep(1) 簡單來說就是啟動一個只有一秒的倒數計時器,如果沒有加入await的話,那 Event Loop 只會把計時器放在那邊跑,就不會讓自己的協程暫停了。

三個任務依序執行, 有任務完成後就印出

import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) print('Done after {}s'.format(x)) coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] start = now() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('TIME: ', now() - start)

三個任務依序執行, 所有任務完成後同時印出

import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] start = now() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task ret: ', task.result()) print('TIME: ', now() - start)

模組化包成函式, 三個任務依序執行, 有任務完成後就印出

import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] (dones, pendings) = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) start = now() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('TIME: ', now() - start)

模組化包成函式, 三個任務依序執行, 所有任務完成後同時印出

(1)

import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] (dones, pendings) = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) start = now() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('TIME: ', now() - start)

(2) 回傳模組化函式處理完的結果

import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.wait(tasks) start = now() loop = asyncio.get_event_loop() done, pending = loop.run_until_complete(main()) for task in done: print('Task ret: ', task.result()) print('TIME: ', now() - start)

輸出都是一樣的結果, 只是顯示印出的時機不同:

Waiting:  1
Waiting:  2
Waiting:  4
Done after 1s
Done after 2s
Done after 4s
TIME:  4.005498886108398

動態添加協程

EX1: 兩任務 4 sec 與 2 sec, 執行時間 4 + 2 sec

  1. 建立 event loop
  2. 建立 thread,在此 thread 內啟動無限循環的 event loop
import asyncio import time from threading import Thread now = lambda: time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) new_loop.call_soon_threadsafe(more_work, 4) new_loop.call_soon_threadsafe(more_work, 2)
TIME: 0.0004038810729980469
More work 4                ⤦ 四秒後印出
Finished more work 4
More work 2                ⤦ 兩秒後印出
Finished more work 2

因為time.sleep(x) 會同步 block,因此可以看到 log 訊息是依序印出。

EX2: 兩任務 4 sec 與 2 sec, 執行時間 4 sec

import asyncio import time from threading import Thread now = lambda: time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def more_work(x): print('More work {}'.format(x)) await asyncio.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) asyncio.run_coroutine_threadsafe(more_work(4), new_loop) asyncio.run_coroutine_threadsafe(more_work(2), new_loop)
TIME: 0.0003960132598876953
More work 4                同時執行兩任務
More work 2                    〃
Finished more work 2       2秒的任務做完先印出
Finished more work 4       再過2秒後, 4秒的任務完成後印出

Ex1 使用 new_loop.call_soon_threadsafe() 不同,這裏是使用 run_coroutine_threadsafe() 來註冊協程對象。这样就能在子线程中进行 event loop 的 concurrent 操作,同时主线程又不会被 block。一共执行的时间大概在4秒左右。

Ref.

Python黑魔法 - 异步IO( asyncio) 协程
python的asyncio模組(三):建立Event Loop和定義協程
事件循环

tags: 實作相關