MultiThreading 或 MultiProcessing 的 IO 工作取決於系統,而協程取決於 user 的定義。
協程可以看做是"能在中途中斷、能中途return 結果給其他協程、中途恢復、中途傳入參數的函數",和一般的函數只能在起始傳入參數,不能中斷,而且最後返回值給父函數之後就結束的概念不一樣。
利用 await asyncio.sleep(x)
來模擬耗時的 IO 工作 (ex: 網路請求, 讀寫檔案), 中斷並輪轉 cpu 的控制權。
asyncio.sleep(1)
簡單來說就是啟動一個只有一秒的倒數計時器,如果沒有加入await
的話,那 Event Loop 只會把計時器放在那邊跑,就不會讓自己的協程暫停了。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
print('Done after {}s'.format(x))
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('TIME: ', now() - start)
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
(dones, pendings) = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
(1)
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
(dones, pendings) = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('TIME: ', now() - start)
(2) 回傳模組化函式處理完的結果
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.wait(tasks)
start = now()
loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(main())
for task in done:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
輸出都是一樣的結果, 只是顯示印出的時機不同:
Waiting: 1
Waiting: 2
Waiting: 4
Done after 1s
Done after 2s
Done after 4s
TIME: 4.005498886108398
import asyncio
import time
from threading import Thread
now = lambda: time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def more_work(x):
print('More work {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))
new_loop.call_soon_threadsafe(more_work, 4)
new_loop.call_soon_threadsafe(more_work, 2)
TIME: 0.0004038810729980469
More work 4 ⤦ 四秒後印出
Finished more work 4
More work 2 ⤦ 兩秒後印出
Finished more work 2
因為time.sleep(x)
會同步 block,因此可以看到 log 訊息是依序印出。
import asyncio
import time
from threading import Thread
now = lambda: time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def more_work(x):
print('More work {}'.format(x))
await asyncio.sleep(x)
print('Finished more work {}'.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))
asyncio.run_coroutine_threadsafe(more_work(4), new_loop)
asyncio.run_coroutine_threadsafe(more_work(2), new_loop)
TIME: 0.0003960132598876953
More work 4 同時執行兩任務
More work 2 〃
Finished more work 2 2秒的任務做完先印出
Finished more work 4 再過2秒後, 4秒的任務完成後印出
與 Ex1 使用 new_loop.call_soon_threadsafe()
不同,這裏是使用 run_coroutine_threadsafe()
來註冊協程對象。这样就能在子线程中进行 event loop 的 concurrent 操作,同时主线程又不会被 block。一共执行的时间大概在4秒左右。
Python黑魔法 –- 异步IO( asyncio) 协程
python的asyncio模組(三):建立Event Loop和定義協程
事件循环
實作相關