# asyncio
[](https://hackmd.io/@RogelioKG/asyncio)
## References
+ 🔗 [**MyApollo - asyncio**](https://myapollo.com.tw/blog/begin-to-asyncio/)
+ 🔗 [**MyApollo - asyncio shield**](https://myapollo.com.tw/blog/asyncio-shield/)
+ 🔗 [**Python Document : asyncio-queue**](https://docs.python.org/zh-tw/3/library/asyncio-queue.html)
+ 🔗 [**Python Asyncio Part 1 – Basic Concepts and Patterns**](https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-1.html)
+ 🔗 [**Python Asyncio Part 2 – Awaitables, Tasks, and Futures**](https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-2.html)
+ 🔗 [**Python Asyncio Part 3 – Asynchronous Context Managers and Asynchronous Iterators**](https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-3.html)
+ 🔗 [**Python Asyncio Part 4 – Library Support**](https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-4.html)
+ 🔗 [**Python Asyncio Part 5 – Mixing Synchronous and Asynchronous Code**](https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-5.html)
+ 🔗 [**Maxlist - Async IO Design Patterns**](https://www.maxlist.xyz/2020/04/03/async-io-design-patterns-python/)
+ 🔗 [**Aureliano's Macondo**](https://aureliano90.github.io/blog/2022/04/28/A_Brief_Introduction_of_Python_Coroutines_and__await__Attribute.html)
+ 🎞️ [**ArjanCodes**](https://youtu.be/GpqAQxH1Afc?si=iIvKy9yEoIQ_shjt)
+ 🎞️ [**GaoGaoTianTian - asyncio的理解与入门,搞不明白协程?看这个视频就够了**](https://youtu.be/brYsDi-JajI)
+ 🎞️ [**GaoGaoTianTian - await机制详解。再来个硬核内容,把并行和依赖背后的原理全给你讲明白**](https://youtu.be/K0BjgYZbgfE)
## Note
| 🚨 <span class="caution">CAUTION</span> |
| :----------------------------------------------------------------------------------------------------------------------------------------------------- |
| 此篇文章嘗試描述異步機制的觀點,是<mark>筆者在尚未學習 JavaScript 的 Promise 前所著</mark>。在此還是建議讀者先去學 Promise,會更好理解與入門異步機制。 |
| 🎞️ [**Lydia Hallie - JavaScript Visualized - Event Loop, Web APIs, (Micro)task Queue**](https://youtu.be/eiC58R16hb8) |
| 🎞️ [**Lydia Hallie - JavaScript Visualized - Promise Execution**](https://youtu.be/Xs1EMmBLpn4) |
| 📘 <span class="note">NOTE</span> : 協程 |
| :---------------------------------------------------------------------- |
| 實作:在 Python 中以 generator 實作 |
| 本質:一個可以開始 (enter) / 暫停 (exit) / 任意恢復執行 (resume) 的函式 |
| 單進程單線程 (若不使用 `to_thread`) |
| 📘 <span class="note">NOTE</span> : 異步 v.s. 多線程 |
| :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| Q : `asyncio` 和 `threading` 似乎都是處理 I/O bound task 的工具,<br />且在 CPython 中,它們都是並發 (concurrency)。<br />既然協程比線程輕量,為何我還要使用線程?<br /> |
| A :<br /><mark>異步</mark> (`asyncio`) 屬於<mark>協作式多工</mark> (cooperative multitasking),<br />意即控制權的轉讓決定在 coroutine 手上<br />(await 一個 future 或 task 的時候),因而有可能被 blocking。<br /><br /><mark>多線程</mark> (`threading`) 屬於<mark>搶占式多工</mark> (preemptive multitasking),<br />意即控制權的轉讓決定在 OS 手上,<br />時間到了就會切換,不會發生 blocking。 |
## Nouns
### awaitable / awaitable object
+ 屬於類別 `collection.abc.Awaitable` (實作 `__await__` 抽象方法)
+ 有實作 `__await__` 魔術方法的類別,即為 `collection.abc.Awaitable` 的子類別,\
其 instance 即為 awaitable object (例如:coroutine, future, task),這是透過 `__subclasshook__` 實作的,無須顯式繼承
### coroutine function
+ 以 `async def` 定義的函式,稱為 coroutine function
### coroutine / coroutine object
+ 屬於類別 `collection.abc.Coroutine` (繼承 `collection.abc.Awaitable`)
+ 調用 coroutine function 會產生一個 coroutine object
### future
+ 屬於類別 `asyncio.Future` (繼承 `collection.abc.Awaitable`)
+ 非常類似 JavaScript 的 Promise
+ 低階 API,<mark>不建議直接使用</mark>
### task
+ 屬於類別 `asyncio.Task` (繼承 `asyncio.Future`)
+ task 其實是一種特化的 future,其[專門管理 coroutine object 的執行與回傳結果](https://stackoverflow.com/a/64858226)
+ 必須將 coroutine object 包裝為 task
+ 使用 `asyncio.create_task()`
+ 此動作會順便把創建出來的 task 丟進 event loop 等待執行
### event loop
+ 屬於類別 `asyncio.AbstractEventLoop`
+ 背景運行,會不斷地排程、執行 task 和 callback (同個 thread 下)
+ 一次僅會執行 1 個 task
### executor
+ 負責在非 main thread 執行會造成阻塞的 task
## Keywords
### `async def`
+ 以此定義的函式,稱為 coroutine function
### `await`
+ 只能出現在 coroutine function 中
+ 需給定 awaitable object
+ 若給定 **coroutine**
> coroutine function 將直接被執行,直到它 return 一個值。
+ 若給定 **task** (或 **future**)
> 當前 task 會在給定 **task** (或 **future**) 掛上一個 callback,\
> 告訴它等它完成的時候,把我 (當前 task) 叫醒 (event loop 執行它),\
> 接著當前 task 暫離 event loop,event loop 將轉而執行其他 task。
+ 否則
> 嘗試調用該物件的 `__await__` 魔術方法
+ await 述句
+ 等待的 awaitable object 須執行完畢,當前 task 才會繼續往下執行
+ 這並不意味 await 述句的 task 必將在 await 述句處執行
+ 只要有 `create_task()` 或者 `gather()` (因為這些動作同時會將 task 同時放入 event loop),在 await 述句之前都有可能執行
+ evaluation 值為 task 執行完後的回傳值
## Behind the Scences
### `asyncio.sleep()`
+ 實作
```py
async def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
if math.isnan(delay):
raise ValueError("Invalid delay: NaN (not a number)")
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
```
+ 說明
1. `asyncio.sleep()` 即為一個 **coroutine object**,其內部等待一個 n 秒後會收到結果的 **future**
+ 所以當外層在 `await asyncio.sleep()` 時,實際上是在 await 一個 **coroutine object**
+ 根據 [await](#await) 的結論,這個 coroutine function 會即刻執行,直到它 `await future`,event loop 才會轉而執行其他 task
2. 收到結果後,`asyncio.sleep()` 執行完畢,因為 `future` 設定的結果是 `None`,所以 `return None`
+ 所以說外層若 `print(await asyncio.sleep())` 就是 `None`
3. 叫醒最外層那個 `await asyncio.sleep()` 的 **task**,讓它繼續執行。
## Usage
### Function
| function | comments |
| ----------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------- |
| `asyncio.iscoroutinefunction()` | 是否為 coroutine function |
| `asyncio.iscoroutine()` | 是否為 coroutine object |
| `asyncio.isfuture()` | 是否為 future 或 task |
| `asyncio.get_event_loop()` | 回傳一個 event loop 實例 |
| `asyncio.get_running_loop()` | 回傳當前運行中的 event loop 實例 |
| `asyncio.run()` | 建立 event loop,將傳入的 coroutine object 包裝成 task,放入此 event loop 等待執行 |
| `asyncio.create_task(coro)` | 將 coroutine object 包裝成 task,放入 event loop 等待執行 |
| `asyncio.sleep(n)` | 模擬一個 n 秒後會收到的 response (非阻塞) |
| `asyncio.gather(coro1, coro2, ...)` | 將多個 coroutine objects 包裝成 tasks,放入 event loop 等待執行 |
| `asyncio.wait_for()` | 同 `create_task()`,但為 task 設置時限,超出時限則 raise `TimeoutError` |
| `asyncio.to_thread()` | 把耗時較久 (會導致阻塞 blocked) 的 task 交給 executor,並丟到其他 thread,由 executor 負責執行這個 task (event loop 留在 main thread) |
| `asyncio.shield(task)` | task 的聖盾術,使得 task 免於一次 cancel |
### Error
| exception | comments |
| ------------------------ | --------------------------- |
| `asyncio.CancelledError` | 此 task 已被移出 event loop |
| `asyncio.TimeoutError` | 此 task 已超出時限 |
+ `Task`
| method | comments |
| -------------------------------- | ----------------------- |
| `cancel()` | 將 task 移出 event loop |
| `add_done_callback(callback)` | 新增 callback |
| `remove_done_callback(callback)` | 移除 callback |
+ `Future` (低階 API,<mark>不建議使用</mark>)
| method | comments |
| -------------- | ------------------------------------------------------------- |
| `set_result()` | 將 future 的 state 屬性標記為結束狀態,並設定 result 屬性的值 |
| `result()` | 回傳 future 的 result 屬性的值 |
| `cancelled()` | future 是否被取消 |
+ `AbstractEventLoop` (低階 API,<mark>不建議使用</mark>)
| method | comments |
| ------------------------------------ | -------------------------------------------------------------------------------------------- |
| `stop()` | 停止 event loop |
| `is_running()` | 如果 event loop 當前正在運行,則回傳 True |
| `is_closed()` | 如果 event loop 已關閉,則回傳 True |
| `close()` | 關閉 event loop |
| `run_until_complete(future)` | 運行 event loop 直到 future 完成 |
| `run_forever()` | 運行 event loop 直到 `stop()` 被呼叫 |
| `time()` | 根據 event loop 的內部單調時鐘,回傳當前時間 (單位:秒) |
| `create_task(coro)` | 將 coroutine 包裝成 task,放入 event loop 等待執行 |
| `create_future()` | 創建一個 future |
| `call_soon(callback, *args)` | 把 callback 安排在下一次 event loop 的開頭執行。類似 JavaScript 的 `setTimeout(cb, 0)` |
| `call_later(delay, callback, *args)` | 在指定的秒數後,把 callback 丟進 event loop 排程。類似 JavaScript 的 `setTimeout(cb, delay)` |
| `call_at(when, callback, *args)` | 指定一個絕對時間點 (時間參照同 `time()`) 來安排 callback 的執行 |
## Example
### 範例一:非阻塞任務
```py
import asyncio
import time
import random
class BaseTask:
def __init__(self, payload: str, task_id: int):
self.payload = payload
self.task_id = task_id
async def run(self) -> tuple[str, int]:
"""樣板方法 (一個任務固定經過三個步驟)"""
await self.step_one()
await self.step_two()
await self.step_three()
return (self.payload, self.task_id)
async def step_one(self):
"""任務 1"""
response_time = random.randint(2, 4)
print(f"Task {self.task_id} step 1, takes {response_time}s")
await asyncio.sleep(response_time)
print(f"Task {self.task_id} step 1, done")
self.payload += '1'
async def step_two(self):
"""任務 2"""
response_time = random.randint(1, 3)
print(f"Task {self.task_id} step 2, take {response_time}s")
await asyncio.sleep(response_time)
print(f"Task {self.task_id} step 2, done")
self.payload += '2'
async def step_three(self):
"""任務 3"""
response_time = random.randint(3, 5)
print(f"Task {self.task_id} step 3, take {response_time}s")
await asyncio.sleep(response_time)
print(f"Task {self.task_id} step 3, done")
self.payload += '3'
async def main():
tasks = [BaseTask("payload", n).run() for n in range(3)]
result = await asyncio.gather(*tasks)
print(result)
# [('payload123', 0), ('payload123', 1), ('payload123', 2)]
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"TIME: {end - start:.2f}s")
# Task 0 step 1, takes 3s
# Task 1 step 1, takes 4s
# Task 2 step 1, takes 3s
# Task 0 step 1, done
# Task 0 step 2, take 2s
# Task 2 step 1, done
# Task 2 step 2, take 2s
# Task 1 step 1, done
# Task 1 step 2, take 2s
# Task 0 step 2, done
# Task 0 step 3, take 5s
# Task 2 step 2, done
# Task 2 step 3, take 3s
# Task 1 step 2, done
# Task 1 step 3, take 3s
# Task 2 step 3, done
# Task 1 step 3, done
# Task 0 step 3, done
# TIME: 10.04s
# 3 個任務,阻塞式等待 6 秒,原需耗費至少 18 秒,
# 在這裡因非阻塞等待,而降低到 10 秒。
```
### 範例二:如同 Promise 的 `Future`
```py
import asyncio
async def set_after(future: asyncio.Future, delay: float, value: str):
await asyncio.sleep(delay)
future.set_result(value)
def callback(future: asyncio.Future):
# 第一個參數是固定會傳進來的
print("I am callback!")
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
future.add_done_callback(callback)
coro = set_after(future, 3, "... world") # 3 秒後返回結果
loop.create_task(coro)
print("hello ")
print(future.done())
print(await future)
print(future.done())
print(future.result())
if __name__ == "__main__":
asyncio.run(main())
# hello
# False
# I am callback!
# ... world
# True
# ... world
```
### 範例三:套套聖盾術 `shield`
```py
import asyncio
async def do_async_job():
print("do_async_job!")
await asyncio.sleep(2)
print("protect me from cancelling!")
async def main():
task = asyncio.create_task(do_async_job())
shield = asyncio.shield(task)
print("shield's type =>", type(shield))
try:
await asyncio.wait_for(shield, timeout=1)
except asyncio.TimeoutError:
print("timeout!")
print(f"shield canceled: {shield.cancelled()}")
print(f"task canceled: {task.cancelled()}")
await task
if __name__ == "__main__":
asyncio.run(main())
# shield's type => <class '_asyncio.Future'>
# do_async_job!
# timeout!
# shield canceled: True
# task canceled: False
# protect me from cancelling!
```
### 範例四:異步迭代器 AsyncIterator
```py
async for item in iterable:
...
# async for 的本質
iterator = iterable.__aiter__()
while True:
try:
item = await iterator.__anext__()
...
except StopAsyncIteration:
break
```
```py
import asyncio
import random
class WebSocketClient:
def __init__(self, name, total_messages=5):
self.name = name
self.total_messages = total_messages
self.received = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.received >= self.total_messages:
raise StopAsyncIteration
await asyncio.sleep(random.uniform(0.5, 2)) # 模擬 request 延遲
self.received += 1
return f"Message for {self.name}: {self.received}"
async def listen(self):
async for msg in self:
print("📩", msg)
async def main():
client1 = WebSocketClient("ClientA")
client2 = WebSocketClient("ClientB")
await asyncio.gather(
client1.listen(),
client2.listen(),
)
if __name__ == "__main__":
asyncio.run(main())
# 📩 Message for ClientB: 1
# 📩 Message for ClientA: 2
# 📩 Message for ClientA: 3
# 📩 Message for ClientA: 4
# 📩 Message for ClientB: 2
# 📩 Message for ClientA: 5
# 📩 Message for ClientB: 3
# 📩 Message for ClientB: 4
# 📩 Message for ClientB: 5
```
### 範例五:異步上下文管理器 AsyncContextManager
```py
async with resource as r:
...
# async with 的本質
r = await resource.__aenter__()
try:
...
finally:
await resource.__aexit__(...)
```
```py
import asyncio
import random
class WebSocketClient:
def __init__(self, name, total_messages=5):
self.name = name
self.total_messages = total_messages
self.received = 0
self.connected = False
async def __aenter__(self):
print(f"🔌 {self.name} connecting...")
await asyncio.sleep(0.5)
self.connected = True
print(f"✅ {self.name} connected")
return self
async def __aexit__(self, exc_type, exc, tb):
print(f"❌ {self.name} disconnecting...")
await asyncio.sleep(0.5)
self.connected = False
print(f"✅ {self.name} disconnected")
def __aiter__(self):
return self
async def __anext__(self):
if self.received >= self.total_messages:
raise StopAsyncIteration
await asyncio.sleep(random.uniform(0.5, 1.5)) # 模擬 request 延遲
self.received += 1
return self.received
async def listen(self):
async for msg in self:
print(f"📩 Message for {self.name}: {msg}")
async def main():
async with WebSocketClient("ClientA") as c1, WebSocketClient("ClientB") as c2:
await asyncio.gather(
c1.listen(),
c2.listen(),
)
if __name__ == "__main__":
asyncio.run(main())
# 🔌 ClientA connecting...
# ✅ ClientA connected
# 🔌 ClientB connecting...
# ✅ ClientB connected
# 📩 Message for ClientB: 1
# 📩 Message for ClientA: 1
# 📩 Message for ClientB: 2
# 📩 Message for ClientB: 3
# 📩 Message for ClientA: 2
# 📩 Message for ClientB: 4
# 📩 Message for ClientA: 3
# 📩 Message for ClientB: 5
# 📩 Message for ClientA: 4
# 📩 Message for ClientA: 5
# ❌ ClientB disconnecting...
# ✅ ClientB disconnected
# ❌ ClientA disconnecting...
# ✅ ClientA disconnected
```