# Ćwiczenia 6, grupa śr. 10-12, 29 listopada 2023
###### tags: `PRW23` `ćwiczenia` `pwit`
## Deklaracje
Gotowość rozwiązania zadania należy wyrazić poprzez postawienie X w odpowiedniej kolumnie! Jeśli pożądasz zreferować dane zadanie (co najwyżej jedno!) w trakcie dyskusji oznacz je znakiem ==X== na żółtym tle.
**UWAGA: Tabelkę wolno edytować tylko wtedy, gdy jest na zielonym tle!**
:::danger
| | 1 | 2 | 3 | 4 | 5 | 6 |
| ----------------------:| ----- | --- | --- | --- | --- | --- |
Mikołaj Balwicki | | | | | | |
Konrad Kowalczyk | | X | X | X | | |
Jakub Krzyżowski | | X | X | | | |
Łukasz Magnuszewski | | | | | | |
Marcin Majchrzyk | | X | X | X | X | X |
Jan Marek | | X | X | | | |
Marcin Mierzejewski | | | | | | |
Juan Jose Nieto Ruiz | | | | | | |
Konrad Oleksy | | | | | | |
Javier Rábago Montero | X | X | X | X | X | X |
Michał Mękarski | X | X | | | | |
:::
Tutaj można zadeklarować zaległe zadanie 8 z listy 5:
:::danger
| | 5.8 |
| ----------------------:| ----- |
| Javier Rábago Montero | X |
| | |
:::
:::info
**Uwaga:** Po rozwiązaniu zadania należy zmienić kolor nagłówka na zielony.
:::
## Zadanie 1
:::success
Autor: Michał Mękarski
:::


Bez straty ogólności załóżmy, że nasza unikalna numeracja to indeksowanie kolejnymi liczbami naturalnymi.
#### Definicja atomowości
Rejestr jest uznawany za atomowy, jeśli spełnia zasadę linearyzowalności. Oznacza to, że jeśli wątek A wykonuje operację zapisu w punkcie x, to wątek B, wykonując operację odczytu przed tym punktem, uzyska dostęp do starej wartości. Natomiast wykonując operację odczytu po tym punkcie, otrzyma dostęp do nowej wartości.
#### ( * ) $\forall i \; \; \neg (𝑅^i \rightarrow 𝑊^i)$
Po ludzku: Nie mamy żadnych odczytów wartości dotychczas niezapisanych.
#### ( ** ) $\forall i, j \; \; \neg (𝑊^i \rightarrow W^j \rightarrow 𝑅^i)$
Po ludzku: Nie znaleźliśmy sekwencji zapisów w której wartość czytana w punkcie $i$ mogła być nadpisana w innym punkcie $j$
#### ( *** ) $\forall i, j \; \; (𝑅^i \rightarrow R^j) \Rightarrow (i \leq j)$
Gdyby ten warunek nie zachodził to znaczyło by, że mamy do czynienia z błędną kolejnością odczytów, tzn. odczyt "późniejszy" wykonany jest przed oczytem "wcześniejszym" co jest sprzeczne z naszą definicją
### Równoważność
### $$\Rightarrow$$
Przez sprzeczność z każdym z warunków.
#### ( * ) $\forall i \; \; \neg (𝑅^i \rightarrow 𝑊^i)$
Gdyby tak nie było to znaczyło by, że istnieje taka para zapisu i odczytu, że odczyt aktualnej "wersji rejestru" odbywa się przed jej zapisaniem, co jest sprzeczne z definicją atomowości.
#### ( ** ) $\forall i, j \; \; \neg (𝑊^i \rightarrow W^j \rightarrow 𝑅^i)$
Gdyby istniała taka sytuacja to operacje W i R nie były by atomowe ponieważ mielibyśmy odczyt "przykrytej" już wartości rejestru.
#### ( *** ) $\forall i, j \; \; (𝑅^i \rightarrow R^j) \Rightarrow (i \leq j)$
Ciąg indeksów operacji jest niemalejący, zatem taka sytuacja nie może zajść.
### $$\Leftarrow$$
Z ( * ) oraz ( ** ) wprost wynika, atomowość rejestru.
## Zadanie 2
:::success
Autor: Jan Marek
:::

Regularny M-wartościowy rejestr implementujemy jako tablicę M rejestrów boolowskich.
Początkowo rejestr jest ustawiony na wartość zero, o czym świadczy wartość true bitu "zerowego".
Metoda write(x) zapisuje true w lokacji x, a następnie w malejącym porządku indeksu tablicy ustawia wszystkie niższe lokacje na false.
Metoda read() odczytuje wartości w rosnącym porządku indeksu do momentu, kiedy po raz pierwszy odczyta wartość true pod jakimś indeksem i. Następnie zwraca i.


Aby wykazać, że implementacja rejestru jest regularna, musimy wykazać, że jego historie spełniają poniższe warunki.
(*) Żadne wywołanie operacji odczytu nie zwraca wartości z przyszłości:
dla każdego i nie jest prawdą, że $R^i \rightarrow W^i$
(**) Żadne wywołanie operacji odczytu nie zwraca wartości z odległej przeszłości, tzn. takiej wartości, która poprzedza ostatnio zapisaną, nienakładającą się wartość:
dla każdych i, j nie jest prawdą, że $W^i \rightarrow W^j \rightarrow R^i$
**Lemat 1.**
Wywołanie metody read() zawsze zwraca wartość odpowiadającą bitowi ze zbioru 0...M-1, ustawioną przez jakieś wywołanie metody write().
**Dowód.**
Następująca własność jest niezmienna:
jeżeli wątek odczytujący przeczyta r_bit[j], to jakiś bit z indeksem j lub wyższym, zapisany w wyniku wywołania metody write(), jest ustawiony na true.
W momencie inicjalizacji rejestru nie ma wątków odczytujących, a konstruktor (wywołanie metody write(0)) ustawia r_bit[0] na true.
Załóżmy, że czytamy r_bit[j] oraz że r_bit[k] ma wartość true dla k >= j.
- jeśli wątek czytający przejdzie z j do j+1, to wówczas r_bit[j] ma wartość false, więc k > j (to znaczy bit większy lub równy j+1 ma wartość true).
- wątek zapisujący zeruje r_bit[k] tylko wtedy, gdy ustawił wyższy r_bit[l] na true, dla l > k.
**Lemat 2.**
Klasa `RegMRSWRegister` poprawnie implementuje regularny M-wartościowy rejestr MRSW.
**Dowód.**
Dla dowolnej operacji odczytu, niech x będzie wartością zapisaną przez ostatnią nienakładającą się operację write(). W momencie zakończenia write(), pole r_bit[x] było ustawione na true, a wszystkie pola r_bit[i] (dla i < x) mają wartość false. Z Lematu 1 wiemy, że jeśli wątek odczytujący zwróci wartość różną od x, to musiał on odczytać w jakimś polu r_bit[j] (j != x) wartość true, a bit ten musiał zostać ustawiony przez współbieżną operację zapisu, co dowodzi warunków (*) oraz (**).
## Zadanie 3
:::success
Autor: Jakub Krzyżowski
:::





Żaden reader nie zwraca wartości z przyszłości, więc pierwszy warunek jest spełniony.
Z definicji `write()` zapisuje rosnące timestampy, a więc max timestamp wzdłuż rzędów, lub kolumn też jest rosnący.
Jeśli wątek A zapisuje wartość v z timestampem t, to dowolny `read()` z wątku B następujący po `write()` z A odczytuje z przekątnej maksymalny timestamp większy-równy t, a więc spełniony warunek 2.
Jeśli `A: read()` $\rightarrow$ `B: read()`, to A zapisuje wartość z timestampem t do rzędu B i B odczytuje wartość z timestampem większym-równym t, spełniając 3. warunek.
## Zadanie 4
:::success
Autor: Konrad Kowalczyk
:::

```java
public class AtomicMRMWRegister<T> implements Register<T>{
private StampedValue<T>[] a_table; // array of atomic MRSW registers
public AtomicMRMWRegister(int capacity, T init) {
a_table = (StampedValue<T>[]) new StampedValue[capacity];
StampedValue<T> value = new StampedValue<T>(init);
for (int j = 0; j < a_table.length; j++) {
a_table[j] = value;
}
}
public void write(T value) {
int me = ThreadID.get();
StampedValue<T> max = StampedValue.MIN_VALUE;
for (int i = 0; i < a_table.length; i++) {
max = StampedValue.max(max, a_table[i]);
}
a_table[me] = new StampedValue(max.stamp + 1, value);
}
public T read() {
StampedValue<T> max = StampedValue.MIN_VALUE;
for (int i = 0; i < a_table.length; i++) {
max = StampedValue.max(max, a_table[i]);
}
return max.value;
}
}
```
Aby rejestr był atomowy, potrzebne jest, aby spełniał 3 własności:
1. nigdy nie jest tak, że `read(i)` → `write(i)` (`read()` nie może zwrócić wartości, która jeszcze nie została zapisana)
2. dla żadnego `j` nie występuje sytuacja, że `write(i)` → `write(j)` → `read(i)` (w przypadku niewspółbieżnych odczytów i zapisów, `read()` zawsze zwraca wartość zapisaną jako ostatnia)
3. jeśli `read(i)` → `read(j)` to `i` ≤ `j` (wcześniejszy `read()` nie może odczytać wartości późniejszej niż późniejszy `read()`)
---
Własność 1 jest spełniona, ponieważ każdy `write()` następujący po `read()` będzie miał sygnaturę czasową wyższą niż jakakolwiek, która była zapisana w momencie działania `read()`. W takim razie `read()` nie mógł odczytać `write()` następującego po nim.
Własność 2 jest również spełniona. Załóżmy, że mamy `write()`<sub>A</sub> → `write()`<sub>B</sub> → `read()`<sub>C</sub>, gdzie A, B i C są wątkami. Jeżeli A = B, to `write()`<sub>B</sub> nadpisuje komórkę, do której zapisał `write()`<sub>A</sub>, a więc C nie odczyta wartości zapisanej przez A. Jeśli zaś A ≠ B, to sygnatura czasowa A jest mniejsza niż sygnatura czasowa B, a C zawsze odczyta wartość o wyższej sygnaturze czasowej, zatem C nie odczyta wartości zapisanej przez A.
Własność 3 również jest spełniona. Załóżmy, że mamy `read()`<sub>A</sub> → `read()`<sub>B</sub> oraz `write()`<sub>C</sub> → `write()`<sub>D</sub>. Jeżeli odczyt w A zwrócił wartość zapisaną przez D, to odczyt w B nie może zwrócić wartości zapisanej przez C. Jeżeli sygnatura czasowa C jest mniejsza niż sygnatura czasowa D (`t`<sub>C</sub> < `t`<sub>D</sub>), to skoro A odczytał sygnaturę czasową D, to B mógł odczytać tylko sygnaturę czasową D lub większą, a zatem nie mógł odczytać wartości zapisanej w C.
```java
public class StampedValue<T> {
public long stamp;
public T value;
// initial value with zero timestamp
public StampedValue(T init) {
stamp = 0;
value = init;
}
// later values with timestamp provided
public StampedValue(long stamp, T value) {
stamp = stamp;
value = value;
}
public static StampedValue max(StampedValue x, StampedValue y) {
if (x.stamp > y.stamp) {
return x;
} else {
return y;
}
}
public static StampedValue MIN_VALUE =
new StampedValue(null);
}
```
Z kolei jeśli `t`<sub>C</sub> = `t`<sub>D</sub>, to zauważmy, że metoda `StampedValue.max(x, y)` zwraca `x` tylko i wyłącznie wtedy, kiedy `x` > `y`. Kiedy `y` > `x` lub kiedy `x` = `y`, to zwraca `y`. W kodzie rejestru atomowego MRMW kolejne wartości z tabeli są na pozycji `y`, a dotychczasowy `max` na pozycji `x`. To oznacza, że w przypadku takich samych sygnatur czasowych zarówno `read()` jak i `write()` za wykonanie "późniejsze" uzna to, które ma wyższy indeks. Możemy zatem powiedzieć, że jeśli `write()`<sub>C</sub> → `write()`<sub>D</sub> oraz `t`<sub>C</sub> = `t`<sub>D</sub>, to C < D. Zatem skoro `read()`<sub>A</sub> odczytał wartość zapisaną przez D, to zrobi to również `read()`<sub>B</sub> (lub odczyta jakąś o wyższej sygnaturze czasowej lub indeksie, ale nie tę zapisaną wcześniej przez C).
## Zadanie 5
:::success
Autor: Marcin Majchrzyk
:::
```code=
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
class Task {
public int l, r, m, parent;
public boolean sort;
public Task(boolean sort, int l, int r, int m, int parent) {
this.sort = sort;
this.l = l;
this.r = r;
this.m = m;
this.parent = parent;
}
}
public class Main {
public static void main(String[] args) {
int[] array = {9, 8, 7, 6, 5, 4, 3, 2, 1, 9, 8, 7, 6, 5, 4, 3, 2, 1, 9, 8, 7, 6, 5, 4, 3, 2, 1, 9, 8, 7, 6, 5, 4, 3, 2, 1};
int[] helper = new int[array.length];
AtomicInteger sorted = new AtomicInteger(0);
AtomicInteger[] sort_done = new AtomicInteger[array.length];
for (int i=0; i < array.length; i++) {
sort_done[i] = new AtomicInteger(0);
}
AtomicInteger merge_index = new AtomicInteger(0);
Task[] merges = new Task[array.length];
ConcurrentLinkedQueue<Task> tasks = new ConcurrentLinkedQueue<>();
int MAX_TASKS = 8;
Thread[] workers = new Thread[MAX_TASKS];
for (int i=0; i < MAX_TASKS; i++) {
workers[i] = new Thread(() -> {
while (sorted.get() == 0) {
Task t = tasks.poll();
if (t != null) {
if (t.sort) {
if (t.l == t.r) {
if (sort_done[t.parent].getAndIncrement() == 1) {
tasks.offer(merges[t.parent]);
}
} else {
int idx = merge_index.getAndIncrement();
t.m = (t.l + t.r) / 2;
merges[idx] = new Task(false, t.l, t.r, t.m, t.parent);
tasks.offer(new Task(true, t.l, t.m, -1, idx));
tasks.offer(new Task(true, t.m + 1, t.r, -1, idx));
}
} else {
int leftSize = t.m - t.l + 1;
int rightSize = t.r - t.m;
int arrPosition = t.l;
int leftPosition = t.l, rightPosition = t.l + leftSize;
int leftEnd = leftPosition + leftSize, rightEnd = rightPosition + rightSize;
System.arraycopy(array, leftPosition, helper, leftPosition, leftSize);
System.arraycopy(array, rightPosition, helper, rightPosition, rightSize);
while (leftPosition < leftEnd && rightPosition < rightEnd) {
if (helper[leftPosition] <= helper[rightPosition]) {
array[arrPosition] = helper[leftPosition];
leftPosition++;
} else {
array[arrPosition] = helper[rightPosition];
rightPosition++;
}
arrPosition++;
}
System.arraycopy(helper, leftPosition, array, arrPosition, leftEnd - leftPosition);
System.arraycopy(helper, rightPosition, array, arrPosition, rightEnd - rightPosition);
if (t.parent == -1) {
sorted.getAndIncrement();
} else {
if (sort_done[t.parent].getAndIncrement() == 1) {
tasks.offer(merges[t.parent]);
}
}
}
}
}
});
workers[i].start();
}
tasks.offer(new Task(true, 0, array.length-1, -1, -1));
for (Thread worker : workers) {
try {
worker.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i : array) {
System.out.printf("%d ", i);
}
}
}
```
## Zadanie 6
:::success
Autor: Javier Rábago Montero
:::
```java
import java.util.concurrent.LinkedBlockingQueue;
class Task {
public int l, r, m;
public int parent;
public Task(int l, int r, int m, int parent) {
this.l = l;
this.r = r;
this.m = m;
this.parent = parent;
}
}
public class MergeSort {
private static final int M = 6;
private static final LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private static final Runnable END = () -> {};
private static void callMergeSort(int[] arr, int l, int r){
Task[] merges = new Task[arr.length*arr.length];
int parent = 0;
mergeSort(arr, l, r, parent, merges);
StartThreads();
waitThreads();
for (int i = (merges.length-1); i >= 0; i--) {
if (merges[i] != null) {
final int indice = i;
try {
taskQueue.put(() -> {
merge(arr, merges[indice].l, merges[indice].m, merges[indice].r);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
waitThreads();
for (int k = 0; k < M; k++) { // This ends the threads, it enters M times END in the queue
try {
taskQueue.put(END);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void mergeSort(int[] arr, int l, int r, int parent, Task[] merges) {
int m = (l + r) / 2;
int i=1;
if (parent == 0) {
merges[0] = new Task(l, r, m, parent);
} else{
if (merges [parent*parent-1] == null) {
merges[parent*parent-1] = new Task(l, r, m, parent);
} else {
while (merges[parent*parent-1+i] != null) {
i++;
}
merges[parent*parent-1+i] = new Task(l, r, m, parent);
}
}
if (l < r) {
try {
taskQueue.put(() -> mergeSort(arr, l, m, parent + 1, merges));
taskQueue.put(() -> mergeSort(arr, m + 1, r, parent + 1, merges));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void waitThreads() {
while (true) {
if (taskQueue.size() == 0) {
break;
}
}
}
private static void StartThreads() {
for (int i = 0; i < M; i++) {
new Thread(() -> {
while (true) {
try {
Runnable task = taskQueue.take();
if (task == END) {
break;
}
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
}
private static void merge(int[] arr, int l, int m, int r) {
int n1 = m - l + 1;
int n2 = r - m;
int[] left = new int[n1];
int[] right = new int[n2];
for (int i = 0; i < n1; i++) {
left[i] = arr[l + i];
}
for (int j = 0; j < n2; j++) {
right[j] = arr[m + 1 + j];
}
int i = 0, j = 0, k = l;
while (i < n1 && j < n2) {
if (left[i] <= right[j]) {
arr[k] = left[i];
i++;
} else {
arr[k] = right[j];
j++;
}
k++;
}
while (i < n1) {
arr[k] = left[i];
i++;
k++;
}
while (j < n2) {
arr[k] = right[j];
j++;
k++;
}
}
public static void main(String[] args) {
// int[] arr = { 12, 11, 13, 5, 6, 7 , 1, 2, 3, 4, 5, 4, 6, 1, 2, 3, 4, 5, 6, 7, 8};
int[] arr = { 12, 11, 13, 5, 6, 7 , 1};
callMergeSort(arr, 0, arr.length - 1);
for (int i : arr) {
System.out.print(i + " ");
}
}
}
```
The linked queue is more efficient, as you don´t have to insert a stop condition or “sentinel” to the end of the queue to end the threads, or else they will be waiting for new tasks while main is waiting for threads to end, resulting in a never-ending program.1