<style> .reveal { font-size: 28px; } </style> # Some Concurrency Introduction --- ### Agenda * Atomic * lock free API * compare and swap(CAS) * Thread local storage * Concurrent_vector * Case studies --- ### Atomic [cppreference](https://en.cppreference.com/w/cpp/atomic/atomic) * If one thread writes to an atomic object while another thread reads from it, the behavior is well-defined. ```cpp= std::atomic_int acnt; int cnt; void f() { for (int n = 0; n < 10000; ++n) { ++acnt; ++cnt; } } int main() { { std::vector<std::jthread> pool; for (int n = 0; n < 10; ++n) pool.emplace_back(f); } std::cout << "The atomic counter is " << acnt << '\n' << "The non-atomic counter is " << cnt << '\n'; } /* output: The atomic counter is 100000 The non-atomic counter is 85185 * / ``` --- ### Lock Free API * Use `std::atomic<T>::is_always_lock_free` to check if the atomic variable is a lock free value. * What is lock free? [Reference](https://hackmd.io/@sysprog/concurrency-lockfree#Lock-Free-%E5%92%8C-Lock-Less-%E7%9A%84%E5%88%86%E9%87%8E) * Lock free is not Lockless. * An algorithm is lock-free if, when the program threads are run `for a sufficiently long time, at least one of the threads makes progress` * A lock free algorithm should avoid Deadlock, livelock, starving... * [Reference](https://www.baeldung.com/cs/deadlock-livelock-starvation) --- ### Lock-free is not lock-less ``` while (x == 0) { x = 1 - x; } ``` ![image](https://hackmd.io/_uploads/SyKYIOZ_C.png) --- ### Atomic API * Plateform provides(x86,arm...) the atomic instruction. * load: Atomically returns the current atomic value. * store: Atomically replaces the current value. * Atomically operation * fetch_add(): `++, +=` are implemented by fetch_add * fetch_sub(): `--, -=` are implemented by fetch_sub * fetch_or(): `|=` * fetch_and(): `&=` * fetch_xor(): `^=` --- ### Atomic API * compare_and_swap: CAS [cppreference](https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange) * if the current value is equal to `expected`, assign the desired to the atomic variable and return `true`. * else assign the current value to `expected` and return `false`. ```cpp= bool compare_exchange_weak( T& expected, T desired, std::memory_order success, std::memory_order failure ) noexcept; bool compare_exchange_strong( T& expected, T desired, std::memory_order success, std::memory_order failure ) noexcept; ``` --- ### Atomic API * compare_exchange_weak: * compare_exchange_weak is allowed to `fail spuriously`, that is, acts as if `*this != expected` even if they are equal. * When a compare-and-exchange is in a loop, compare_exchange_weak will yield better performance on some platforms. ```cpp= std::atomic<int> m=0; void f(int n) { int cur = m; do { if (cur>=n) { break; } }while(!m.compare_exchange_weak(cur, n)); } int main() { { std::vector<std::jthread> pool; for (int n = 0; n < 10; ++n) pool.emplace_back(f, n); } std::cout<<"max is "<<m.load()<<"\n"; } ``` --- ### lock free linked-list [Reference](http://15418.courses.cs.cmu.edu/spring2013/article/46) * Making whole program lock free is very hard. Usually, lock-free is applied to some specific data structures. * [GodBolt](https://godbolt.org/z/GrjzMhh9q) ```cpp= struct Node { int val=0; std::atomic<Node*> next = nullptr; }; struct Stack { Node dummy; void push(int v) { Node* exp = dummy.next.load(); Node* n = new Node {v, nullptr}; do { n->next.store(exp); }while(!dummy.next.compare_exchange_weak(exp, n)); } }; ``` --- ### Thread local Storage * keyword `thread_local` after c++11 [cppreference](https://en.cppreference.com/w/cpp/language/storage_duration) * tbb::enumerable_thread_specific * tbb::combinable --- ### thread_local * The `static` is implied for the thread_local variable. * Hard to use in general usage scenarios. * Hard to collect the result from every thread. ```cpp= thread_local unsigned int rage = 1; std::mutex cout_mutex; void increase_rage(const std::string& thread_name) { ++rage; // modifying outside a lock is okay; this is a thread-local variable std::lock_guard<std::mutex> lock(cout_mutex); std::cout << "Rage counter for " << thread_name << ": " << rage << '\n'; } int main() { std::thread a(increase_rage, "a"), b(increase_rage, "b"); { std::lock_guard<std::mutex> lock(cout_mutex); std::cout << "Rage counter for main: " << rage << '\n'; } a.join(); b.join(); } /* output Rage counter for a: 2 Rage counter for main: 1 Rage counter for b: 2 * / ``` --- ### tbb::enumerable_thread_specific, [Spec](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/thread_local_storage/enumerable_thread_specific_cls) * Thread local storage implemented by tbb * Can declare it as an general variable. * Combine function to collect. * API: * local() * combine(), combine_each() * begin(), end() --- ### tbb::enumerable_thread_specific Example * Use finding max as example ```cpp= int main() { tbb::enumerable_thread_specific<int> tls{0}; { std::vector<std::jthread> pool; for (int n = 0; n < 10; ++n) { pool.emplace_back([&](int v){ int & m = tls.local(); m = std::max(m, v); }, n); } } // use combine to collect result int max = tls.combine([](int lhs, int rhs){return std::max(lhs, rhs);}); // use combine_each to collect result tls.combine_each([&](int elem){max = std::max(max,elem);}); std::cout<<"max is "<<max<<"\n"; } ``` --- ### tbb::combinable * It's just a wrapper of tbb::enumerable_thread_specific [Reference](https://github.com/oneapi-src/oneTBB/blob/master/include/oneapi/tbb/combinable.h) ```cpp= template <typename T> class combinable { using my_alloc = typename tbb::cache_aligned_allocator<T>; using my_ets_type = typename tbb::enumerable_thread_specific<T, my_alloc, ets_no_key>; my_ets_type my_ets; // ... }; ``` --- ### Note: local() is slow [StackOverflow](https://stackoverflow.com/questions/30407691/tbbcombinablelocal-is-too-slow) * The local storage is maintained by a Linear probing open address hash table. ```cpp= tbb::enumerable_thread_specific<int> tls; // Call local every [&](int v){ tls.local() = std::max(tls.local(), v); } // Cache the local value as reference instead. [&](int v){ int& m = tls.local(); m = std::max(m, v); } ``` --- ### concurrent_vector [Spec](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/containers/concurrent_vector_cls) * concurrent_vector is a class template that represents a sequence container with the following features * Multiple threads can concurrently grow the container and append new elements. * Random access by index. The index of the first element is zero. * Growing the container does not invalidate any existing iterators or indices. * reserve, resize, shrink_to_fit, clear, swap are not thread safe. [Reference](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/containers/concurrent_vector_cls/unsafe_operations) * concurrent container is not always thread safe and deterministic. * Be careful with your use model. --- ### concurrent_vector * data structure of concurrent_vector * double addressing * grow with double size capacity * Indexing: * Use log2 to find the segment index and pointer. [godbolt](https://godbolt.org/z/8Mca44hTn) * Use index to get the value. ![image](https://hackmd.io/_uploads/S1GGdvGOR.png) --- ### concurrent_vector * Using growing function in concurrency. * push_back * emplace_back * grow_by(size_t delta) * grow_to_at_least(size_t n) --- ### grow_by 1. change the size with CAS. 2. extend the capacity in segment_table if needed. 3. loop construct object from start to end. 4. return the iterator to the beginning of the appended sequence. --- ### grow_by psuedo code ```cpp= iterator grow_by(size_t delta) { // std::atomic<size_t> my_size; /* decide the range for the current thread to handle */ size_t start_index = this->my_size.fetch_add(delta); size_t end_index = start_index + delta; size_t seg_index = this->segment_index_of(end_index - 1); /* extend the capacity in table if needed */ if (/* segment in seg_index is not existed */) { segment_type new_segment = self()->create_segment(table, seg_index, index); if (!table[seg_index].compare_exchange_strong(disabled_segment, new_segment - segment_base(seg_index))) { // compare_exchange failed => some other thread has already enabled this segment // Deallocate the memory self()->deallocate_segment(new_segment, seg_index); } } /* construct the element in the new place */ internal_loop_construct(table, start_index, end_index, /*default construct*/); /* return the start iterator */ return iterator(*this, start_index); } ``` --- ### grow_by ![image](https://hackmd.io/_uploads/HyzPTwM_R.png) --- ### grow_by ![image](https://hackmd.io/_uploads/SyivaPG_C.png) --- ### Case Studies [GodBolt](https://godbolt.org/z/E1TP7ah4M) * concurrent_vector ```cpp= struct Container { tbb::concurrent_vector<int> storage; void setValue(size_t idx, int v) { if (storage.size() <= idx) { storage.grow_to_at_least(idx+1); } storage[idx] = v; } int getValue(size_t idx) { return storage[idx]; } }; int main() { Container c; tbb::blocked_range<int> range{0,10000}; auto fun = [&](auto r){ for(auto idx = r.begin(); idx!=r.end(); ++idx) { c.setValue(static_cast<size_t>(idx), idx); } }; // fun(range); tbb::parallel_for(range, fun); for(int idx=0; idx<10000; ++idx) { if (idx != c.getValue(idx)) { std::cout<<"Value for idx "<<idx<<" is "<<c.getValue(idx)<<"\n"; } } } ``` --- ### Case Studies ![image](https://hackmd.io/_uploads/r1GS7iZx-g.png) --- ### Case Studies ![image](https://hackmd.io/_uploads/SJj7mo-lWl.png) --- ### Case Studies ![image](https://hackmd.io/_uploads/rkaZ7i-l-l.png) --- ### Case Studies ``` * Thread 1: setValue(100, 5); * Thread 2: setValue(200, 5); * Thread 2: call grow_to_at_least to 200 and update the concurrent_vector size to 200. * Thread 1: size is 200 and update the concurrent_vector[100] = 5; * Thread 2: overwrite the concurrent_vector from index 0 to 200 as 0. * ^--- this overwrite the vector[100] * Thread 2: update the concurrent_vector[200] = 5; ``` --- ### Case Studies 2 [GodBolt](https://godbolt.org/z/94G5Yr85P) ```cpp= struct Node { std::vector<int> data; explicit Node(int n) : data(n, n) {} }; auto flatten(const std::vector<Node>& arr, bool mt) { int n = arr.size(); tbb::concurrent_vector<int> allData; std::vector<std::pair<size_t, size_t>> scope; scope.resize(n); auto fun = [&](auto Range) { for (auto idx = Range.begin(); idx != Range.end(); ++idx) { auto start = allData.size(); for (auto d : arr[idx].data) { allData.emplace_back(d); } scope[idx] = std::make_pair(start, allData.size()); } }; tbb::blocked_range<int> range{0, n}; if (mt) { tbb::parallel_for(range, fun); } else { fun(range); } return std::make_pair(allData, scope); } auto check(const auto& allData, const auto& scope) { int n = scope.size(); auto pass = true; for (int i = 0; i < n; ++i) { auto [start, end] = scope[i]; auto allCorrect = std::all_of(allData.begin() + start, allData.begin() + end, [=](auto v) { return v == i; }); pass &= allCorrect; if (!pass) { std::cout << "idx: " << i << ", scope: (" << start << ", " << end << ")\n"; for (auto s = start; s < end; ++s) { std::cout << allData[s] << ", "; } std::cout << "\n"; break; } } return pass; } int main(int /*argc*/, char** argv) { int n = std::stoi(argv[1]); std::vector<Node> arr; arr.reserve(n); for (int i = 0; i < n; ++i) { arr.emplace_back(i); } { auto [d, scope] = flatten(arr, false); auto pass = check(d, scope); std::cout << "single thread " << (pass ? "pass" : "fail") << std::endl; } { auto [d, scope] = flatten(arr, true); auto pass = check(d, scope); std::cout << "multi thread " << (pass ? "pass" : "fail") << std::endl; } } ``` --- ### Case studies 2 ``` cpp tbb::concurrent_vector<int> v; auto Func() { v.size(); v.push_back(); v.size(); } ``` | Thread1 | Thread2 | Status | | ---- | ---- | ---- | | size() | | start1 == 0 | | | size() | start1 == start2 == 0| | push_back() | | v: {1} | | | push_back() | v: {1,2} | | size() | | end1 == 2 | | | size() | end1 == end2 == 2| --- ### Case studies 2 [GodBolt](https://godbolt.org/z/7hsEhEeE7) ```cpp= auto threadSafeFlatten(const std::vector<Node>& arr, bool mt) { int n = arr.size(); tbb::concurrent_vector<int> allData; std::vector<std::pair<size_t,size_t>> scope; scope.resize(n); auto fun = [&](auto Range) { for(auto idx=Range.begin(); idx!=Range.end(); ++idx) { auto& nodeData = arr[idx].data; // use grow_by to query the segment of the memory auto beginIt = allData.grow_by(nodeData.size()); for(size_t i = 0 ; i < nodeData.size(); ++i) { *(beginIt+i) = nodeData[i]; } auto start = std::distance(allData.begin(), beginIt); scope[idx] = std::make_pair(start, start+nodeData.size()); } }; tbb::blocked_range<int> range{0, n}; if (mt) { tbb::parallel_for(range, fun); } else { fun(range); } return std::make_pair(allData, scope); } ``` --- ### Conclusion * Concurrency programming is very hard. * Concurrent containers are not always safe. * Mostly, it's only safe to call the same API in concurrency. * Realize the limitaion of the concurrent containers. * Read the document and dig into the source code. * There are many atomic/CAS operation in tbb. * Be careful in your use model and choose the best container. --- ### Reference * [lock-free linkedlist/stack](http://15418.courses.cs.cmu.edu/spring2013/article/46) * [concurrency lockfree](https://hackmd.io/@sysprog/concurrency-lockfree) * [cppreference](https://en.cppreference.com/w/cpp/atomic/atomic) * [oneTBB spec](https://oneapi-spec.uxlfoundation.org/specifications/oneapi/v1.3-rev-1/elements/onetbb/source/nested-index) * [oneTBB source code](https://github.com/oneapi-src/oneTBB) * [An introduction for lock free](https://preshing.com/20120612/an-introduction-to-lock-free-programming/) * [Deadlock, Livelock and Starvation](https://www.baeldung.com/cs/deadlock-livelock-starvation) --- # Q&A
{"title":"Some Concurrency Introduction","description":"Atomic","contributors":"[{\"id\":\"09379b25-db04-47a4-8912-78e722b7a548\",\"add\":19284,\"del\":3779,\"latestUpdatedAt\":1762930096002}]"}
    230 views