asyncio coroutin (非同步IO協程)
===
MultiThreading 或 MultiProcessing 的 IO 工作取決於系統,而**協程**取決於 user 的定義。
## 協程 (coroutin)
協程可以看做是"能在中途中斷、能中途return 結果給其他協程、中途恢復、中途傳入參數的函數",和一般的函數只能在起始傳入參數,不能中斷,而且最後返回值給父函數之後就結束的概念不一樣。
## 範例
### 定義協程
利用 `await asyncio.sleep(x)` 來模擬耗時的 IO 工作 (ex: 網路請求, 讀寫檔案), 中斷並輪轉 cpu 的控制權。
`asyncio.sleep(1)` 簡單來說就是啟動一個只有一秒的倒數計時器,如果沒有加入`await`的話,那 Event Loop 只會把計時器放在那邊跑,就不會讓自己的協程暫停了。
#### 三個任務依序執行, 有任務完成後就印出
```python=
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
print('Done after {}s'.format(x))
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('TIME: ', now() - start)
```
#### 三個任務依序執行, 所有任務完成後同時印出
```python=
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
```
#### 模組化包成函式, 三個任務依序執行, 有任務完成後就印出
```python=
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
(dones, pendings) = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
```
#### 模組化包成函式, 三個任務依序執行, 所有任務完成後同時印出
(1)
```python=
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
(dones, pendings) = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
```
(2) 回傳模組化函式處理完的結果
```python=
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.wait(tasks)
start = now()
loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(main())
for task in done:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
```
輸出都是一樣的結果, 只是顯示印出的時機不同:
```
Waiting: 1
Waiting: 2
Waiting: 4
Done after 1s
Done after 2s
Done after 4s
TIME: 4.005498886108398
```
### 動態添加協程
#### EX1: 兩任務 4 sec 與 2 sec, 執行時間 4 + 2 sec
1. 建立 event loop
2. 建立 thread,在此 thread 內啟動無限循環的 event loop
```python=
import asyncio
import time
from threading import Thread
now = lambda: time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))
new_loop.call_soon_threadsafe(more_work, 4)
new_loop.call_soon_threadsafe(more_work, 2)
```
```
TIME: 0.0004038810729980469
More work 4 ⤦ 四秒後印出
Finished more work 4
More work 2 ⤦ 兩秒後印出
Finished more work 2
```
因為`time.sleep(x)` 會同步 block,因此可以看到 log 訊息是依序印出。
#### EX2: 兩任務 4 sec 與 2 sec, 執行時間 4 sec
```python=
import asyncio
import time
from threading import Thread
now = lambda: time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def more_work(x):
print('More work {}'.format(x))
await asyncio.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))
asyncio.run_coroutine_threadsafe(more_work(4), new_loop)
asyncio.run_coroutine_threadsafe(more_work(2), new_loop)
```
```
TIME: 0.0003960132598876953
More work 4 同時執行兩任務
More work 2 〃
Finished more work 2 2秒的任務做完先印出
Finished more work 4 再過2秒後, 4秒的任務完成後印出
```
與 **Ex1** 使用 `new_loop.call_soon_threadsafe()` 不同,這裏是使用 `run_coroutine_threadsafe()` 來註冊協程對象。这样就能在子线程中进行 event loop 的 concurrent 操作,同时主线程又不会被 block。一共执行的时间大概在4秒左右。
## Ref.
[Python黑魔法 --- 异步IO( asyncio) 协程](https://www.jianshu.com/p/b5e347b3a17c)
[python的asyncio模組(三):建立Event Loop和定義協程](https://ithelp.ithome.com.tw/articles/10199408)
[事件循环](https://docs.python.org/zh-tw/3.8/library/asyncio-eventloop.html)
###### tags: `實作相關`