# Ćwiczenia 13, grupa cz. 12-14, 27. stycznia 2022
###### tags: `PRW21` `ćwiczenia` `pwit`
## Deklaracje
Gotowość rozwiązania zadania należy wyrazić poprzez postawienie X w odpowiedniej kolumnie! Jeśli pożądasz zreferować dane zadanie (co najwyżej jedno!) w trakcie dyskusji oznacz je znakiem ==X== na żółtym tle.
**UWAGA: Tabelkę wolno edytować tylko wtedy, gdy jest na zielonym tle!**
:::danger
| | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| ----------------------:| ----- | --- | --- | --- | --- | --- | --- |
Jacek Bizub | X | X | X | X | | | |
Michał Błaszczyk | | | | | | | |
Dawid Dudek | X | X | | | | | |
Krzysztof Juszczyk | | | | | | | |
Kamil Kasprzak | X | X | X | X | X | X | X |
Kacper Komenda | X | X |==X==| X | | | |
Aleksandra Kosińska | ==X== | X | X | | | | |
Łukasz Orawiec | X | X | X | | | | |
Kamil Puchacz | | | | | | | |
Paweł Sikora | | | | | | | |
Michał Sobecki | X | X | X | | | | |
Cezary Stajszczyk | X | X | X | | | | |
Piotr Stokłosa | X | X | X | | | | |
Cezary Troska | X | X | X | | | | |
:::
:::info
**Uwaga:** Po rozwiązaniu zadania należy zmienić kolor nagłówka na zielony.
:::
## Zadanie 1
:::success
Autor: Aleksandra Kosińska
:::

## Zadanie 2
:::success
Autor: Dawid Dudek
:::




Klasa RangePolicy określa zakres do użycia dla EliminationArray. Jest to polityka, która może się zmieniać w trakcie, np. jeśli dostajemy faile, to możemy zmniejszyć zakres.
Każdy zakończony sukcesem `push()` lub `pop()`, który kończy się dostępem do LockFreeStack może być zlinearyzowany w momencie zdobycia tego dostępu (poprawny CAS).
Każda para wyeliminowanych `push()` i `pop()` może być zlinearyzowana, kiedy się spotkają (push przed pop).
## Zadanie 3
:::success
Autor: Kacper Komenda
:::

$getAndIncrement()$ składa się z czterech faz: precombining, combining, op oraz distribution
precombining - sprawdza czy wątek zajął liść jako pierwszy
combining - w przypadku gdy do liścia starają się dotrzeć dwa wątki, to dodaje ich liczniki do siebie i tak dla tych liści ustalonych w fazie precombining
operation - sprawdza ostatni liść, do którego doszedł w fazie precombining, jeśli ten liść jest odwiedzony przez inny wątek, to znaczy, że wątek w fazie operation musi powiadomić ten pierwszy wątek, że zapisał wartość i drugi wątek może ją dodać.
distribution - w pewnym momencie wątki dotarły do korzenia i teraz należy rozesłać informację o wyniku do pozostałych wątków czekających w liściach
## Zadanie 4
:::success
Autor: Jacek Bizub
:::





- **locked** służy do synchronizacji między dwoma wątkami, które mają dostęp do danego węzla
1.
Jeżeli robiąc precombine napotkamy zapaloną flagę **locked** to znaczy, że wątek aktywny już na nas nie czeka. Ten "obieg" algorytmu nas nie dotyczy i musimy zaczekać do następnego.
Jeżeli robiąc combine napotkamy zapaloną flagę **locked** to znaczy, że do węzła przybył drugi wątek ale jeszcze nie skończył swojej pracy. Musimy na niego zaczekać żeby połączyć wartości
2.
Ma to związek z "protokołem" tego algorytmu. Kiedy znajduje się w stanie kiedy wykonuje op/distribute to drugi wątek już mu nie wejdzie w drogę.
W szczególności to my zapaliliśmy wcześniej locked, więc my musimy to zgasić.
3.
- precombine: "drogi wątku aktywny, przybył wątek pasywny i zaczekaj proszę żeby połączyć z nim wartości"
- combine: "jestem wątkiem aktywnym i już przeprocesowałem ten węzeł, musisz zaczekać do następnej tury"
- op: "jestem wątkiem pasywnym i zrobiłem już swoje, wątek aktywny może kontynuować"
- distribute: wątek aktywny - który nie miał kompana - informuje o skończeniu pracy i pozwala kontynuować drugiemu wątkowi
## Zadanie 5
:::success
Autor: Kamil Kasprzak
:::
```java=
import java.util.Stack;
public class CombiningTree {
Node leaf[];
public static ThreadLocal<Integer> ThreadID = new ThreadLocal<>();
public CombiningTree(int width) {
int len = (int) ((3.0 / 2.0) * width + 1);
Node[] nodes = new Node[len];
nodes[0] = new Node();
for (int i = 1; i < nodes.length; i++) {
nodes[i] = new Node(nodes[(i - 1) / 3]);
}
leaf = new Node[width];
for (int i = 0; i < leaf.length; i++) {
leaf[i] = nodes[nodes.length - i - 1];
}
}
public int getAndIncrement() throws InterruptedException, Exception {
Stack<Node> stack = new Stack<Node>();
Node myLeaf = leaf[ThreadID.get() / 3];
Node node = myLeaf;
Node.CStatus status;
// precombining phase
while ((status =node.precombine()) == Node.CStatus.FIRST)
node = node.parent;
Node stop = node;
// combining phase
int combined = 1;
for (node = myLeaf; node != stop; node = node.parent) {
combined = node.combine(combined);
stack.push(node);
}
// operation phase
int prior = stop.op(combined,status);
// distribution phase
while (!stack.empty()) {
node = stack.pop();
node.distribute(prior);
}
return prior;
}
}
```
```java=
public class Node extends Thread {
public enum CStatus {
IDLE,
FIRST,
SECOND,
THIRD,
RESULT,
ROOT
};
boolean locked;
int guests;
CStatus cStatus;
int firstValue, secondValue, thirdValue;
int result, prior;
Node parent;
public Node() {
cStatus = CStatus.ROOT;
locked = false;
guests = 0;
}
public Node(Node myParent) {
parent = myParent;
cStatus = CStatus.IDLE;
locked = false;
guests = 0;
}
synchronized CStatus precombine() throws InterruptedException, Exception {
while (locked)
wait();
switch (cStatus) {
case IDLE:
cStatus = CStatus.FIRST;
guests = 0;
return CStatus.FIRST;
case FIRST:
guests++;
cStatus = CStatus.SECOND;
return CStatus.SECOND;
case SECOND:
guests++;
locked = true;
cStatus = CStatus.THIRD;
return CStatus.THIRD;
case ROOT:
return CStatus.ROOT;
default:
throw new Exception("1) unexpected Node state" + cStatus);
}
}
synchronized int combine(int combined) throws InterruptedException, Exception {
while (guests > 0)
wait();
locked = true;
firstValue = combined;
switch (cStatus) {
case FIRST:
return firstValue;
case SECOND:
return firstValue + secondValue;
case THIRD:
return firstValue + secondValue + thirdValue;
default:
throw new Exception("2) unexpected Node state" + cStatus);
}
}
synchronized int op(int combined, CStatus status) throws InterruptedException, Exception {
int localResult;
switch (status) {
case ROOT:
int prior = result;
result += combined;
return prior;
case SECOND:
secondValue = combined;
guests--;
notifyAll(); // wake up waiting threads
while (cStatus != CStatus.RESULT)
wait();
localResult = this.prior + firstValue;
if (--guests <= 0) {
locked = false;
cStatus = CStatus.IDLE;
}
notifyAll();
return localResult;
case THIRD:
thirdValue = combined;
guests--;
notifyAll(); // wake up waiting threads
while (cStatus != CStatus.RESULT)
wait();
localResult = this.prior + firstValue + secondValue;
if (--guests <= 0) {
locked = false;
cStatus = CStatus.IDLE;
}
notifyAll();
return localResult;
default:
throw new Exception("3) unexpected Node state" + cStatus);
}
}
synchronized void distribute(int prior) throws Exception {
switch (cStatus) {
case FIRST:
cStatus = CStatus.IDLE;
locked = false;
break;
case SECOND:
this.prior = prior;
cStatus = CStatus.RESULT;
guests++;
break;
case THIRD:
this.prior = prior;
cStatus = CStatus.RESULT;
guests += 2;
break;
default:
throw new Exception("4) unexpected Node state" + cStatus);
}
notifyAll();
}
}
```
## Zadanie 6
:::success
Autor: Kamil Kasprzak
:::

```java=
import java.util.Stack;
public class CombiningTree {
private Node leaf[];
public static ThreadLocal<Integer> ThreadID = new ThreadLocal<>();
public CombiningTree(int width) {
Node[] nodes = new Node[2 * width - 1];
nodes[0] = new Node();
for (int i = 1; i < nodes.length; i++) {
nodes[i] = new Node(nodes[(i - 1) / 2]);
}
leaf = new Node[width];
for (int i = 0; i < leaf.length; i++) {
leaf[i] = nodes[nodes.length - i - 1];
}
}
public int getAndIncrement() throws InterruptedException, Exception {
Stack<Node> stack = new Stack<Node>();
Node myLeaf = leaf[ThreadID.get() / 2];
Node node = myLeaf;
int[] combine = new int[] { 1 };
while (node.UP(combine)) {
stack.push(node);
node = node.parent;
}
combine[0] = node.DOWN(combine, 2);
while (!stack.empty()) {
node = stack.pop();
node.DOWN(combine, 1);
}
return combine[0];
}
}
```
```java=
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.TimeUnit;
import java.lang.Exception;
import java.util.concurrent.TimeoutException;
public class LockFreeExchanger<T> {
static final int EMPTY = 0, WAITING = 1, BUSY = 2;
AtomicStampedReference<T> slot = new AtomicStampedReference<T>(null, 0);
public T exchange(T myItem, long timeout, TimeUnit unit)
throws TimeoutException, Exception {
long nanos = unit.toNanos(timeout);
long timeBound = System.nanoTime() + nanos;
int[] stampHolder = {
EMPTY
};
while (true) {
if (System.nanoTime() > timeBound)
throw new TimeoutException();
T yrItem = slot.get(stampHolder);
int stamp = stampHolder[0];
switch (stamp) {
case EMPTY:
if (slot.compareAndSet(yrItem, myItem, EMPTY, WAITING)) {
while (System.nanoTime() < timeBound) {
yrItem = slot.get(stampHolder);
if (stampHolder[0] == BUSY) {
slot.set(null, EMPTY);
return yrItem;
}
}
if (slot.compareAndSet(myItem, null, WAITING, EMPTY)) {
throw new TimeoutException();
} else {
yrItem = slot.get(stampHolder);
slot.set(null, EMPTY);
return yrItem;
}
}
break;
case WAITING:
if (slot.compareAndSet(yrItem, myItem, WAITING, BUSY))
return yrItem;
break;
case BUSY:
break;
default: // impossible
throw new Exception("<LockFreeExchanger> Co to za stan? " + stamp);
}
}
}
}
```
```java=
public class Main {
public static void main(String args[]) // static method
{
System.out.println("Main start");
Test[] t = new Test[Test.threadsAmount];
for (int i = 0; i < t.length; i++) {
t[i] = new Test(i);
}
for (int i = 0; i < t.length; i++) {
t[i].start();
}
try {
for (int i = 0; i < t.length; i++) {
t[i].join();
}
} catch (InterruptedException e) {
System.out.println(e);
}
System.out.println("Wątki zakończyły działanie");
for (int i = 0; i < Test.array.length; i++) {
if (Test.array[i] != 1) {
System.out.println("Error: Test.array[" + i + "]!=" + Test.array[i]);
}
}
System.out.println("Main end");
}
}
```
```java=
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Node extends Thread {
enum CStatus {
IDLE,
FIRST,
SECOND,
RESULT,
ROOT,
NORMAL,
SOLO,
DUO
};
int firstValue, secondValue;
int result;
Node parent;
CStatus nodeType;
CStatus nodeState;
int guests;
LockFreeExchanger<Integer> exchangerUP;
LockFreeExchanger<Integer> exchangerDown;
public Node() {
nodeState = CStatus.IDLE;
nodeType = CStatus.ROOT;
guests = 0;
exchangerUP = new LockFreeExchanger<Integer>();
exchangerDown = new LockFreeExchanger<Integer>();
}
public Node(Node myParent) {
parent = myParent;
nodeState = CStatus.IDLE;
nodeType = CStatus.NORMAL;
guests = 0;
exchangerUP = new LockFreeExchanger<Integer>();
exchangerDown = new LockFreeExchanger<Integer>();
}
public boolean UP(int[] combined) throws Exception {
int id;
while (true) {
synchronized (this) {
while (nodeState == CStatus.DUO || nodeState == CStatus.SOLO || nodeState == CStatus.RESULT)
wait();
}
switch (nodeType) {
case ROOT:
return false;
case NORMAL:
synchronized (this) {
id = ++guests;
if (id > 2) {
guests--;
break;
}
}
try {
int exchange = exchangerUP.exchange(combined[0], 5, TimeUnit.MILLISECONDS);
nodeState = CStatus.DUO;
if (id == 1) {
combined[0] += exchange;
secondValue = exchange;
return true;
}
firstValue = exchange;
return false;
} catch (TimeoutException e) {
if (id == 1) {
nodeState = CStatus.SOLO;
return true;
}
break;
}
default:
throw new Exception("Unexpected type in <UP>: " + nodeType);
}
}
}
public int DOWN(int[] combined, int id) throws Exception {
int exchange;
switch (nodeType) {
case ROOT:
synchronized (this) {
int prior = result;
result += combined[0];
return prior;
}
case NORMAL:
switch (id) {
case 1:
synchronized (this) {
if (nodeState == CStatus.SOLO) {
nodeState = CStatus.IDLE;
notifyAll();
guests = 0;
return combined[0];
}
}
while (true) {
try {
exchangerDown.exchange(combined[0], 200, TimeUnit.MILLISECONDS);
return combined[0];
} catch (TimeoutException e) {
continue;
}
}
case 2:
while (true) {
try {
exchange = exchangerDown.exchange(combined[0], 200, TimeUnit.MILLISECONDS);
int val = exchange + firstValue;
synchronized (this) {
guests = 0;
nodeState = CStatus.IDLE;
notifyAll();
return val;
}
} catch (TimeoutException e) {
continue;
}
}
}
default:
throw new Exception("<DOWN> niespodziewany stan: " + nodeType + " | " + id);
}
}
}
```
```java=
import java.util.concurrent.locks.ReentrantLock;
public class Test extends Thread {
public static volatile int[] array = new int[3000];
public static int threadsAmount = 10;
public static volatile CombiningTree tree = new CombiningTree(threadsAmount);
private static volatile ReentrantLock lock = new ReentrantLock();
private int id;
public Test(int id) {
this.id=id;
}
public void run() {
CombiningTree.ThreadID.set(id);
System.out.println("Wątek rozpoczął pracę: " + CombiningTree.ThreadID.get());
try {
while (true) {
int index = tree.getAndIncrement();
if (index < array.length && index >= 0) {
lock.lock();
array[index]++;
lock.unlock();
} else {
break;
}
}
} catch (Exception e) {
System.out.println(e + "| z wątkiem " + CombiningTree.ThreadID.get());
} finally {
System.out.println("Wątek zakończył pracę: " + CombiningTree.ThreadID.get());
}
}
}
```
## Zadanie 7
:::success
Autor: Jacek Bizub
:::
```scala=
import java.util.concurrent.atomic.AtomicInteger
class Pool[T](size: Int) {
private val buffer = Array.fill[Option[T]](size)(None)
private val locks = Vector.tabulate(size)(_ => new Object)
private val headCounter = new AtomicInteger
private val tailCounter = new AtomicInteger
def put(t: T): Unit = {
val index = normalizeIndex(headCounter.getAndIncrement())
val lock = locks(index)
lock.synchronized {
while (buffer(index).isDefined) lock.wait()
buffer(index) = Some(t)
lock.notifyAll()
}
}
def get(): T = {
val index = normalizeIndex(tailCounter.getAndIncrement())
val lock = locks(index)
lock.synchronized {
while (buffer(index).isEmpty) lock.wait()
val result = buffer(index).get
buffer(index) = None
lock.notifyAll()
result
}
}
private def normalizeIndex(read: Int) = (
if (read >= 0) read else read - Int.MinValue
) % size
}
```
```scala=
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random
object Main extends App {
val writes = new ConcurrentHashMap[Int, AtomicInteger]()
val reads = new ConcurrentHashMap[Int, AtomicInteger]()
for (i <- 0 to 1000) {
writes.put(i, new AtomicInteger(0))
reads.put(i, new AtomicInteger(0))
}
val pool = new Pool[Int](10)
val jobs = List.tabulate(50) { id =>
val job = if (id % 2 == 0) new PoolWriter(id, pool) else new PoolReader[Int](id, pool)
new Thread(job).start()
}
Thread.sleep(1000 * 10)
for (i <- 0 to 1000) {
val r = writes.get(i).get()
val w = reads.get(i).get()
println(s"[Main:$i] r:$r w:$w")
if (r != w) {
throw new IllegalStateException
}
}
println("fine")
}
class PoolReader[T](id: Int, pool: Pool[T]) extends Runnable {
override def run(): Unit = {
while (true) {
val value = pool.get()
println(s"[Reader:$id] got $value")
Main.reads.get(value).incrementAndGet()
}
}
}
class PoolWriter(id: Int, pool: Pool[Int]) extends Runnable {
override def run(): Unit = {
for (_ <- 1 to 50) {
val value = Random.nextInt(1001)
println(s"[Writer:$id] writes $value")
pool.put(value)
Main.writes.get(value).incrementAndGet()
}
}
}
```