# threading
[](https://hackmd.io/@RogelioKG/threading)
### 參考
+ 🔗 [**louie_lu's blog: 深入 GIL**](https://blog.louie.lu/2017/05/19/%E6%B7%B1%E5%85%A5-gil-%E5%A6%82%E4%BD%95%E5%AF%AB%E5%87%BA%E5%BF%AB%E9%80%9F%E4%B8%94-thread-safe-%E7%9A%84-python-grok-the-gil-how-to-write-fast-and-thread-safe-python/)
+ 🔗 [**Maxlist**](https://www.maxlist.xyz/2020/03/15/python-threading/)
+ 🔗 [**GTW**](https://blog.gtwang.org/programming/python-threading-multithreaded-programming-tutorial/)
+ 🔗 [**Python Parallel Programmning Cookbook**](https://python-parallel-programmning-cookbook.readthedocs.io/zh-cn/latest/index.html)
+ 🔗 [**利用 Concurrency 加速你的 Python 程式執行效率**](https://hackmd.io/@YungHuiHsu/SJ5EgB5eT)
### 說明
| 🔮 <span class="important">IMPORTANT</span> |
| :------------------------------------------- |
| 🪄 使用 multi-threading 解決 I/O-bound tasks |
| 🪄 使用 multi-processing 解決 CPU-bound tasks |
### 函式
| function | description |
| ------------------ | ---------------------------------------------------------------- |
| `active_count()` | 返回當前存活的線程數量 |
| `enumerate()` | 返回當前所有存活的線程物件 list |
| `current_thread()` | 返回當前線程物件 |
| `get_ident()` | 返回當前線程的 ident |
| `main_thread()` | 返回主線程物件 |
| `settrace(func)` | 為從 threading 模組啟動的所有線程設置追蹤函式 |
| `setprofile(func)` | 為從 threading 模組啟動的所有線程設置分析函式 |
| `stack_size(size)` | 返回新線程使用的堆疊大小,或者如果提供了大小參數,則設置堆疊大小 |
| `local()` | 創建線程本地數據 |
### 類別
| class | description |
| ---------------------------- | ----------------------------------------------------------------------------------------------------------------------------- |
| `Mutex()` | (🗑️deprecated) 創建一個互斥鎖 |
| `Lock()` | 創建一個互斥鎖,用於防止多個線程同時訪問共享資源 |
| `RLock()` | 創建一個可重入互斥鎖,允許同一線程多次獲取鎖<br>(單純在 `Lock()` 的 C API 上套一層皮,其中 attribute `_block` 就是 Lock 實例) |
| `Condition(lock)` | 創建一個條件變數,用於更高級的線程同步 |
| `Semaphore(value)` | 創建一個號幟,用於管理固定數量的可用資源 |
| `BoundedSemaphore(value)` | 創建一個號幟,但防止計數器超過初始值 |
| [`Event()`][Myapollo: Event] | 創建一個事件,用於線程之間的訊號傳遞 |
| `Thread(target, args, name)` | 創建一個線程物件,`target` 指定線程執行的函式,`args` 指定傳遞給該函式的引數,<br>`name` 可指定別名 |
| `Timer(interval, func)` | 創建一個計時器,在指定的間隔後執行給定的函式 |
| `Barrier()` | 創建一個同步屏障 |
### 方法 / 屬性
+ `Thread`
> 使用方法
> 1. 使用 `target` 指定線程執行的函式
> 2. 繼承 `Thread` 然後將工作內容覆寫至 `run()` 方法
| method / attribute | description |
| ----------------------------------- | ------------------------------------------------------ |
| `start()` | 啟動線程 |
| `join()` | 阻塞當前線程,直到該線程完成才會進行後續動作 |
| `run()` | 當 `start()`後會調用此函式,其執行指定的 `target` 函式 |
| `is_alive()` | 線程是否啟動 |
| `name` | 線程別名 |
| `ident` | 取得 ID (當前 process 中 thread 的 ID) |
| `native_id` | 取得 ID (OS 給予此 thread 的 ID) |
| [`daemon`][Myapollo: Daemon Thread] | 主線程運行結束時,此線程也強制結束,預設為 `False` |
+ `Lock` / `RLock`
```py
class _RLock:
def __init__(self):
self._block = Lock()
self._owner = None
self._count = 0
```
> `Lock` 規則
> 1. 狀態 unlocked + 調用 `acquire()` : 狀態改為 locked
> 3. 狀態 unlocked + 調用 `release()` : `RuntimeError`
> 2. 狀態 locked + 調用 `acquire()` : 被阻塞直到另一線程調用 `release()` 釋放鎖<br>(若為 `RLock`,不阻塞,僅遞迴計數 +1)
> 4. 狀態 locked + 調用 `release()` : 狀態改為 unlocked<br>(若為 `RLock`,不阻塞,僅遞迴計數 -1,直到為 0 則狀態改為 unlocked)
| method / attribute | description |
| ----------------------------------- | ------------------------------------------------------------------------------------------- |
| `acquire(blocking=True,timeout=-1)` | 獲取鎖<br>`blocking`:若未能成功獲取鎖 thread 是否休眠<br>`timeout`:嘗試在指定秒數內獲取鎖 |
| `release()` | 釋放鎖 |
| `__enter__` | 直接被定義為 `acquire` 方法 (可使用 `with` 述句獲取鎖並自動釋放鎖) |
| `__exit__` | 直接調用 `release` 方法 |
+ `Condition`
```py
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
self._waiters = _deque()
```
> 內部維護一個 deque (等待佇列),當線程 wait 的時候,創建一個互斥鎖 `Lock`,\
> 然後讓線程獲取鎖,並將這個鎖推入 deque,然後再次嘗試獲取它。\
> 由於互斥鎖不可重入的特性,線程就會進入休眠 (或者反覆甦醒嘗試獲取互斥鎖)。\
> 而 `notify()` 的奇效就在於它會 `release()` 等待佇列中的互斥鎖。\
> 這就讓被互斥鎖卡住的線程,下次甦醒時能成功獲取互斥鎖並繼續執行\
> (然後線程離開 `wait()` 後互斥鎖就被丟掉了,因為它只是拿來排隊用的)。
| 📘 <span class="note">NOTE</span> : 預設為可重入互斥鎖 `RLock` |
| :------------------------------------------------------------ |
| method / attribute | description |
| ------------------------------ | --------------------------------------------------------------------------------------- |
| `acquire(blocking,timeout)` | 直接被定義為鎖的 `acquire` 方法 |
| `release()` | 直接被定義為鎖的 `release` 方法 |
| `wait(timeout)` | 釋放鎖後休眠,在條件變數的佇列中等待,直到被 `notify()` 喚醒,或超過指定的 timeout 時間 |
| `wait_for(predicate, timeout)` | 若指定條件不滿足,持續 `wait()` |
| `notify(n)` | 喚醒在條件變數的佇列中等待的前 n 個線程 |
| `notify_all()` | 喚醒在條件變數的佇列中等待的所有線程 |
| `__enter__` | 直接調用鎖的 `__enter__` 方法 |
| `__exit__` | 直接調用鎖的 `__exit__` 方法 |
+ `Semaphore`
> 獲取鎖和釋放鎖的邏輯都交給 `Condition`,`Semaphore` 單純就是套一層可使用資源數量 `value` 的皮罷了
```py
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
```
| method / attribute | description |
| --------------------------- | ------------------------------------------------------------------ |
| `acquire(blocking,timeout)` | 獲取鎖 |
| `release()` | 釋放鎖 |
| `__enter__` | 直接被定義為 `acquire` 方法 (可使用 `with` 述句獲取鎖並自動釋放鎖) |
| `__exit__` | 直接調用 `release` 方法 |
+ `Event`
> flag 若為 False,釋放鎖後休眠,在條件變數的佇列中等待,直到被 `notify()` 喚醒,或超過指定的 timeout 時間
```py
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
```
| method | description |
| -------------------- | ---------------------------------------------------------------- |
| `set()` | 將 flag 設置為 True,所有等待該事件的線程將被喚醒並繼續執行。 |
| `clear()` | 將 flag 設置為 False,後續等待該事件的線程將休眠。 |
| `wait(timeout=None)` | 當前線程休眠,直到 flag 為 True 或超過指定的 timeout 時間。 |
| `is_set()` | 返回 flag 的狀態。如果 flag 為 True,返回 True,否則返回 False。 |
+ `Timer`
```py
class Timer(Thread):
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
def cancel(self):
self.finished.set()
def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()
```
| method | description |
| ------------ | ----------------------------------------------------------------- |
| `start()` | 啟動定時器。這會使定時器在指定的 `interval` 時間後執行`function` |
| `cancel()` | 停止定時器。如果定時器尚未執行,則此方法將阻止其執行 |
| `is_alive()` | 返回定時器是否仍在計時中。如果定時器已啟動但尚未觸發,返回 `True` |
+ `Barrier`
| method | description |
| -------------------- | ------------------------------------------------------------------------------- |
| `wait(timeout=None)` | 阻塞直到所有線程到達 `Barrier`,`timeout` 設定等待時間 |
| `reset()` | 重置 `Barrier`,使其能夠重新使用,會使所有等待中的線程引發 `BrokenBarrierError` |
| `abort()` | 中止 `Barrier`,所有等待中的線程都會引發 `BrokenBarrierError` |
| `parties` | 返回初始化時設置的需要等待的線程數量 |
| `n_waiting` | 返回目前已經到達 `Barrier` 並正在等待的線程數量 |
| `broken` | 返回 `True` 如果 `Barrier` 已經被中止或重置,否則返回 `False` |
<!-- Link -->
[Myapollo: Daemon Thread]: https://myapollo.com.tw/blog/python-daemon-thread/
[Myapollo: Event]: https://myapollo.com.tw/blog/python-event-objects/