# L13 - GRUPA 2 ## Zadanie 1 :::success Autor: Patryk Mazur ::: ![](https://i.imgur.com/Vd7ncFr.png) ![](https://i.imgur.com/Nm3qllQ.png) CAS Linijka 16. Jeżeli nie uda się go wykonac, to znaczy, że jakiś inny wątek zdeponował swój przedmiot i zmienił stan z EMPTY na WAITING. Nasz wątek wtedy wychodzi z case'a i próbuje się dopasować na nowo Linijka 24. Jeżeli nie uda się go wykonać to znaczy, że jednak znalazł się jakiś drugi wątek, który zmienił stan z WAITING na BUSY. Nasz wątek może wtedy dokończyć wymianę Linijka 34. Jeżeli nie uda się go wykonać, to znaczy, że przyszedł inny wątek(do pary), który "sprzątnął" nam przedmiot sprzed nosa i zmienił stan z WAITING na BUSY Nasz wątek musi wtedy wejść do switcha na nowo. set zamiast CAS w 20. i 28. linijce Wątek B zabrał przedmiot, zdeponował swój własny i ustawił status na BUSY. To znaczy, że pozostał tylko wątek A (Wątki chętne na wymiane będą się rozbijały o case BUSY). Zatem nie musimy korzystać z CASa, ponieważ żaden wątek nas nie ubiegnie. ## Zadanie 2 :::success Autor: Daniel Wiczołek ::: ![](https://i.imgur.com/yVDqKZu.png) ![](https://i.imgur.com/vdx75p5.png) Wątek wybiera losowy slot w tablicy wymian z przedziału [0, range). I dokonuje tam wymiany, w jakiś sposób dobraliśmy rozsądny czas oczekiwania.. ![](https://i.imgur.com/HV7RhMZ.png) ![](https://i.imgur.com/Jtc56e2.png) Próbujemy dokonać pop/push jak na zwykłym lockFreeStack, jeśli się uda to konniec, wpp próbujemy dokonać wymiany z innym wątkiem, jeśli się uda to koniec wpp powtarzamy całą operację. RangePolicy odpowiada za dynamiczne wyznaczanie wartości 'range' - parametru, który decyzuje z jakiego zakresu losujemy miejsce wymiany. Jeśli wymiana często kończy się timeoutem, to nie ma dużo (dużo w porównaniu do capacity tablicy wymian) dużo wątków próbujących dokonać wymiany więc może warto zmniejszyć range etc.. - może być tak, że pop/push spotka pop/push wtedy poprostu ponawiamy całość. Punklty linearyzacji: - dla tych kończących się dostępem do metod LockFreeStack, moment wywołania CAS jest punktem lin. - dla eliminujących się - moment wymiany jest tym punktem (push przed pop) ## Zadanie 3 :::success Autor: Jan Jankowicz ::: ![](https://i.imgur.com/LSCHvbD.png) ![](https://i.imgur.com/tWLxUAE.png) Klasa CombiningTree pozwala na efektywnie współbieżnie wykorzystanie licznika. Struktura: CombiningTree to drzewo binarne, gdzie w korzeniu znajduje się aktualny stan licznika, a w liściach - punkty dostępu dla wątków chcących zwiększyć licznik. Liści jest dwa razy mniej niż przewidzianych wątków, tak że każdy wątek otrzymuje dokładnie jeden liść, który współdzieli z innym wątkiem, i z którego będzie zgłaszać chęć podbicia licznika. Pozostałe węzły drzewa posłużą jako kanał komunikacji pomiędzy wątkami. Algorytm inkrementacji: Wątek, który chciałby podwyższyć wartość licznika, używa w tym celu metody getAndIncrement(). Ponieważ struktura jest wielowątkowa, w tym samym czasie może nastąpić więcej takich żądań do struktury. Modyfikacja licznika odbywa się w czterech etapach: - przygotowania/przedzbierania/prekombinacji (ang. precombination phase): Wątki, wywołujące getAndIncrement(), ustalają w tym momencie podział obowiązków na drodze od liścia do korzenia (licznika). Każdy wątek porusza się od swojego liścia do korzenia. W każdym napotkanym węźle sprawdza, czy jest pierwszym wątkiem, który do niego dotarł. Jeśli tak, to kontynuuje swoją podróż w stronę korzenia do momentu, w którym dotrze do wierzchołka jako drugi lub dotrze do korzenia. Taki wierzchołek nazwijmy węzłem R, do którego w późniejszych etapach będzie musiał przekazać wartości z węzłów, w których z kolei to on był pierwszy. Każdy wątek zaczyna w liściu z taką wartością równą 1, bo o 1 chce zwiększyć licznik. - zbierania (ang. combination phase): Teraz wątki wiedzą, z których węzłów muszą zebrać wartości, a któremu przekazać. Wątek ponownie rozpoczyna od liścia i jeśli był w nim pierwszy w fazie poprzedniej, to zbiera wartość z węzła i idzie węzeł wyżej. Gdyby do wierzchołka zgłosił się drugi wątek, to ten pierwszy oczekuje, aż drugi zbierze wszystkie wartości ze swojego poddrzewa i przekaże je pierwszemu. Po przekazaniu pierwszy wątek rusza dalej. Gdy dotrze do węzła R, do którego miał przekazać wartość, robi to i usypia się w oczekiwaniu na powrót wątku, któremu przekazał wartość, a który poinformuje go (na etapie dystrybucji), co powinien zwrócić w wyniku metody getAndIncrement() na podstawie aktualnej wartości licznika. - przekazania (ang. operation phase): Łączy się to z etapem poprzednim. Wątki przekazują swoje wartości zebrane na etapie zbierania do węzłów R. Szczególna sytuacja zachodzi, gdy R to korzeń drzewa. Wówczas wątek podbija licznik o wartość, którą zebrał, ale nie zasypia, tylko od razu wraca i powiadamia wątki, które śpią, o ich wartości licznika. Rozpoczyna się czas dystrybucji. - dystrybucji (ang. distribution phase): Wątki, które zostały obudzone, transmitują wartości licznika do węzłów, z których zbierały wartości. ## Zadanie 4 :::success Autor: Joanna Stachowicz ::: ![](https://i.imgur.com/y1MOhR5.png) ![](https://i.imgur.com/PzXidwA.png) ![](https://i.imgur.com/TUED8Vc.png) ![](https://i.imgur.com/ijjBUcg.png) ![](https://i.imgur.com/SkBZpJm.png) ![](https://i.imgur.com/KP1bAIG.jpg) ![](https://i.imgur.com/MUSmavs.jpg) ![](https://i.imgur.com/T4T7LB4.jpg) :::info Punkty uzyskane za poniższe zadania są bonusowe. ::: ## Zadanie 5 :::success Autor: Daniel Boguszewski ::: ```java= import java.util.Stack; public class TriCombiningTree { TriNode[] leaf; TriNode root; public TriCombiningTree(int width) { TriNode[] nodes = new TriNode[width + (width / 2)]; nodes[0] = new TriNode(); root = nodes[0]; for (int i = 1; i < nodes.length; i++) { nodes[i] = new TriNode(nodes[(i - 1) / 3]); } leaf = new TriNode[width]; for (int i = 0; i < leaf.length; i++) { leaf[i] = nodes[nodes.length - i - 1]; } } public int getAndIncrement(int id) throws InterruptedException, PanicException { Stack<TriNode> stack = new Stack<>(); TriNode myLeaf = leaf[id/3]; TriNode node = myLeaf; // precombining phase while (node.precombine()) { node = node.parent; } TriNode stop = node; // combining phase int combined = 1; for (node = myLeaf; node != stop; node = node.parent) { combined = node.combine(combined); stack.push(node); } // operation phase int prior = stop.op(combined); // distribution phase while (!stack.empty()) { node = stack.pop(); node.distribute(prior); } return prior; } } public class TriNode { enum CStatus{IDLE, FIRST, SECOND, THIRD, RESULT, ROOT}; boolean[] locked = {false, false}; int num_nodes, ready; CStatus cStatus; int firstValue; int[] secondValue = new int[2]; int[] result = new int[2]; TriNode parent; public TriNode() { cStatus = CStatus.ROOT; num_nodes = 0; ready = 0; } public TriNode(TriNode parent) { this.parent = parent; cStatus = CStatus.IDLE; locked[0] = false; locked[1] = false; num_nodes = 0; ready = 0; } synchronized boolean precombine() throws InterruptedException, PanicException { while (locked[1] || num_nodes >= 2) wait(); switch (cStatus) { case IDLE: cStatus = CStatus.FIRST; return true; case FIRST: locked[0] = true; num_nodes = 1; ready = 0; cStatus = CStatus.SECOND; return false; case SECOND: num_nodes = 2; cStatus = CStatus.THIRD; return false; case ROOT: return false; default: throw new PanicException("unexpected Node state " + cStatus); } } synchronized int combine(int combined) throws InterruptedException, PanicException { while (locked[0] || ready < num_nodes) wait(); locked[1] = true; firstValue = combined; switch (cStatus) { case FIRST: return firstValue; case SECOND: return firstValue + secondValue[0]; case THIRD: return firstValue + secondValue[0] + secondValue[1]; default: throw new PanicException("unexpected Node state " + cStatus); } } synchronized int op(int combined) throws InterruptedException, PanicException { int prior; switch (cStatus) { case ROOT: int oldValue = result[0]; result[0] += combined; return oldValue; case SECOND: case THIRD: secondValue[ready] = combined; // secondValue = combined; // locked = false; // notifyAll(); ready++; if (ready >= num_nodes) { locked[0] = false; notifyAll(); } while (cStatus != CStatus.RESULT) wait(); ready--; prior = result[ready]; if (ready == 0) { locked[1] = false; cStatus = CStatus.IDLE; num_nodes = 0; notifyAll(); } return prior; default: throw new PanicException("unexpected Node state " + cStatus); } } synchronized void distribute(int prior) throws PanicException { switch (cStatus) { case FIRST: cStatus = CStatus.IDLE; locked[1] = false; break; case SECOND: result[0] = prior + firstValue; cStatus = CStatus.RESULT; break; case THIRD: result[1] = prior + firstValue; result[0] = result[1] + secondValue[0]; cStatus = CStatus.RESULT; break; default: throw new PanicException("unexpected Node state " + cStatus); } notifyAll(); } } ``` ## Zadanie 6 :::success Autor: Jan Jankowicz ::: ![](https://i.imgur.com/acEa9l4.png) ```java= import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import exchanger.LockFreeExchanger; public class ExchangerNode { enum CStatus { IDLE, FIRST, SECOND, ROOT }; AtomicBoolean locked; CStatus cStatus; int firstValue; // for root this is counter value ExchangerNode parent; LockFreeExchanger<Integer> exchanger; public ExchangerNode() { cStatus = CStatus.ROOT; locked = new AtomicBoolean(false); } public ExchangerNode(ExchangerNode myParent) { parent = myParent; cStatus = CStatus.IDLE; locked = new AtomicBoolean(false); exchanger = new LockFreeExchanger<>(); } boolean precombine() throws InterruptedException { while (!locked.compareAndSet(false, true)) ; switch (cStatus) { case IDLE: cStatus = CStatus.FIRST; locked.set(false); return true; case FIRST: cStatus = CStatus.SECOND; return false; case ROOT: locked.set(false); return false; default: throw new RuntimeException(ThreadID.get() + " unexpected Node state" + cStatus); } } int combine(int combined) throws InterruptedException, TimeoutException { /* wait until node become locked */ while (cStatus != CStatus.SECOND && !locked.compareAndSet(false, true)) ; firstValue = combined; switch (cStatus) { case FIRST: return firstValue; case SECOND: /* wait for combined value of second thread */ return firstValue + exchanger.exchange(0, 10, TimeUnit.SECONDS); default: throw new RuntimeException("unexpected Node state " + cStatus); } } int op(int combined) throws InterruptedException, TimeoutException { switch (cStatus) { case ROOT: while (!locked.compareAndSet(false, true)) ; int prior = firstValue; firstValue += combined; locked.set(false); return prior; case SECOND: /* pass combined value */ exchanger.exchange(combined, 10, TimeUnit.SECONDS); /* wait for result */ int result = exchanger.exchange(0, 10, TimeUnit.SECONDS); cStatus = CStatus.IDLE; locked.set(false); return result; default: throw new RuntimeException("unexpected Node state"); } } void distribute(int prior) throws TimeoutException { switch (cStatus) { case FIRST: cStatus = CStatus.IDLE; locked.set(false); break; case SECOND: /* pass result to waiting thread */ exchanger.exchange(prior + firstValue, 10, TimeUnit.SECONDS); break; default: throw new RuntimeException("unexpected Node state"); } } } ``` ## Zadanie 7 :::success Autor: Daniel Boguszewski ::: ```java import java.util.ArrayList; import java.util.concurrent.locks.ReentrantLock; public class Pool<T> { CombiningTree in, out; ArrayList<T> pool; ReentrantLock[] lock; public Pool(int poolSize, int threadsNum) { pool = new ArrayList<>(poolSize); lock = new ReentrantLock[poolSize]; for (int i = 0; i < poolSize; i++) { pool.add(null); lock[i] = new ReentrantLock(); } in = new CombiningTree(threadsNum); out = new CombiningTree(threadsNum); } void put (T item, int id) throws PanicException, InterruptedException { int i = in.getAndIncrement(id) % pool.size(); while (true) { while (!lock[i].tryLock()) continue; try { if (pool.get(i) == null) { pool.set(i, item); return; } System.out.println(i); } finally { lock[i].unlock(); } } } T get (int id) throws PanicException, InterruptedException { int i = out.getAndIncrement(id) % pool.size(); while (true) { while (!lock[i].tryLock()) continue; try { if (pool.get(i) != null) { return pool.set(i, null); } System.out.println(i); } finally { lock[i].unlock(); } } } } ```