# C++ thread ###### tags: `cpp` 要在C++使用thread,要記得加編譯器參數 `-std=c++11 -pthread` ## 參考 [thispointer.com](https://thispointer.com/c11-multithreading-tutorial-series/) [cplusplus.com](http://www.cplusplus.com/reference/multithreading/) [cppreference.com](https://en.cppreference.com/w/cpp/thread) [jyt0532 multithread](https://www.jyt0532.com/2016/12/23/c++-multi-thread-p1/) 以下都是std::的,不是boost:: --- ## thread 就像一個指向執行緒的指標,在建構時就會順便建構該執行緒,可以用該指標間接控制該執行緒,若detach()該執行緒也只是無法繼續控制該執行緒,該執行緒還會繼續存在,但和指標不一樣的是,new出來的物件如果沒有delete就會一直存在,而創造出來的執行緒只要執行完畢就會自動消失。 > 上面說的指標只是一個概念 ```cpp // 建立一個執行緒,此執行緒會執行 Fn(Args...) // 如果要傳參考的話需要加ref() thread(Fn, Args...) // 主執行緒會在此一直等待,直到此執行緒執行完 join() // 檢查此執行緒是否可以 join (是否還和主執行緒連接) joinable() // 把此執行緒和主執行緒的連接切斷 detach() // 如果此執行緒存在,會返回唯一的識別碼 // 回傳型態為 thread::id get_id() ``` > 創造者(主執行緒)和被創造者(此執行緒)的關係就像function call一樣,被創造者執行完會回到創造者的執行緒,但是可以detach讓被創造者獨立出去。 --- 使用範例 ```cpp // 此執行緒(thr)會執行一個跑n次的迴圈,n=100 auto thr = thread([](int n){ for(int i=0; i<n; ++i); }, 100); // 切斷連結 thr.detach(); // 因為thr已經和主執行緒分開了,所以回傳false if(thr.joinable()) thr.join(); ``` > 主執行緒 : 呼叫 thread() 的執行緒 > 此執行緒 : thread() 建構出的執行緒 --- ### this_thread:: ```cpp // 回傳此執行緒的唯一識別碼 get_id() // 暫時中斷此執行緒,讓他去外面排隊等著再次被執行 yield() // 暫時中斷此執行緒,讓他去外面等一段時間再去排隊等著被執行 sleep_for(chrono::duration<>) // 暫時中斷此執行緒,讓他去外面等到某個時間點再去排隊等著被執行 sleep_until(chrono::time_point<>) ``` > CPU同時能跑的執行緒是有上限的,如果你創造很多條執行緒,實際上只會有幾條再跑,其他的都在排隊等著被執行,但執行緒可能執行一段時間就會自動yield出去換別人跑(分時多工),並不會等先到的執行緒整個結束才讓後面的執行。 --- #### chrono::duration duration<>可以用這些定義的型別取代,比較直覺 ```cpp std::chrono::nanoseconds() std::chrono::microseconds() std::chrono::milliseconds() std::chrono::seconds() std::chrono::minutes() std::chrono::hours() ``` --- 使用範例 ```cpp // main cout << "this thread is " << this_thread::get_id() << endl; for (const char c : "Hello world"){ cout << c; this_thread::sleep_for(chrono::seconds(1)); } ``` > 先印出主執行緒的id,在每隔一秒輸出一個字。 --- ### call_once 如果有多個執行緒一起執行一個function,但只想讓這個function被執行一次,可以用call_once()。 `std::call_once(once_flag, Fn, Args...)` - once_flag : 用來判斷是否已被執行,所以需要共用同一個flag - Fn(Args...) : 與一般thread呼叫方式一樣 範例 ```cpp void test(){ cout << "once"; } void tester(){ static once_flag f; call_once(f, test); // 由他呼叫的test()只會被執行一次 } int main() { for (int i = 0; i < 10; ++i){ thread(tester).detach(); } return 0; } ``` > call_once執行時,會先判斷flag,如果還沒執行過就設定flag並執行,原理就像macro的引入保護一樣。 --- ### ! race condition ! 因為執行緒會共用資料,所以可能發生同一份資料同時被多條執行緒寫入,結果有可能會和預想的不一樣,要解決這個問題就要用到lock。 --- ## lock C++11的lock有分為: - 基本型(較不安全): - mutex - timed_mutex - recursive_mutex - recursive_timed_mutex - 包裝型(把基本型包起來使用,解構時會自動解鎖,較安全): - lock_guard<> - unique_lock<> --- ### mutex ```cpp // 互斥鎖鎖定,當其他執行緒也執行到該互斥鎖的lock()時,會等待直到該互斥鎖unlock() lock() // 解除鎖定 unlock() // 若該互斥鎖被鎖定,會回傳false並繼續執行,沒被鎖定就回傳true繼續執行,最好搭配if使用 try_lock() ``` > 當一執行緒成功lock(),取得該mutex所有權,則只有該執行緒能對該mutex進行unlock()。 --- 範例一 lock ```cpp void test(){ static mutex m; m.lock(); // maybe waiting /* * atomic section */ m.unlock(); /* * maybe occur race condition */ } ``` --- 範例二 try_lock ```cpp void test(){ static mutex m; if(m.try_lock()){ // no waiting /* * atomic section */ m.unlock(); } /* * maybe occur data condition */ } ``` --- ### special mutex 除了基本的mutex,還有幾個特殊功能的mutex - `timed_mutex` 比基本mutex多了兩個時間功能。 - `try_lock_for(chrono::duration<>)` - 在一段時間內會一直try_lock(阻塞一段時間),時間內成功lock回傳true並繼續,超時還沒lock回傳false並繼續。 - `try_lock_until(chrono::time_point<>)` - 同上,但會一直持續到指定的時間點。 - `recursive_mutex` 可用在遞迴的mutex。 - 最好搭配unique_lock使用,不要自行呼叫unlock() - `recursive_timed_mutex` 結合以上兩種mutex --- ### *C++14 up : SharedMutex* 為了更方便的製作讀寫鎖,而多了可以共用所有權的mutex - `shared_timed_mutex` (C++14) - `shared_mutex` (C++17) 他們的使用方式分成兩種級別,unique、shared - unique : lock後,只有自己擁有所有權 - lock() - try_lock() - unlock() - shared : lock後,其他人只能用shared方式取得所有權,但所有權會共享 - lock_shared() - try_lock_shared() - unlock_shared() --- ### lock_guard mutex::lock() 有一個問題是**若還沒執行 unlock() 就發生例外跳出去,導致沒人執行 unlock() ,導致一直都是 lock 狀態**。 解決這個問題的就是 lock_guard,原理是當發生例外往外跳時,因為離開scope,所以會解構在這個scope內創造的東西。 ```cpp mutex m; { lock_guard<mutex> lg(m); // 建構子會執行m.lock() /* * atomic */ } // 解構子會執行m.unlock() ``` --- ### unique_lock lock_guard 保護的範圍是整個 scope ,如果要縮小範圍的話,就用 unique_lock 解決。 和 lock_guard一樣有保護功能,還能使用mutex的功能。 他就像一個指向mutex的智慧指標,需要指向一個mutex,可以使用該mutex的功能,也可用release取消指向該mutex,還會自動使用unlock。 除了可使用mutex的功能以外,還有以下功能 ```cpp owns_lock() // 是否擁有該mutex的所有權 release() // 釋放所指向的mutex,並回傳指向他的指標 mutex() // 回傳指向mutex的指標,但不釋放 ``` --- ### constructor flag unique_lock<>還有另外三種建構模式 ```cpp // 可用在 unique_lock的建構子上 // 在建構時執行try_lock() std::try_to_lock // 可用在 unique_lock的建構子上 // 在建構時"先"不執行lock() std::defer_lock // 可用在 unique_lock、lock_guard的建構子上 // 假設在建構前,已經把mutex lock住了 // 所以在建構時"不會再"執行lock(),之後也不會執行lock() std::adopt_lock ``` --- 範例 ```cpp mutex m; { unique_lock<mutex> ul(m, try_to_lock); // 建構子會執行m.try_lock() if(ul.owns_lock()){ /* * atomic section. */ ul.unlock(); } /* * may occur race condition. */ } // 若還沒unlock()會執行unlock() ``` --- ### *C++14 up : shared_lock<>* 可以使用指向ShareMutex,並用shared方式使用他,配合unipue_lock<>可實作讀寫鎖。 ```cpp shared_mutex m; void read(){ shared_lock<share_mutex> lock(m); // 同時擁多個執行緒擁有所有權 } void write(){ unique_lock<shared_mutex> lock(m); // 同時只有一個執行緒擁有所有權 } ``` --- ### ! dead lock ! 當有一執行緒tA 占用資源rA 並等待資源rB 另有一執行緒tB 占用資源rB 並等待資源rA 雙方都沒辦法執行,沒執行完就不會釋放自己的資源,於是一直互相等待。 當這情況發生時,就只能結束整個程式,所以要盡量避免。 --- ### multiple lock 為解決這個問題,有以下兩個function,可以一次拿到所有需要的資源。 不過當你要求的資源太多,可能一直沒機會全部拿到,然後就一直等待到所有人都執行結束釋放資源。 ```cpp std::lock(mutex1, mutex2, ...) std::try_lock(mutex1, mutex2, ...) // 這裡參數的mutex可以是一般的mutex或各種特殊功能的mutex,像lock_guard、unique_lock... ``` --- ### *C++17 up : scoped_lock* 如上述multiple lock,在C++17以後可以使用scoped_lock,有同樣的效果,並且和lock_guard<>一樣,會自動unlock。 ```cpp scoped_lock lock(mutex1, mutex2, ...) scoped_lock lock(adopt_lock, mutex1, mutex2, ...) ``` --- ## condition_variable 若要讓一個thread準備好資料後通知另一個thread使用,可以用condition_variable 可以依照條件從當前的thread跳出到 condition_variable 內部的一個 queue等待,之後需要時再喚醒他,這樣就不用浪費一個thread一直等待。 --- ### wait() ```cpp // 會跳出當前執行緒 // 並把此執行緒加入condition_varible內部的queue // 再把該互斥鎖unlock() wait(unique_lock<mutex>) // 若f回傳false: 繼續wait // 若f回傳true: 不會做事,繼續執行 wait(unique_lock<mutex>, function<bool()>f) ``` --- ### notify ```cpp // 喚醒queue內第一個執行緒繼續執行 notify_one() // 喚醒queue內所有執行緒繼續執行 notify_all() ``` --- 範例 ```cpp condition_variable cv; mutex m; bool ready = false; void output_data(string* message){ unique_lock<mutex> ul(m); while(!ready) cv.wait(ul); cout << message->c_str() << endl; } void prepare_data(string* message){ cout << "prepare message ...\n"; this_thread::sleep_for(chrono::seconds(5)); *message = "hello world"; ready = true; cout << "message ready, continue thread.\n"; cv.notify_one(); } // main string message; thread(prepare_data, &message).detach(); thread(output_data, &message).detach(); ``` --- ### condition_variable_any condition_variable<>只能使用unique_lock<>,而多了any就是可以使用任何lockable的東西。 --- ## future 若要讓一個thread準備好資料後通知另一個thread使用,也可以用future和promise。 ```cpp // promise<dataType> get_future() // 回傳一個與之對應的future物件 set_value() // 設定共用資料,並通知future物件繼續執行 // future<dataType> get() // 會等待直到promise設定好資料,並回傳該資料 wait()// 會等待直到promise設定好資料,但不會回傳該資料 wait_for(chrono::duration) // 會等待一段時間 // 若在時間內得到資料 回傳future_status::ready 並結束等待 // 時間超過了還沒拿到資料 回傳future_status::timeout 並結束等待 ``` --- 範例 ```cpp void output_data(future<string>& message){ cout << message.get().c_str() << endl; } void prepare_data(promise<string>& message){ cout << "prepare message ..."; this_thread::sleep_for(chrono::seconds(5)); message.set_value("hello world"); cout << "message ready, continue thread. \n"; } // main promise<string> message; future<string> futureObj = message.get_future(); thread(prepare_data, ref(message)).detach(); thread(output_data, ref(futureObj)).detach(); ``` --- ### async() 建立一個有回傳值的thread,將它的回傳值保存再future物件中,需要時再拿出來。 ```cpp future<T> async(launch_policy, function, args...) // 創造一個thread執行function(args...) // 當function執行完,回傳值會被的future接收 // launch_policy有兩種 // launch::async 馬上執行function // launch::deferred 先不執行function,在future::get時才會執行 // 如果不指定launch_policy,系統會自動選擇 ``` --- 範例 ```cpp string prepareMessage(){ this_thread::sleep_for(chrono::seconds(3)); return "Hello world"; } // main future<string> message = async(prepareMessage); /* * do something ... */ cout << message.get(); ``` > 如果function還沒執行完就要get他的值,那會等待他執行完。(阻塞) --- ### packaged_task<> 可以算是async()的底層,把function和promise包成一個物件,此物件可像function object一樣使用,若要get回傳值一定要等到function執行完,不然會一直阻塞,可用move()傳給thread()來達成async()的功能。 ```cpp packaged_task<returnType(Args...)>(function) // 建構式和function class差不多 // 也像function一樣可以用()運算子 .get_future() // 跟promise一樣,回傳一個future<returnType>物件 ``` --- 範例一(單執行緒) ```cpp string prepareMessage(){ this_thread::sleep_for(chrono::seconds(3)); cout << "prepare complete\n"; return "Hello world"; } // main packaged_task<string()> task(prepareMessage); auto result = task.get_future(); task(); cout << result.get().c_str(); ``` --- 範例二(多執行緒) ```cpp string prepareMessage(){ this_thread::sleep_for(chrono::seconds(3)); cout << "prepare complete\n"; return "Hello world"; } // main packaged_task<string()> task(prepareMessage); auto result = task.get_future(); thread(move(task)).detach(); /* * 可以做其他事,不用一直等 */ cout << result.get().c_str(); ``` --- # multi thread programs ## 1 100個資料,101人同時要求資源 比較有lock沒lock的差別 > 共有4種要求資料方式,每按一次 Enter 換下一個,4個後重複。 > 會輸出每種方式所花的時間、更改後的資料。 --- ### include ```cpp= // stdafx.h #include "targetver.h" #include <stdio.h> #include <tchar.h> #include <iostream> #include <array> #include <thread> #include <mutex> #include <ctime> #include <functional> ``` --- ### Data ```cpp= class Data{ static int delayTime; // 增加 data race 的機率. mutex m; int val; public: Data(int val=0) :val(val){} int Get(){ return val; } void Reset(){ val = 0; } void Increase(int v=1){ // non atomic int tmp = val; for (int i = 0; i < delayTime; ++i); // delay tmp += v; val = tmp; } void IncreaseWithLock(int v=1){ unique_lock<mutex> lock(m); int tmp = val; for (int i = 0; i < delayTime; ++i); // delay tmp += v; val = tmp; } }; int Data::delayTime = 50000; ``` --- ### Tester ```cpp= struct Tester{ array<Data, 100> data; array<thread, 101> people; clock_t clockTime; // to account consume time typedef function<void(array<Data, 100>&)> Fn; void test(const char* message, Fn action){ cout << message << endl; ResetData(); clockTime = clock(); for (auto&& th : people){ th = thread(action, ref(data)); } for (auto&& th : people){ th.join(); } cout << "clock:" << clock() - clockTime << endl; OutputData(); cin.get(); } void ResetData(){ for (auto& e : data) e.Reset(); } void OutputData(){ for (auto& e : data) cout << e.Get() << " "; } }; ``` --- ### functions ```cpp= void requireDataWithLock(array<Data, 100>& data){ static mutex m; unique_lock<mutex> lock(m); for (auto&& e : data){ e.Increase(); } } void requireDataWithoutLock(array<Data, 100>& data){ for (auto&& e : data){ e.Increase(); } } void requireAtomicDataWithLock(array<Data, 100>& data){ static mutex m; unique_lock<mutex> lock(m); for (auto&& e : data){ e.IncreaseWithLock(); } } void requireAtomicDataWithoutLock(array<Data, 100>& data){ for (auto&& e : data){ e.IncreaseWithLock(); } } ``` --- ### main ```cpp= int main(){ Tester tester; while (true){ tester.test("Require Data With Lock", requireDataWithLock); tester.test("Require Data Without Lock", requireDataWithoutLock); tester.test("Require Atomic Data With Lock", requireAtomicDataWithLock); tester.test("Require Atomic Data Without Lock", requireAtomicDataWithoutLock); } return 0; } ``` --- ## 2 10個攤位,每個攤位都有一個服務客人的時間,一次服務一個客人 多組客人去吃飯,每個客人都有不同的等待耐心當耐心耗完時就會離開不繼續排隊 固定時間會增加客人去排隊 印出每個客人的等待時間, 與是否離開等待 > 這是單執行緒程式,用來模擬多執行緒發生的事 > 因為是一步一步執行的,可以看到每一步發生什麼事 > 按 Enter 下一步 --- ### include ```cpp= // stdafx.h #include "targetver.h" #include <stdio.h> #include <tchar.h> #include <iostream> #include <array> #include <list> #include <memory> ``` --- ### Client ```cpp= class Client{ static int count; static int minPatience; static int rangePatience; int waitTime; bool waiting; public: const int id; const int patienceTime; Client(int patienceTime) :patienceTime(patienceTime), id(count++), waitTime(0), waiting(true){ cout << "client[" << id << "] join , patience : " << patienceTime << endl; } Client() :Client(rand() % rangePatience + minPatience){} void Clock(){ if (waiting) ++waitTime; } void BeServe(int storeId){ waiting = false; cout << "client[" << id << "] be served by store [" << storeId << "], wait for : " << waitTime << endl; } void NoPatienceExit(){ cout << "client[" << id << "], no patience exit, wait for : " << waitTime << endl; } bool IsNoPatience()const{ return waitTime == patienceTime; } }; int Client::count = 0; int Client::minPatience = 3; int Client::rangePatience = 2; ``` --- ### Store ```cpp= class Store{ static int count; static int minServiceTime; static int rangeServiceTime; unique_ptr<Client> client; int passTime; public: const int id; const int serviceTime; Store(int serviceTime) :serviceTime(serviceTime), id(count++), client(nullptr), passTime(0){ cout << "store [" << id << "]: service time : " << serviceTime << endl; } Store() :Store(rand() % rangeServiceTime + minServiceTime){} void Clock(){ if (client) ++passTime; } void Serve(unique_ptr<Client>&& client){ client->BeServe(id); this->client = move(client); passTime = 0; } void Pop(){ if (client){ cout << "store[" << id << "] service finish, client[" << client->id << "] exit" << endl; client.release(); } } bool IsFinish()const{ return passTime == serviceTime; } bool IsEmpty()const{ return !(bool)(client); } }; int Store::count = 0; int Store::minServiceTime = 30; int Store::rangeServiceTime = 20; ``` --- ### main (scheduler) ```cpp= int main() { array<Store, 10> stores; list< unique_ptr<Client> > clients; const int addTime = 2; int passTime = 0; while (true){ // add client if (passTime%addTime == 0){ clients.push_back(make_unique<Client>()); } // store clock for (auto& store : stores) { if (store.IsFinish()) store.Pop(); if (store.IsEmpty() && !clients.empty()){ store.Serve(move(clients.front())); clients.pop_front(); } store.Clock(); } // client clock for (auto it = clients.begin(); it != clients.end(); ++it) { unique_ptr<Client>& client = *it; client->Clock(); if (client->IsNoPatience()){ client->NoPatienceExit(); it = clients.erase(it); } } ++passTime; cout << "--Clock-- "<<passTime; cin.get(); } return 0; } ``` --- ## 3 哲學家用餐問題 把Philosopher::Action裡的 TryToEat() 改成 TryToEat_MultipleLock() 就可解決 deadlock Philosopher::delay 越大越容易發生deadlock --- ### include ```cpp= // stdafx.h #include "targetver.h" #include <stdio.h> #include <tchar.h> #include <ctime> #include <iostream> #include <thread> #include <mutex> #include <chrono> #include <vector> ``` --- ### Philosopher ```cpp= class Philosopher{ enum Status{think, hungry, eat}; static int count; static int delay; static int minEatTime; static int minThinkTime; static int rangeEatTime; static int rangeThinkTime; mutex& leftFork; mutex& rightFork; Status status; int forkNum; public: const int id; Philosopher(mutex& left, mutex& right) :id(count++), leftFork(left), rightFork(right), status(Status::think), forkNum(0) { thread(&Philosopher::Action,this).detach(); } void ShowStatus(){ cout << "Philosopher[" << id << "] : " << StatusStr() << "\thas forks : " << forkNum << endl; } private: void Action(){ while (true){ Think(); TryToEat(); forkNum = 0; } } void TryToEat(){ // may occur deadlock status = Status::hungry; unique_lock<mutex> l_fork(leftFork); ++forkNum; for (int i = 0; i < delay; ++i); // 增加發生機率. unique_lock<mutex> r_fork(rightFork); ++forkNum; status = Status::eat; this_thread::sleep_for(chrono::milliseconds(rand() % rangeEatTime + minEatTime)); cout << "eat finish" << endl; } void TryToEat_multipleLock(){ // can solve deadlock status = Status::hungry; unique_lock<mutex> l_fork(leftFork, std::defer_lock); unique_lock<mutex> r_fork(rightFork, std::defer_lock); lock(l_fork, r_fork); forkNum = 2; status = Status::eat; this_thread::sleep_for(chrono::milliseconds(rand() % rangeEatTime + minEatTime)); cout << "eat finish" << endl; } void Think(){ status = Status::think; this_thread::sleep_for(chrono::milliseconds(rand() % rangeThinkTime + minThinkTime)); } const char* StatusStr(){ switch (status) { case Philosopher::think: return "think"; case Philosopher::hungry: return "hungry"; case Philosopher::eat: return "eat"; default: break; } } }; int Philosopher::count = 0; int Philosopher::delay = 200000; int Philosopher::minEatTime = 1000; int Philosopher::minThinkTime = 10000; int Philosopher::rangeEatTime = 4000; int Philosopher::rangeThinkTime = 2000; ``` --- ### Table ```cpp= class Table{ vector<mutex> forks; vector<Philosopher> philosophers; public: Table(int number):forks(number){ philosophers.reserve(number); for (int i = 0; i < number-1; ++i){ philosophers.emplace_back(forks[i], forks[i + 1]); } philosophers.emplace_back(forks[number-1], forks[0]); } void Action(){ while (true){ for (auto&& p : philosophers){ p.ShowStatus(); } system("cls"); } } }; ``` --- ### main ```cpp= int main() { srand(time(nullptr)); Table table(8); table.Action(); return 0; } ``` ---