# Global-Scale Evacuation System
In this lab, we build upon the foundation set by the 'final' application from Lab1. It's crucial to note that while the application was efficient for previous tasks, adjustments may be necessary to handle more extensive datasets, such as planning a planetary evacuation due to rising sea levels.
While our current application processes the dataset of The Netherlands swiftly, clocking in at approximately 1 minute, our goal is to measure its efficiency with progressively larger datasets. We've mapped our benchmarking sequence as follows:
France
North America
Europe
Planet
Our primary metric for evaluation is **Cost**. Specifically, we're interested in the product of **Time** and **Price**. This is inspired and convinced to be a good criteria because of the billing calculator Amazon provided (an example is given below).

**However, it's worth highlighting that this metric will only come into play once our application can successfully process the planetary evacuation plan dataset.** For configurations that meet this criterion, we'll conduct an in-depth evaluation based on the cost metric to determine the optimal configuration.
Before executing the planetary evacuation plan, the priority is to ensure the application and cluster operate robustly and efficiently.
**As a spoiler, we managed to get the planetary evacuation plan to run in 5 minutes with the cost of less than 0.1$, which is very impressive.**
## Usage
To use our application, follow the command structure below:
```
<sea_level> <adequate_filePath> <good_filePath> <excellent_filePath> [orc_filePath] [alos_filePath] [use_precompute] [precompute]
sea_level: Integer. Represents the rise in sea level in meters.
adequate_filePath: Path to the directory or file for storing 'adequate' results.
good_filePath: Path to the directory or file for storing 'good' results.
excellent_filePath: Path to the directory or file for storing 'excellent' results.
orc_filePath (Optional): Path to the ORC file. Default: "netherlands.orc".
alos_filePath (Optional): Path to the directory for ALOS data in Parquet format. Default: "parquet/".
use_precompute (Optional): Boolean. Determines whether to use precomputed data. Accepts "true" or "false".
Default: false.
precompute (Optional): Boolean. Indicates if the application should perform the precomputation of parquet files instead of the evacuation plan. Accepts "true" or "false". Default: false.
```
### Local application
Although it's advised against running this application locally due to extensive dataset sizes, instructions are provided for those who wish to proceed:
1. Clone the repository:
```
// Clone our repository first
cd Desktop
mkdir group_02
git clone {ssh of the latest repository}
cd lab-2-2023-group-02
```
2. Run the SBT Docker Image:
```
// Run the SBT docker image
docker run -it --rm -v "`pwd`":/root sbt sbt
```
3. Execute Within SBT:
```
// Now we are in the SBT
reload
compile
// The cmd below assumes that you have put the .orc and parquet files under the same root directory
~run 0 newdump/adequate/ newdump/good/ newdump/excellent/ netherlands.orc parquet/ false false
```
### AWS application
We've uploaded the FAT .jar file to AWS S3. If you wish to verify its integrity or recreate it, follow these steps:
1. Step: Generating FAT `.jar` file
```
# Clone our repository
cd Desktop
mkdir group_02
git clone {ssh of the latest repository}
cd lab-2-2023-group-02
# Run the SBT docker image
docker run -it --rm -v "`pwd`":/root sbt sbt
# Inside SBT
reload
compile
assembly
exit
# Rename the FAT .jar (ensure no spaces in the name)
mv target/scala-2.12/Lab 1-assembly-1.0.jar Lab_1-assembly-1.0.jar
```
With the FAT `.jar` file generated, upload it to AWS S3. We assume you uploaded it to `s3://group-02`.
- Step: AWS Configuration
Navigate to the AWS EMR, and click create cluster.
Assuming you know how to choose the `Application Bundles` and set up `Firewall`, `Networking`, `Service Role` and so on.

Remove the `Task Node Instance Group`, and choose the configuration as below:

Adjust configurations of core nodes and instance sizes as per requirement (5 (c5.2xlarge) for fastest application with precomputed features or 12/16 (c5.4xlarge) otherwise).

Finally double check the other configurations which are not mentioned here. If everything is set, create the cluster.
- Step: Add Steps
Navigate to the Summary page of the cluster just created.
Navigate to the Steps section, and click on Add Step.
If you want to take advantage of the precomputed features. Follow the figure below:

```
s3://group-02/Lab_1-assembly-1.0.jar
--class "Lab1"
0 s3://group-02/with_precompute/adequate/ s3://group-02/with_precompute/good/ s3://group-02/with_precompute/excellent/ s3://osm-pds/planet/planet-latest.orc s3://group-02/planet_precompute_parquet/ true false
```
If you want want to compute everything from scratch. Follow the figure below:

```
s3://group-02/Lab_1-assembly-1.0.jar
--class "Lab1"
0 s3://group-02/from_scratch/adequate/ s3://group-02/from_scratch/good/ s3://group-02/from_scratch/excellent/ s3://osm-pds/planet/planet-latest.orc s3://abs-tudelft-sbd-2022/ALPSMLC30.parquet/ false false
```
Click "Add Step" and wait for the completion.
## Approach
### Pre-optimisations
**A Crucial Reminder**:
Before diving into the optimizations for this lab, it's essential to understand the foundational improvements made during Lab1. These adjustments were specifically aimed at enhancing performance as the system scales up. For a comprehensive understanding, we strongly recommend revisiting our Lab1 report.
While we will highlight some key scalability improvements below, it's important to note that each iteration builds upon the optimizations from Lab1. These foundational optimizations will not be mentioned again in subsequent sections.
#### Pre-optimisation 1: Strategic Data Caching
For dataframes processed multiple times, caching proves instrumental. This not only trims processing times but also bolsters scalability. By way of example, the places and harbours dataframes are reused extensively, making caching them vital.
In the example below, dataframes places and harbours are used multiple times, as they are used to determine the average elevation of each location and harbour, the population of safety cities/locations to be evacuated and the distance between evacuated locations and safety cities/harbours.
```scala
val places = osm_common_filter
.filter(col("tags").getItem("place").isin("city", "town", "village", "hamlet"))
.filter(col("tags").getItem("population").isNotNull)
.select(
col("id"),
col("tags").getItem("name").as("name"),
col("tags").getItem("place").as("type"),
col("tags").getItem("population").as("population"),
expr("geoToH3(lat, lon)").as("h3_index")
)
places.cache()
val harbours = osm_common_filter
.filter(col("tags").getItem("harbour").isin("yes"))
.select(
lit("Waterworld").as("name"),
lit("harbour").as("type"),
expr("geoToH3(lat, lon)").as("h3_index")
)
harbours.cache()
```
#### Pre-optimisation 2: Leveraging Broadcast Joins
In order to increase scalability, normal join was avoided as it potentially does not provide as much scalability as broadcast join.
With broadcast join, Spark broadcast the smaller dataframe to all executors and the executor keeps it in memory and the larger dataframe is split and distributed across all executors so that Spark can perform a join without shuffling any data from the larger one as the data required for join colocated on every executor.
The performance gain of this technique might not be obvious when running the application on small and restricted devices such as our laptops, however, it is expected to gain huge performance when executing on large distributed servers when there are many executors.
An example of the use of broadcast join in our application is as below. Please note that we avoided using normal join throughout the application, all join action are performed as broadcast join.
In the example below, distance_calculation is the distance of each evacuated location to all possible safety cities, and closest_distance is a small dataframe which only contains the id of each evacuated locations and it corresponding distance to the closest safety city. It is obvious that closest_distance is much smaller than distance_calulation, and hence closest_distance is being broadcast to each executor to perform the join action.
```scala
val evacuation_plan = distance_calulation //join() can cause shuffle
.join(broadcast(closest_distance), closest_distance("evacuation_id") === distance_calulation("evacuation_id") && closest_distance("min_distance_to_safety_city") === distance_calulation("distance"), "right") // broadcast join on the smaller dataframe
.drop(distance_calulation("evacuation_id"))
.drop(col("distance"))
```
### Iteration 0: Dataset France
#### Baseline
| | |
|---|---|
| System | Lenovo Y9000P (16GB RAM, i7-11800H, 8 Cores)|
| Workers | 8 |
| Dataset | France |
| Run time <br>(hh:mm:ss) | 00:05:38 |
| Output | Adequate, Good & Excellent |
Due to the lack of information that could be obtained locally. To further our analysis, we leveraged the Spark History Server, our primary local performance analysis tool. Subsequently, we executed the same `France` dataset on AWS, adhering to the suggested settings. This involved employing `1 master node (c5.xlarge)` and `5 core nodes (c5.xlarge)`.
| | |
|---|---|
| System | AWS: c5.xlarge (1 Primary Node, 5 Core Node)|
| Workers | 12 (24 vCore*) |
| Dataset | France |
| Run time <br>(hh:mm:ss) | 00:06:12 |
| Price | 6 * 0.087 $/hour |
| Cost | 0.054 $|
| Output | Adequate, Good & Excellent |
(vCore: Each vCore is a thread of a CPU core, a simple way to think about this is that an AWS vCPU is equal to half a physical core.)
The Ganglia profiles are depicted below:

Further, in both figures below, insights from the Spark History Server reveal that the upper rightmost figure represents the driver, while the other figures signify executors. Notably, the upper leftmost figure isn't recognized by the Server, and as such, its performance isn't considered.


A deep dive into the Ganglia profiles indicates a dual-intensive nature of the application: both computationally and memory-wise. Specifically, the executors push both CPU and memory to their limits. Consequently, to enhance performance, the core nodes are substituted with alternatives that boast greater processing power and memory capacity.
#### Cluster Improvement 1: Enhancing CPU and Memory (c5.xlarge -> c5.2xlarge)
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 5 Core c5.2xlarge|
| Workers | 22 (44 vCore) |
| Dataset | France |
| Run time <br>(hh:mm:ss) | 00:03:32 |
| Price | 0.087 + 5 * 0.172 $/hour |
| Cost | 0.055 $|
| Output | Adequate, Good & Excellent |
The Ganglia profiles are illustrated below:

Further annotations from the Spark History Server specify that in both figures below, the second from the top-left represents the driver, while all other figures are executors. The figure at the bottom left isn't recognized by the Server, so its performance isn't incorporated in our evaluation.


Drawing insights from the Ganglia profiles, it's evident that both CPU and memory no longer reach their maximum capacities. This has culminated in a considerable acceleration in the application's performance. Meanwhile, this improvement barely increased the cost.
### Iteration 1: Dataset North America
Before executing our application on the North America dataset, a crucial adjustment was made. Given the exponential growth in dataset sizes for North America and subsequent regions, we transitioned to using the provided ORC and Parquet files from AWS S3.
However, it's noteworthy that the Parquet files encompass data for the entire planet. To optimize performance and relevance, we instituted a filtering mechanism to extract only the relevant latitude and longitude data corresponding to our area of interest, in this case, North America.

#### Application Improvement 1: Extracting Area of Interest
The application improvement focuses on efficiently loading and processing data from Parquet files. By leveraging the inherent statistics stored within Parquet, we can significantly reduce the time and resources needed to extract a specific geographical range.
When extracting specific geographical ranges from a dataset, these statistics can be leveraged to reduce the amount of data loaded into memory. Specifically, by using these statistics, we can instruct Apache Spark to only load row groups that fall within our specified latitude and longitude range. According to the manual, this would save significant amount of time.
The provided Scala code to achieve this is divided into two parts. The first part involves loading the .orc file and filtering for the columns of interest (id, tags, lat, lon). After applying the necessary filters, we then determine the minimum and maximum latitude and longitude boundaries for the area of interest.
The second part is where the magic happens. Here, the native Parquet reader is activated. This reader is optimized for performance and is a marked improvement over the older Hive-based one. With the native reader activated, we then read the Parquet file, but instead of loading everything, we only load rows that fall within the latitude and longitude range determined earlier.
```scala
// First part
val osm_common_filter = spark.read.orc(orc_filePath) // apply common filters to the whole .orc file to extract information we care about
.select("id", "tags", "lat", "lon")
.filter(col("tags").getItem("place").isNotNull || col("tags").getItem("harbour").isNotNull)
.filter(col("lat").isNotNull)
.filter(col("lon").isNotNull)
.filter(col("id").isNotNull)
osm_common_filter.cache()
// Aggregate to find min and max of lat and lon
val get_area_range = osm_common_filter.agg(
max("lat").as("lat_max"),
min("lat").as("lat_min"),
max("lon").as("lon_max"),
min("lon").as("lon_min")
)
// Collect the aggregated results
val results = get_area_range.collect()(0)
// Extract values as Double
val lat_max = results.getAs[BigDecimal]("lat_max").doubleValue()
val lat_min = results.getAs[BigDecimal]("lat_min").doubleValue()
val lon_max = results.getAs[BigDecimal]("lon_max").doubleValue()
val lon_min = results.getAs[BigDecimal]("lon_min").doubleValue()
// Second part
spark.conf.set("spark.sql.parquet.useDataSourceApi", "true")
// Read the parquet and process the coordinate information & cache
val alos_df = spark.read.parquet(alos_filePath)
val alos_df_filtered = alos_df
.filter(col("lat").between(lat_min, lat_max))
.filter(col("lon").between(lon_min, lon_max))
```
#### Application Improvement 2: Robustness Against Corrupted Parquet Files
During the execution of the first application improvement on our cluster, we discovered that some parquet files were corrupted, while others hadn't downloaded correctly. Such issues caused the application to terminate prematurely.
We addressed this by modifying our Spark configurations. The configurations were set to ignore any corrupted files and skip over missing ones. Here's the revised implementation:
```scala
def calculate_avg_elevation_places_harbours_from_parquet(spark: SparkSession, alos_filePath: String, places: DataFrame, harbours: DataFrame, lat_max: Double, lat_min: Double, lon_max: Double, lon_min: Double): (DataFrame, DataFrame) = {
spark.conf.set("spark.sql.parquet.useDataSourceApi", "true")
// The configuration in the line below ignores corrupted files
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
// The configuration in the line below ignores files failed to download
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")
// Read the parquet and process the coordinate information & cache
val alos_df = spark.read.parquet(alos_filePath)
val alos_df_filtered = alos_df
.filter(col("lat").between(lat_min, lat_max))
.filter(col("lon").between(lon_min, lon_max))
.....
```
#### Application Improvement 3: Robustness Against Uncomputable H3 Distances
During the application's run, we encountered instances where calculating the distance between certain H3 indices led to failures. This was evident from the DistanceUndefinedException being thrown.
```bash
... (error message truncated for brevity)
Caused by: com.uber.h3core.exceptions.DistanceUndefinedException: Distance not defined between the two indexes.
```
We fortified our distance computation logic by embedding it within a try-catch construct. This ensures that if a distance calculation between two indices is unfeasible, we'll assign a distinctive value, `-1`, as the result. As part of our data processing, rows with a distance value of `-1` are subsequently filtered out, ensuring their non-inclusion in any downstream operations. Additionally, a similar approach was also employed for converting geographical coordinates to H3 indices to further enhance our application's robustness:
```scala
object H3Utils {
private val h3 = H3Core.newInstance()
private val h3_index_resolution = 9
def geoToH3(lat: Double, lon: Double): Long = {
try {
h3.geoToH3(lat, lon, h3_index_resolution)
} catch {
case _: Throwable => -1L
}
}
def h3Distance(origin: Long, dest: Long): Int = {
try {
h3.h3Distance(origin, dest)
} catch {
case _: Throwable => -1
}
}
}
```
#### Baseline
Building upon the configuration set for the France dataset, we derived the following performance metrics:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 5 Core c5.2xlarge|
| Workers | 22 (44 vCore) |
| Dataset | North America |
| Run time <br>(hh:mm:ss) | 01:50:00 |
| Price | 0.087 + 5 * 0.172 $/hour |
| Cost | 1.74$|
| Output | Adequate, Good & Excellent |
The noticeable increase in runtime stems from a methodological shift. Instead of relying on specialized parquet files tailored for specific regions, we now extract data points of interest directly from a comprehensive parquet folder which contains the whole planet's information.
The Ganglia profiles below provide a visual breakdown:



Insights from the Ganglia profiles above and the Spark Server History (Not presented due to sudden unexpected break down of the ssh connection):
- The application’s behavior underscores its computational and memory demands.
- Data from the Spark History Server indicates that the majority of the runtime is expended on reading global parquet files and distilling data for regions of interest.
- Despite Spark’s significant optimizations for reading parquet files, this task remains a primary time-consuming operation.
We continue increasing the number of core nodes to increase computation parallelism for the purpose of speeding the application up with minimal cost.
#### Cluster Improvement 1: Enhancing CPU
Increasing the number of core nodes from 5 to 10, we get the table of performance below:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 10 Core c5.2xlarge|
| Workers | 42 (84 vCore) |
| Dataset | North America |
| Run time <br>(hh:mm:ss) | 00:56:29 |
| Price | 0.071 + 10 * 0.181 $/hour |
| Cost | 1.77$ |
| Output | Adequate, Good & Excellent |
The Ganglia profiles are as below:

As indicated in the Spark History Server, the right most plot in the second row of both figures below is the driver node, and the rest are the executor nodes (except the second to the last plot in both figures, it is not shown in the Spark History Server).


From the profiles, it's evident that our application remains both memory and computation-intensive.
An interesting observation is that as we allocate more resources, the cost doesn't necessarily increase proportionally. It would be intriguing to determine the precise threshold where the balance between resources and cost becomes suboptimal.
#### Cluster Improvement 2: Enhancing CPU Further
We further increase the number of core nodes to 15, and the result is shown as a table below:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 15 Core c5.2xlarge|
| Workers | 62 (124 vCore) |
| Dataset | North America |
| Run time <br>(hh:mm:ss) | 00:54:58 |
| Price | 0.071 + 15 * 0.181 $/hour |
| Cost | 2.55$ |
| Output | Adequate, Good & Excellent |
Below is the Ganglia utilization profile:

Interpreting the Spark History Server: In both figures below, the second to the last plot in the first row represents the driver node. All other plots denote executor nodes. However, it's noteworthy that one node (first in the second row) has unexpectedly ceased functioning. Surprisingly, not all nodes are fully utilized.


A detailed view from the Spark History Server reveals high IO-related input to each executor node, with an average data read of over 7GB per node.

Considering the profiles and data insights, below are directions further improvements should be heading towards:
- IO Speed Boost: There's a necessity to elevate IO speed, aiming to minimize data read durations.
- Refined Node Configuration: While maintaining the same node count, it would be beneficial to equip each node with a more powerful processor.
#### CLuster Improvement 3: Enhancing IO Throughput
While reviewing system configurations, we unexpectedly identified a potential bottleneck – the default settings for IOPS and IO throughput. Initially, both were set to the minimum values of 3,000 for IOPS and 125MiB/sec per volume for throughput, respectively.

To harness improved performance, we adjusted these settings:
- IOPS: Remain 3000 (Frequently IO operations not required)
- Throughput Speed: Boosted to 750MiB/sec per volume (Loading huge datasets qucikly is required)
Subsequently, we executed the application with consistent node configurations. The resulting performance metrics are detailed below:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 10 Core c5.2xlarge, Thoughput 750MiB/sec|
| Workers | 42 (84 vCore) |
| Dataset | North America |
| Run time <br>(hh:mm:ss) | 01:04:00 |
| Price | 0.071 + 10 * 0.181 $/hour |
| Cost | 2.01$ |
| Output | Adequate, Good & Excellent |
Despite our efforts to enhance IO throughput, the application's performance didn't improve as anticipated. Thus, for the North America dataset, the optimal configuration remains: `1 Primary c5.xlarge and 10 Core c5.2xlarge` without any throughput enhancement.
### Iteration 2: Dataset Europe
#### Baseline
Leveraging the best configuration optimized for the North America dataset, we assessed its efficacy on the Europe dataset. The results were as follows:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 10 Core c5.2xlarge|
| Workers | 42 (84 vCore) |
| Dataset | Europe |
| Run time <br>(hh:mm:ss) | 00:39:44 |
| Price | 0.071 + 10 * 0.181 $/hour |
| Cost | 1.25$ |
| Output | Adequate, Good & Excellent |
The profiles from Ganglia are as below:


Given the high utilization of the current cluster with the Europe dataset, there's a consideration to augment the core node count. Especially since the Europe dataset is more extensive than its North American counterpart, there's potential for better job distribution across more nodes. It is also shown clearly in the profiles above that the memory is far from being a limit for the application.
Interestingly, processing the Europe dataset was markedly faster than handling North America. This disparity is attributed to North America's broader latitude and longitude coverage compared to Europe. However, Europe, with its dense concentration of cities and towns, provides richer data in the OpenStreetMap and a heftier `.orc` file. Notably, the process of reading the parquet files and subsequently computing the average elevation for each location was the most time-intensive.
Below, you'll find a side-by-side comparison of the latitude and longitude ranges for Europe and North America. This visual representation provides insights into the geographic spread of both continents.
<figure style="display: flex; justify-content: space-between;">
<img src="https://hackmd.io/_uploads/ry6x-Hi-T.png" alt="Europe Latitude and Longitude Range" style="width: 45%; margin-right: 5%;">
<img src="https://hackmd.io/_uploads/SkjW-BjZ6.png" alt="North America Latitude and Longitude Range" style="width: 45%;">
</figure>
**This observation suggests a pivotal enhancement for the planetary evacuation plan: pre-compute the average elevation of each location. Given the geological stability of Earth's topography over centuries, this pre-processing is both logical and efficient.**
#### Cluster Improvement 1: Enhancing CPU
We further increase the number of core nodes to 15, and the result is shown as a table below:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 15 Core c5.2xlarge|
| Workers | 62 (124 vCore) |
| Dataset | Europe |
| Run time <br>(hh:mm:ss) | 00:24:14 |
| Price | 0.071 + 15 * 0.181 $/hour |
| Cost | 1.13$ |
| Output | Adequate, Good & Excellent |
The Ganglia profiles are as shown below:

In both figures below, the second plot in the third row is the driver note which is indicated by the Spark History Server.


By increasing the executor nodes, the application gained a significant speedup. Meanwhile, the overall cost is lowered. As indicated in the profiles above, all resources are well utlized as well. Hence we decided to further enhance the executor nodes.
#### Cluster Improvement 2: Enhancing CPU Further
We further increase the number of core nodes to 30, and the result is shown as a table below:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 30 Core c5.2xlarge|
| Workers | 122 (244 vCore) |
| Dataset | Europe |
| Run time <br>(hh:mm:ss) | 00:14:20 |
| Price | 0.071 + 30 * 0.181 $/hour |
| Cost | 1.31$ |
| Output | Adequate, Good & Excellent |
The profiles in Ganglia are shown below:


A dive into the Ganglia profiles suggests robust resource utilization with the introduction of 30 executor nodes. Two nodes remained idle throughout the execution. We opted not to showcase memory statistics this time since memory constraints are no longer a bottleneck for the application.
Weighing the cost-performance ratio, even though there's a 16% cost increment, the application performance leapfrogged by an impressive 42%. Given its robust handling of intense computation and memory demands, we've chosen this configuration as the optimum setup for the planetary evacuation plan.
### Iteration 2: Dataset Planet
#### Baseline
Seeking further performance enhancements, we escalated the core nodes count to 30, which is the same configuration as mentioned in the last section. The outcome is detailed in the table below:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 30 Core c5.2xlarge|
| Workers | 122 (244 vCore) |
| Dataset | Planet |
| Run time <br>(hh:mm:ss) | 01:22:00 |
| Price | 0.071 + 30 * 0.181 $/hour |
| Cost | 7.52$ |
| Output | Adequate, Good & Excellent |
The Ganglia profiles are as shown below:


The profiles indicate high resource utilization, though not all cores are maximized.
We revisited the configuration, reducing to 15 core nodes, with the following outcomes:
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 15 Core c5.2xlarge|
| Workers | 62 (124 vCore) |
| Dataset | Planet |
| Run time <br>(hh:mm:ss) | 02:15:00 |
| Price | 0.071 + 15 * 0.181 $/hour |
| Cost | 6.27$ |
| Output | Adequate, Good & Excellent |


Although there's a slight decrease in speed, each core is optimally utilized. The lower operational cost justifies adopting this configuration as our base for subsequent improvements.
#### Application Improvement 1: Planetary Elevation Precomputation in Parquet Files
Reflecting upon earlier observations, we highlighted a significant enhancement strategy for the planetary evacuation plan: pre-computation of the average elevation for each location. Considering Earth's topography exhibits stable geological patterns over extended periods, this pre-processing step appears both prudent and beneficial.
Implementing this approach provides several advantages:
- Efficiency: Substantially minimizes the reliance on H3 functions.
- Reduced Shuffling: Eliminates specific grouping operations, especially ones leading to considerable shuffles. For instance, grouping by H3 index for average elevation calculations becomes redundant.
- Versatility: The pre-computation can be scaled to accommodate all H3 resolutions, reinforcing the belief that Earth's topographic characteristics remain consistent over centuries.
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 15 Core c5.2xlarge|
| Workers | 62 (124 vCore) |
| Dataset | Planet |
| Run time <br>(hh:mm:ss) | 00:02:59 |
| Price | 0.071 + 15 * 0.181 $/hour |
| Cost | 0.139$ |
| Output | Adequate, Good & Excellent |
The Ganglia profiles are as below:


**The results are remarkable! Computation times have been reduced to just a matter of minutes.**
For subsequent improvements, we'll fine-tune the cluster settings to achieve an optimal configuration, with a particular focus on cost-efficiency.
#### Cluster Improvement 1: Cost-Efficient Optimization Through Core Node Variation (With Precomputation)
We embarked on a series of tests, adjusting the number of c5.2xlarge core nodes to analyze their impact on cost. Specifically, we tested with core node counts of 4, 5, 10, 15, 20, and 25. It's noteworthy to mention that when only using 1, 2, or 3 core nodes, the application failed due to the inability of the broadcast join to operate correctly.
Below is the Cost vs. Core Node Count graph:

Drawing conclusions from the graph, the optimal configuration for cost efficiency is achieved when deploying 5 c5.2xlarge core nodes. This observation holds especially true for applications utilizing precomputed parquet files for planetary elevation data.
#### Cluster Improvement 2: Maximizing Core Node Utilization (Without Precomputation)
<!-- Observing underutilization in our previous configurations, we trimmed the core node count from 30 down to 24. This adjustment was informed by the realization that six of the core nodes were largely underused.
| | |
|---|---|
| System | AWS: 1 Primary c5.xlarge, 24 Core c5.2xlarge|
| Workers | 88 (196 vCore) |
| Dataset | Planet |
| Run time <br>(hh:mm:ss) | 01:32:00 |
| Price | 0.071 + 24 * 0.181 $/hour |
| Cost | 6.76$|
| Output | Adequate, Good & Excellent |
Despite the reduction in core nodes, the cost remains higher than configurations with only 15 nodes. Even though there's a trade-off in speed, the configuration presents a more cost-effective choice given the acceptable processing time. -->
Similar to the cluster space exploration done with the precomputation, we conducted a space exploration on the cluster configurations. This basically includes using different kinds of core nodes with a similar total amount of CPUs. The graph is as shown below:
| | | | | | | | |
|---|---|---|---|---|---|---|---|
| System | 24 c5.2xlarge | 12 c5.4xlarge | 12 c5.9xlarge | 14 c5.4xlarge | 10 c5.9xlarge | 16 c5.4xlarge | 8 c5.9xlarge
| Workers | 88, (196 vCore )| 88, (196 vCore) | 218, (436 vCore) | 114, (228 vCore) | 182, (364 vCore) | 130, (260 vCore) | 146, (292 vCore)
| Run time <br>(hh:mm:ss) | 01:32:00 | 01:16:00 | 00:37:16 | 01:14:00 | 00:44:19 | 00:57:00 | 00:53:38 |
| Cost | 6.76$| 4.12$ | 4.58$ | 4.79$ | 4.53$ | 4.09$ | 4.42$ |

The graph above suggests that with a similar total core count, nodes with more cores tend to enhance performance. Our optimal configuration, when not leveraging precomputed features, consists of:
- 1 Primary Node using c5.xlarge
- 12/16 Core Nodes, each using c5.4xlarge.
The total cost to compute the planetary evacuation plan with this setup is 4.12/4.09$.
## Summary of application-level improvements
1. Data Caching (Pre-optimisation from Lab 1):
Caching is employed as a key strategy to enhance the efficiency of dataframes processed multiple times. One notable instance is the caching of the `places` and `harbours` dataframes, which are frequently reused in operations. These dataframes, for example, help determine the average elevation of various locations and harbours, the population size of safety cities designated for evacuations, and the distances between evacuated locations and their corresponding safety cities or harbours.
2. Leveraging Broadcast Joins (Pre-optimisation from Lab 1):
Broadcast joins present an optimized solution over normal joins to enhance scalability in Spark. In this method, the smaller dataframe is broadcast to every executor, where it's kept in memory. Meanwhile, the larger dataframe is partitioned and dispersed across all executors. This design allows Spark to execute a join without needing to shuffle the larger dataframe, as the necessary join data resides on every executor.
Within our application, an exemplary use of this is seen in joining `distance_calculation` and `closest_distance`. Here, `closest_distance` — a notably smaller dataframe containing IDs of evacuated locations and their nearest safety city distances — is broadcast to each executor for the join, underscoring our consistent preference for broadcast joins over normal ones.
3. Extracting Area of Interest (For datasets besides Planet):
We have data stored in Parquet files, and sometimes we just want a piece of that data, specifically from certain areas. Using the stats in Parquet, we can pull just the parts we want without reading through everything. When we're after specific areas on the map, we can tell Spark to only look at data within certain latitude and longitude boundaries. This method is way quicker.
In the Scala code we use, first we open the .orc file and look at the main columns: id, tags, lat, and lon. We figure out the boundaries for the area we're interested in. Then we use this efficient Parquet reader to grab only the rows that fit within those boundaries. So instead of going through all the data, we only check out what we need.
4. Robustness Against Corrupted Parquet Files:
We discovered that some parquet files were corrupted, while others hadn't downloaded correctly. Such issues caused the application to terminate prematurely. We addressed this by modifying our Spark configurations which are set to ignore any corrupted files and skip over missing ones.
5. Robustness Against Uncomputable H3 Distances:
We encountered instances where calculating the distance between certain H3 indices led to failures.
We made a try-catch for the error. If the application runs into a such error, it just gives it a placeholder value of `-1`. Later, we just kick out any data with this `-1` value so it doesn't disturb our final results.
6. Planetary Elevation Precomputation in Parquet Files:
We identified a key enhancement for our planetary evacuation. As Earth's topographic characteristics remain consistent over centuries, we precompute the average elevation of each H3 index location in advance. This could be done for all possible required H3 resolution. This improvement has lead to exponentially significant speedup, which reduced the runtime from hours to minutes. Moreover, this precomputation step is logical as it aligns with our understanding of Earth's consistent topographical patterns.
## Summary of cluster-level improvements
**Kindly note that improvements at the cluster-level differ from those at the application-level. Each iteration involves multiple enhancements. For a comprehensive understanding of the logic behind each modification, we strongly encourage a detailed review of every iteration.**
1. Enhancing CPU of the Clusters:
We switched our core node CPU from c5.xlarge to c5.2xlarge and adjusted the number of core nodes. Using Ganglia and Spark History Server profiles, we assessed each setup's performance. By focusing on cost-effectiveness as our main criteria, we determined the optimal configuration. Consequently, we managed to execute the planetary evacuation plan for under $0.1.
2. Enhancing Memory of the Clusters:
We experimented with IOPS settings, volume quantities, and node throughput speeds. While these changes didn't ultimately enhance performance, it was worth exploring. Interestingly, some overly ambitious configurations that appeared promising actually resulted in performance drop.
3. Changing the type of nodes:
While not highlighted in every section, we ventured into using a range of node types, each tailored for specific tasks. For instance, we tested M family nodes for batch tasks, C family for parallel workloads, R family for memory-heavy analyses, and D & I families for IO-centric jobs. Surprisingly, the performance outcomes were quite consistent across the board. A potential explanation, as hinted at in AWS's documentation, is that these node types are optimal for processes spanning over 50 core nodes, a threshold we didn't approach.
## Conclusion
Our exploration into optimizing the cluster revealed the configurations below that stood out, depending on the usage of precomputation.
1. With Precomputation:
The use of precomputed parquet files for planetary elevation data significantly influences the cost-effectiveness. From the series of tests conducted with varying core node counts, the most cost-efficient setup involves deploying 5 c5.2xlarge core nodes. The Cost vs. Core Node Count graph clearly indicated this optimal point, emphasizing the advantage of integrating precomputation in the application.
The best configuration as shown below is:
- 1 Primary c5.xlarge, 5 Core c5.2xlarge, 00:05:22, 0.087$

2. Without Precomputation:
Without relying on the precomputed data, we have two configurations which runs the application in a reasonable amount of time with a reasonable cost:
- 1 Primary c5.xlarge, 12 Core c5.4xlarge, 01:16:00, 4.12$
- 1 Primary c5.xlarge, 16 Core c5.4xlarge, 00:57:00, 4.09$
While all configurations have their merits, the stark cost and efficiency benefits of integrating precomputation in the application become evident.
**You can find all the results under two folders in `s3://group-02`: `planet_results` and `useless_results`. The first folder contains all the output of different configurations of the planetary versions as indicated by each their corresponding name. The second folder contains all the other results besides the planetary ones.**
In this lab which the task is to refine both application and cluster-level configurations, two key insights stood out:
1. Prioritizing Application Robustness:
As we navigated through our planetary evacuation plan, ensuring the robustness of the application was paramount. We encountered challenges like corrupted parquet files and errors during distance calculations. Adopting a proactive approach, we integrated mechanisms to bypass corrupted or undownloaded files and implemented try-catch blocks to handle unforeseen errors during calculations. These measures not only ensured smoother application runs but also minimized disruptions due to unexpected data anomalies or calculation challenges.
2. Balancing Speed with Cost:
Throughout our optimization efforts, a recurring theme was the balance between speed and cost. While a rapid execution might be appealing, it often came with substantial cost implications. For instance, ambitious memory configurations and CPU enhancements sometimes led to only marginal performance gains but at a steeper price. We learned that blindly chasing speed isn't always the best strategy. Instead, a more holistic approach, where we weigh the benefits of faster processing against its associated costs, proved to be more beneficial in the long run. Seeking this balance was not only cost-effective but also aligned with our overarching goal of efficient and economical planetary evacuation.
3. Benefit from less nodes but each with a more powerful CPU:
Our observations indicate that with a roughly equal core count, performance enhancements are more pronounced when individual nodes possess more cores. Essentially, a stronger individual core processing power can mitigate the longest computation time experienced by the slowest node. We surmise that for computation-intensive applications, the limiting factor isn't the task distribution, but rather the extended computation time of the slowest node. By reducing the number of nodes while outfitting each with a robust CPU, we achieved significant cost savings without compromising on speed.
In conclusion, our experience underscored the importance of designing resilient applications. It also highlights the imperative of balancing between speed and cost to optimize performance.