Try   HackMD

Crust of Rust : Atomics and Memory Ordering

直播錄影

  • 主機資訊
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx ~/CrustOfRust> neofetch --stdout
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx 
    ​​​​----------------------------------------------- 
    ​​​​OS: Ubuntu 22.04.3 LTS x86_64 
    ​​​​Host: HP Pavilion Plus Laptop 14-eh0xxx 
    ​​​​Kernel: 6.2.0-37-generic 
    ​​​​Uptime: 22 mins 
    ​​​​Packages: 2367 (dpkg), 11 (snap) 
    ​​​​Shell: bash 5.1.16 
    ​​​​Resolution: 2880x1800 
    ​​​​DE: GNOME 42.9 
    ​​​​WM: Mutter 
    ​​​​WM Theme: Adwaita 
    ​​​​Theme: Yaru-dark [GTK2/3] 
    ​​​​Icons: Yaru [GTK2/3] 
    ​​​​Terminal: gnome-terminal 
    ​​​​CPU: 12th Gen Intel i5-12500H (16) @ 4.500GHz 
    ​​​​GPU: Intel Alder Lake-P 
    ​​​​Memory: 1996MiB / 15695MiB 
    
  • Rust 編譯器版本 :
    ​​​​wilson@wilson-HP-Pavilion-Plus-Laptop-14-eh0xxx ~/CrustOfRust> rustc --version
    ​​​​rustc 1.70.0 (90c541806 2023-05-31) (built from a source tarball)
    

Introduction

0:00:00

In this episode of Crust of Rust, we go over Rust's atomic types, including the mysterious Ordering enum. In particular, we explore the std::sync::atomic module, and look at how its components can be used to implement concurrency primitives like mutexes. We also investigate some of the gotchas and sometimes counter-intuitive behaviors of the atomic primitives with different memory orderings, as well as strategies for testing for and debugging errors in concurrent code.

Q : Are we going to also talk about the asm level details or just the rust abstactions?
A : 一開始我們會稍微討論這個問題,儘管討論內容是與 Rust 相關的,但只要是遵循 LLVM, C++ 或是 C11 memory model 的程式語言都可以套用。

本次直播會專注於 Rust atomic 型別以及 Rust 中觀察到的 memory ordering,雖然都是圍繞在 Rust 上討論的,但我們可以學習到底層的東西,有助於其他程式語言的開發。

What are atomics?

0:02:03

Module std::sync::atomic

Atomic 的使用兩個原因,主要的原因是確保 thread safety,第二個原因是限制編譯器應該產生什麼程式碼,不要讓編譯器的最佳化破壞了你對共享記憶體使用的規則。

如果你有共享記憶體的值,你需要關於存取該值的額外訊息,讓CPU 知道不同的執行緒何時應該看到其他執行緒所做的操作,它們是如何同步的。當一個執行緒寫入某個值,而另一個執行緒讀取該值時,我們如何確定讀取的執行緒將讀取到哪些值?它是否總是讀取到最新的值?"最新"的意義是什麼?此外,在程式中的其他讀取和寫入操作在這兩個不同的執行緒中,哪些對於另一個執行緒是可見的?

如果你不使用 Atomic 的話,你將沒辦法保證並行程式的執行緒一定可以讀到另一個執行緒的寫入值。

The Memory Model

0:05:26

直播談到“規格”或“編譯器的規則”時,實際上指的是程式語言的 memory model。而 Rust 參考手冊的概念是,它應該完全規定語言,以便如果其他人想來實作 Rust 編譯器,他們會知道確切要實作和這些事物的行為。

Rust memory model 還沒有定義。Rust 依賴於 LLVM,一般來說 Rust 遵循 C11 memory model。本次直播在 memory model 的部分會對應到 C++ 的 memory odering 文件,因為這個文件很好地說明不同 memory model 的含義,並且有一些很好的錯誤範例。

:pencil2: Rust Memory model : Rust does not yet have a defined memory model. Various academics and industry professionals are working on various proposals, but for now, this is an under-defined place in the language.

AtomicUsize

0:07:33

Struct std::sync::atomic::AtomicUsize

AtomicUsize 只能透過它所提供的方法來去存取變數。usize 與 AtomicUsize 所佔用的記憶體大小相同,唯一真正的區別是存取該值時,編譯器所產生的指令。

以下為 AtomicUsize 的方法簽章 :

pub const fn new(v: usize) -> AtomicUsize pub fn load(&self, order: Ordering) -> usize // AtomicUsize 與 usize 最大的區別是, // AtomicUsize 可以使用 self 的`共享`參考,即可修改它的值, // usize 則是使用 exclusive 參考,才可以修改它的值。 // 原因是編譯器會針對 AtomicUsize 產生一些指令確保 thread safe。 pub fn store(&self, val: usize, order: Ordering) pub fn swap(&self, val: usize, order: Ordering) -> usize // load, store, swap 都需要傳入 Ordering, // 我們將會大量討論 Ordering。 pub fn compare_and_swap( &self, current: usize, new: usize, order: Ordering ) -> usize pub fn compare_exchange( &self, current: usize, new: usize, success: Ordering, failure: Ordering ) -> Result<usize, usize> // 我們將會討論 compare_exchange 與 compare_exchange_weak 的區別。 pub fn compare_exchange_weak( &self, current: usize, new: usize, success: Ordering, failure: Ordering ) -> Result<usize, usize> pub fn fetch_add(&self, val: usize, order: Ordering) -> usize pub fn fetch_sub(&self, val: usize, order: Ordering) -> usize ... // Line 11 - Line 33 函式主要功能 : // Atomic operation : 避免執行緒在更新值的過程中,其他執行緒可以操作該值。

AtomicUsize (與其它 Atomic 型別) 本質上並非共享。它們是放在 stack 上的值,如果你想跨執行緒共享它們,不能只是創建一個單獨的 AtomicUsize 並將其提供給其他執行緒。儘管可以向這些執行緒提供共享參考,但對於 stack 上的某物的共享參考不會是 static,所以通常會讓你陷入困境。因此,通常在獲得這些 Atomic 類型之後,你會使用像 Box 或更常見的 Arc 之類的東西將其放在 heap 上,這將允許你共享指向該共享 atomic 值的參考(指標),然後你可以更新它。

Ordering 的列舉成員如下,我們將會討論這些成員 :

#[non_exhaustive] pub enum Ordering { Relaxed, Release, Acquire, AcqRel, SeqCst, }

Ordering 的作用是告訴編譯器對於這個特定的記憶體存取,相對於其他執行緒中可能同時發生的事情,你期望哪一組保證。

Questions so far

0:12:23

Q : don't u64s on x86 have atomic access? Or am I confusing it with something else?
A : 在某些平台上,non-atomic 類型有額外的保證。例如,在 Intel x86 的平台上,如果你有 64 位元的值,對它的任何存取都是 automatically atomic,理論上,如果你在原始組合語言中並且知道你正在使用的是 Intel x64,你可以在不使用 AtomicUsize 的情況下進行這樣的操作。但實際上,在標準函式庫中,我們想要做的是揭露一個總是有效的通用介面

Q : why is it non exhaustive?
A : 這基本上是說不允許任何程式碼假設這些成員一定是這樣的排列順序。因此,如果你在 Ordering 進行匹配操作,你總是需要包含一個 _ branch (_ => "abc"),或者 else branch (else ...),因為標準庫希望能夠在以後添加額外的成員。例如像是 Consume 成員,這是 C++ 有的一個東西,但 Rust 目前還沒有。如果我們想要有添加後續內容的能力,它需要被標記為 #[non_exhaustive]

Q : Is the ordering enum related to the diferent memory models of the architectures (x86 or arm)?
A : 不同的 Ordering 並不是與記憶體架構有關,它們與操作提供的保證相關。不同的架構如何實現這些保證將因架構而異,我們將在稍後看到。

Q : Whats the different between atomic and mutex?
A : Atomic 沒有涉及到 locking 機制。如果是 Atomic 的話,多個執行緒可以以某種 well-defined 的方式同時對這個值進行操作;如果是 Mutex 的話,一次只有一個執行緒可以存取該值,所有其他執行緒都必須等待。

Q : do those atomic operations then not block other threads potentially?
A : 不是。所有 atomic operation 都是 lock-free,但它們不一定要是 wait-free。因為在沒有 fetch_and_add 的某些架構上,它們是透過 compare_and_swap 來實作的,所以你可能必須等待其他執行緒,所以某些架構就不是 wait-free,而只是 lock-free。

Atomic 操作不僅僅是每個 CPU,而是每個 CPU 架構。它們不僅僅修改指令,還改變了編譯器的語義 (我們稍後會看到)。Atomic 操作限制了 CPU 可以對特定記憶體存取進行的操作,以及編譯器對其的處理。

Q : store, swap & friends all take &self, not &mut self. I guess that means it rely on UnsafeCell at some point?
A : 是的,參照原始碼
UnsafeCell 是在基本層級上透過共用參考獲得 mutable 存取的唯一方法。

Q : You said atomics are generally shared via Arc rather than Box, but the only reason for that is that Arc is Send/Sync while Box is not, so it's simply more convenient, right?
A : 不完全對。如果你將一個值放入 Box 並將其放在 heap 上,那麼現在你就擁有了這個值。但如果你啟動了兩個執行緒,這兩個執行緒通常都需要你傳遞具有 static 生命週期的 closure。如果你將對 Box 的參考傳遞給兩個執行緒,那麼現在這個對 Box 的參考就不具有 static 生命週期,它與 Box 的 heap 上的生命週期相關聯。使用 Arc,你可以 clone Arc 並給每個執行緒一個獨立擁有的 static 生命週期的 Arc,因此可以使用在 AtomicUsize 上。這就是為什麼通常使用 Arc 而不是 Box 的原因。當然,你不一定非得這樣做。例如,你也可使用 Box::leak() 將一個值 leak 到 heap 上。因此,Box 不會呼叫解構函式,它將給你一個 static 生命週期的參考,然後你可以將它傳遞給兩個執行緒。

Implementing a (bad) Mutex

0:20:20

開始建置 Rust 專案 :

$ cargo new --bin atomics
$ cd atomics
$ vim src/main.rs

先實作 Mutex 的原型 :

use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, // 用來標記是否有人持有 lock v: UnsafeCell<T>, // 指向內部型別 } impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } // with_lock 會接收一個 closure, // 即一個在取得 lock 之後將被呼叫的函式。 pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { f() } } fn main() {}

:bulb: 本次直播不實作 guard 機制。

我們將實作的 Mutex 會用到 Spinlock,因為 Spinlock 版的 Mutex 實作很適合拿來練習。

:warning: 不要使用 Spinlock,你幾乎永遠不會想使用 Spinlock,你幾乎永遠不會想要實作自己的 Mutex 並用在你的程式。

參考文章 : Spinlocks Considered Harmful

開始實作 with_lock 函式 :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self.locked.load() != UNLOCKED {} self.locked.store(LOCKED); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED); ret } }

with_lock 函式有一些問題 (Tag),第一個問題是它無法被編譯,因為 AtomicBool::load() 以及 AtomicBool::store() 都需要再傳入一個 Ordering 參數 :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { - while self.locked.load() != UNLOCKED {} + while self.locked.load(Ordering::Relaxed) != UNLOCKED {} - self.locked.store(LOCKED); + self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); - self.locked.store(UNLOCKED); + self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self.locked.load(Ordering::Relaxed) != UNLOCKED {} self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } fn main() {}

Our Mutex works!

0:27:39

來測試我們的 Mutex :

... use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(1))); let mut handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); }
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self.locked.load(Ordering::Relaxed) != UNLOCKED {} self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..10) .map(|_| { spawn(move || { for _ in 0..100 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); // 如果不使用 collect,這仍然是個迭代器沒錯, // 但是你將在生成每個執行緒的時候就 join 它, // 這樣你一次只能跑一個執行緒。 for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 10 * 100) }

編譯會出現以下錯誤 :

$ cargo run --release error[E0277]: `UnsafeCell<i32>` cannot be shared between threads safely --> src\main.rs:39:15 | 39 | spawn(move || { | _________-----_^ | | | | | required by a bound introduced by this call 40 | | for _ in 0..100 { 41 | | l.with_lock(|v| { 42 | | *v += 1; 43 | | }); 44 | | } 45 | | }) | |_________^ `UnsafeCell<i32>` cannot be shared between threads safely

但我們現在在使用 UnsafeCell 的時候有用 Mutex 保護,所以現在要讓 UnsafeCell 可以跨執行緒執行 :

unsafe impl<T> Sync for Mutex<T> where T: Send {} // T 有 Send bound 的原因是, // lock 可以從多個不同的執行緒獲取, // 並且每個執行緒都可以獲取該值。 // T 沒有 Sync bound 的原因是, // 我們從不同的執行緒同時並行地存取內部值。
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self.locked.load(Ordering::Relaxed) != UNLOCKED {} self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..10) .map(|_| { spawn(move || { for _ in 0..100 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 10 * 100) }

成功執行 :

$ cargo run --release Compiling atomics v0.1.0 (/home/wilson/CrustOfRust/atomics) Finished release [optimized] target(s) in 0.30s Running `target/release/atomics`

Pesky thread interleavings

0:33:04

with_lock 函式有一些問題 (Tag),第二個問題是 :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self.locked.load() != UNLOCKED {} // 因為多個執行緒可能都通過了 while 迴圈, // 因為此時還沒有執行緒來得及將 locked 設為 LOCKED, // 最終導致 critial section 有多個執行緒進入。 // maybe another thread runs here self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }

雖然我們剛剛驗證的結果是正確的,但也是基於在電腦非常快的速度上才讓我們的 Mutex 看起來是正確的,為了示範我們的 Mutex 其實並非完全正確,我們主動讓執行緒在通過 while 迴圈之後先讓出 CPU :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self.locked.load() != UNLOCKED {} // maybe another thread runs here + std::thread::yield_now(); self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self.locked.load(Ordering::Relaxed) != UNLOCKED {} // maybe another thread runs here std::thread::yield_now(); self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); }

這次執行就會得到無法預期的結果了 :

$ cargo run --release
...
  left: `92996`,
 right: `100000`', src\main.rs:55:5
...
$ cargo run --release
...
  left: `94743`,
 right: `100000`', src\main.rs:55:5
...

讓 critical section 有多個執行緒進入的原因 :

  • 在單核的電腦上,不同的執行緒同時要去搶同一個 CPU,在某執行緒通過 while 迴圈之前就因為 timer interrupt 被強制讓出 CPU (我們剛剛的 yield_now 就是在假設這種情況的發生),導致某執行緒還沒將 locked 設定為 LOCKED,其他執行緒得以通過 while 迴圈。
  • 在多核的電腦上,不同的執行緒同時跑在不同的核上,導致多個執行緒都一起通過 while 迴圈。

Q : Would sleeping a bit while doing += 1 help create concurrency issues?
A : 可能不會。因為如果你在 += 1 時,你已經取得了 lock ,而不是在上鎖的情況下去操作值導致資料爭用,所以在這裡進行休眠可能不會像剛剛插入 yield 那樣產生相同的效果。

Q : isn't the compiler already pre determining the sum ?
A : 沒有。儘管在技術上這可以在靜態地計算,但編譯器不會跨執行緒邊界執行此操作。

Q : would that ThreadSanitizer catch?
A : ThreadSanitizer 會捕捉到這樣的問題,因為基本上你有兩個執行緒同時寫入一個記憶體位置,這是 ThreadSanitizer 通常會捕捉到的寫入衝突。等等會更詳細介紹 ThreadSanitizer。

Q : it seems like sleeping there would result in more lock contention and thus more races
A : Jon 不認為睡眠有那麼重要,而且沒有必要證明這個問題。

Q : So are atomics related to multithreading or concurrency?
A : atomics 本身對於並型很有用,而多執行緒是並行的一種形式。如果你沒有多個執行緒,則不太可能需要 atomics;如果你有多個執行緒,即使你的電腦只有一個單核,atomics 也是必要的。

Q : Isn't Sequential consistency by default in Rust?
A : 沒有。對於不使用 atomic 操作的程式碼,例如單執行緒操作,你確實不必擔心這些問題,原因是在 memory model 中,給定執行緒內的任何一系列操作都保證按照順序觀察到。因此,它們可能以 out-of-order 方式執行,但效果將顯示為程式以 sequential 方式執行,這可能是你所指的。但並沒有預設為 sequential consistency,這就是為什麼所有這些方法都需要明確傳遞一個 Ordering 參數的原因。
參考 : Superscalar_processor

compare_exchange

0:39:42

修復我們的 Mutex 可以使用 compare_exchange :

pub fn compare_exchange( &self, current: bool, new: bool, success: Ordering, failure: Ordering // 等等會討論 success 以及 failure 的參數功能 ) -> Result<bool, bool> // Stores a value into the bool if the current value is the same as the current value. // pub fn compare_and_swap( // &self, // current: bool, // new: bool, // order: Ordering // ) -> bool // 👎 // Deprecated since 1.50.0: Use compare_exchange or compare_exchange_weak instead // compare_and_swap 只是去呼叫 compare_exchange 而已 : // pub fn compare_and_swap(&self, current: bool, new: bool, order: Ordering) -> bool { // match self.compare_exchange(current, new, order, strongest_failure_ordering(order)) { // Ok(x) => x, // Err(x) => x, // } // }

compare_exchangecompare_and_swap 強大的原因是,前者允許你為操作成功或失敗的情況指定不同的 memory ordering (等等會解釋這是什麼意思),而後者則不允許。

開始修復我們的 Mutex :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { - while self.locked.load() != UNLOCKED {} + while self + .locked + .compare_exchange(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) + .is_err() // 若回傳 error -> while True,需要再去搶奪 lock。 + {} + // read and write in single operation ! - // maybe another thread runs here - std::thread::yield_now(); - self.locked.store(LOCKED, Ordering::Relaxed); // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self .locked .compare_exchange( UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() {} // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); }

Mitigating compare_exchange contention

0:44:54

:warning: 本段內容實際上是在談論 cache coherence 和 cache lines,但 Jon 將其稱為記憶體位置。

compare_exchange 是一個相當昂貴的操作。如果每個 CPU 在 compare_exchange 期間都在 spin,它們都會嘗試獲得對底層記憶體位置的 exclusive 存取權。實際上,這意味著,假設你有八個核,其中一個當前持有 lock,所有其他執行緒都在嘗試獲取對保存 true 或 false 的值的 exclusive 存取權。因此,每個核都會說,給我這個值的 exclusive 存取權,這是一種協調的 effort,以確保沒有其他執行緒同時對其進行寫入。然後,某 CPU 核會查看這個值並得知它不是當前的值,然後,另一個CPU 核會說,現在給我,導致記憶體中該位置的所有權將在 CPU 核之間彈來彈去,這是非常低效的。CPU 並不是它們不擅長這樣做。這只是在所有這些核之間協調 exclusive 存取權的成本很高。

詳情參照 : MESI_protocol

MESI protocol 基本上說明了記憶體中的一個位置。記憶體中的一個位置可以是 S(Shared) 或 E(Exclusive),還有一些其他狀態,但基本上是 S 或 E。因此,在 compare_exchange 中,CPU 需要對記憶體中的該位置進行 exclusive 存取,這需要與所有其他 CPU 協調。或者,記憶體中的某個位置可以被標記為 S,多個核被允許同時處於共享狀態。總的來說,如果我們可以使一個值在持有 lock 的同時保持在共享狀態,這將避免所有記憶體所有權的彈跳。

改進我們的 Mutex 的效能 :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self .locked .compare_exchange(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() - {} + { + // MESI protocal: stay in S when locked + while self.locked.load(Ordering::Relaxed) == LOCKED {} + } // read and write in single operation ! // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }

加了 Line 14 之後,實際上並未改變任何東西,行為仍然是一樣的,只是如果我們未能取得 lock,那麼我們將只是 read 該值。請注意,Line 14 之所以不使用 compare_exchange 是因為執行緒不需要 exclusive 存取,只是 read-only 的存取。因此,如果我們未能取得 lock,那麼我們將 spin 並執行 read,這允許該值保持在共享狀態,這意味著我們不會有所有這種擁有權的彈跳。然後,當它更改時(因為某個核獲得了對它的 exclusive 存取,可能是因為它正在執行 Line 19),我們才回到執行昂貴的 compare_exchange,嘗試獲得 exclusive 存取。如果我們在那時未能取得 lock,我們將退回到執行 read-only 的迴圈 (Line 19)。因此,當存在 high contention 時,這實際上更有效率。但再次強調,不要使用 spinlock。

Q : Do you think the performance degradation would be visible if we just redid your tests again? Would be cute to see.
A : 不是的。它們只在你對特定值存在大量 contention 的情況下才真正重要。當你有很多執行緒、很多核時都會進行這種 contention。因此,通常情況下,如果你繪製性能圖,例如通過核數繪製吞吐量的圖表,你會看到,如果存在大量 contention,即每個執行緒都試圖獲得對值的 exclusive 存取,那麼隨著核數的增加,吞吐量開始變得趨於平緩,甚至下降,因為核花費所有時間僅僅是為了爭奪誰擁有對該值的 exclusive 存取權。而如果你採用剛剛 Line 14 的方法,你可以避免這種性能崩潰。因此,你可能仍然看不到線性增長,即如果你將核數翻倍,吞吐量也翻倍,但你通常會看到更好的曲線,因為你避免了一些性能崩潰。
image

Q : Is compare_exchange then much faster than locking a mutex?
A : 很難說。單個 compare_exchange 不會太昂貴。compare_exchange 與 Mutex 的最大區別在於 Mutex 必須等待;compare_exchange 永遠不會等待。單個 compare_exchange 呼叫將嘗試執行你告訴它的操作,然後它將說明它是成功還是失敗,如果失敗,compare_exchange 不是 block 執行緒,但你可以選擇這樣做,而在我們實作的 Mutex 中,我們選擇 spin,因此我們正在等待它成功;另一方面,如果你 lock Mutex,那麼你的執行緒將被 block,直到你取得 lock。這是其中的區別。一般來說,你會發現 compare_exchange 常常(但不總是)搭配迴圈使用,然而有一些演算法 compare_exchange 並沒有搭配迴圈使用,這些演算法在進行 compare_exchange 失敗時,有一些其他有用的工作可以進行,因此,你會在稍後的某個時間點再次嘗試 compare_exchange

目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self .locked .compare_exchange( UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED {} } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); }

compare_exchange_weak

0:50:43

compare_exchange_weakcompare_exchange 差異為小,這歸結為 compare_exchange 只有在當前值與你傳入的值不匹配時才允許失敗,它不被允許在任何其他情況下失敗。而 compare_exchange_weak 允許出現虛假失敗,即使當前值與你傳遞的值相符,compare_exchange_weak 也是允許失敗的。通常情況下它不會失敗,但是允許發生。

這之所以重要是因為涉及到指令集支援的不同 :

// x86: CMPXCHG // ARM: LDREX (load-link) STREX (store-conditional) // - compare_exchange: impl using a loop of LDREX and STREX // - compare_exchange_weak: LDREX STREX

LDREX 可以理解為獲得記憶體位置的 exclusive 所有權,並將其值載入到暫存器中。而 STREX 則是只有在仍然對該記憶體位置具有 exclusive 存取權時才能 store,否則操作將失敗。失敗的原因可能是因為其他執行緒只是讀取該值,或者將其覆蓋為已經存在的相同值,在這種情況下,ARM 上的 STREX 操作將失敗,即使值可能仍然相同,但它仍然會失敗。

使用 LDREX 以及 STREX 操作的優點是 STREX 是非常便宜的,你不必去獲取 exclusive 存取權,但這可能意味著你在無需失敗的情況下失敗了。在 ARM 處理器上,compare_exchange 實際上是使用 LDREX 和 STREX 的迴圈實作的。因為它需要實現 CAS (compare and swap) 的語義,即只有在當前值保持不變時才失敗。這意味著在 ARM 處理器上,這個迴圈最終會變成一個巢狀迴圈 :

while compare_exchange() {} // ARM pseudocode while { loop compare_exchange_weak(){ ... } }
function compare_exchange_weak(address, expected, new_value): // Perform locked load loaded_value = locked_load(address) // Compare loaded value with expected value if loaded_value != expected: return (false, loaded_value) // Return failure and actual value // Attempt conditional store success = conditional_store(address, new_value) if success: return (true, expected) // Return success and old value else: return (false, loaded_value) // Return failure, which might be spurious function compare_exchange(address, expected, new_value): while true: result, actual = compare_exchange_weak(address, expected, new_value) if result or actual != expected: return (result, actual) // If conditional_store fails, it could be because: // 1. The value was changed between load and store // 2. Couldn't obtain exclusive access // We continue the loop to retry, rather than immediately failing

雖然巢狀迴圈本身不是問題,但它們傾向讓編譯器產生不太高效的程式碼,因為巢狀迴圈會導致更多的暫存器使用和其他壓力,這就是為什麼有 compare_exchange_weak 的原因。由於 compare_exchange_weak 允許出現虛假失敗,因此 ARM 平台上可以直接使用 LDREX 和 STREX 實作 compare_exchange_weak。在 x86 平台上,compare_exchange_weak 就是一個 CAS,它不會產生虛假失敗。

如果你是在迴圈中呼叫 CAS,你應該使用 compare_exchange_weak (避免巢狀迴圈的使用);如果你沒有在迴圈中呼叫 CAS 並且希望只有在當前值更改時才失敗,那麼你應該使用 compare_exchange (:question: 如果 compare_exchange 沒有搭配 while 迴圈使用不會有多個執行緒可以同時進入 critical section 的問題嗎 :question: 這裡的重點是到底要不要處理 false spuriously 的情形)。由於我們 Mutex 的實作是在迴圈中呼叫 CAS,所以應該使用 compare_exchange_weak :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self .locked - .compare_exchange(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) + .compare_exchange_weak(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED {} } // read and write in single operation ! // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { // x86: CMPXCHG // ARM: LDREX (load-link) STREX (store-conditional) // - compare_exchange: impl using a loop of LDREX and STREX // - compare_exchange_weak: LDREX STREX while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED {} } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); }

參考文章 : Rust原子类型和内存排序

Q : ARM64 has CAS
A : ARM64 確實具有 CAS,但還有許多其他 ARM 變體沒有。

Q : is that all the conditions when weak is allowed to fail
A : compare_exchange_weak 允許虛假失敗。它只會在你期望的一種條件下成功,但也可能因任何原因而失敗。而 compare_exchange 只有在目前值發生變更時才會失敗。

Ordering::Relaxed

0:57:02

讓執行緒主動讓出 CPU :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self .locked .compare_exchange_weak(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked - while self.locked.load(Ordering::Relaxed) == LOCKED {} + while self.locked.load(Ordering::Relaxed) == LOCKED { + std::thread::yield_now(); + } + std::thread::yield_now(); } // read and write in single operation ! // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { // x86: CMPXCHG // ARM: LDREX (load-link) STREX (store-conditional) // - compare_exchange: impl using a loop of LDREX and STREX // - compare_exchange_weak: LDREX STREX while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED { std::thread::yield_now(); } std::thread::yield_now(); } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); }

成功執行 :

$ cargo run --release Compiling atomics v0.1.0 (/home/wilson/CrustOfRust/atomics) Finished release [optimized] target(s) in 0.25s Running `target/release/atomics`

看似我們的 Mutex 已經完成了,但其實還是有問題,而這問題跟 Ordering 有關。Ordering 基本上決定了在多個執行緒在某個記憶體位置互動時允許的可觀察行為,它們在你執行此程式碼時指示發生的事情,或者允許發生什麼。

Ordering::Relaxed 只保證編譯器看到 atomic 指令仍會編譯成 atomic 指令,但不保證不會因為最佳化而被重排指令,寫個測試來說明 :

#[test] fn too_relaxed() { use::std::sync::atomic::AtomicUsize; let x: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let y: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let t1 = spawn(move || { let r1 = y.load(Ordering::Relaxed); x.store(r1, Ordering::Relaxed); r1 }); let t2 = spawn(move || { // 這次對 X 的讀取可以查看到 X 曾經 store 的任何值, // 包括 42,因為 42 在 Modification(x) 的集合內。 let r2 = x.load(Ordering::Relaxed); y.store(42, Ordering::Relaxed); // 以下都是為了最佳化 : // 1. 編譯器透過重排指令讓 Line 16 有可能先於 Line 15 執行。 // 2. CPU 也可以讓 Line 15 與 Line 16 out-of-order 執行,。 // Line 15 與 Line 16 沒有 data dependency, // 所以在"這個"執行緒內 Line 15 或 Line 16 先執行都是一樣的"效果"。 // CPU 讓 Line 16 先執行的可能原因是 : // t2 的執行緒先拿到 y 的 exclusive 存取權, // 但還沒有 x 的 shared 存取權, // 為了避免要等 x 拿到 shared 存取權, // Line 16 可以先執行。 r2 }); // Modification(x): {0, 42} // Modification(y): {0, 42} let r1 = t1.join().unwrap(); let r2 = t2.join().unwrap(); // r1 == r2 == 42 (possible) println!("r1 = {}, r2 = {}", r1, r2); }

在使用 Ordering::Relaxed 時,當你 load 一個值,你可以看到任何執行緒寫入該位置的值。對於 Ordering::Relaxed,沒有關於它何時相對於你發生的限制,唯一的限制是是否存在兩者之間的同步點。規範在 happens-before 關係方面進行了解釋,我們稍後會再回到這個主題。

:pencil2: Speculative execution
主要用於讓 CPU 推測 branch 的執行 (有可能引入 SpectreMeltdown):

let r2 = x.load(Ordering::Relaxed); // CPU 在還不知道 z == 3 為 True if z == 3 { // 就先推測為 True,並執行 if 內的程式碼 y.store(42, Ordering::Relaxed); // 如果推測錯的話需要 undo。 }

Q : Wait so when I write code the CPU can take any instruction that does not depend on others and start by that?
A : 是的,有點像。所以有一堆關於 CPU 和編譯器允許和不允許進行的重新排序的約束。但一般來說,它們允許重新排序任何沒有明確的 happens before 關係的東西。就像如果你的程式是依賴圖的話。如果 A 依賴於 B,那麼你就不能重新排序它們,因為 A 依賴於 B。但如果 A 和 B 之間沒有關係,如果沒有依賴關係,如果沒有說 B 必須先發生的東西,那麼它們是可以互換的。這就是規範所規定的。

目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { // x86: CMPXCHG // ARM: LDREX (load-link) STREX (store-conditional) // - compare_exchange: impl using a loop of LDREX and STREX // - compare_exchange_weak: LDREX STREX while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED { std::thread::yield_now(); } std::thread::yield_now(); } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); } #[test] fn too_relaxed() { use::std::sync::atomic::AtomicUsize; let x: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let y: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let t1 = spawn(move || { let r1 = y.load(Ordering::Relaxed); x.store(r1, Ordering::Relaxed); r1 }); let t2 = spawn(move || { let r2 = x.load(Ordering::Relaxed); y.store(42, Ordering::Relaxed); r2 }); let r1 = t1.join().unwrap(); let r2 = t2.join().unwrap(); }

Ordering::Relaxed 對我們程式的影響如下,如果新增 Line 21 :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { + while self .locked .compare_exchange_weak(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED { std::thread::yield_now(); } std::thread::yield_now(); } // read and write in single operation ! // SAFETY: we hold the lock, therefore we can create a mutable reference + unsafe { &mut *self.v.get() } += 1; let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }

Line 21 有可能因為 Line 9 的第四個參數 (failure) 設為 Ordering::Relaxed 被編譯器重排到 Line 6,因為 self.lockself.v 為不同記憶體位置,不存在相依關係。但我們不應該允許編譯器這樣重排,因為會導致某個執行緒持有 lock,其他執行緒仍能讀 self.v 的值,因而導致違反了 mutable 參考的 exclusivity。

Ordering::Acquire/Release

1:12:13

Ordering::Acquire 以及 Ordering::Release 聽起來有點像是我們使用 lock 的術語 : acquire lock, release lock。這並非偶然。這是因為這些 memory ordering 基本上是為在 acquire 或 release 資源的上下文中使用而設計的。

self.locked.store 也因為傳入 Ordering::Relaxed 參數可能被重排成以下這樣 :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { while self .locked .compare_exchange_weak(UNLOCKED, LOCKED, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED { std::thread::yield_now(); } std::thread::yield_now(); } // read and write in single operation ! // SAFETY: we hold the lock, therefore we can create a mutable reference + self.locked.store(UNLOCKED, Ordering::Relaxed); let ret = f(unsafe { &mut *self.v.get() }); - self.locked.store(UNLOCKED, Ordering::Relaxed); ret } }

但我們不應該允許編譯器這樣排,因為這樣會導致多個執行緒進入了 critical section。這時候 Ordering::Release 就要發揮作用了。

:pencil2: Release

When coupled with a store, all previous operations become ordered before any load of this value with Acquire (or stronger) ordering. In particular, all previous writes become visible to all threads that perform an Acquire (or stronger) load of this value.

Notice that using this ordering for an operation that combines loads and stores leads to a Relaxed load operation!

This ordering is only applicable for operations that can perform a store.

Corresponds to memory_order_release in C++20.

:pencil2: Release-Acquire_ordering

  • memory_order_release : A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store (Rust 的 Release 規範應該要寫到這個限制). All writes in the current thread are visible in other threads that acquire the same atomic variable (see Release-Acquire ordering below) and writes that carry a dependency into the atomic variable become visible in other threads that consume the same atomic (see Release-Consume ordering below).
  • memory_order_acquire : A load operation with this memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load. All writes in other threads that release the same atomic variable are visible in the current thread (see Release-Acquire ordering below).

先修改 Ordering :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { ... while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, - Ordering::Relaxed, + Ordering::Acquire, Ordering::Relaxed) .is_err() { ... } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); - self.locked.store(UNLOCKED, Ordering::Relaxed); + self.locked.store(UNLOCKED, Ordering::Release); ret } }

接著用修改後的程式碼說明規範的意思 :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { ... while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_err() // 性質 : 如果這個 CAS 中發生的 load 使用了 Ordering::Acquire, // 那麼它必須看到 Line 24 (Ordering::Release) 的之前的所有操作。 // 上述性質之所以重要是因為,如果 Line 24 是 Ordering::Relaxed, // 那麼下一個拿到 lock 的人就"不保證"能看見 Line 23 的記憶體修改結果。 { ... } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Release); // 基本上,規範寫的 Release 意味著如果我們使用 Ordering::Release 進行 store, // 那麼任何使用 Ordering::Acquire 的相同值的 load 操作都必須將在 store 之前發生的所有操作視為在 store 之前發生。 // Line 23 (Ordering::Release) 也不能重排到 Line 7 (Ordering::Acquire) 之前。 ret } }

happens-before 關係確保在執行 store 操作的事物之前發生的任何事情也確實發生在 load 操作之後發生的任何事情之前。

目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { // x86: CMPXCHG // ARM: LDREX (load-link) STREX (store-conditional) // - compare_exchange: impl using a loop of LDREX and STREX // - compare_exchange_weak: LDREX STREX while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Relaxed) == LOCKED { std::thread::yield_now(); } std::thread::yield_now(); } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Release); ret } } use std::thread::spawn; fn main() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); } #[test] fn too_relaxed() { use::std::sync::atomic::AtomicUsize; let x: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let y: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let t1 = spawn(move || { let r1 = y.load(Ordering::Relaxed); x.store(r1, Ordering::Relaxed); r1 }); let t2 = spawn(move || { let r2 = x.load(Ordering::Relaxed); y.store(42, Ordering::Relaxed); r2 }); let r1 = t1.join().unwrap(); let r2 = t2.join().unwrap(); }

再來介紹 Ordering::AcqRel,它會傳遞給具有讀寫的操作,compare_exchange 就是個例子,該指令有讀出值以及寫入值的操作。Ordering::AcqRel 表示,使用 Ordering::Acquire 語意進行 load,使用 Ordering::Release 語意進行 store :

impl<T> Mutex<T> { ... pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { ... while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, - Ordering::Acquire, + Ordering::AcqRel + // 我們的 `compare_exchange` 其實不用在乎 store 是 Ordering::Release, + // 因為 Line 23 已經建立 Ordering::Release 的語意了。 Ordering::Relaxed) .is_err() { ... } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Release); ret } }

Ordering::AcqRel 更常用在 fetch_xxx 這種單一改變值的操作,這種操作不涉及 critical section,但希望與其他執行緒同步,所以我們繼續使用 Ordering::Acquire 作為我們的傳入參數。

:question: 1:19:58
Q : Is it only Acquire ordering or Acquire and stronger ordering?
A : So sequentially consistent ordering is acquire and release and more.
A : Ordering::SeqCst 包括 Ordering::Acquire、Ordering::Release 和更強的 Ordering。

接著講解 compare_exchange_weak 的最後一個參數 (failure) :

  • 當前想改變某個記憶體位置的值的時候,發現它的值已經被其他執行緒改變了,當前執行緒就會放棄 store 值到某個記憶體位置。這個 Ordering 表示如果 load 指示不應該 store,那麼 load 應該具有什麼 Ordering。
  • 在我們的程式,你可以將其視為未能取得 lock 的 Ordering 是什麼。你可以想像一些情況,即使未能獲取 lock,你仍希望建立一種 happens-before 關係,雖然這種情況相當罕見,但確實存在。在我們的程式,我們的程式並不需要在未能獲取 lock 的時候建立 happens-before 關係。
  • 如果當前執行緒執行緒未能取得 lock,這並不表示當前執行緒現在必須與上次釋放 lock 的執行緒同步,因為這並不重要。重要的是,在當前執行緒取得 lock 的時刻,你必須在最後持有 lock 的執行緒和你自己之間建立一種 Ordering 關係,或者更確切地說是一種 happens-before 關係,因為你將要對其中的內容進行操作。因此,在我們的程式,可以將這個參數保持為 Ordering::Relaxed 即可。

為什麼在我們剛剛一開始都使用 Relaxed 一樣沒有問,因為我的 CPU 是 x86 架構,該架構基本上保證了 Ordering::Acquire/ Ordering::Release 語義,即使你使用的是 Ordering::Relax,但如果你的 CPU 若為 ARM 架構,你使用了 Ordering::Relax,就會得到 Ordering::Relax 的語義 :
image
這就造成了,明明我的電腦就可以跑,為什麼你的不行的情況。即使在某台電腦成功執行了數百數千次,也有可能在別台電腦不行跑,因為使用的編譯器不同或者是使用的 CPU 架構不同。所以你的程式碼必須要考量到不同平台,必須將 Ordering 完全寫正確。
image

Q : What about that relaxed in the loop?
A : 維持 Ordering::Relaxed 即可。在這不需要建立 happens-before 關係,因為我們還沒取得 lock。

使用 Ordering::Relaxed 的情境是,每個執行緒看到什麼並不重要。比如說你在維護一個計數器,你可以讓 Ordering 為 Relaxed,因為如果一個執行緒沒有看到另一個執行緒的 increment,這並不重要。如果計數器在執行順序相對較早或稍後地更新,這也並不重要。因此,Ordering::Relaxed 的優點在於它對 CPU 和編譯器的限制最小。這樣編譯器可以產生更有效的程式碼,CPU 可以更高效地執行指令。但對於任何需要關心執行緒之間順序和指令相對順序的情況,你可能需要考慮更強的 Ordering。

The fetch_ methods

1:26:00

接著討論的 fetch_ 方法,它永遠不會失敗。fetch_ 方法告訴 CPU,你就去取那個值,不用管那個值本來是多少,直接操作那個值並寫回記憶體即可。fetch_ 方法最後會回傳在操作之前的值本來是多少,因為該方法假設你關心在計算之前的值是什麼。

:question: 1:29:07
如果只有像 atomic 加法這樣的操作,你無法將其與 load 結合起來以找出增加或減少的值 (這裡是啥意思?)。因為如果你進行了 load,然後是 atomic 遞增,那麼在它們之間存在一個微小的空間,其他執行緒可能會插入。同樣地,如果你進行了加法然後是 load,也存在一個空間。

fetch_ 方法的精神是,與其僅指定更新後的值,不如告訴它應該執行的操作

fetch_update 讓你可以傳入 closure :

pub fn fetch_update<F>( &self, set_order: Ordering, fetch_order: Ordering, mut f: F, ) -> Result<bool, bool> where F: FnMut(bool) -> Option<bool>, { let mut prev = self.load(fetch_order); while let Some(next) = f(prev) { // 因為傳進來的 closure 可能不只有做加減那種可以用 atomic 就完成的操作, // 所以要用到 CAS 來確保沒有其他執行緒更新目標值。 match self.compare_exchange_weak(prev, next, set_order, fetch_order) { x @ Ok(_) => return x, Err(next_prev) => prev = next_prev, } } Err(prev) }

:pencil2: Atomic Portability
All atomic types in this module are guaranteed to be lock-free if they’re available. This means they don’t internally acquire a global mutex. Atomic types and operations are not guaranteed to be wait-free. This means that operations like fetch_or may be implemented with a compare-and-swap loop.

fetch_ 方法用於一些情況,例如,如果你想為同時發生的一系列操作提供唯一的序列號,而不是使用 lock 的方式,即取得下一個序列號時,你先取得 lock,然後讀取序列號,增加它,最後釋放 lock。相反地,你只需對一個 AtomicUsize 執行 fetch_add 操作即可。這保證了每次獲取序列號的呼叫都會獲得一個獨立的序列號,因為每次增加都會發生,而且 fetch 會確保你讀取的是在更新時存在的值。因此,如果每個執行緒都執行 fetch_add 操作,則不會有執行緒取得相同的序列號。這種 lock-free 的操作會比 lock 的使用還有效率。

Ordering::SeqCst

1:34:07

先將 main 函式改為 mutex_test :

-fn main() #[test] +fn mutex_test() { ... }

再來實作下一個 main 函式 (先忽略程式的 Ordering::SeqCst ,等等會講解) :

// 該範例可以在 C++ 找到對應的程式碼。 fn main() { use::std::sync::atomic::AtomicUsize; let x: &'static _ = Box::leak(Box::new(AtomicBool::new(false))); let y: &'static _ = Box::leak(Box::new(AtomicBool::new(false))); let z: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); // Release is for stores, Acquire is for loads let _tx = spawn(move || { x.store(true, Ordering::Release); }); let _ty = spawn(move || { y.store(true, Ordering::Release); }); let t1 = spawn(move || { while !x.load(Ordering::Acquire) {} if y.load(Ordering::Acquire) { z.fetch_add(1, Ordering::Relaxed); } }); let t2 = spawn(move || { while !y.load(Ordering::Acquire) {} if x.load(Ordering::Acquire) { z.fetch_add(1, Ordering::Relaxed); } }); t1.join().unwrap(); t2.join().unwrap(); let z = z.load(Ordering::SeqCst); // What are the possible values for z? // - Is 0 possible? // Restriction: // we know that t1 must run "after" _tx // we know that t2 must run "after" _ty // Given that // .. _tx .. t1 .. // _ty t2 _tx t1(+1) = 1 // _ty _tx t2(+1) t1(+1) = 2 // _tx _ty t2(+1) t1(+1) = 2 // _tx t1 _ty t2(+1) = 1 // "Seems" impossible to have a thread schedule where z == 0 // // 執行緒看得到 x, y 的什麼值 : // t2 t1,t2 // Modification(x) : {false, true} // // t1 t1,t2 // Modification(y) : {false, true} // // - Is 1 possible? // Yes: tx, t1(+0), ty, t2(+1) = 1 // - Is 2 possible? // Yes: _tx, _ty, t1(+1), t2(+1) = 2 }
目前程式碼
use std::cell::UnsafeCell; use std::sync::atomic::{AtomicBool, Ordering}; const LOCKED: bool = true; const UNLOCKED: bool = false; pub struct Mutex<T> { locked: AtomicBool, v: UnsafeCell<T>, } unsafe impl<T> Sync for Mutex<T> where T: Send {} impl<T> Mutex<T> { pub fn new(t: T) -> Self { Self { locked: AtomicBool::new(UNLOCKED), v: UnsafeCell::new(t), } } pub fn with_lock<R>(&self, f: impl FnOnce(&mut T) -> R) -> R { // x86: CMPXCHG // ARM: LDREX (load-link) STREX (store-conditional) // - compare_exchange: impl using a loop of LDREX and STREX // - compare_exchange_weak: LDREX STREX while self .locked .compare_exchange_weak( UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_err() { // MESI protocal: stay in S when locked while self.locked.load(Ordering::Acquire) == LOCKED { std::thread::yield_now(); } std::thread::yield_now(); } // SAFETY: we hold the lock, therefore we can create a mutable reference let ret = f(unsafe { &mut *self.v.get() }); self.locked.store(UNLOCKED, Ordering::Release); ret } } use std::thread::spawn; #[test] fn mutex_test() { let l: &'static _ = Box::leak(Box::new(Mutex::new(0))); let handles: Vec<_> = (0..100) .map(|_| { spawn(move || { for _ in 0..1000 { l.with_lock(|v| { *v += 1; }); } }) }) .collect(); for handle in handles { handle.join().unwrap(); } assert_eq!(l.with_lock(|v| *v), 100 * 1000); } #[test] fn too_relaxed() { use::std::sync::atomic::AtomicUsize; let x: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let y: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let t1 = spawn(move || { let r1 = y.load(Ordering::Relaxed); x.store(r1, Ordering::Relaxed); r1 }); let t2 = spawn(move || { let r2 = x.load(Ordering::Relaxed); y.store(42, Ordering::Relaxed); r2 }); let r1 = t1.join().unwrap(); let r2 = t2.join().unwrap(); } fn main() { use::std::sync::atomic::AtomicUsize; let x: &'static _ = Box::leak(Box::new(AtomicBool::new(false))); let y: &'static _ = Box::leak(Box::new(AtomicBool::new(false))); let z: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); spawn(move || { x.store(true, Ordering::Release); }); spawn(move || { y.store(true, Ordering::Release); }); let t1 = spawn(move || { while !x.load(Ordering::Acquire) {} if y.load(Ordering::Acquire) { z.fetch_add(1, Ordering::Relaxed); } }); let t2 = spawn(move || { while !y.load(Ordering::Acquire) {} if x.load(Ordering::Acquire) { z.fetch_add(1, Ordering::Relaxed); } }); t1.join().unwrap(); t2.join().unwrap(); let z = z.load(Ordering::SeqCst); }

在我們這個例子,是有可能產生 z = 0 的情形。如果是執行緒排程的話,確實不可能發生 z = 0 的情形,但我們不受執行緒排程的束縛,執行緒排程就像人類渴望將事物井然有序一樣。但實際上,電腦的運作不必遵循單一順序。你電腦的 CPU 擁有多個核,這些核可以顯示舊值、新值,我們所受制於的僅僅是 Ordering::Require 和 Ordering::Release 語義的確切規則,這就是我們在這裡所提供的。

Line 44 - Line 49 說明 :

  • 當執行到 Line 17(Ordering::Acquire) 的時候,唯一讓 xtrue 的機會是,我們必須從 Line 11(Ordering::Release) 取得 x 值,因此而建立了 happans-before 的關係,也就是 Line 11 之前的其他修改必須被 Line 17 看見 (在我們的例子,Line 11 之前沒任何其他修改) :
    ​​​​Line 17 : Acquire (如果你從 load 觀察到值) ​​​​... ​​​​Line 11 : Release (你必須看到 store 之前的所有操作)
    程式本身沒有限制 Line 11 跟 Line 17 要建立 happens-before 的關係,是因為要通過 while 迴圈,才去建立這種關係的。

    :pencil2: Line 23 說明相似於 Line 17 說明

  • 接著當我們執行到 Line 18(Ordering::Acquire),其 y 值可能來自 Line 6 或者是 Line 14 (Ordering::Release),所以 t1 就有可能看到 ytrue 或是 false 的情形。即使 _ty 先被執行了,我們的記憶體系統仍有可能顯示 yfalse,原因如下 :
    ​​​​Line 14 : Release (store) ​​​​... ​​​​Line 18 : Acquire (並不需要看到 store 之前的所有操作, ​​​​ 包含 Line 14 本身,因為沒這種同步要求。) ​​​​// Line 14 對 y 的修改可能在 Cache A, ​​​​// Line 18 對 y 的讀取可能在 Cache B, ​​​​// 因為 Line 14 與 Line 18 沒要求同步, ​​​​// 所以 Cache A 跟 Cache B 的 y 值可以不一致。
    若 Line 14 與 Line 18 之間要建立了 happens-before 的關係,必須 Line 18 先執行,Line 14 後執行才可以。Line 14 先執行,Line 18 後執行並不會建立 happans-before 的關係。

    :pencil2: Line 24 說明相似於 Line 18 說明

  • 最後的結果 : Line 14/ Line 18 可能看到 yfalse,Line 11/ Line 24 可能看到 xfalse,最終導致 z 都沒有被修改,仍為 0

:bulb: 編譯器不被允許將 Acquire 後面的操作移到 Acquire 前面,以本程式為例子來說,編譯器不被允許 Line 24 - Line 26 的操作移到 Line 23 之前。

為什麼開發 Rust 語言要讓 Line 44 - Line 49 這種情況可以發生 ? 答案是因為如果你看一下類似 Mutex 的東西,這種情況是可以的。這不會引起任何問題,並且這樣做給了 CPU 和編譯器更多重排指令的自由。

x 和 y 只是獨立的變數。那麼為什麼我們應該在它們之間建立一種任意的連結呢?如果有的話,突然之間我們正在強制 CPU 和編譯器必須按順序執行事物。他們必須獲得對某些操作的 exclusive 存取權 (而這實際上是不必要的),這將帶來你本來可以避免的成本。

你對 CPU 和編譯器的限制越多,成本將會越高;你對 CPU 和編譯器的限制越少,成本將會越低,相對應的,你要自己確定這些重排不會造成非你預期的結果。如果真的非得限制先後的執行順序,才能達到你預期程式的行為,這時候你才會使用到 Ordering::SeqCst,讓 CPU 和編譯器必須 sequentially 執行你的指令。

:pencil2: SeqCst
Like Acquire/Release/AcqRel (for load, store, and load-with-store operations, respectively) with the additional guarantee that all threads see all sequentially consistent operations in the same order.

Corresponds to memory_order_seq_cst in C++20.

Q : if something are implemented under the hood with compare_exchange, why doesn't everything return a Result? How do they handle errors from compare_exchange?
A : 因為 fetch_ 方法總是成功,所以從不回傳 Result。這就是為什麼如果實作 CAS,它會在一個迴圈中執行 compare_exchange_weak,直到成功為止的原因。

Q : is this mutex panic safe?
A : 是,但它不會傳遞 panic。

將程式做以下修改,z 就不可能是 0 了 :

fn main() { ... spawn(move || { - x.store(true, Ordering::Release); + x.store(true, Ordering::SeqCst); }); spawn(move || { - y.store(true, Ordering::Release); + y.store(true, Ordering::SeqCst); }); let t1 = spawn(move || { - while !x.load(Ordering::Acquire) {} + while !x.load(Ordering::SeqCst) {} - if y.load(Ordering::Acquire) { + if y.load(Ordering::SeqCst) { z.fetch_add(1, Ordering::Relaxed); } }); let t2 = spawn(move || { - while !y.load(Ordering::Acquire) {} + while !y.load(Ordering::SeqCst) {} - if x.load(Ordering::Acquire) { + if x.load(Ordering::SeqCst) { z.fetch_add(1, Ordering::Relaxed); } }); t1.join().unwrap(); t2.join().unwrap(); let z = z.load(Ordering::SeqCst); }

:question: 1:56:53 - 2:00:08
整段聽不懂,為什麼 z 不可能為 0。

Breather

2:00:08

ThreadSanitizer

2:00:40

並行程式最麻煩的地方並不是如何實作,而是如何驗證你實作的結果是否正確,需要考慮到 CPU 架構、作業系統的排程器、編譯器版本、開啟哪種最佳化。你的程式也有可能在重複驗證數兆次才出現一次 bug,因此我們需要更好的方法來驗證我們程式的正確性。

可能你的程式有問題,即便你的程式沒有 panic,像是我們前面的 counter 被遞增的數字並非預期,但這也只是依賴我們寫的 assert 來確認我們的程式是否正確。這種 assert 的作法,當程式對值的操作越加複雜,我們就不能輕易地得知實作的哪部分有 bug (critical section 有多個執行緒進入、Mutex 有 bug、寫入同一筆資料到 log 兩次、log 被截斷,備份最終是空的等等各種可能),這也意味著,即使你在不同硬體上的許多核心上執行它長達 10 年,你的程式碼可能會遇到 bug,但沒有任何機制會注意到你遇到了一個 bug。

如何確保測試所有可能的合法執行方式,以及如何知道是否發生了問題,是兩個有些不同的問題,它們通常會以稍微不同的方式處理。對於第二個問題,即如何偵測這些錯誤?在撰寫像這樣使用 atomic 操作的並行程式碼時,你會想要插入許多 assert 來確保你的實作是正確的。如果你真的想這樣做,你可以將它們設為debug assertions,然後以 release 模式執行測試套件,但開啟 debug assertions,這樣不會太影響釋出版本。但你確實需要許多 assert 來偵測這些問題。還有一些自動化系統可以用來進行偵測,Google 有很多不同的 sanitizers,現在已經內建在很多編譯器 (clang, gcc, msvc) 中,其中一種 sanitizer 即為 ThreadSanitizer

ThreadSanitizer 會以一種檢測的方式執行你的程式碼,你程式碼中的每個記憶體操作 (load, store)都會被追蹤。它會添加特殊的指令來追蹤修改的內容和時間。當你的程式碼執行時,ThreadSanitizer 在背景中會偵測是否存在兩個執行緒寫入相同的記憶體位置,或者是否存在一個執行緒在另一個執行緒讀取它之後對其進行寫入,但它們之間沒有 happens-before 關係。因此,ThreadSanitizer 基本上嘗試偵測在單個記憶體位置上有非同步操作的情況,但它不能偵測到所有的問題。

你執行程式碼,ThreadSanitizer 將偵測程式碼的執行是否有問題,可能這個特定的執行是正確的,但你仍然可能有一個並行 bug。這又回到了一個問題,那就是如何確保你的程式的每種合法執行都被測試到?這實際上更難解決,因為你需要找出如何探索所有可能的行為,使用所有可能的編譯器、CPU和架構,符合規範。Jon 目前已知最好的工具是 loom。

loom

2:05:49

loom 是用 Rust 開發的工具,它是照著這篇論文實作出來的。基本上,該論文概述的內容和 Loom 實作的內容是採用並行程式並對其進行檢測 (不是自動檢測)的策略。loom 不會用到標準函式庫提供的 atomic 操作,而是使用自定義的 atomic Mutex, channel, 等等。

當你透過 loom 的資料結構進行載入時,loom 會向你回傳可能的合法值,並且會多次執行你提供的測試 closure。當你執行 store 操作時,loom 會記錄已 store 的所有值,這樣在其他 load 操作中,它就能夠將你的執行揭露給其中一個可能的值。然後每次執行 closure 都會將你揭露給不同的可能執行。

因此,當 loom 執行時,它將執行所有可能的執行緒交錯,所有可能的 memory ordering。記得我們談到每個變數的修改順序,其中 t1 可以看到 xy 的任一值,而 t2 可以看到 x 的任一值。loom 將確保有一個執行,其中 t2 看到了 xfalse 值,還有一個執行,其中 t2 看到了 xtrue 值。總的來說,我們將對程式中的所有操作進行這樣的處理。

示範程式 :

// 使用 loom 提供的 atomic 物件來做 loom 測試 // 若要 release 你的程式,使用標準函式庫提供的 atomic 物件。 use loom::sync::Arc; use loom::sync::atomic::AtomicUsize; use loom::sync::atomic::Ordering::{Acquire, Release, Relaxed}; use loom::thread; #[test] #[should_panic] fn buggy_concurrent_inc() { // loom::model 會試所有可能的程式路徑 // 如果可能的程式路徑過多,將會測試不完。 loom::model(|| { let num = Arc::new(AtomicUsize::new(0)); let ths: Vec<_> = (0..2) .map(|_| { let num = num.clone(); thread::spawn(move || { let curr = num.load(Acquire); num.store(curr + 1, Release); }) }) .collect(); for th in ths { th.join().unwrap(); } assert_eq!(2, num.load(Relaxed)); }); }

Q : Are there any sort of "toy programs" one can think of to try and drill this into my brain? Like the spinlock was a pretty good example but this last example seems crazysauce
A : Jon 沒有更簡單的例子。Jon 建議查閱一些實作並行資料結構的論文,注意它們在何處使用 Ordering::SeqCst,而不是其他 Ordering。通常,論文會解釋為什麼使用特定的 Ordering。

loom 有一些限制,其中一些是已知的問題。有些是該方法的更根本問題,或者是不可能建模的問題。例如,Ordering::Relaxed 是如此鬆散,即使使用像 loom 這樣的工具,也無法完全建模所有可能的執行 :

#[test] fn too_relaxed() { use::std::sync::atomic::AtomicUsize; let x: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let y: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); let t1 = spawn(move || { let r1 = y.load(Ordering::Relaxed); x.store(r1, Ordering::Relaxed); r1 }); let t2 = spawn(move || { let r2 = x.load(Ordering::Relaxed); y.store(42, Ordering::Relaxed); r2 }); let r1 = t1.join().unwrap(); let r2 = t2.join().unwrap(); }

當你將這個函式傳遞到 loom::model,loom 必須預測 Line 13 會 load 什麼值,但 42 並不在它的預測集合,因為 Line 14 的 store 還沒有發生。但實際情形是,Ordering::Relaxed 允許 Line 13 load 42,所以 loom 沒有測試到這種程式執行路徑。

如果你閱讀這篇引人入勝的論文,裡面有一些方法 (但無法完全解決問題),因為你知道你會多次執行這個 closure,你可以執行這個 closure,記住所有你看到的 Ordering::Relaxed store (如剛剛的 Line 14 例子)。下一次執行時,從 load 回傳一個稍後 store 的值,繼續執行,看看該 store 是否仍然 store 該值。如果是,你成功地完成了反向依賴。如果不是,你就放棄了整個執行,因為那個 race 不再發生。

loom 不支援 Ordering::SeqCst,它會把它降級成更弱的保證,Ordering::AcqRel,雖然 loom 有可能出現 false positive 的情形,但不可能出現 false negative 的情形。但重要的是,即使你的測試平台是 x86 架構,loom 能夠正確地模擬 Acquire/Release。

Q : @jonhoo I see that Tokio uses Loom, has it caught serious issues?
A : 確實有。Tokio 使用了 loom,尤其是對於排程器,該排程器有大量的 lock-free,loom 有造成重大的錯誤。但幸運的是,據 Jon 所知,這些錯誤從未進入 Production。不過 loom 確實引起了實際問題。

Loom 還要求執行是 deterministic。比如,如果執行了一個系統呼叫,Loom 將每次都重新執行該系統呼叫。因此,為了能夠有效地使用 Loom,您可能需要進行一些模擬,就像在正常測試中一樣。

Atomic fences

2:22:09

compiler_fence 限制編譯器不能將 fence 之後的程式碼再編譯之後移動到前面,但 CPU 仍可以 out-of-order 執行。所以你很少需要用到 compiler_fence,可能需要用的情況是防止執行緒與其自身 race,這種情況通常只有在使用 signal handlers 時才會發生。

fence 就重要多了,fence 基本上是一個 atomic 操作,它在兩個執行緒之間建立 happens-before 的關係,但不涉及特定的記憶體位置

volatile

2:27:27

Function std::ptr::read_volatile, Function std::ptr::write_volatile 的文件分類並不是 std::atomics,因為它們跟 atomic 本來就沒關係。

read_volatile 聽起來似乎與 atomic 操作有關,因為它通常被解釋為確保你存取記憶體。因此,人們可能會認為並行操作可能會發生 race 是因為一個 CPU 在暫存器中執行操作而不是在記憶體中。但如果 CPU 存取記憶體,其他執行緒肯定會看到它,因此要使用 volatile。這實際上不是 volatile 的目的。此外,volatile 不能在跨執行緒邊界上建立 happens-before 關係。

volatile 的用途是與 memory-mapped 設備進行互動。想像一下你有一張網卡,它必須發送和接封包,它將自己映射到記憶體的特定區域,例如在特定的地址範圍內。假設封包佇列映射到該記憶體區域,從該記憶體的某個部分讀取可能會導致修改該記憶體

讀取某個部分導致修改該記憶體的例子是,許多 memory-mapped 設備具有記憶體區域,其中有一個 ring buffer,並且有指向該 ring buffer 的 head 和 tail 的指標。通常,head 和 tail 決定新的寫入位置以及讀取器正在從哪裡讀取,而這些可能是不同的執行緒,其中一個只操作 head,另一個只操作 tail。而這些設備對應的記憶體區域通常具有這樣的 side-effects,即如果你從 tail 讀取,同時也會重設 tail (讀取還有其他的一些 side-effects)。

在這種情況下,想像一下,如果你從對應到記憶體的給定變數中讀取兩次,如果你沒有使用 read_volatile,編譯器可能會進行第一次讀取,並意識到讀取只是 read-only (因為編譯器看不到讀取的 side-effects),所以將第一次讀取 cache 到暫存器中,而第二次讀取只是直接從暫存器中讀取。

但實際上,你需要兩者都去存取記憶體,才能對設備映射記憶體產生 side-effects。這就是 read_volatile 的作用,它是一種告訴編譯器必須存取記憶體且操作不能相對於其他操作進行重新排序的方法。因此,編譯器不能將 volatile 操作上移或下移,因為它實際上可能有 side-effects。

:pencil2: Relationship with volatile

Within a thread of execution, accesses (reads and writes) through volatile glvalues cannot be reordered past observable side-effects (including other volatile accesses) that are sequenced-before or sequenced-after within the same thread, but this order is not guaranteed to be observed by another thread, since volatile access does not establish inter-thread synchronization.

In addition, volatile accesses are not atomic (concurrent read and write is a data race) and do not order memory (non-volatile memory accesses may be freely reordered around the volatile access).

AtomicPtr

2:32:18

Struct std::sync::atomic::AtomicPtr 是一個 AtomicUsize 大小,其中的方法被專門設計用來操作底層的指標 :

pub struct AtomicPtr<T> { p: UnsafeCell<*mut T>, }

AtomicPtr 的方法 :

// fetch_add // 沒有 fetch_add 方法,因為有可能造成 UB pub fn load(&self, order: Ordering) -> *mut T pub fn store(&self, ptr: *mut T, order: Ordering) pub fn compare_exchange( &self, current: *mut T, new: *mut T, success: Ordering, failure: Ordering ) -> Result<*mut T, *mut T>

除了 AtomicPtr 的 get_mut 方法是 unsafe (因為要操作 raw pointer),其他都是 safe,以 AtomicUsize 為例 :

pub fn get_mut(&mut self) -> &mut usize // 傳入 self 的 mutable 參考,並回傳對內部類型的 mutable 參考。 // 這是 safe 的,因為如果你擁有對 atomic 本身的 exclusive 參考, // 那麼你不需要使用任何特殊的 memory ordering、exclusvie 操作或其他任何東西。 // 因為沒有其他人擁有對該 atomic 的參考。因此,你不需要對其使用原子操作。

AtomicUsize 不可以實作 Deref,而 DerefMut 又是 Deref 的擴展,所以 DerefMut 也不可以實作。as_mut 不能實作的原因也是因為 as_ref 不可以實作。所以即使從技術上來說這個類型可以實作其中之一,實際上 Rust 語言開發團隊並沒有這樣做。

Atomics through FFI

2:35:13

:question: 2:35:13
Q : how do atomics interact with regular stores? So if I cast an atomic to a *mut u32 and pass it to C?
A : 在 C 中,您也可以選擇使用 memory order 執行 atomic 操作,在這種情況下,它將受到相同的影響。Jon 實際上不知道編譯器在這方面是如何操作的,但是如果你在 C 中執行 atomic 操作,比如使用 Ordering::SeqCst 的 store 操作,那仍然會對該操作強制執行。可以理解為並不是該值特殊,而是當你使用 AtomicUsize 或 AtomicU32 時,編譯器知道在產生指令時要對其進行特殊處理。如果你將其作為原始指標傳遞給 C,然後透過 * 進行解參考,這甚至比對該值進行 Ordering::Relaxed load 還要弱。所以這就好比如果你將其轉換為原始指標指向 Usize,然後對其進行解參考,而實際上在 Rust 中你甚至不能這樣做。如果你將其轉換為指向 Usize 的原始指標,然後對其進行解參考,這樣做可能是 unsafe 的,可能會遇到未定義行為。

Consume ordering?

2:36:44

Rust 還沒有實作 Ordering::Consume,Jon 不知道 Consume 在 C++ 常不常用,也不太清楚用途。現在也不太適合學它,C++ 官方文件寫道 :

The specification of release-consume ordering is being revised, and the use of memory_order_consume is temporarily discouraged.
(since C++17)

Closing thoughts

2:38:08

非必要請不要實作 lock-free 程式。如果非得使用,請搭配 loom, ThreadSanitizer 工具以及閱讀他人的論文再來實作。

待整理

  1. 如果 compare_exchange 沒有搭配 while 迴圈使用不會有多個執行緒可以同時進入 critical section 的問題嗎 ?
  2. 1:19:58
  3. 1:29:07
  4. 1:56:53
  5. 2:35:13