# Ćwiczenia 6, grupa cz. 16-18, 23 listopada 2023
###### tags: `PRW23` `ćwiczenia` `pwit`
## Deklaracje
Gotowość rozwiązania zadania należy wyrazić poprzez postawienie X w odpowiedniej kolumnie! Jeśli pożądasz zreferować dane zadanie (co najwyżej jedno!) w trakcie dyskusji oznacz je znakiem ==X== na żółtym tle.
**UWAGA: Tabelkę wolno edytować tylko wtedy, gdy jest na zielonym tle!**
:::danger
| | 1 | 2 | 3 | 4 | 5 | 6 |
| ----------------------:| ----- | --- | --- | --- | --- | --- |
Dominik Baziuk | | X | X | X | | |
Paweł Borek | | X | X | | | |
Arti Brzozowski | | X | X | X | | |
Mateusz Golisz | | X | X | X | | |
Kacper Jóźwiak | | X | X | X | | |
Julia Konefał | | | | | | |
Adam Korta | | X | X | X | | |
Jakub Mikołajczyk | | X | X | X | X | ==X== |
Bartosz Morasz | X | X | X | | | |
Andrzej Morawski | | x | x | x | | |
Aleksandra Nicpoń | | x | x | x | | |
Mateusz Reis | | X | | | X | X |
Tomasz Stachurski | | x | x | | | |
Marcel Szelwiga | X | X | X | X | ==X== | X |
Volha Tsekalo | | x | x | x | | |
Martyna Wybraniec | | x | x | x | | |
Denys Zinoviev | | | | | | |
Dominik Olejarz | | | | | | |
:::
:::info
**Uwaga:** Po rozwiązaniu zadania należy zmienić kolor nagłówka na zielony.
:::
## Zadanie 1
:::success
Autor: Bartosz Morasz
:::

> [name=Piotr Witkowski] Uwaga: w zadaniu potrzebujemy dodatkowego założenia: zapisy mają numery rosnące, tzn. jeśli zapis Wi jest wcześniejszy od Wj to i < j.
Rejestr jest atomowy, jeśli jest linearyzowalny. Jeśli wątek A zrobi write w punkcie x, to jeśli wątek B zrobi read przed tym punktem, to przeczyta starą wartość, a jeśli po tym punkcie, to przeczyta nową.
- dla każdego 𝑖 nie jest prawdą, że $𝑅^i → 𝑊^i$ (*)
Nie uda nam się znaleźć sekwencyjnej historii, która zawierałaby $𝑅^i → 𝑊^i$. Nie możemy przeczytać czegoś co nie zostało zapisane, więc nie byłaby poprawana. Sekwencja wywołań nie byłaby linearyzowalna i stąd rejestr nie byłby atomowy.
Natomiast gdyby rejestr nie był atomowy, ale bezpieczny lub regularny, to taka sekwencja mogłaby być już poprawna, ponieważ w przypadku jeśli $𝑅^i$ nakłada się z $𝑊^i$, to mimo wykonania się $𝑊^i$ później, $𝑅^i$ i tak może zwrócić nową wartość.
- dla każdych 𝑖 oraz 𝑗 nie jest prawdą, że $W^i → W^j → R^i$ (**)
Nie uda nam się znaleźć sekwencyjnej historii, która zawierałaby $W^i → W^j → R^i$. Wartość, którą chcemy przeczytać zostanie nadpisana, stąd nie byłaby poprawna. Sekwencja wywołań nie byłaby linearyzowalna i stąd rejestr nie byłby atomowy.
Gdyby rejestr nie był atomowy, ale bezpieczny lub regularny, to taka sekwencja mogłaby być już poprawna, ponieważ w przypadku jeśli $𝑅^i$ nakłada się z $𝑊^j$, to mimo wykonania się $𝑅^i$ później i tak może zwrócić starą wartość zapisaną przez $𝑊^i$.
- dla dla każdych 𝑖 oraz 𝑗 jeśli $R^i → R^j$ to $i ≤ j$ (***).
Z takiej sekwencji wykonań jesteśmy w stanie utworzyć poprawną sekwencyjną historię, czytamy w odpowiedniej kolejności.
Gdyby $i > j$, oznaczałoby to niepoprawną kolejność wywołań, a na to nie możemy sobie pozwolić. Sekwencja wywołań nie byłaby linearyzowalna i stąd rejestr nie byłby atomowy.
Gdyby rejestr nie był atomowy, ale bezpieczny lub regularny, to $i > j$ dla tej sekwencji wywołań mogłoby być dopuszczalne, bo możemy mieć sytuacje, w której te dwa wywołania read nakładają się z $W^i$ i najpierw pierwszy wątek zwraca nową wartość, a po nim drugi wątek zwraca starą wartość.
> [name=Piotr Witkowski] Udowodniliśmy, że: jeśli rejestr jest atomowy to spełnia (*), (**) i (***).
> Chcemy pokazać, że jeśli rejestr spełnia (*), (**), (***) to jest rejestrem atomowym.
## Zadanie 2
:::success
Autor: Arti Brzozowski
:::

Z rejestrów boolowskich przejdziemy do m-wartościowych poprzez skorzystanie z tablicy tych pierwszych. Będziemy w niej zapamiętywać określoną wartość poprzez ustawienie true na indeksie równym naszej wartości.

**Dowód poprawności**
Lemat
Wywołanie metody czytającej zawsze zwraca wartość odpowiadającą bitowi w tablicy ustawionemu przez jakiś zapis.
Zawsze będzie spełnione:
jeśli wątek czyta ```r_bit[j]``` to jakiś bit o indeksie j lub wyższym, zapisany w wyniku wywołania ```write()```, jest ustawiany na wartość true
W trakcie inicjalizacji ustawiamy bit ```r_bit[0]``` na true (co traktujemy jako pierwszy zapis). Załóżmy, że czytamy ```r_bit[j]``` i ```r_bit[k]``` jest ustawiony na ```true``` dla $k\geq j$.
- Jeżeli wątek czytający przejdzie z j na j+1 to wtedy ```r_bit[j]``` musi być ustawione na false, więc k jest większe od j
- Wątek zapisujący czyści r_bit[k] tylko gdy ustawił wyższy bit na true
Zatem lemat zachodzi.
Niech dla dowolnego odczytu x będzie wartością zapisaną w ostatnim nienakładjacym się zapisie. W momencie gdy zapis zostanie zakończony ```r_bit[x]``` będzie ustawione na true a wszystkie ```r_bit[i]``` dla i<x będą ustawione na false. Z wcześniejszego lematu wiemy, że jeżeli wątek odczytujący zwróci wartość, która nie jest x to musiał zwrócić inną wartość, która musiała być ustawiona przez obecny zapis. Zatem rejestr ten jest regularny.
## Zadanie 3
:::success
Autor: Andrzej Morawski
:::
```java=
public class AtomicMRSWRegister<T> implements Register<T> {
ThreadLocal<Long> lastStamp;
private StampedValue<T>[][] a_table; // each entry is an atomic SRSW register
public AtomicMRSWRegister(T init, int readers) {
lastStamp = new ThreadLocal<Long>() {
protected Long initialValue() { return 0; };
};
a_table = (StampedValue<T>[][]) new StampedValue[readers][readers];
StampedValue<T> value = new StampedValue<T>(init);
for (int i = 0; i < readers; i++) {
for (int j = 0; j < readers; j++) {
a_table[i][j] = value;
}
}
}
public T read() {
int me = ThreadID.get();
StampedValue<T> value = a_table[me][me];
for (int i = 0; i < a_table.length; i++) {
value = StampedValue.max(value, a_table[i][me]);
}
for (int i = 0; i < a_table.length; i++) {
if (i == me) continue;
a_table[me][i] = value;
}
return value;
}
public void write(T v) {
long stamp = lastStamp.get() + 1;
lastStamp.set(stamp);
StampedValue<T> value = new StampedValue<T>(stamp, v);
for (int i = 0; i < a_table.length; i++) {
a_table[i][i] = value;
}!
}
}
```

Chcemy pokazać, że powyższy rejestr jest atomowym rejestrem MRSW
**4.1.1** Nigdy nie jest tak, że $R^i \rightarrow W^i$
Prawda
**4.1.2** Nigdy nie jest tak, że dla jakiegoś j $W^i \rightarrow W^j \rightarrow R^i$
Metoda $write()$ zapisuje tylko rosnące wskaźniki.
Z tego wynika, że gdy metoda $write()$ zapisze jakąś wartosć $v$ ze znacznikiem $t$ to następnie rodzielne wykonanie metody $read()$ przeczyta wartość ze znacznikiem czasu $\ge t$
**4.1.3** Jeżeli $R^i \rightarrow R^j$, to $i \le j$
Jeżeli wywołanie odczytu przez wątek $A$ w całości poprzedza wywołanie odczytu wykonane przez wątek $B$ to wątek $A$ zapisuje swoją wartość ze znacznikiem czasu w wierszu wątku $B$. Zatem $B$ wybierze wartość z odpowiedniem $(\ge t)$ znacznikiem czasu.
## Zadanie 4
:::success
Autor: Adam Korta
:::



Tworzymy tablicę gdzie każda komórka to atomowy rejestr MRSW. Aby zapisać wątek czyta wszystkie elementy tablicy i wybiera największą wartość i zapisuje ją do do swojej komórki.
Aby przeczytać wątek czyta wszystkie komórki tablicy i zwraca tą z najwyższą wartością. Jeżeli jest remis rozwiązujemy to na korzyść wątku z mniejszym indeksem.
* Lemat 1
Rejestr jest atomowym MRMW
Warunek (*) jest spełniony ponieważ wątek zawsze zapisuje większą wartość do komórki a zatem poprzedzający go read() nie mógł go przeczytać wcześniej.
Niech wywołanie metody write() przez A poprzedza wywołanie metody przez B które z kolei poprzedza wywołanie read() przez C.
Jeżeli A != B to C odczyta wartość zapisaną przez B bo B ma większą wartość.
Jeżeli A = B to wtedy wartość A zostaje nadpisana przez B a zatem znowu read() nie zwróci wartości zapisanej przez A. To spełnia warunek (**).
Załóżmy że wywołanie read() przez A jest przed wywołaniem read() przez B oraz wywołanie write() przez C jest przed wywołaniem write() przez D.
Jeżeli A zwróci wartość D to B nie może zwrócić wartości C.
Jeżeli wartość C jest mniejsza od wartości D to A zwróci wartość D a zatem B nie zwróci wartości C.
Jeżeli watość C jest równa wartości D to wątki musiały wykonać zapis współbieżnie. Jeżeli A odczyta wartość D to B również ją odczyta.(to dowodzi ***)
## Zadanie 5
:::success
Autor: Marcel Szelwiga
:::
```java=
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
class Task{
/*
T == 1 -> Sort -> Splits and add sorts
T == 2 -> Merge -> Merges to arrays and adds another merges
*/
public int T, a, b, m;
public int parentIndex;
Task(int T, int a, int b, int m, int parentIndex){
this.T = T;
this.a = a;
this.b = b;
this.m = m;
this.parentIndex = parentIndex;
}
}
public class MS{
static int[] arr, h;
public static void main(String[] args) {
int A[] = {7, 3, 2, 5, 1, 6, 8, 4};
arr = A;
h = new int[arr.length];
for (int i=0; i<arr.length; i++)
System.out.printf("%d ", arr[i]);
System.out.println();
AtomicInteger sorted = new AtomicInteger(0);
AtomicInteger idx = new AtomicInteger(0);
AtomicInteger[] checkers = new AtomicInteger[arr.length];
for (int i=0; i<arr.length; i++)
checkers[i] = new AtomicInteger(0);
Task[] tasks = new Task[arr.length];
ConcurrentLinkedQueue<Task> Q = new ConcurrentLinkedQueue<Task>();
Q.offer(new Task(1, 0, arr.length-1, -1, -1));
int workers_cnt = 3;
Thread[] workers = new Thread[workers_cnt];
for (int i=0; i<workers_cnt; i++){
workers[i] = new Thread(new Runnable() {
@Override
public void run() {
while (sorted.get() == 0){
Task T = Q.poll();
if (T == null)
continue;
//System.out.printf("T: %d {a: %d, b: %d} I: %d\n", T.T, T.a, T.b, T.parentIndex);
if (T.T == 1){
/* Sort */
if (T.a != T.b){
int I = idx.getAndIncrement();
int mid = (T.a+T.b)/2;
tasks[I] = new Task(2, T.a, T.b, mid, T.parentIndex);
Q.offer(new Task(1, T.a, mid, -1, I));
Q.offer(new Task(1, mid+1, T.b, -1, I));
} else {
if (checkers[T.parentIndex].getAndIncrement() == 1){
Q.offer(tasks[T.parentIndex]);
}
}
} else {
/* Merge */
int i = T.a, N = T.m;
int j = T.m+1, M = T.b;
int g = T.a;
while (i<=N || j<=M){
if (i>N) {
h[g] = arr[j]; g++; j++;
continue;
}
if (j>M) {
h[g] = arr[i]; g++; i++;
continue;
}
if (arr[i] > arr[j]){
h[g] = arr[j]; g++; j++;
} else {
h[g] = arr[i]; g++; i++;
}
}
for (i=T.a; i<=T.b; i++)
arr[i] = h[i];
if (T.parentIndex == -1){
sorted.getAndIncrement();
} else {
if (checkers[T.parentIndex].getAndIncrement() == 1){
Q.offer(tasks[T.parentIndex]);
}
}
}
}
}
});
}
for (int i=0; i<workers_cnt; i++){
workers[i].start();
}
/* Sort is running */
for (int i=0; i<workers_cnt; i++){
try {
workers[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < arr.length; i++)
System.out.printf("%d ", arr[i]);
System.out.printf("\n");
}
}
```
## Zadanie 6
:::success
Autor: Jakub Mikołajczyk
:::
```java=
package z6;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
abstract class Task{
protected BlockingQueue<Task> queue;
abstract public void execute() throws InterruptedException;
}
class TerminateTask extends Task{
public TerminateTask(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void execute() throws InterruptedException {
queue.put(this);
Thread.currentThread().interrupt();
}
}
class MergeTask extends Task{
private final int l, m, r;
private final int[] arr, buf;
private final MergeTask parent;
private int childCompleted;
public MergeTask(int[] arr, int[] buf, BlockingQueue<Task> queue, int l, int m, int r, MergeTask parent){
this.arr = arr;
this.buf = buf;
this.queue = queue;
this.childCompleted = 0;
this.l = l;
this.m = m;
this.r = r;
this.parent = parent;
}
private boolean isReady(){
return childCompleted == 2;
}
public synchronized void completeChild(){
childCompleted++;
}
private void completeParent() throws InterruptedException {
if (parent == null){
queue.put(new TerminateTask(queue));
}
else {
parent.completeChild();
}
}
@Override
public void execute() throws InterruptedException {
if (isReady()){
System.arraycopy(arr, l, buf, l, r - l + 1);
int leftIndex = l;
int leftEnd = m + 1;
int rightIndex = m + 1;
int rightEnd = r + 1;
int arrIndex = l;
while (leftIndex < leftEnd && rightIndex < rightEnd){
if (buf[leftIndex] <= buf[rightIndex]){
arr[arrIndex] = buf[leftIndex];
leftIndex++;
}
else {
arr[arrIndex] = buf[rightIndex];
rightIndex++;
}
arrIndex++;
}
System.arraycopy(buf, leftIndex, arr, arrIndex, leftEnd - leftIndex);
System.arraycopy(buf, rightIndex, arr, arrIndex, rightEnd - rightIndex);
completeParent();
}
else {
queue.put(this);
}
}
}
class SplitTask extends Task{
private final int l, r;
private final int[] arr, buf;
private final MergeTask parent;
public SplitTask(int[] arr, int[] buf, BlockingQueue<Task> queue, int l, int r, MergeTask parent) {
this.arr = arr;
this.buf = buf;
this.queue = queue;
this.l = l;
this.r = r;
this.parent = parent;
}
@Override
public void execute() throws InterruptedException {
if (l < r){
int m = (l + r)/2;
MergeTask mergeTask = new MergeTask(arr, buf, queue, l, m, r, parent);
queue.put(new SplitTask(arr, buf, queue, l, m, mergeTask));
queue.put(new SplitTask(arr, buf, queue, m + 1, r, mergeTask));
queue.put(mergeTask);
}
else{
if(parent == null){
queue.put(new TerminateTask(queue));
}
else {
parent.completeChild();
}
}
}
}
class Worker implements Runnable{
private final BlockingQueue<Task> queue;
public Worker(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
Task task = null;
try {
task = queue.take();
task.execute();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
class PoolManager{
static int THREATS = 5;
public PoolManager(int[] arr) throws InterruptedException {
int [] buf = new int[arr.length];
BlockingQueue<Task> queue = new LinkedBlockingQueue<Task>();
queue.put(new SplitTask(arr, buf, queue, 0, arr.length - 1, null));
for (int i = 0; i < THREATS; i++){
Thread thread = new Thread(new Worker(queue));
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class z6 {
public static void main(String[] args) throws InterruptedException {
int []arr = {4,5,2,6,1,654,675,21,3,54,73,21,654,865,4,36,2,8,0};
new PoolManager(arr);
System.out.println(Arrays.toString(arr));
}
}
```