# SYT-GEK1041 `Distributed Data Structures`
Jan R. Borensky, Tim Degold
## Einfaches Beispiel
### Java
Hier erkläre ich das Programm MainRunner [2], dabei werden Zahlen gerundet. Um sich mit Spark zu Verbinden wird eine Konfiguration und ein Kontext erstellt.
```java=
SparkConf sparkConf = new SparkConf()
.setAppName("spark_first")
.setMaster("local[*]");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
```
Nun müssen wir eine RDD erstellen, also das verteilte Objekt, welches von den Nodes abgefragt werden kann.
```java
JavaRDD<Double> javaRDD = sparkContext.parallelize(doubleList);
```
`doubleList` ist dabei eine gewöhnliche Liste mit Zahlen.
```java=
List<Double> doubleList = new ArrayList<>();
doubleList.add(23.44);
doubleList.add(26.43);
doubleList.add(75.35);
doubleList.add(245.767);
doubleList.add(398.445);
doubleList.add(94.72);
```
Nun kann die `round`-Operation ausgeführt werden, dies geschieht über die `map`-Funktion.
```java
JavaRDD<Integer> mappedRDD = javaRDD.map(val -> (int)Math.round(val));
```
Das Ergebnis kann dann ausgegeben werden.
Die gerundeten Werte:

Die Summe:

### Python
Folgendes Beispiel kann für die Verwendung von Spark in Python verwendet werden.
**Code**
```python=
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext('local[*]')
data = [1.1, 2.1, 3.1, 4.1, 5.1]
distData = sc.parallelize(data)
mapped = distData.map(lambda s: round(s))
print(mapped.collect())
```
Zuerst wird die `findspark`-Library installiert, die dazu verwendet wird, um die Spark-Installation bzw. den Server zu finden. Danach wird mithilfe der `pyspark`-Library ein `SparkContext` erstellt, der in diesem Fall eigentlich als Master dient.
Danach erstellen wir ein RDD, mit dem anschließend das Mappen einer Funktion sowie das "Einsammeln" der Daten aus diesem Speicher getestet wird.
**Deplyoment**
```=
cd python
virtualenv venv
pip install -r requirments.txt
spark-submit --master local --deploy-mode client --conf spark.pyspark.virtualenv.enabled=true round.py
```
Wenn man dann genau hinsieht, sieht man das Ergebnis zwischen den Log-Meldungen:

## Beispiel mit Workern
Für die jeweiligen Beispiele mit mehreren Workern wurde das Skript `remote_test.py` verwendet, das im Ordner `Python` zu finden ist.
### Starten des Masters und der Worker
Wenn man Spark installiert hat, kann der Master mit folgenden Befehlen ausgeführt werden.
```shell=
cd /opr/apache-spark/sbin
sh start-master.sh --host localhost
```
Danach ist die Spark-Konsole unter localhost:8080 erreichbar.

Diesen Output sollte der Master geben
```cmd=
2021-03-03 16:19:05,679 INFO master.Master: I have been elected leader! New state: ALIVE
2021-03-03 16:28:52,656 INFO master.Master: Registering worker 127.0.0.1:44103 with 4 cores, 14.5 GiB RAM
```
Nun können beliebig viele Worker mit `sh start-worker.sh spark://localhost:7077` gestartet werden.
Neues Skript:
```
sudo sh startworkersnew.sh spark://localhost:7077
```

Diese sind nun in der Konsole ersichtlich.
**Master und Worker**

### Ausfall eines Workers
Wir haben eine Berechnung mit 10^9 Samples auf 3 Worker aufgeteilt, und dann einen Worker abgedreht. Wie man sieht, erkennt Spark das.


Spark bricht die Berechnung bei einem Ausfall nicht ab, sondern verteilt die verbleibenden Operationen auf die beiden anderen Worker auf.
Dabei wird folgende Error-Message angezeigt:
```
2021-03-10 12:18:46,296 ERROR scheduler.TaskSchedulerImpl: Lost executor 2 on 127.0.0.1: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
2021-03-10 12:18:46,302 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 127.0.0.1, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
```

## Benchmarks
Wir haben uns für eine sehr einfache Lösung entschieden. Wir messen die Zeit bevor sich Python mit dem Master verbindet und sobald die Berechnung beendet wurde. Dann geben wir die Differenz aus. Für die Zeit ergibt sich in Zusammenhang mit der Nummer an Samples und der Zahl der Worker folgendes.
Für 1 Worker:
| Nummer an Samples | Zeit (Python) [s] | Zeit (Spark) [s] |
|-------------------|-------------------|------------------|
| 10^9 | 425.77 | 420 |
| 10^6 | 9.2 | 6 |
| 10^3 | 8.5 | 5 |
Für 2 Worker:
| Nummer an Samples | Zeit (Python) [s] | Zeit (Spark) [s] |
|-------------------|-------------------|------------------|
| 10^9 | 226.23 | 222 |
| 10^6 | 11.83 | 8 |
| 10^3 | 11.05 | 8 |
Wie man sieht, sind die Zeiten für eine kleine Sample-Zahl bei zwei Workern größer, als bei einem. Dies liegt vermutlich daran, dass der Overhead zur Lastenverteilung im Vergleich zur benötigten Rechenzeit sehr groß ist. Erst bei einer großen Zahl an Samples merkt man den Unterschied. Für 10^9 Samples haben 2 Worker ca halb so viel Zeit benötigt.
## Quellen
[1] “Starting the Spark | Learning Apache Spark in Java.” https://telegraphhillsoftware.com/starting-the-spark/ (accessed Mar. 03, 2021).
[2] “Your First Java RDD With Apache Spark - DZone Java.” https://dzone.com/articles/your-first-java-rdd-with-apache-spark (accessed Mar. 03, 2021).
[3] “scala - Spark - Error ‘A master URL must be set in your configuration’ when submitting an app - Stack Overflow.” https://stackoverflow.com/questions/38008330/spark-error-a-master-url-must-be-set-in-your-configuration-when-submitting-a (accessed Mar. 03, 2021).
[4] “Using Gradle to create a simple Java Spark application | by Yang Gao | Medium.” https://medium.com/@yanggao1119/using-gradle-to-create-a-simple-java-spark-application-f2a2b6460e8c (accessed Mar. 03, 2021).
[5] “What is an RDD in Spark? - Learn Spark RDD - Intellipaat.” https://intellipaat.com/blog/tutorial/spark-tutorial/programming-with-rdds/ (accessed Mar. 03, 2021).
[6] “Apache Spark - RDD - Tutorialspoint.” https://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm (accessed Mar. 03, 2021).
[7] “Apache Spark: Vorteile und Anwendungsbereiche.” https://datasolut.com/was-ist-spark/ (accessed Mar. 03, 2021).
[8] “Distributed Data Processing with Apache Spark | by Munish Goyal | DataDrivenInvestor.” https://medium.datadriveninvestor.com/distributed-data-processing-with-apache-spark-2a5e473b0cb1 (accessed Mar. 03, 2021).
[9] “apache/spark: Apache Spark - A unified analytics engine for large-scale data processing.” https://github.com/apache/spark (accessed Mar. 03, 2021).
[10] “Overview - Spark 3.1.1 Documentation.” http://spark.apache.org/docs/latest/index.html (accessed Mar. 03, 2021).