Try   HackMD

asyncio

RogelioKG/asyncio

References

Note

🚨 CAUTION
此篇文章嘗試描述異步機制的觀點,是筆者在尚未學習 JavaScript 的 Promise 前所著。在此還是建議讀者先去學 Promise,會更好理解與入門異步機制。
🎞️ Lydia Hallie - JavaScript Visualized - Event Loop, Web APIs, (Micro)task Queue
🎞️ Lydia Hallie - JavaScript Visualized - Promise Execution
📘 NOTE : 協程
實作:在 Python 中以 generator 實作
本質:一個可以開始 (enter) / 暫停 (exit) / 任意恢復執行 (resume) 的函式
單進程單線程 (若不使用 to_thread)
📘 NOTE : 異步 v.s. 多線程
Q : asynciothreading 似乎都是處理 I/O bound task 的工具,
且在 CPython 中,它們都是並發 (concurrency)。
既然協程比線程輕量,為何我還要使用線程?
A :
異步 (asyncio) 屬於協作式多工 (cooperative multitasking),
意即控制權的轉讓決定在 coroutine 手上
(await 一個 future 或 task 的時候),因而有可能被 blocking。

多線程 (threading) 屬於搶占式多工 (preemptive multitasking),
意即控制權的轉讓決定在 OS 手上,
時間到了就會切換,不會發生 blocking。

Nouns

awaitable / awaitable object

  • 屬於類別 collection.abc.Awaitable (實作 __await__ 抽象方法)
  • 有實作 __await__ 魔術方法的類別,即為 collection.abc.Awaitable 的子類別,
    其 instance 即為 awaitable object (例如:coroutine, future, task),這是透過 __subclasshook__ 實作的,無須顯式繼承

coroutine function

  • async def 定義的函式,稱為 coroutine function

coroutine / coroutine object

  • 屬於類別 collection.abc.Coroutine (繼承 collection.abc.Awaitable)
  • 調用 coroutine function 會產生一個 coroutine object

future

  • 屬於類別 asyncio.Future (繼承 collection.abc.Awaitable)
  • 非常類似 JavaScript 的 Promise
  • 低階 API,不建議直接使用

task

  • 屬於類別 asyncio.Task (繼承 asyncio.Future)
  • task 其實是一種特化的 future,其專門管理 coroutine object 的執行與回傳結果
  • 必須將 coroutine object 包裝為 task
    • 使用 asyncio.create_task()
    • 此動作會順便把創建出來的 task 丟進 event loop 等待執行

event loop

  • 屬於類別 asyncio.AbstractEventLoop
  • 背景運行,會不斷地排程、執行 task 和 callback (同個 thread 下)
  • 一次僅會執行 1 個 task

executor

  • 負責在非 main thread 執行會造成阻塞的 task

Keywords

async def

  • 以此定義的函式,稱為 coroutine function

await

  • 只能出現在 coroutine function 中
  • 需給定 awaitable object
    • 若給定 coroutine

      coroutine function 將直接被執行,直到它 return 一個值。

    • 若給定 task (或 future)

      當前 task 會在給定 task (或 future) 掛上一個 callback,
      告訴它等它完成的時候,把我 (當前 task) 叫醒 (event loop 執行它),
      接著當前 task 暫離 event loop,event loop 將轉而執行其他 task。

    • 否則

      嘗試調用該物件的 __await__ 魔術方法

  • await 述句
    • 等待的 awaitable object 須執行完畢,當前 task 才會繼續往下執行
      • 這並不意味 await 述句的 task 必將在 await 述句處執行
      • 只要有 create_task() 或者 gather() (因為這些動作同時會將 task 同時放入 event loop),在 await 述句之前都有可能執行
    • evaluation 值為 task 執行完後的回傳值

Behind the Scences

asyncio.sleep()

  • 實作
    ​​async def sleep(delay, result=None):
    ​​    """Coroutine that completes after a given time (in seconds)."""
    ​​    if delay <= 0:
    ​​        await __sleep0()
    ​​        return result
    
    ​​    if math.isnan(delay):
    ​​        raise ValueError("Invalid delay: NaN (not a number)")
    
    ​​    loop = events.get_running_loop()
    ​​    future = loop.create_future()
    ​​    h = loop.call_later(delay,
    ​​                        futures._set_result_unless_cancelled,
    ​​                        future, result)
    ​​    try:
    ​​        return await future
    ​​    finally:
    ​​        h.cancel()
    
  • 說明
    1. asyncio.sleep() 即為一個 coroutine object,其內部等待一個 n 秒後會收到結果的 future
      • 所以當外層在 await asyncio.sleep() 時,實際上是在 await 一個 coroutine object
      • 根據 await 的結論,這個 coroutine function 會即刻執行,直到它 await future,event loop 才會轉而執行其他 task
    2. 收到結果後,asyncio.sleep() 執行完畢,因為 future 設定的結果是 None,所以 return None
      • 所以說外層若 print(await asyncio.sleep()) 就是 None
    3. 叫醒最外層那個 await asyncio.sleep()task,讓它繼續執行。

Usage

Function

function comments
asyncio.iscoroutinefunction() 是否為 coroutine function
asyncio.iscoroutine() 是否為 coroutine object
asyncio.isfuture() 是否為 future 或 task
asyncio.get_event_loop() 回傳一個 event loop 實例
asyncio.get_running_loop() 回傳當前運行中的 event loop 實例
asyncio.run() 建立 event loop,將傳入的 coroutine object 包裝成 task,放入此 event loop 等待執行
asyncio.create_task(coro) 將 coroutine object 包裝成 task,放入 event loop 等待執行
asyncio.sleep(n) 模擬一個 n 秒後會收到的 response (非阻塞)
asyncio.gather(coro1, coro2, ...) 將多個 coroutine objects 包裝成 tasks,放入 event loop 等待執行
asyncio.wait_for() create_task(),但為 task 設置時限,超出時限則 raise TimeoutError
asyncio.to_thread() 把耗時較久 (會導致阻塞 blocked) 的 task 交給 executor,並丟到其他 thread,由 executor 負責執行這個 task (event loop 留在 main thread)
asyncio.shield(task) task 的聖盾術,使得 task 免於一次 cancel

Error

exception comments
asyncio.CancelledError 此 task 已被移出 event loop
asyncio.TimeoutError 此 task 已超出時限
  • Task

    method comments
    cancel() 將 task 移出 event loop
    add_done_callback(callback) 新增 callback
    remove_done_callback(callback) 移除 callback
  • Future (低階 API,不建議使用)

    method comments
    set_result() 將 future 的 state 屬性標記為結束狀態,並設定 result 屬性的值
    result() 回傳 future 的 result 屬性的值
    cancelled() future 是否被取消
  • AbstractEventLoop (低階 API,不建議使用)

    method comments
    stop() 停止 event loop
    is_running() 如果 event loop 當前正在運行,則回傳 True
    is_closed() 如果 event loop 已關閉,則回傳 True
    close() 關閉 event loop
    run_until_complete(future) 運行 event loop 直到 future 完成
    run_forever() 運行 event loop 直到 stop() 被呼叫
    time() 根據 event loop 的內部單調時鐘,回傳當前時間 (單位:秒)
    create_task(coro) 將 coroutine 包裝成 task,放入 event loop 等待執行
    create_future() 創建一個 future
    call_soon(callback, *args) 把 callback 安排在下一次 event loop 的開頭執行。類似 JavaScript 的 setTimeout(cb, 0)
    call_later(delay, callback, *args) 在指定的秒數後,把 callback 丟進 event loop 排程。類似 JavaScript 的 setTimeout(cb, delay)
    call_at(when, callback, *args) 指定一個絕對時間點 (時間參照同 time()) 來安排 callback 的執行

Example

範例一:非阻塞任務

import asyncio
import time
import random


class BaseTask:
    def __init__(self, payload: str, task_id: int):
        self.payload = payload
        self.task_id = task_id

    async def run(self) -> tuple[str, int]:
        """樣板方法 (一個任務固定經過三個步驟)"""
        await self.step_one()
        await self.step_two()
        await self.step_three()
        return (self.payload, self.task_id)

    async def step_one(self):
        """任務 1"""
        response_time = random.randint(2, 4)
        print(f"Task {self.task_id} step 1, takes {response_time}s")
        await asyncio.sleep(response_time)
        print(f"Task {self.task_id} step 1, done")
        self.payload += '1'

    async def step_two(self):
        """任務 2"""
        response_time = random.randint(1, 3)
        print(f"Task {self.task_id} step 2, take {response_time}s")
        await asyncio.sleep(response_time)
        print(f"Task {self.task_id} step 2, done")
        self.payload += '2'

    async def step_three(self):
        """任務 3"""
        response_time = random.randint(3, 5)
        print(f"Task {self.task_id} step 3, take {response_time}s")
        await asyncio.sleep(response_time)
        print(f"Task {self.task_id} step 3, done")
        self.payload += '3'


async def main():
    tasks = [BaseTask("payload", n).run() for n in range(3)]
    result = await asyncio.gather(*tasks)
    print(result)
    # [('payload123', 0), ('payload123', 1), ('payload123', 2)]


if __name__ == "__main__":
    start = time.perf_counter()
    asyncio.run(main())
    end = time.perf_counter()
    print(f"TIME: {end - start:.2f}s")

    # Task 0 step 1, takes 3s                                                                                                                                                               
    # Task 1 step 1, takes 4s
    # Task 2 step 1, takes 3s
    # Task 0 step 1, done
    # Task 0 step 2, take 2s
    # Task 2 step 1, done
    # Task 2 step 2, take 2s
    # Task 1 step 1, done
    # Task 1 step 2, take 2s
    # Task 0 step 2, done
    # Task 0 step 3, take 5s
    # Task 2 step 2, done
    # Task 2 step 3, take 3s
    # Task 1 step 2, done
    # Task 1 step 3, take 3s
    # Task 2 step 3, done
    # Task 1 step 3, done
    # Task 0 step 3, done
    # TIME: 10.04s
    
    # 3 個任務,阻塞式等待 6 秒,原需耗費至少 18 秒,
    # 在這裡因非阻塞等待,而降低到 10 秒。

範例二:如同 Promise 的 Future

import asyncio


async def set_after(future: asyncio.Future, delay: float, value: str):
    await asyncio.sleep(delay)
    future.set_result(value)


def callback(future: asyncio.Future):
    # 第一個參數是固定會傳進來的
    print("I am callback!")


async def main():
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    future.add_done_callback(callback)
    coro = set_after(future, 3, "... world")  # 3 秒後返回結果
    loop.create_task(coro)
    print("hello ")
    print(future.done())
    print(await future)
    print(future.done())
    print(future.result())


if __name__ == "__main__":
    asyncio.run(main())
    # hello
    # False
    # I am callback!
    # ... world
    # True
    # ... world

範例三:套套聖盾術 shield

import asyncio


async def do_async_job():
    print("do_async_job!")
    await asyncio.sleep(2)
    print("protect me from cancelling!")


async def main():
    task = asyncio.create_task(do_async_job())
    shield = asyncio.shield(task)
    print("shield's type =>", type(shield))
    try:
        await asyncio.wait_for(shield, timeout=1)
    except asyncio.TimeoutError:
        print("timeout!")
        print(f"shield canceled: {shield.cancelled()}")
        print(f"task canceled: {task.cancelled()}")
        await task


if __name__ == "__main__":
    asyncio.run(main())
    # shield's type => <class '_asyncio.Future'>
    # do_async_job!
    # timeout!
    # shield canceled: True
    # task canceled: False
    # protect me from cancelling!

範例四:異步迭代器 AsyncIterator

async for item in iterable:
    ...

# async for 的本質
iterator = iterable.__aiter__()
while True:
    try:
        item = await iterator.__anext__()
        ...
    except StopAsyncIteration:
        break
import asyncio
import random


class WebSocketClient:
    def __init__(self, name, total_messages=5):
        self.name = name
        self.total_messages = total_messages
        self.received = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.received >= self.total_messages:
            raise StopAsyncIteration
        await asyncio.sleep(random.uniform(0.5, 2))  # 模擬 request 延遲
        self.received += 1
        return f"Message for {self.name}: {self.received}"

    async def listen(self):
        async for msg in self:
            print("📩", msg)


async def main():
    client1 = WebSocketClient("ClientA")
    client2 = WebSocketClient("ClientB")

    await asyncio.gather(
        client1.listen(),
        client2.listen(),
    )


if __name__ == "__main__":
    asyncio.run(main())
    # 📩 Message for ClientB: 1
    # 📩 Message for ClientA: 2
    # 📩 Message for ClientA: 3
    # 📩 Message for ClientA: 4
    # 📩 Message for ClientB: 2
    # 📩 Message for ClientA: 5
    # 📩 Message for ClientB: 3
    # 📩 Message for ClientB: 4
    # 📩 Message for ClientB: 5

範例五:異步上下文管理器 AsyncContextManager

async with resource as r:
    ...

# async with 的本質
r = await resource.__aenter__()
try:
    ...
finally:
    await resource.__aexit__(...)
import asyncio
import random


class WebSocketClient:
    def __init__(self, name, total_messages=5):
        self.name = name
        self.total_messages = total_messages
        self.received = 0
        self.connected = False

    async def __aenter__(self):
        print(f"🔌 {self.name} connecting...")
        await asyncio.sleep(0.5)
        self.connected = True
        print(f"✅ {self.name} connected")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print(f"❌ {self.name} disconnecting...")
        await asyncio.sleep(0.5)
        self.connected = False
        print(f"✅ {self.name} disconnected")

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.received >= self.total_messages:
            raise StopAsyncIteration
        await asyncio.sleep(random.uniform(0.5, 1.5))  # 模擬 request 延遲
        self.received += 1
        return self.received

    async def listen(self):
        async for msg in self:
            print(f"📩 Message for {self.name}: {msg}")


async def main():
    async with WebSocketClient("ClientA") as c1, WebSocketClient("ClientB") as c2:
        await asyncio.gather(
            c1.listen(),
            c2.listen(),
        )


if __name__ == "__main__":
    asyncio.run(main())
    # 🔌 ClientA connecting...
    # ✅ ClientA connected
    # 🔌 ClientB connecting...
    # ✅ ClientB connected
    # 📩 Message for ClientB: 1
    # 📩 Message for ClientA: 1
    # 📩 Message for ClientB: 2
    # 📩 Message for ClientB: 3
    # 📩 Message for ClientA: 2
    # 📩 Message for ClientB: 4
    # 📩 Message for ClientA: 3
    # 📩 Message for ClientB: 5
    # 📩 Message for ClientA: 4
    # 📩 Message for ClientA: 5
    # ❌ ClientB disconnecting...
    # ✅ ClientB disconnected
    # ❌ ClientA disconnecting...
    # ✅ ClientA disconnected