<style>
.reveal {
font-size: 28px;
}
</style>
# Some Concurrency Introduction
---
### Agenda
* Atomic
* lock free API
* compare and swap(CAS)
* Thread local storage
* Concurrent_vector
* Case studies
---
### Atomic [cppreference](https://en.cppreference.com/w/cpp/atomic/atomic)
* If one thread writes to an atomic object while another thread reads from it, the behavior is well-defined.
```cpp=
std::atomic_int acnt;
int cnt;
void f()
{
for (int n = 0; n < 10000; ++n)
{
++acnt;
++cnt;
}
}
int main()
{
{
std::vector<std::jthread> pool;
for (int n = 0; n < 10; ++n)
pool.emplace_back(f);
}
std::cout << "The atomic counter is " << acnt << '\n'
<< "The non-atomic counter is " << cnt << '\n';
}
/* output:
The atomic counter is 100000
The non-atomic counter is 85185
* /
```
---
### Lock Free API
* Use `std::atomic<T>::is_always_lock_free` to check if the atomic variable is a lock free value.
* What is lock free? [Reference](https://hackmd.io/@sysprog/concurrency-lockfree#Lock-Free-%E5%92%8C-Lock-Less-%E7%9A%84%E5%88%86%E9%87%8E)
* Lock free is not Lockless.
* An algorithm is lock-free if, when the program threads are run `for a sufficiently long time, at least one of the threads makes progress`
* A lock free algorithm should avoid Deadlock, livelock, starving...
* [Reference](https://www.baeldung.com/cs/deadlock-livelock-starvation)
---
### Lock-free is not lock-less
```
while (x == 0)
{
x = 1 - x;
}
```

---
### Atomic API
* Plateform provides(x86,arm...) the atomic instruction.
* load: Atomically returns the current atomic value.
* store: Atomically replaces the current value.
* Atomically operation
* fetch_add(): `++, +=` are implemented by fetch_add
* fetch_sub(): `--, -=` are implemented by fetch_sub
* fetch_or(): `|=`
* fetch_and(): `&=`
* fetch_xor(): `^=`
---
### Atomic API
* compare_and_swap: CAS [cppreference](https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange)
* if the current value is equal to `expected`, assign the desired to the atomic variable and return `true`.
* else assign the current value to `expected` and return `false`.
```cpp=
bool compare_exchange_weak( T& expected, T desired,
std::memory_order success,
std::memory_order failure ) noexcept;
bool compare_exchange_strong( T& expected, T desired,
std::memory_order success,
std::memory_order failure ) noexcept;
```
---
### Atomic API
* compare_exchange_weak:
* compare_exchange_weak is allowed to `fail spuriously`, that is, acts as if `*this != expected` even if they are equal.
* When a compare-and-exchange is in a loop, compare_exchange_weak will yield better performance on some platforms.
```cpp=
std::atomic<int> m=0;
void f(int n)
{
int cur = m;
do {
if (cur>=n) {
break;
}
}while(!m.compare_exchange_weak(cur, n));
}
int main()
{
{
std::vector<std::jthread> pool;
for (int n = 0; n < 10; ++n)
pool.emplace_back(f, n);
}
std::cout<<"max is "<<m.load()<<"\n";
}
```
---
### lock free linked-list [Reference](http://15418.courses.cs.cmu.edu/spring2013/article/46)
* Making whole program lock free is very hard. Usually, lock-free is applied to some specific data structures.
* [GodBolt](https://godbolt.org/z/GrjzMhh9q)
```cpp=
struct Node {
int val=0;
std::atomic<Node*> next = nullptr;
};
struct Stack {
Node dummy;
void push(int v) {
Node* exp = dummy.next.load();
Node* n = new Node {v, nullptr};
do {
n->next.store(exp);
}while(!dummy.next.compare_exchange_weak(exp, n));
}
};
```
---
### Thread local Storage
* keyword `thread_local` after c++11 [cppreference](https://en.cppreference.com/w/cpp/language/storage_duration)
* tbb::enumerable_thread_specific
* tbb::combinable
---
### thread_local
* The `static` is implied for the thread_local variable.
* Hard to use in general usage scenarios.
* Hard to collect the result from every thread.
```cpp=
thread_local unsigned int rage = 1;
std::mutex cout_mutex;
void increase_rage(const std::string& thread_name)
{
++rage; // modifying outside a lock is okay; this is a thread-local variable
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "Rage counter for " << thread_name << ": " << rage << '\n';
}
int main()
{
std::thread a(increase_rage, "a"), b(increase_rage, "b");
{
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "Rage counter for main: " << rage << '\n';
}
a.join();
b.join();
}
/* output
Rage counter for a: 2
Rage counter for main: 1
Rage counter for b: 2
* /
```
---
### tbb::enumerable_thread_specific, [Spec](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/thread_local_storage/enumerable_thread_specific_cls)
* Thread local storage implemented by tbb
* Can declare it as an general variable.
* Combine function to collect.
* API:
* local()
* combine(), combine_each()
* begin(), end()
---
### tbb::enumerable_thread_specific Example
* Use finding max as example
```cpp=
int main()
{
tbb::enumerable_thread_specific<int> tls{0};
{
std::vector<std::jthread> pool;
for (int n = 0; n < 10; ++n) {
pool.emplace_back([&](int v){
int & m = tls.local();
m = std::max(m, v);
}, n);
}
}
// use combine to collect result
int max = tls.combine([](int lhs, int rhs){return std::max(lhs, rhs);});
// use combine_each to collect result
tls.combine_each([&](int elem){max = std::max(max,elem);});
std::cout<<"max is "<<max<<"\n";
}
```
---
### tbb::combinable
* It's just a wrapper of tbb::enumerable_thread_specific [Reference](https://github.com/oneapi-src/oneTBB/blob/master/include/oneapi/tbb/combinable.h)
```cpp=
template <typename T>
class combinable {
using my_alloc = typename tbb::cache_aligned_allocator<T>;
using my_ets_type = typename tbb::enumerable_thread_specific<T, my_alloc, ets_no_key>;
my_ets_type my_ets;
// ...
};
```
---
### Note: local() is slow [StackOverflow](https://stackoverflow.com/questions/30407691/tbbcombinablelocal-is-too-slow)
* The local storage is maintained by a Linear probing open address hash table.
```cpp=
tbb::enumerable_thread_specific<int> tls;
// Call local every
[&](int v){
tls.local() = std::max(tls.local(), v);
}
// Cache the local value as reference instead.
[&](int v){
int& m = tls.local();
m = std::max(m, v);
}
```
---
### concurrent_vector [Spec](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/containers/concurrent_vector_cls)
* concurrent_vector is a class template that represents a sequence container with the following features
* Multiple threads can concurrently grow the container and append new elements.
* Random access by index. The index of the first element is zero.
* Growing the container does not invalidate any existing iterators or indices.
* reserve, resize, shrink_to_fit, clear, swap are not thread safe. [Reference](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/containers/concurrent_vector_cls/unsafe_operations)
* concurrent container is not always thread safe and deterministic.
* Be careful with your use model.
---
### concurrent_vector
* data structure of concurrent_vector
* double addressing
* grow with double size capacity
* Indexing:
* Use log2 to find the segment index and pointer. [godbolt](https://godbolt.org/z/8Mca44hTn)
* Use index to get the value.

---
### concurrent_vector
* Using growing function in concurrency.
* push_back
* emplace_back
* grow_by(size_t delta)
* grow_to_at_least(size_t n)
---
### grow_by
1. change the size with CAS.
2. extend the capacity in segment_table if needed.
3. loop construct object from start to end.
4. return the iterator to the beginning of the appended sequence.
---
### grow_by psuedo code
```cpp=
iterator grow_by(size_t delta) {
// std::atomic<size_t> my_size;
/* decide the range for the current thread to handle */
size_t start_index = this->my_size.fetch_add(delta);
size_t end_index = start_index + delta;
size_t seg_index = this->segment_index_of(end_index - 1);
/* extend the capacity in table if needed */
if (/* segment in seg_index is not existed */) {
segment_type new_segment = self()->create_segment(table, seg_index, index);
if (!table[seg_index].compare_exchange_strong(disabled_segment, new_segment - segment_base(seg_index))) {
// compare_exchange failed => some other thread has already enabled this segment
// Deallocate the memory
self()->deallocate_segment(new_segment, seg_index);
}
}
/* construct the element in the new place */
internal_loop_construct(table, start_index, end_index, /*default construct*/);
/* return the start iterator */
return iterator(*this, start_index);
}
```
---
### grow_by

---
### grow_by

---
### Case Studies [GodBolt](https://godbolt.org/z/E1TP7ah4M)
* concurrent_vector
```cpp=
struct Container {
tbb::concurrent_vector<int> storage;
void setValue(size_t idx, int v) {
if (storage.size() <= idx) {
storage.grow_to_at_least(idx+1);
}
storage[idx] = v;
}
int getValue(size_t idx) {
return storage[idx];
}
};
int main()
{
Container c;
tbb::blocked_range<int> range{0,10000};
auto fun = [&](auto r){
for(auto idx = r.begin(); idx!=r.end(); ++idx) {
c.setValue(static_cast<size_t>(idx), idx);
}
};
// fun(range);
tbb::parallel_for(range, fun);
for(int idx=0; idx<10000; ++idx) {
if (idx != c.getValue(idx)) {
std::cout<<"Value for idx "<<idx<<" is "<<c.getValue(idx)<<"\n";
}
}
}
```
---
### Case Studies

---
### Case Studies

---
### Case Studies

---
### Case Studies
```
* Thread 1: setValue(100, 5);
* Thread 2: setValue(200, 5);
* Thread 2: call grow_to_at_least to 200 and update the concurrent_vector size to 200.
* Thread 1: size is 200 and update the concurrent_vector[100] = 5;
* Thread 2: overwrite the concurrent_vector from index 0 to 200 as 0.
* ^--- this overwrite the vector[100]
* Thread 2: update the concurrent_vector[200] = 5;
```
---
### Case Studies 2 [GodBolt](https://godbolt.org/z/94G5Yr85P)
```cpp=
struct Node {
std::vector<int> data;
explicit Node(int n) : data(n, n) {}
};
auto flatten(const std::vector<Node>& arr, bool mt) {
int n = arr.size();
tbb::concurrent_vector<int> allData;
std::vector<std::pair<size_t, size_t>> scope;
scope.resize(n);
auto fun = [&](auto Range) {
for (auto idx = Range.begin(); idx != Range.end(); ++idx) {
auto start = allData.size();
for (auto d : arr[idx].data) {
allData.emplace_back(d);
}
scope[idx] = std::make_pair(start, allData.size());
}
};
tbb::blocked_range<int> range{0, n};
if (mt) {
tbb::parallel_for(range, fun);
} else {
fun(range);
}
return std::make_pair(allData, scope);
}
auto check(const auto& allData, const auto& scope) {
int n = scope.size();
auto pass = true;
for (int i = 0; i < n; ++i) {
auto [start, end] = scope[i];
auto allCorrect =
std::all_of(allData.begin() + start, allData.begin() + end,
[=](auto v) { return v == i; });
pass &= allCorrect;
if (!pass) {
std::cout << "idx: " << i << ", scope: (" << start << ", " << end
<< ")\n";
for (auto s = start; s < end; ++s) {
std::cout << allData[s] << ", ";
}
std::cout << "\n";
break;
}
}
return pass;
}
int main(int /*argc*/, char** argv) {
int n = std::stoi(argv[1]);
std::vector<Node> arr;
arr.reserve(n);
for (int i = 0; i < n; ++i) {
arr.emplace_back(i);
}
{
auto [d, scope] = flatten(arr, false);
auto pass = check(d, scope);
std::cout << "single thread " << (pass ? "pass" : "fail") << std::endl;
}
{
auto [d, scope] = flatten(arr, true);
auto pass = check(d, scope);
std::cout << "multi thread " << (pass ? "pass" : "fail") << std::endl;
}
}
```
---
### Case studies 2
``` cpp
tbb::concurrent_vector<int> v;
auto Func() {
v.size();
v.push_back();
v.size();
}
```
| Thread1 | Thread2 | Status |
| ---- | ---- | ---- |
| size() | | start1 == 0 |
| | size() | start1 == start2 == 0|
| push_back() | | v: {1} |
| | push_back() | v: {1,2} |
| size() | | end1 == 2 |
| | size() | end1 == end2 == 2|
---
### Case studies 2 [GodBolt](https://godbolt.org/z/7hsEhEeE7)
```cpp=
auto threadSafeFlatten(const std::vector<Node>& arr, bool mt) {
int n = arr.size();
tbb::concurrent_vector<int> allData;
std::vector<std::pair<size_t,size_t>> scope;
scope.resize(n);
auto fun = [&](auto Range) {
for(auto idx=Range.begin(); idx!=Range.end(); ++idx) {
auto& nodeData = arr[idx].data;
// use grow_by to query the segment of the memory
auto beginIt = allData.grow_by(nodeData.size());
for(size_t i = 0 ; i < nodeData.size(); ++i) {
*(beginIt+i) = nodeData[i];
}
auto start = std::distance(allData.begin(), beginIt);
scope[idx] = std::make_pair(start, start+nodeData.size());
}
};
tbb::blocked_range<int> range{0, n};
if (mt) {
tbb::parallel_for(range, fun);
} else {
fun(range);
}
return std::make_pair(allData, scope);
}
```
---
### Conclusion
* Concurrency programming is very hard.
* Concurrent containers are not always safe.
* Mostly, it's only safe to call the same API in concurrency.
* Realize the limitaion of the concurrent containers.
* Read the document and dig into the source code.
* There are many atomic/CAS operation in tbb.
* Be careful in your use model and choose the best container.
---
### Reference
* [lock-free linkedlist/stack](http://15418.courses.cs.cmu.edu/spring2013/article/46)
* [concurrency lockfree](https://hackmd.io/@sysprog/concurrency-lockfree)
* [cppreference](https://en.cppreference.com/w/cpp/atomic/atomic)
* [oneTBB spec](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/nested-index)
* [oneTBB source code](https://github.com/oneapi-src/oneTBB)
* [An introduction for lock free](https://preshing.com/20120612/an-introduction-to-lock-free-programming/)
* [Deadlock, Livelock and Starvation](https://www.baeldung.com/cs/deadlock-livelock-starvation)
---
# Q&A
{"title":"Some Concurrency Introduction","description":"Atomic","contributors":"[{\"id\":\"09379b25-db04-47a4-8912-78e722b7a548\",\"add\":19284,\"del\":3779,\"latestUpdatedAt\":1762930096002}]"}