🚨 CAUTION |
---|
此篇文章嘗試描述異步機制的觀點,是筆者在尚未學習 JavaScript 的 Promise 前所著。在此還是建議讀者先去學 Promise,會更好理解與入門異步機制。 |
🎞️ Lydia Hallie - JavaScript Visualized - Event Loop, Web APIs, (Micro)task Queue |
🎞️ Lydia Hallie - JavaScript Visualized - Promise Execution |
📘 NOTE : 協程 |
---|
實作:在 Python 中以 generator 實作 |
本質:一個可以開始 (enter) / 暫停 (exit) / 任意恢復執行 (resume) 的函式 |
單進程單線程 (若不使用 to_thread ) |
📘 NOTE : 異步 v.s. 多線程 |
---|
Q : asyncio 和 threading 似乎都是處理 I/O bound task 的工具,且在 CPython 中,它們都是並發 (concurrency)。 既然協程比線程輕量,為何我還要使用線程? |
A : 異步 ( asyncio ) 屬於協作式多工 (cooperative multitasking),意即控制權的轉讓決定在 coroutine 手上 (await 一個 future 或 task 的時候),因而有可能被 blocking。 多線程 ( threading ) 屬於搶占式多工 (preemptive multitasking),意即控制權的轉讓決定在 OS 手上, 時間到了就會切換,不會發生 blocking。 |
collection.abc.Awaitable
(實作 __await__
抽象方法)__await__
魔術方法的類別,即為 collection.abc.Awaitable
的子類別,__subclasshook__
實作的,無須顯式繼承async def
定義的函式,稱為 coroutine functioncollection.abc.Coroutine
(繼承 collection.abc.Awaitable
)asyncio.Future
(繼承 collection.abc.Awaitable
)asyncio.Task
(繼承 asyncio.Future
)asyncio.create_task()
asyncio.AbstractEventLoop
async def
await
coroutine function 將直接被執行,直到它 return 一個值。
當前 task 會在給定 task (或 future) 掛上一個 callback,
告訴它等它完成的時候,把我 (當前 task) 叫醒 (event loop 執行它),
接著當前 task 暫離 event loop,event loop 將轉而執行其他 task。
嘗試調用該物件的
__await__
魔術方法
create_task()
或者 gather()
(因為這些動作同時會將 task 同時放入 event loop),在 await 述句之前都有可能執行asyncio.sleep()
async def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
if math.isnan(delay):
raise ValueError("Invalid delay: NaN (not a number)")
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
asyncio.sleep()
即為一個 coroutine object,其內部等待一個 n 秒後會收到結果的 future
await asyncio.sleep()
時,實際上是在 await 一個 coroutine objectawait future
,event loop 才會轉而執行其他 taskasyncio.sleep()
執行完畢,因為 future
設定的結果是 None
,所以 return None
print(await asyncio.sleep())
就是 None
await asyncio.sleep()
的 task,讓它繼續執行。function | comments |
---|---|
asyncio.iscoroutinefunction() |
是否為 coroutine function |
asyncio.iscoroutine() |
是否為 coroutine object |
asyncio.isfuture() |
是否為 future 或 task |
asyncio.get_event_loop() |
回傳一個 event loop 實例 |
asyncio.get_running_loop() |
回傳當前運行中的 event loop 實例 |
asyncio.run() |
建立 event loop,將傳入的 coroutine object 包裝成 task,放入此 event loop 等待執行 |
asyncio.create_task(coro) |
將 coroutine object 包裝成 task,放入 event loop 等待執行 |
asyncio.sleep(n) |
模擬一個 n 秒後會收到的 response (非阻塞) |
asyncio.gather(coro1, coro2, ...) |
將多個 coroutine objects 包裝成 tasks,放入 event loop 等待執行 |
asyncio.wait_for() |
同 create_task() ,但為 task 設置時限,超出時限則 raise TimeoutError |
asyncio.to_thread() |
把耗時較久 (會導致阻塞 blocked) 的 task 交給 executor,並丟到其他 thread,由 executor 負責執行這個 task (event loop 留在 main thread) |
asyncio.shield(task) |
task 的聖盾術,使得 task 免於一次 cancel |
exception | comments |
---|---|
asyncio.CancelledError |
此 task 已被移出 event loop |
asyncio.TimeoutError |
此 task 已超出時限 |
Task
method | comments |
---|---|
cancel() |
將 task 移出 event loop |
add_done_callback(callback) |
新增 callback |
remove_done_callback(callback) |
移除 callback |
Future
(低階 API,不建議使用)
method | comments |
---|---|
set_result() |
將 future 的 state 屬性標記為結束狀態,並設定 result 屬性的值 |
result() |
回傳 future 的 result 屬性的值 |
cancelled() |
future 是否被取消 |
AbstractEventLoop
(低階 API,不建議使用)
method | comments |
---|---|
stop() |
停止 event loop |
is_running() |
如果 event loop 當前正在運行,則回傳 True |
is_closed() |
如果 event loop 已關閉,則回傳 True |
close() |
關閉 event loop |
run_until_complete(future) |
運行 event loop 直到 future 完成 |
run_forever() |
運行 event loop 直到 stop() 被呼叫 |
time() |
根據 event loop 的內部單調時鐘,回傳當前時間 (單位:秒) |
create_task(coro) |
將 coroutine 包裝成 task,放入 event loop 等待執行 |
create_future() |
創建一個 future |
call_soon(callback, *args) |
把 callback 安排在下一次 event loop 的開頭執行。類似 JavaScript 的 setTimeout(cb, 0) |
call_later(delay, callback, *args) |
在指定的秒數後,把 callback 丟進 event loop 排程。類似 JavaScript 的 setTimeout(cb, delay) |
call_at(when, callback, *args) |
指定一個絕對時間點 (時間參照同 time() ) 來安排 callback 的執行 |
import asyncio
import time
import random
class BaseTask:
def __init__(self, payload: str, task_id: int):
self.payload = payload
self.task_id = task_id
async def run(self) -> tuple[str, int]:
"""樣板方法 (一個任務固定經過三個步驟)"""
await self.step_one()
await self.step_two()
await self.step_three()
return (self.payload, self.task_id)
async def step_one(self):
"""任務 1"""
response_time = random.randint(2, 4)
print(f"Task {self.task_id} step 1, takes {response_time}s")
await asyncio.sleep(response_time)
print(f"Task {self.task_id} step 1, done")
self.payload += '1'
async def step_two(self):
"""任務 2"""
response_time = random.randint(1, 3)
print(f"Task {self.task_id} step 2, take {response_time}s")
await asyncio.sleep(response_time)
print(f"Task {self.task_id} step 2, done")
self.payload += '2'
async def step_three(self):
"""任務 3"""
response_time = random.randint(3, 5)
print(f"Task {self.task_id} step 3, take {response_time}s")
await asyncio.sleep(response_time)
print(f"Task {self.task_id} step 3, done")
self.payload += '3'
async def main():
tasks = [BaseTask("payload", n).run() for n in range(3)]
result = await asyncio.gather(*tasks)
print(result)
# [('payload123', 0), ('payload123', 1), ('payload123', 2)]
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"TIME: {end - start:.2f}s")
# Task 0 step 1, takes 3s
# Task 1 step 1, takes 4s
# Task 2 step 1, takes 3s
# Task 0 step 1, done
# Task 0 step 2, take 2s
# Task 2 step 1, done
# Task 2 step 2, take 2s
# Task 1 step 1, done
# Task 1 step 2, take 2s
# Task 0 step 2, done
# Task 0 step 3, take 5s
# Task 2 step 2, done
# Task 2 step 3, take 3s
# Task 1 step 2, done
# Task 1 step 3, take 3s
# Task 2 step 3, done
# Task 1 step 3, done
# Task 0 step 3, done
# TIME: 10.04s
# 3 個任務,阻塞式等待 6 秒,原需耗費至少 18 秒,
# 在這裡因非阻塞等待,而降低到 10 秒。
Future
import asyncio
async def set_after(future: asyncio.Future, delay: float, value: str):
await asyncio.sleep(delay)
future.set_result(value)
def callback(future: asyncio.Future):
# 第一個參數是固定會傳進來的
print("I am callback!")
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
future.add_done_callback(callback)
coro = set_after(future, 3, "... world") # 3 秒後返回結果
loop.create_task(coro)
print("hello ")
print(future.done())
print(await future)
print(future.done())
print(future.result())
if __name__ == "__main__":
asyncio.run(main())
# hello
# False
# I am callback!
# ... world
# True
# ... world
shield
import asyncio
async def do_async_job():
print("do_async_job!")
await asyncio.sleep(2)
print("protect me from cancelling!")
async def main():
task = asyncio.create_task(do_async_job())
shield = asyncio.shield(task)
print("shield's type =>", type(shield))
try:
await asyncio.wait_for(shield, timeout=1)
except asyncio.TimeoutError:
print("timeout!")
print(f"shield canceled: {shield.cancelled()}")
print(f"task canceled: {task.cancelled()}")
await task
if __name__ == "__main__":
asyncio.run(main())
# shield's type => <class '_asyncio.Future'>
# do_async_job!
# timeout!
# shield canceled: True
# task canceled: False
# protect me from cancelling!
async for item in iterable:
...
# async for 的本質
iterator = iterable.__aiter__()
while True:
try:
item = await iterator.__anext__()
...
except StopAsyncIteration:
break
import asyncio
import random
class WebSocketClient:
def __init__(self, name, total_messages=5):
self.name = name
self.total_messages = total_messages
self.received = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.received >= self.total_messages:
raise StopAsyncIteration
await asyncio.sleep(random.uniform(0.5, 2)) # 模擬 request 延遲
self.received += 1
return f"Message for {self.name}: {self.received}"
async def listen(self):
async for msg in self:
print("📩", msg)
async def main():
client1 = WebSocketClient("ClientA")
client2 = WebSocketClient("ClientB")
await asyncio.gather(
client1.listen(),
client2.listen(),
)
if __name__ == "__main__":
asyncio.run(main())
# 📩 Message for ClientB: 1
# 📩 Message for ClientA: 2
# 📩 Message for ClientA: 3
# 📩 Message for ClientA: 4
# 📩 Message for ClientB: 2
# 📩 Message for ClientA: 5
# 📩 Message for ClientB: 3
# 📩 Message for ClientB: 4
# 📩 Message for ClientB: 5
async with resource as r:
...
# async with 的本質
r = await resource.__aenter__()
try:
...
finally:
await resource.__aexit__(...)
import asyncio
import random
class WebSocketClient:
def __init__(self, name, total_messages=5):
self.name = name
self.total_messages = total_messages
self.received = 0
self.connected = False
async def __aenter__(self):
print(f"🔌 {self.name} connecting...")
await asyncio.sleep(0.5)
self.connected = True
print(f"✅ {self.name} connected")
return self
async def __aexit__(self, exc_type, exc, tb):
print(f"❌ {self.name} disconnecting...")
await asyncio.sleep(0.5)
self.connected = False
print(f"✅ {self.name} disconnected")
def __aiter__(self):
return self
async def __anext__(self):
if self.received >= self.total_messages:
raise StopAsyncIteration
await asyncio.sleep(random.uniform(0.5, 1.5)) # 模擬 request 延遲
self.received += 1
return self.received
async def listen(self):
async for msg in self:
print(f"📩 Message for {self.name}: {msg}")
async def main():
async with WebSocketClient("ClientA") as c1, WebSocketClient("ClientB") as c2:
await asyncio.gather(
c1.listen(),
c2.listen(),
)
if __name__ == "__main__":
asyncio.run(main())
# 🔌 ClientA connecting...
# ✅ ClientA connected
# 🔌 ClientB connecting...
# ✅ ClientB connected
# 📩 Message for ClientB: 1
# 📩 Message for ClientA: 1
# 📩 Message for ClientB: 2
# 📩 Message for ClientB: 3
# 📩 Message for ClientA: 2
# 📩 Message for ClientB: 4
# 📩 Message for ClientA: 3
# 📩 Message for ClientB: 5
# 📩 Message for ClientA: 4
# 📩 Message for ClientA: 5
# ❌ ClientB disconnecting...
# ✅ ClientB disconnected
# ❌ ClientA disconnecting...
# ✅ ClientA disconnected