# C++ thread
###### tags: `cpp`
要在C++使用thread,要記得加編譯器參數
`-std=c++11 -pthread`
## 參考
[thispointer.com](https://thispointer.com/c11-multithreading-tutorial-series/)
[cplusplus.com](http://www.cplusplus.com/reference/multithreading/)
[cppreference.com](https://en.cppreference.com/w/cpp/thread)
[jyt0532 multithread](https://www.jyt0532.com/2016/12/23/c++-multi-thread-p1/)
以下都是std::的,不是boost::
---
## thread
就像一個指向執行緒的指標,在建構時就會順便建構該執行緒,可以用該指標間接控制該執行緒,若detach()該執行緒也只是無法繼續控制該執行緒,該執行緒還會繼續存在,但和指標不一樣的是,new出來的物件如果沒有delete就會一直存在,而創造出來的執行緒只要執行完畢就會自動消失。
> 上面說的指標只是一個概念
```cpp
// 建立一個執行緒,此執行緒會執行 Fn(Args...)
// 如果要傳參考的話需要加ref()
thread(Fn, Args...)
// 主執行緒會在此一直等待,直到此執行緒執行完
join()
// 檢查此執行緒是否可以 join (是否還和主執行緒連接)
joinable()
// 把此執行緒和主執行緒的連接切斷
detach()
// 如果此執行緒存在,會返回唯一的識別碼
// 回傳型態為 thread::id
get_id()
```
> 創造者(主執行緒)和被創造者(此執行緒)的關係就像function call一樣,被創造者執行完會回到創造者的執行緒,但是可以detach讓被創造者獨立出去。
---
使用範例
```cpp
// 此執行緒(thr)會執行一個跑n次的迴圈,n=100
auto thr = thread([](int n){
for(int i=0; i<n; ++i);
}, 100);
// 切斷連結
thr.detach();
// 因為thr已經和主執行緒分開了,所以回傳false
if(thr.joinable())
thr.join();
```
> 主執行緒 : 呼叫 thread() 的執行緒
> 此執行緒 : thread() 建構出的執行緒
---
### this_thread::
```cpp
// 回傳此執行緒的唯一識別碼
get_id()
// 暫時中斷此執行緒,讓他去外面排隊等著再次被執行
yield()
// 暫時中斷此執行緒,讓他去外面等一段時間再去排隊等著被執行
sleep_for(chrono::duration<>)
// 暫時中斷此執行緒,讓他去外面等到某個時間點再去排隊等著被執行
sleep_until(chrono::time_point<>)
```
> CPU同時能跑的執行緒是有上限的,如果你創造很多條執行緒,實際上只會有幾條再跑,其他的都在排隊等著被執行,但執行緒可能執行一段時間就會自動yield出去換別人跑(分時多工),並不會等先到的執行緒整個結束才讓後面的執行。
---
#### chrono::duration
duration<>可以用這些定義的型別取代,比較直覺
```cpp
std::chrono::nanoseconds()
std::chrono::microseconds()
std::chrono::milliseconds()
std::chrono::seconds()
std::chrono::minutes()
std::chrono::hours()
```
---
使用範例
```cpp
// main
cout << "this thread is " << this_thread::get_id() << endl;
for (const char c : "Hello world"){
cout << c;
this_thread::sleep_for(chrono::seconds(1));
}
```
> 先印出主執行緒的id,在每隔一秒輸出一個字。
---
### call_once
如果有多個執行緒一起執行一個function,但只想讓這個function被執行一次,可以用call_once()。
`std::call_once(once_flag, Fn, Args...)`
- once_flag : 用來判斷是否已被執行,所以需要共用同一個flag
- Fn(Args...) : 與一般thread呼叫方式一樣
範例
```cpp
void test(){
cout << "once";
}
void tester(){
static once_flag f;
call_once(f, test); // 由他呼叫的test()只會被執行一次
}
int main()
{
for (int i = 0; i < 10; ++i){
thread(tester).detach();
}
return 0;
}
```
> call_once執行時,會先判斷flag,如果還沒執行過就設定flag並執行,原理就像macro的引入保護一樣。
---
### ! race condition !
因為執行緒會共用資料,所以可能發生同一份資料同時被多條執行緒寫入,結果有可能會和預想的不一樣,要解決這個問題就要用到lock。
---
## lock
C++11的lock有分為:
- 基本型(較不安全):
- mutex
- timed_mutex
- recursive_mutex
- recursive_timed_mutex
- 包裝型(把基本型包起來使用,解構時會自動解鎖,較安全):
- lock_guard<>
- unique_lock<>
---
### mutex
```cpp
// 互斥鎖鎖定,當其他執行緒也執行到該互斥鎖的lock()時,會等待直到該互斥鎖unlock()
lock()
// 解除鎖定
unlock()
// 若該互斥鎖被鎖定,會回傳false並繼續執行,沒被鎖定就回傳true繼續執行,最好搭配if使用
try_lock()
```
> 當一執行緒成功lock(),取得該mutex所有權,則只有該執行緒能對該mutex進行unlock()。
---
範例一 lock
```cpp
void test(){
static mutex m;
m.lock(); // maybe waiting
/*
* atomic section
*/
m.unlock();
/*
* maybe occur race condition
*/
}
```
---
範例二 try_lock
```cpp
void test(){
static mutex m;
if(m.try_lock()){ // no waiting
/*
* atomic section
*/
m.unlock();
}
/*
* maybe occur data condition
*/
}
```
---
### special mutex
除了基本的mutex,還有幾個特殊功能的mutex
- `timed_mutex` 比基本mutex多了兩個時間功能。
- `try_lock_for(chrono::duration<>)`
- 在一段時間內會一直try_lock(阻塞一段時間),時間內成功lock回傳true並繼續,超時還沒lock回傳false並繼續。
- `try_lock_until(chrono::time_point<>)`
- 同上,但會一直持續到指定的時間點。
- `recursive_mutex` 可用在遞迴的mutex。
- 最好搭配unique_lock使用,不要自行呼叫unlock()
- `recursive_timed_mutex` 結合以上兩種mutex
---
### *C++14 up : SharedMutex*
為了更方便的製作讀寫鎖,而多了可以共用所有權的mutex
- `shared_timed_mutex` (C++14)
- `shared_mutex` (C++17)
他們的使用方式分成兩種級別,unique、shared
- unique : lock後,只有自己擁有所有權
- lock()
- try_lock()
- unlock()
- shared : lock後,其他人只能用shared方式取得所有權,但所有權會共享
- lock_shared()
- try_lock_shared()
- unlock_shared()
---
### lock_guard
mutex::lock() 有一個問題是**若還沒執行 unlock() 就發生例外跳出去,導致沒人執行 unlock() ,導致一直都是 lock 狀態**。
解決這個問題的就是 lock_guard,原理是當發生例外往外跳時,因為離開scope,所以會解構在這個scope內創造的東西。
```cpp
mutex m;
{
lock_guard<mutex> lg(m); // 建構子會執行m.lock()
/*
* atomic
*/
} // 解構子會執行m.unlock()
```
---
### unique_lock
lock_guard 保護的範圍是整個 scope ,如果要縮小範圍的話,就用 unique_lock 解決。
和 lock_guard一樣有保護功能,還能使用mutex的功能。
他就像一個指向mutex的智慧指標,需要指向一個mutex,可以使用該mutex的功能,也可用release取消指向該mutex,還會自動使用unlock。
除了可使用mutex的功能以外,還有以下功能
```cpp
owns_lock() // 是否擁有該mutex的所有權
release() // 釋放所指向的mutex,並回傳指向他的指標
mutex() // 回傳指向mutex的指標,但不釋放
```
---
### constructor flag
unique_lock<>還有另外三種建構模式
```cpp
// 可用在 unique_lock的建構子上
// 在建構時執行try_lock()
std::try_to_lock
// 可用在 unique_lock的建構子上
// 在建構時"先"不執行lock()
std::defer_lock
// 可用在 unique_lock、lock_guard的建構子上
// 假設在建構前,已經把mutex lock住了
// 所以在建構時"不會再"執行lock(),之後也不會執行lock()
std::adopt_lock
```
---
範例
```cpp
mutex m;
{
unique_lock<mutex> ul(m, try_to_lock); // 建構子會執行m.try_lock()
if(ul.owns_lock()){
/*
* atomic section.
*/
ul.unlock();
}
/*
* may occur race condition.
*/
} // 若還沒unlock()會執行unlock()
```
---
### *C++14 up : shared_lock<>*
可以使用指向ShareMutex,並用shared方式使用他,配合unipue_lock<>可實作讀寫鎖。
```cpp
shared_mutex m;
void read(){
shared_lock<share_mutex> lock(m);
// 同時擁多個執行緒擁有所有權
}
void write(){
unique_lock<shared_mutex> lock(m);
// 同時只有一個執行緒擁有所有權
}
```
---
### ! dead lock !
當有一執行緒tA 占用資源rA 並等待資源rB
另有一執行緒tB 占用資源rB 並等待資源rA
雙方都沒辦法執行,沒執行完就不會釋放自己的資源,於是一直互相等待。
當這情況發生時,就只能結束整個程式,所以要盡量避免。
---
### multiple lock
為解決這個問題,有以下兩個function,可以一次拿到所有需要的資源。
不過當你要求的資源太多,可能一直沒機會全部拿到,然後就一直等待到所有人都執行結束釋放資源。
```cpp
std::lock(mutex1, mutex2, ...)
std::try_lock(mutex1, mutex2, ...)
// 這裡參數的mutex可以是一般的mutex或各種特殊功能的mutex,像lock_guard、unique_lock...
```
---
### *C++17 up : scoped_lock*
如上述multiple lock,在C++17以後可以使用scoped_lock,有同樣的效果,並且和lock_guard<>一樣,會自動unlock。
```cpp
scoped_lock lock(mutex1, mutex2, ...)
scoped_lock lock(adopt_lock, mutex1, mutex2, ...)
```
---
## condition_variable
若要讓一個thread準備好資料後通知另一個thread使用,可以用condition_variable
可以依照條件從當前的thread跳出到 condition_variable 內部的一個 queue等待,之後需要時再喚醒他,這樣就不用浪費一個thread一直等待。
---
### wait()
```cpp
// 會跳出當前執行緒
// 並把此執行緒加入condition_varible內部的queue
// 再把該互斥鎖unlock()
wait(unique_lock<mutex>)
// 若f回傳false: 繼續wait
// 若f回傳true: 不會做事,繼續執行
wait(unique_lock<mutex>, function<bool()>f)
```
---
### notify
```cpp
// 喚醒queue內第一個執行緒繼續執行
notify_one()
// 喚醒queue內所有執行緒繼續執行
notify_all()
```
---
範例
```cpp
condition_variable cv;
mutex m;
bool ready = false;
void output_data(string* message){
unique_lock<mutex> ul(m);
while(!ready)
cv.wait(ul);
cout << message->c_str() << endl;
}
void prepare_data(string* message){
cout << "prepare message ...\n";
this_thread::sleep_for(chrono::seconds(5));
*message = "hello world";
ready = true;
cout << "message ready, continue thread.\n";
cv.notify_one();
}
// main
string message;
thread(prepare_data, &message).detach();
thread(output_data, &message).detach();
```
---
### condition_variable_any
condition_variable<>只能使用unique_lock<>,而多了any就是可以使用任何lockable的東西。
---
## future
若要讓一個thread準備好資料後通知另一個thread使用,也可以用future和promise。
```cpp
// promise<dataType>
get_future() // 回傳一個與之對應的future物件
set_value() // 設定共用資料,並通知future物件繼續執行
// future<dataType>
get() // 會等待直到promise設定好資料,並回傳該資料
wait()// 會等待直到promise設定好資料,但不會回傳該資料
wait_for(chrono::duration)
// 會等待一段時間
// 若在時間內得到資料 回傳future_status::ready 並結束等待
// 時間超過了還沒拿到資料 回傳future_status::timeout 並結束等待
```
---
範例
```cpp
void output_data(future<string>& message){
cout << message.get().c_str() << endl;
}
void prepare_data(promise<string>& message){
cout << "prepare message ...";
this_thread::sleep_for(chrono::seconds(5));
message.set_value("hello world");
cout << "message ready, continue thread. \n";
}
// main
promise<string> message;
future<string> futureObj = message.get_future();
thread(prepare_data, ref(message)).detach();
thread(output_data, ref(futureObj)).detach();
```
---
### async()
建立一個有回傳值的thread,將它的回傳值保存再future物件中,需要時再拿出來。
```cpp
future<T> async(launch_policy, function, args...)
// 創造一個thread執行function(args...)
// 當function執行完,回傳值會被的future接收
// launch_policy有兩種
// launch::async 馬上執行function
// launch::deferred 先不執行function,在future::get時才會執行
// 如果不指定launch_policy,系統會自動選擇
```
---
範例
```cpp
string prepareMessage(){
this_thread::sleep_for(chrono::seconds(3));
return "Hello world";
}
// main
future<string> message = async(prepareMessage);
/*
* do something ...
*/
cout << message.get();
```
> 如果function還沒執行完就要get他的值,那會等待他執行完。(阻塞)
---
### packaged_task<>
可以算是async()的底層,把function和promise包成一個物件,此物件可像function object一樣使用,若要get回傳值一定要等到function執行完,不然會一直阻塞,可用move()傳給thread()來達成async()的功能。
```cpp
packaged_task<returnType(Args...)>(function)
// 建構式和function class差不多
// 也像function一樣可以用()運算子
.get_future()
// 跟promise一樣,回傳一個future<returnType>物件
```
---
範例一(單執行緒)
```cpp
string prepareMessage(){
this_thread::sleep_for(chrono::seconds(3));
cout << "prepare complete\n";
return "Hello world";
}
// main
packaged_task<string()> task(prepareMessage);
auto result = task.get_future();
task();
cout << result.get().c_str();
```
---
範例二(多執行緒)
```cpp
string prepareMessage(){
this_thread::sleep_for(chrono::seconds(3));
cout << "prepare complete\n";
return "Hello world";
}
// main
packaged_task<string()> task(prepareMessage);
auto result = task.get_future();
thread(move(task)).detach();
/*
* 可以做其他事,不用一直等
*/
cout << result.get().c_str();
```
---
# multi thread programs
## 1
100個資料,101人同時要求資源
比較有lock沒lock的差別
> 共有4種要求資料方式,每按一次 Enter 換下一個,4個後重複。
> 會輸出每種方式所花的時間、更改後的資料。
---
### include
```cpp=
// stdafx.h
#include "targetver.h"
#include <stdio.h>
#include <tchar.h>
#include <iostream>
#include <array>
#include <thread>
#include <mutex>
#include <ctime>
#include <functional>
```
---
### Data
```cpp=
class Data{
static int delayTime; // 增加 data race 的機率.
mutex m;
int val;
public:
Data(int val=0) :val(val){}
int Get(){
return val;
}
void Reset(){
val = 0;
}
void Increase(int v=1){ // non atomic
int tmp = val;
for (int i = 0; i < delayTime; ++i); // delay
tmp += v;
val = tmp;
}
void IncreaseWithLock(int v=1){
unique_lock<mutex> lock(m);
int tmp = val;
for (int i = 0; i < delayTime; ++i); // delay
tmp += v;
val = tmp;
}
};
int Data::delayTime = 50000;
```
---
### Tester
```cpp=
struct Tester{
array<Data, 100> data;
array<thread, 101> people;
clock_t clockTime; // to account consume time
typedef function<void(array<Data, 100>&)> Fn;
void test(const char* message, Fn action){
cout << message << endl;
ResetData();
clockTime = clock();
for (auto&& th : people){
th = thread(action, ref(data));
}
for (auto&& th : people){
th.join();
}
cout << "clock:" << clock() - clockTime << endl;
OutputData();
cin.get();
}
void ResetData(){
for (auto& e : data)
e.Reset();
}
void OutputData(){
for (auto& e : data)
cout << e.Get() << " ";
}
};
```
---
### functions
```cpp=
void requireDataWithLock(array<Data, 100>& data){
static mutex m;
unique_lock<mutex> lock(m);
for (auto&& e : data){
e.Increase();
}
}
void requireDataWithoutLock(array<Data, 100>& data){
for (auto&& e : data){
e.Increase();
}
}
void requireAtomicDataWithLock(array<Data, 100>& data){
static mutex m;
unique_lock<mutex> lock(m);
for (auto&& e : data){
e.IncreaseWithLock();
}
}
void requireAtomicDataWithoutLock(array<Data, 100>& data){
for (auto&& e : data){
e.IncreaseWithLock();
}
}
```
---
### main
```cpp=
int main(){
Tester tester;
while (true){
tester.test("Require Data With Lock", requireDataWithLock);
tester.test("Require Data Without Lock", requireDataWithoutLock);
tester.test("Require Atomic Data With Lock", requireAtomicDataWithLock);
tester.test("Require Atomic Data Without Lock", requireAtomicDataWithoutLock);
}
return 0;
}
```
---
## 2
10個攤位,每個攤位都有一個服務客人的時間,一次服務一個客人
多組客人去吃飯,每個客人都有不同的等待耐心當耐心耗完時就會離開不繼續排隊
固定時間會增加客人去排隊
印出每個客人的等待時間, 與是否離開等待
> 這是單執行緒程式,用來模擬多執行緒發生的事
> 因為是一步一步執行的,可以看到每一步發生什麼事
> 按 Enter 下一步
---
### include
```cpp=
// stdafx.h
#include "targetver.h"
#include <stdio.h>
#include <tchar.h>
#include <iostream>
#include <array>
#include <list>
#include <memory>
```
---
### Client
```cpp=
class Client{
static int count;
static int minPatience;
static int rangePatience;
int waitTime;
bool waiting;
public:
const int id;
const int patienceTime;
Client(int patienceTime)
:patienceTime(patienceTime), id(count++), waitTime(0), waiting(true){
cout << "client[" << id << "] join , patience : " << patienceTime << endl;
}
Client() :Client(rand() % rangePatience + minPatience){}
void Clock(){
if (waiting)
++waitTime;
}
void BeServe(int storeId){
waiting = false;
cout << "client[" << id << "] be served by store [" << storeId << "], wait for : " << waitTime << endl;
}
void NoPatienceExit(){
cout << "client[" << id << "], no patience exit, wait for : " << waitTime << endl;
}
bool IsNoPatience()const{
return waitTime == patienceTime;
}
};
int Client::count = 0;
int Client::minPatience = 3;
int Client::rangePatience = 2;
```
---
### Store
```cpp=
class Store{
static int count;
static int minServiceTime;
static int rangeServiceTime;
unique_ptr<Client> client;
int passTime;
public:
const int id;
const int serviceTime;
Store(int serviceTime)
:serviceTime(serviceTime), id(count++), client(nullptr), passTime(0){
cout << "store [" << id << "]: service time : " << serviceTime << endl;
}
Store() :Store(rand() % rangeServiceTime + minServiceTime){}
void Clock(){
if (client)
++passTime;
}
void Serve(unique_ptr<Client>&& client){
client->BeServe(id);
this->client = move(client);
passTime = 0;
}
void Pop(){
if (client){
cout << "store[" << id << "] service finish, client[" << client->id << "] exit" << endl;
client.release();
}
}
bool IsFinish()const{
return passTime == serviceTime;
}
bool IsEmpty()const{
return !(bool)(client);
}
};
int Store::count = 0;
int Store::minServiceTime = 30;
int Store::rangeServiceTime = 20;
```
---
### main (scheduler)
```cpp=
int main()
{
array<Store, 10> stores;
list< unique_ptr<Client> > clients;
const int addTime = 2;
int passTime = 0;
while (true){
// add client
if (passTime%addTime == 0){
clients.push_back(make_unique<Client>());
}
// store clock
for (auto& store : stores)
{
if (store.IsFinish())
store.Pop();
if (store.IsEmpty() && !clients.empty()){
store.Serve(move(clients.front()));
clients.pop_front();
}
store.Clock();
}
// client clock
for (auto it = clients.begin(); it != clients.end(); ++it)
{
unique_ptr<Client>& client = *it;
client->Clock();
if (client->IsNoPatience()){
client->NoPatienceExit();
it = clients.erase(it);
}
}
++passTime;
cout << "--Clock-- "<<passTime;
cin.get();
}
return 0;
}
```
---
## 3
哲學家用餐問題
把Philosopher::Action裡的 TryToEat() 改成 TryToEat_MultipleLock() 就可解決 deadlock
Philosopher::delay 越大越容易發生deadlock
---
### include
```cpp=
// stdafx.h
#include "targetver.h"
#include <stdio.h>
#include <tchar.h>
#include <ctime>
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
#include <vector>
```
---
### Philosopher
```cpp=
class Philosopher{
enum Status{think, hungry, eat};
static int count;
static int delay;
static int minEatTime;
static int minThinkTime;
static int rangeEatTime;
static int rangeThinkTime;
mutex& leftFork;
mutex& rightFork;
Status status;
int forkNum;
public:
const int id;
Philosopher(mutex& left, mutex& right)
:id(count++), leftFork(left), rightFork(right), status(Status::think), forkNum(0)
{
thread(&Philosopher::Action,this).detach();
}
void ShowStatus(){
cout << "Philosopher[" << id << "] : " << StatusStr() << "\thas forks : " << forkNum << endl;
}
private:
void Action(){
while (true){
Think();
TryToEat();
forkNum = 0;
}
}
void TryToEat(){ // may occur deadlock
status = Status::hungry;
unique_lock<mutex> l_fork(leftFork);
++forkNum;
for (int i = 0; i < delay; ++i); // 增加發生機率.
unique_lock<mutex> r_fork(rightFork);
++forkNum;
status = Status::eat;
this_thread::sleep_for(chrono::milliseconds(rand() % rangeEatTime + minEatTime));
cout << "eat finish" << endl;
}
void TryToEat_multipleLock(){ // can solve deadlock
status = Status::hungry;
unique_lock<mutex> l_fork(leftFork, std::defer_lock);
unique_lock<mutex> r_fork(rightFork, std::defer_lock);
lock(l_fork, r_fork);
forkNum = 2;
status = Status::eat;
this_thread::sleep_for(chrono::milliseconds(rand() % rangeEatTime + minEatTime));
cout << "eat finish" << endl;
}
void Think(){
status = Status::think;
this_thread::sleep_for(chrono::milliseconds(rand() % rangeThinkTime + minThinkTime));
}
const char* StatusStr(){
switch (status)
{
case Philosopher::think:
return "think";
case Philosopher::hungry:
return "hungry";
case Philosopher::eat:
return "eat";
default:
break;
}
}
};
int Philosopher::count = 0;
int Philosopher::delay = 200000;
int Philosopher::minEatTime = 1000;
int Philosopher::minThinkTime = 10000;
int Philosopher::rangeEatTime = 4000;
int Philosopher::rangeThinkTime = 2000;
```
---
### Table
```cpp=
class Table{
vector<mutex> forks;
vector<Philosopher> philosophers;
public:
Table(int number):forks(number){
philosophers.reserve(number);
for (int i = 0; i < number-1; ++i){
philosophers.emplace_back(forks[i], forks[i + 1]);
}
philosophers.emplace_back(forks[number-1], forks[0]);
}
void Action(){
while (true){
for (auto&& p : philosophers){
p.ShowStatus();
}
system("cls");
}
}
};
```
---
### main
```cpp=
int main()
{
srand(time(nullptr));
Table table(8);
table.Action();
return 0;
}
```
---