# Spark Benchmark & Best Practices
This note focus on spark version 2.3.4 and koalas version 1.8.0.
---
## I. Spark architecture
First of all, we need check that spark runs smoothly, and that tasks are correctly distributed.
The big picture:

See this article on [Spark design](https://medium.com/@fengliplatform/spark-tasks-from-driver-to-executors-and-more-8e8be5ad28bf).
Also the [APHP wiki](https://gitlab.eds.aphp.fr/public-resources/eds-doc/-/wikis/spark-fonctionnement) (in french), explaining clearly the building blocks of Spark:

<br>
## II. Spark Config & sanity check
### II.1 Configs
[Spark Config Guide](https://spark.apache.org/docs/2.4.3/configuration.html#dynamic-allocation)
• [Spark job scheduling](https://spark.apache.org/docs/2.4.3/job-scheduling.html#dynamic-resource-allocation)
Here we explore various way to parallize tasks and understand how to beneficiate from speedup, using DataFrame or functions.
We can begin by accessing all configurations via:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.getConf().getAll()
```
Or alternatively, take a look at your `kernel.json`:
`/export/home/<your_cse>/.local/share/jupyter/kernels/<your_kernel>/kernel.json`
Interestingly, in our jupyter kernel we find:
- `spark.dynamicAllocation.maxExecutors: 4`
- `spark.dynamicAllocation.enabled: true`
Meaning that we're probably capped to 4 cores.
### II.2 `sparkContext.parallelize`
We can parallelize functions as follow:
```python
import os
from time import sleep, time
N_TASKS = 800
def bench(*args):
"""Sleep 1 seconds and return current timestamp and process id"""
sleep(1)
return (time(), os.getpid())
out = sc.parallelize(range(N_TASKS)).map(bench).collect()
```
We compute the speedup yielded by the parallelization when varying `N_TASKS` from 800 up to 3200 (i.e. the total amount of time it would have taken for a single thread, divided by the observed time of parallelization).
<details><summary>code</summary>
```python
import seaborn as sns
all_n_tasks = [80, 160, 400, 800, 1600, 3200]
speedups = []
for n_tasks in all_n_tasks:
tic = time()
out = sc.parallelize(list(range(n_tasks))).map(bench).collect()
duration = time() - tic
print(n_tasks, duration)
total_time = n_tasks
speedups.append(total_time / duration)
sns.lineplot(x=all_n_tasks, y=speedups)
```
</details>
<br>

We see that it converges to the max theoretical speedup of 20 (4 executors $\times$ 5 cores), minus some task scheduling overhead cost.
We can easily ask for more executors or more cores per executors:
```python
config = sc.getConf()
config.set("spark.dynamicAllocation.maxExecutors", 8) # instead of 4
sc.stop()
spark.stop()
spark = SparkSession.builder.config(conf=config).getOrCreate()
sc = spark.sparkContext
```
### II.3 `spark-submit`
Alternatively, one can launch an entire PySpark script from terminal with [`spark-submit`](https://spark.apache.org/docs/2.4.3/submitting-applications.html). This is what [eds-toolbox](https://datasciencetools-pages.eds.aphp.fr/edstoolbox/cli/spark/) uses.
We try to measure the speed-up and hence the number of core efficiently used with this very basic script, using eds-toolbox:
<details><summary>script</summary>
```python
import time
import os
from pprint import pprint
from collections import Counter
from edstoolbox import SparkApp
app = SparkApp("test-script")
T_SLEEP = 10
def func(*args):
time.sleep(T_SLEEP)
return os.getpid()
@app.submit
def run(spark, sql, config):
sc = spark.sparkContext
pprint(sorted(sc.getConf().getAll(), key=lambda x: x[0]))
n_loops = 30
n_partitions = 30
tic = time.time()
pids = sc.parallelize(range(1, n_loops+1), n_partitions).map(func).collect()
toc = time.time()
duration = toc - tic
print(f"duration: {duration}")
total_duration = T_SLEEP * n_loops
speed_up = total_duration / duration
print(f"speed_up: {speed_up}")
counter = Counter(pids)
print(f"unique pids: {len(counter)}")
print(counter)
if __name__ == "__main__":
app.run()
```
</details>
<br>
And run it with the following (we let the default config for spark-submit, i.e. 2 executors and 2 cores by executors):
```shell
eds-toolbox spark submit spark_sanitycheck.py
```
This doesn't yield any print, therefore we can't evaluate the speedup.
<br>
## III. `Koalas.options`
https://koalas.readthedocs.io/en/latest/user_guide/options.html
We list below the options of Koalas that can be set during the runtime.
| option | default value | description |
|----------------------------|---------------|----------------------------------------------------------------------------------------------------------------------------|
| compute.default_index_type | "sequence" | "sequence", "distributed-sequence" or "distributed" (see table below) |
| compute.max_row | 1000 | |
| compute.ops_on_diff_frame | False | Allow operations on different frames by joining them, which is expensive. |
| compute.ordered_head | False | Koalas does not guarantee the row ordering so head could return some rows from distributed partitions, with an extra-cost. |
| compute.shortcut_limit | 1000 | When the dataframe length is larger than this limit, Koalas uses PySpark to compute. |
| display.max_rows | 1000 | |
| plotting.backend | plotly | matplotlib or plotly |
| plotting.max_rows | 1000 | |
| plotting.sample_ratio | None | |
<br>
The most important config to set are `compute.default_index_type` (need to be set to distributed) and `compute.ops_on_diff_frame` (need to be set to True). These two are already well configured in `eds_scikit`.
The table below gives more details about `compute.default_index_type` options:

### Actions
- `display.max_rows` to be set to 50, to replicate pandas API.
<br>
## IV. See the execution plan with `explain()`
```python
import databricks.koalas as ks
kdf = ks.DataFrame({'id': range(10)})
kdf = kdf.sort_values("id")
kdf.spark.explain()
```
```scala
== Physical Plan ==
*(2) Project [__index_level_0__#0L, id#1L]
+- *(2) Sort [id#1L ASC NULLS LAST, __natural_order__#4L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#1L ASC NULLS LAST, __natural_order__#4L ASC NULLS FIRST, 8)
+- *(1) Project [__index_level_0__#0L, id#1L, monotonically_increasing_id() AS __natural_order__#4L]
+- Scan ExistingRDD[__index_level_0__#0L,id#1L]
```
- The plan is read bottom-up: we begin by loading the table `Scan ExistingRDD` before adding an index.
- `Exchange` means partition shuffling or data exchange between partition, and it is costly. Here the data is collected in a single partition to perform sorting.
Let's see a more complex example:
```python
kdf = ks.DataFrame({'id': range(10), 'val': range(10)})
kdf = kdf.groupby("id").sum()
kdf.spark.explain()
```
```scala
== Physical Plan ==
*(2) HashAggregate(keys=[__index_level_0__#33L], functions=[sum(val#24L)])
+- Exchange hashpartitioning(__index_level_0__#33L, 8)
+- *(1) HashAggregate(keys=[__index_level_0__#33L], functions=[partial_sum(val#24L)])
+- *(1) Project [id#23L AS __index_level_0__#33L, val#24L]
+- *(1) Filter AtLeastNNulls(n, id#23L)
+- Scan ExistingRDD[__index_level_0__#22L,id#23L,val#24L]
```
- We see a first HashAggregate with a `partial_sum` function that first perform a sum accross all partition, before shuffling the result (`Exchange`) and performing a final sum (`HashAggregate`) on the previous results
### Actions
- When a query become long and complex, try to understand it using `df.spark.explain()`.
- Let's use `cache` over `persist`.
<br>
## V. When to use `cache()`?
`df.spark.cache()` vs `df.spark.local_checkpoint()`
https://stackoverflow.com/a/40002609
- `df.spark.cache()` doesn't alter lineage while `local_checkpoint()` does
- checkpointing causes double computation
Why using cache?
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
> If, unfortunately, some errors or exceptions occur during the execution of a task, the whole computing chain needs to be re-executed, which is considerably expensive.
What kind of RDD needs to be cached ?
> Those which will be repeatedly computed and are not too large.
We can trigger `cache()` at the end of a function by using `.count()`
https://stackoverflow.com/questions/44002128/when-are-cache-and-persist-executed-since-they-dont-seem-like-actions
```python
_ = kdf.spark.cache().count()
```
`count()` is an inexpensive actions that allow triggering the eager mode.
### Actions
- Add `count()` to `eds_scikit` caching mechanism, to limit the recomputing when an error occurs. <br>
Cons: do not beneficiate from the query plan optim but with the massive number of operation in each of our functions that shouldn't be an issue.
<br>
## VI. Optimizing common operations
### `apply()` & `transform()`
https://koalas.readthedocs.io/en/latest/user_guide/typehints.html
>Koalas, by default, infers the schema by taking some top records from the output, in particular, when you use APIs that allow users to apply a function against Koalas DataFrame such as DataFrame.transform(), DataFrame.apply(), DataFrame.koalas.apply_batch(), DataFrame.koalas.apply_batch(), Series.koalas.apply_batch(), etc.
>
> However, this is potentially expensive. If there are several expensive operations such as a shuffle in the upstream of the execution plan, Koalas will end up with executing the Spark job twice, once for schema inference, and once for processing actual data with the schema.
### Setting index
> However, note that it requires to create new default index in case Koalas DataFrame is created from Spark DataFrame.
### Avoid shuffling
https://koalas.readthedocs.io/en/latest/user_guide/best_practices.html#avoid-shuffling
- Common ops with `Exchange` are `sort_values()`, `merge()`
```python
import databricks.koalas as ks
kdf = ks.DataFrame({'id': range(10)}).sort_values(by="id")
kdf.spark.explain()
```
```scala
== Physical Plan ==
*(2) Sort [id#9L ASC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#9L ASC NULLS LAST, 200), true, [id=#18]
+- *(1) Scan ExistingRDD[__index_level_0__#8L,id#9L]
```
## VII. Parallelize functions with Joblib
Can we leverage the cluster to run parallelize tasks like `GridSearchCV` / `RandomSearchCV` or `RandomForest` efficiently?
- We can run scripts on the cluster via `eds-toolbox` and `spark-submit`, however we don't have access to sdout, sderr or returns for now
- We can use `sparkContext.parallelize` to run functions with a max speedup of $n_{executor} \times n_{cores\;per\;executor}$ (by default $4 \times 5 = 20$)
- [joblib-spark](https://github.com/joblib/joblib-spark) relies on `sc.parallelize`, but for reason that are hard to identify, it is super inefficient for complex tasks. See this [benchmark](https://github.com/Vincent-Maladiere/joblib-distributed-benchmark/blob/main/joblib_distributed.ipynb).
- We can use [sparkML](https://spark.apache.org/docs/2.3.4/api/python/pyspark.ml.html) though but it looks a bit weird
---