# Netherlands Evacuation Planning
## Usage
You can follow the guide below step by step (code by code) to run our application. The meaning of each command is explained when necessary. Note that we assume you are using Ubuntu 22.04, as that is recommended in the manual to be the best linux distro for this assignment.
### 1. Clone our repository.
Launch a new terminal and navigate to the Desktop directory:
```
ctrl + alt + t
cd Desktop/
```
Git clone our repository and access the corresponding directory:
```
git clone git@github.com:abs-tudelft-sbd/lab-1-2023-group-02.git
cd lab-1-2023-group-02/
```
### 2. Prepare .orc and parquet files
We assume that the .orc the parquet files are already generated and is ready for use.
**Please move these files to the content root directory of our application.**
The structrue of our app directory should be looking as below:
(Note that you do not have to follow this strictly, however, placing the files in the content root directory make the file path of data to be used shorter, and hence makes our run cmd shorter)
```
- π parquet
- π netherlands.orc
- π .bsp
- π .github
- π project
- π src
- π main
- π scala
- π Lab1.scala
- π .gitignore
- π README.md
- π .scalafmt.conf
- π Dockerfile
- π build.sbt
- π scalastyle-config.xml
```
### 3. Ways to excecute the application
We have provided three ways to run our application. Please feel free to use any of them, they should all result in a similar performance with the exact same output.
In all three ways, the command to run consists of 6 parts, and we will first explain them to you in detail:
```
To run the program, use the following command:
run <sea_level> <adequate_filePath> <good_filePath> <excellent_filePath> [orc_filePath] [alos_filePath]
## Command Parameters
- `<sea_level>`: (Required) The sea level rise in meters. Example: `0`
- `<adequate_filePath>`: (Required) The file path for the 'adequate' output files.
- `<good_filePath>`: (Required) The file path for the 'good' output files.
- `<excellent_filePath>`: (Required) The file path for the 'excellent' output files.
- `[orc_filePath]`: (Optional) The file path for the .orc file.
- Default: `netherlands.orc`
- `[alos_filePath]`: (Optional) The file path for the parquet files.
- Default: `parquet/`
```
#### 3.1 Run with sbt
Your terminal should be at the content root directory of our application right now, assuming that you have all the docker images the manual requires.
Lanuch sbt using:
```
docker run -it --rm -v "`pwd`":/root sbt sbt
```
Then in the sbt terminal, do the followings in three seperate steps.
```
reload
compile
# exit the sbt
exit
~run 0 newdump/adequate/ newdump/good/ newdump/excellent/ netherlands.orc parquet/
```
#### 3.2 Run with spark-submit with thin .jar file
Your terminal should be at the content root directory of our application right now, assuming that you have all the docker images the manual requires.
Lanuch sbt using:
```
docker run -it --rm -v "`pwd`":/root sbt sbt
```
Then in the sbt terminal, do the followings in three seperate steps.
```
reload
compile
package
# exit the sbt
exit
```
Then submit the thin .jar with uber h3 dependency specified:
```
docker run -it --rm -v "`pwd`":/io -v "`pwd`"/spark-events:/spark-events spark-submit --packages com.uber:h3:3.7.0 target/scala-2.12/lab-1_2.12-1.0.jar 0 newdump/adequate/ newdump/good/ newdump/excellent/ netherlands.orc parquet/
```
#### 3.3 Run with spark-submit with fat .jar file
Your terminal should be at the content root directory of our application right now, assuming that you have all the docker images the manual requires.
Lanuch sbt using:
```
docker run -it --rm -v "`pwd`":/root sbt sbt
```
Then in the sbt terminal, do the followings in three seperate steps.
```
reload
compile
# assembly command requires additional code in the build.sbt file (already added)
assembly
# exit the sbt
exit
```
Then submit the fat .jar with app class specified:
```
docker run -it --rm -v "`pwd`":/io -v "`pwd`"/spark-events:/spark-events spark-submit --class "Lab1" "target/scala-2.12/Lab 1-assembly-1.0.jar" 0 newdump/adequate/ newdump/good/ newdump/excellent/ netherlands.orc parquet/
```
## Functional overview
### Step 1: Process and check user input
* The application processes the user's input and records the specified file paths for later output.
* If the provided input does not meet the required criteria, the application terminates, displaying a user-friendly message indicating the erroneous input.
* We've designed the app to offer users flexibility in choosing their preferred locations for data input and output.
### Step 2: Register UDF (User Defined Functions)
* Two functions sourced from uber.h3 are registered:
* geoToH3: Calculates the h3 index from coordinates.
* h3Distance: Computes the distance between two h3 indices.
* The adoption of UDF serves two primary purposes:
* The functions h3.geoToH3 and h3.h3Distance are non-serializable.
* It enhances code clarity and readability.
```scala
def registerUDFs(spark: SparkSession): Unit = {
// Register UDFs
val h3_index_resolution = 9
val geoToH3UDF = udf((lat: Double, lon: Double) => {
val h3 = H3Core.newInstance()
h3.geoToH3(lat, lon, h3_index_resolution): Long
})
val h3DistanceUDF = udf((origin: Long, dest: Long) => {
val h3 = H3Core.newInstance()
h3.h3Distance(origin, dest)
})
spark.udf.register("geoToH3", geoToH3UDF)
spark.udf.register("h3Distance", h3DistanceUDF)
}
```
### Step 3: Process netherlands.orc file to obtain information of interest
* We selectively read from the netherlands.orc file, focusing solely on pertinent information. To streamline this, filters are applied to identify common elements between places and harbours. This selective filtering aids in caching a minimal dataset, especially since the extraction of information for places and harbours necessitates reading the file twice.
* **To ensure that answers are as accurate as possible, we filter out places with no ID. As IDs are the one and only unique indentification for each location. Moreover, we also filter out locations without latitude and longitude specified. As the h3 index is another critical factor of this application.**
```scala
// read the .orc file and preprocess, make it small enough to cache
val osm_common_filter = spark.read.orc(orc_filePath) // apply common filters
.select("id", "tags", "lat", "lon")
.filter(col("tags").getItem("place").isNotNull || col("tags").getItem("harbour").isNotNull)
.filter(col("lat").isNotNull)
.filter(col("lon").isNotNull)
.filter(col("id").isNotNull)
```
* Further, we employ specialized filters to isolate details about places and harbours. During this process, the h3 index for both entities is computed (with geoToH3UDF), ensuring the dataframe remains compact.
### Step 4: Process parquet files to obtain average elevation of places of interest
* We start by loading the data from the parquet files, subsequently determining the h3 index for each given coordinate.
* By grouping the dataset by the "h3 index", we employ the aggregate function to compute the mean elevation for each index. Essentially, this translates to acquiring the average elevation for every distinct location across the Netherlands.
* Moreover, we associate this average elevation with the "places" and "harbours" dataframes derived in Step 3. This yields the average elevation for all recognized places and harbours.
* It's pertinent to mention that we've employed the broadcast join technique. This decision was informed by the fact that the datasets for places and harbours are considerably more compact compared to the comprehensive dataset capturing elevation details for every specific location in the Netherlands. This technique potentially results in a better performance as the more compact dataframes are copied and sent to each node for parallelism.
### Step 5: Obtain places to be evacuated (Adequate)
* Leveraging the previously constructed "places" dataframe, which provides the average elevation for each location, we introduce a filter. This filter isolates locations with elevations below the sea level specified by the user.
* Subsequent to filtering, we extract the requisite data columns and persist them to the designated directory, as determined by the user's input.
### Step 6: Obtain destinations of evacuation (Good)
* Drawing from the places with average elevation established in Step 3, we employ filters to:
* Isolate locations classified as "city."
* From this subset, further distill cities that sit above the current sea level. These cities serve as potential evacuation destinations.
### Step 7: Determine the closest destination of each place to be evacuated (Good)
* A cross join is executed between the locations identified for evacuation in Step 5 and the potential destinations pinpointed in Step 6. This forms a N x M dataframe, where N and M represent the respective entry counts of the two joined dataframes.
* An additional "distance" column is instituted to determine the distance between each evacuation point and its potential destination, using the h3DistanceUDF. Note that subsequent to calculating the distance, we eliminate columns containing redundant or obsolete information.
* **To make results as accurate as possible, during the minimum distance derivation process for each evacuation site to its destination, the dataframe is grouped by evacuation site IDs. Aggregating with the min() function ensures only entries corresponding to the shortest distance are retained.**
* In the last step to achieve the good requirements, we again leveraged the broadcast join technique. Merging the comprehensive distance matrix with the minimal distance dataframe gives a holistic view, linking each evacuation site to its closest safe destination. **To ensure accuracy of the result, joining was executed based on place IDs rather than names together with the distance to potential destinations. This potentially avoids mismatch in identically named cities.**
### Step 8: Obtain harbours of evacuation & closest harbour of each place to be evacuated (Excellent)
* Essentially, this phase mirrors Steps 6 and 7. However, the focus shifts from identifying the nearest city (destination) to pinpointing the closest harbour, forming the evacuation plan towards these harbours. Moreover, we do not consider whether a harbour is under sea level or not, as it is an indication of evacuating by boat to the open water.
* **To enhance accuracy, it's paramount that each evacuation site links to a maximum of just one harbour. Harbours, in this context, are not viewed as independent entities but as a collective unit. For example, if two cities possess harbours nearer than a secure city, a portion of their inhabitants would be directed towards a "waterworld" as opposed to a specific harbour.**
### Step 9: Determine the places evacuated to harbours and their new number of evacuees (Excellent)
* Having defined the harbour-oriented evacuation strategy, we employ the broadcast join technique once more. This involves merging the harbour-focused evacuation layout, formulated above, with the closest destination evacuation plan from Step 7. Both dataframes are paired based on evacuation site IDs.
* Furthermore, we integrate three new columns: "to_harbour?", "num_evacuees_to_water", and "num_evacuees". The logic:
* "to_harbour?" marks 'true' if the evacuation site is closer to a harbour than a safe city.
* "num_evacuees_to_water" takes either ceil(num_evacuees * 0.25) or 0 based on "to_harbour?".
* "num_evacuees" adjusts with floor(num_evacuees * 0.75) or remains consistent, contingent on "to_harbour?".
* Note: Using one ceil() and one floor() ensures the population's total remains consistent after multiplication. Using two round() functions could yield an inconsistent sum. E.g.:
```scala
622 * 0.25 = 155.5, ceil(155.5) = 156
622 * 0.75 = 466.5, floor(466.5) = 466
156 + 466 = 622
# two round()/ceil()/floor() function would lead to a different sum
```
### Step 10: Calculate total evacuees to waterworld & format structures for output (Excellent)
* We generate two output structures: "final destination plan" and "final population plan", each elucidating distinct features.
* Initially, we calculate evacuees heading to Waterworld by summing "num_evacuees_to_water" from the Step 9 dataframe, and save it when formating final population plan.
* We first calculate the total amount of evacuees to waterworld by summing up all values in column "num_evacuees_to_water" in the result dataframe in Step 9. This is pre-calculated for later use.
* For the "final destination plan":
* We extract "place", "num_evacuees", and "destination" columns, representing evacuations to safe cities.
* Then, for entries marked 'true' in "to_harbour?", we pull "place", "num_evacuees_to_water", and "harbour_destination".
* These datasets are then merged to finalize the destination plan.
```
+--------------+----------------+--------------+-------------+---------------------------+-------------------+------------+-------------------+----------------------------+-----------+---------------------+
|destination_id|destination |old_population|evacuation_id|min_distance_to_safety_city|place |num_evacuees|harbour_destination|min_distance_to_safe_harbour|to_harbour?|num_evacuees_to_water|
+--------------+----------------+--------------+-------------+---------------------------+-------------------+------------+-------------------+----------------------------+-----------+---------------------+
|235278210 |Leeuwarden |91992 |47679176 |104 |Nijehaske |41 |Waterworld |60 |true |14 |
|235861650 |Utrecht |295591 |1926871191 |71 |Woerdense Verlaat |621 |Waterworld |28 |true |207 |
|3474142743 |Alkmaar |96460 |4795596815 |194 |Zuid-Eierland |75 |Waterworld |42 |true |25 |
|.......... |....... |..... |.......... |... |............. |.. |.......... |.. |.... |.. |
```
* For the "final population plan":
* Group by "destination" and "old_population" (to differentiate cities with identical names).
* Aggregate using sum() to combine evacuees directed to identical safe cities.
* Add a singular entry for Waterworld evacuees, then amalgamate with the previous data (sum of "num_evacuees_to_water") to complete the population plan.
<!-- > Introduce how you have approached the problem, and the global steps of your
> solution.
>
> Example:
>
> ### Step 1: Preparing the data
>
> * We want to be as type-safe as possible so we've declared a case class X
> with the fields y and z.
> * We are now able to load the ORC file into the DataSet\[X\].
> Take into consideration your robustness level (see Rubric), and what you had
> to do to make your answer as accurate as possible. Explain what information
> was missing and how you have mitigated that problem. -->
## Result
### Experiment setup:
```
Laptop: Lenovo Legion Y9000P (2021)
Memory: 16GB
CPU: i7-11800H (8 core, 16 threads)
OS: Ubuntu Ubuntu 22.04.3 LTS (Performance mode)
```
### Result 1: When sea level rises to 0 meters
#### Run time: 77 seconds
#### Total Evacuees: 862158
#### Number of places required to be evacuated: 438
#### Total Evacuees to Waterworld: 85219
#### List of final destination with old and new population:
```
+----------------+--------------+--------------+
|destination |old_population|new_population|
+----------------+--------------+--------------+
|'s-Hertogenbosch|115903 |116231 |
|Dordrecht |118703 |155153 |
|Groningen |201242 |260568 |
|Amsterdam |881933 |929183 |
|Rotterdam |572392 |692657 |
|Middelburg |40345 |53867 |
|Leeuwarden |91992 |126696 |
|Zoetermeer |124780 |162732 |
|Alkmaar |96460 |213500 |
|Haarlem |158593 |265525 |
|Utrecht |295591 |315524 |
|Waterworld |0 |85219 |
|Leiden |123753 |193533 |
|Zwolle |124505 |146093 |
|Almere |207904 |265630 |
|Assen |69039 |71597 |
|Delft |101386 |132000 |
|Breda |150008 |151289 |
|Emmen |56113 |57173 |
+----------------+--------------+--------------+
```
### Result 2: When sea level rises to 10 meters
#### Run time: 74 seconds
#### Total Evacuees: 9219291
#### Number of places required to be evacuated: 1819
#### Total Evacuees to Waterworld: 1520354
#### List of final destination with old and new population:
```
+----------------+--------------+--------------+
|destination |old_population|new_population|
+----------------+--------------+--------------+
|'s-Hertogenbosch|115903 |411035 |
|Amsterdam |881933 |2553195 |
|Utrecht |295591 |1226913 |
|Apeldoorn |141107 |258651 |
|Groningen |201242 |878512 |
|Den Haag |525745 |2903260 |
|Waterworld |0 |1520354 |
|Tilburg |199128 |257230 |
|Nijmegen |176731 |199897 |
|Deventer |81505 |396649 |
|Enschede |148874 |151765 |
|Arnhem |155694 |166641 |
|Assen |69039 |344598 |
|Breda |150008 |994771 |
|Emmen |56113 |128413 |
|Ede |72460 |167302 |
+----------------+--------------+--------------+
```
### Result 3: When sea level rises to 30 meters
#### Run time: 82 seconds
#### Total Evacuees: 15732984
#### Number of places required to be evacuated: 2499
#### Total Evacuees to Waterworld: 3560081
#### List of final destination with old and new population:
```
+-----------+--------------+--------------+
|destination|old_population|new_population|
+-----------+--------------+--------------+
|Nijmegen |176731 |9564073 |
|Enschede |148874 |2289894 |
|Maastricht |120105 |150623 |
|Waterworld |0 |3560081 |
|Roermond |41225 |659640 |
+-----------+--------------+--------------+
```
### Result 4: When sea level rises to 50 meters
#### Run time: 76 seconds
#### Total Evacuees: 16680160
#### Number of places required to be evacuated: 2657
#### Total Evacuees to Waterworld: 7120669
#### List of final destination with old and new population:
```
+-----------+--------------+--------------+
|destination|old_population|new_population|
+-----------+--------------+--------------+
|Maastricht |120105 |12091544 |
|Heerlen |67943 |9924729 |
|Waterworld |0 |7120669 |
+-----------+--------------+--------------+
```
### Result 5: When sea level rises to 100 meters
#### Run time: 74 seconds
#### Total Evacuees: 17068757
#### Number of places required to be evacuated: 2722
#### Total Evacuees to Waterworld: 4101259
#### List of final destination with old and new population:
```
+-----------+--------------+--------------+
|destination|old_population|new_population|
+-----------+--------------+--------------+
|Heerlen |67943 |13035441 |
|Waterworld |0 |4101259 |
+-----------+--------------+--------------+
```
<!-- > Present the output of your program. Explain what could be improved (if
> applicable). -->
## Scalability
In this segment, we elucidate the measures undertaken to ensure our solution scales seamlessly. Here's a dive into the strategies and code snippets illuminating the scalable design decisions.
#### 1. Optimizing H3 Instance Creation for Efficiency
In the "Functional Overview" section, we highlighted the use of UDFs to calculate H3 indices and measure the distance between them. However, a closer look at our initial code reveals a potential inefficiency: we instantiate a new H3 object for each individual calculation. This approach, especially when dealing with large datasets, would introduce significant computational overhead, impacting performance.
To overcome this, we've refactored the UDFs. Instead of creating a new H3 instance for every calculation, we now initialize a single H3 object within a Scala object, ensuring that only one instance is ever in play, regardless of the number of calculations performed.
Here's a comparison of the two approaches:
```scala
// Below is the old code of creating the UDF
def registerUDFs(spark: SparkSession): Unit = {
// Register UDFs
val h3_index_resolution = 9
val geoToH3UDF = udf((lat: Double, lon: Double) => {
val h3 = H3Core.newInstance()
h3.geoToH3(lat, lon, h3_index_resolution): Long
})
val h3DistanceUDF = udf((origin: Long, dest: Long) => {
val h3 = H3Core.newInstance()
h3.h3Distance(origin, dest)
})
spark.udf.register("geoToH3", geoToH3UDF)
spark.udf.register("h3Distance", h3DistanceUDF)
}
// Below is the new code of creating the UDF
object H3Utils {
private val h3 = H3Core.newInstance()
private val h3_index_resolution = 9
def geoToH3(lat: Double, lon: Double): Long = {
h3.geoToH3(lat, lon, h3_index_resolution)
}
def h3Distance(origin: Long, dest: Long): Int = {
h3.h3Distance(origin, dest)
}
}
def registerUDFs(spark: SparkSession): Unit = {
val geoToH3UDF = udf((lat: Double, lon: Double) => H3Utils.geoToH3(lat, lon))
val h3DistanceUDF = udf((origin: Long, dest: Long) => H3Utils.h3Distance(origin, dest))
spark.udf.register("geoToH3", geoToH3UDF)
spark.udf.register("h3Distance", h3DistanceUDF)
}
```
This optimization not only boosts the code's performance but also aligns with best practices, ensuring maintainability and scalability for future implementations.
#### 2. Efficient Data Filtering
The "netherlands.orc" file houses a colossal dataset, encapsulating information about diverse locations across the Netherlands. To optimize performance, we initially refine this dataset by:
* Selecting only pertinent columns, significantly reducing data volume.
* Implementing filters to extract non-corrupted, application-relevant information.
With this, the osm_common_filter dataframe shrinks enough to be cached, averting redundant reads and enhancing performance.
Notably, while this process yields marked improvement on smaller systems, the savings on resources are exponential for larger datasets such as "world.orc" when executed on powerful, distributed servers.
```scala
// read the .orc file and preprocess, make it small enough to cache
val osm_common_filter = spark.read.orc(orc_filePath) // apply common filters to the whole .orc file to extract information we care about
.select("id", "tags", "lat", "lon")
.filter(col("tags").getItem("place").isNotNull || col("tags").getItem("harbour").isNotNull)
.filter(col("lat").isNotNull)
.filter(col("lon").isNotNull)
.filter(col("id").isNotNull)
```
#### 3. Strategic Data Caching
For dataframes processed multiple times, caching proves instrumental. This not only trims processing times but also bolsters scalability. By way of example, the places and harbours dataframes are reused extensively, making caching them vital.
In the example below, dataframes places and harbours are used multiple times, as they are used to determine the average elevation of each location and harbour, the population of safety cities/locations to be evacuated and the distance between evacuated locations and safety cities/harbours.
```scala
val places = osm_common_filter
.filter(col("tags").getItem("place").isin("city", "town", "village", "hamlet"))
.filter(col("tags").getItem("population").isNotNull)
.select(
col("id"),
col("tags").getItem("name").as("name"),
col("tags").getItem("place").as("type"),
col("tags").getItem("population").as("population"),
expr("geoToH3(lat, lon)").as("h3_index")
)
places.cache()
val harbours = osm_common_filter
.filter(col("tags").getItem("harbour").isin("yes"))
.select(
lit("Waterworld").as("name"),
lit("harbour").as("type"),
expr("geoToH3(lat, lon)").as("h3_index")
)
harbours.cache()
```
#### 4. Leveraging Broadcast Joins
In order to increase scalability, normal join was avoided as it potentially does not provide as much scalability as broadcast join.
With broadcast join, Spark broadcast the smaller dataframe to all executors and the executor keeps it in memory and the larger dataframe is split and distributed across all executors so that Spark can perform a join without shuffling any data from the larger one as the data required for join colocated on every executor.
The performance gain of this technique might not be obvious when running the application on small and restricted devices such as our laptops, however, it is expected to gain huge performance when executing on large distributed servers when there are many executors.
An example of the use of broadcast join in our application is as below. Please note that we avoided using normal join throughout the application, all join action are performed as broadcast join.
In the example below, distance_calculation is the distance of each evacuated location to all possible safety cities, and closest_distance is a small dataframe which only contains the id of each evacuated locations and it corresponding distance to the closest safety city. It is obvious that closest_distance is much smaller than distance_calulation, and hence closest_distance is being broadcast to each executor to perform the join action.
```scala
val evacuation_plan = distance_calulation //join() can cause shuffle
.join(broadcast(closest_distance), closest_distance("evacuation_id") === distance_calulation("evacuation_id") && closest_distance("min_distance_to_safety_city") === distance_calulation("distance"), "right") // broadcast join on the smaller dataframe
.drop(distance_calulation("evacuation_id"))
.drop(col("distance"))
```
#### 5. Prudent Column Management
In our whole process, some columns are added just to temporarily hold data. These columns are dropped immediately after becoming useless.
Two examples are shown below. In the evcuation_places dataframe, there is a column of "type" which indicates whether the location is a town or city, etc. However, this information is useless when cross joining with the dataframe of safe_harbours which is preparing to calculate the distance of an evacuated location to the closeset harbour.
In the second example, right after the h3Distance function is used to calculate the distance between an evacuated location and a harbour, the columns containing the h3 index are no longer useful, and hence they are dropped.
```scala
// cross join two dataframes to get ready for distance calculation
val evacuation_cross_harbour = evacuation_places.drop("type").crossJoin(safe_harbours) //crossJoin() can cause shuffle
val distance_to_harbour = evacuation_cross_harbour
.withColumn("distance", expr("h3Distance(evacuation_h3_index, safe_harbour_h3_index)"))
.drop(col("evacuation_h3_index"))
.drop(col("safe_harbour_h3_index"))
```
#### 6. Multifunctional Dataframes
For efficiency, certain dataframes serve dual purposes. The ensuing dataframe illustrates this, catering to two distinct requirements of the 'Excellent' grade.
The dataframe is as below:
```
+--------------+----------------+--------------+-------------+---------------------------+-------------------+------------+-------------------+----------------------------+-----------+---------------------+
|destination_id|destination |old_population|evacuation_id|min_distance_to_safety_city|place |num_evacuees|harbour_destination|min_distance_to_safe_harbour|to_harbour?|num_evacuees_to_water|
+--------------+----------------+--------------+-------------+---------------------------+-------------------+------------+-------------------+----------------------------+-----------+---------------------+
|235278210 |Leeuwarden |91992 |47679176 |104 |Nijehaske |41 |Waterworld |60 |true |14 |
|235861650 |Utrecht |295591 |1926871191 |71 |Woerdense Verlaat |621 |Waterworld |28 |true |207 |
|3474142743 |Alkmaar |96460 |4795596815 |194 |Zuid-Eierland |75 |Waterworld |42 |true |25 |
|.......... |....... |..... |.......... |... |............. |.. |.......... |.. |.... |.. |
```
This dataframe is achieved with the code below.
After calculating the distance of an evacuated location to both the closest safety city and closest harbour, we created an additional column "to_harbour?" which states whether the location could also be evacuated to a harbour. With this flag set, we immediately first update the number of evcuaees to safety city and to harbours. Then as shown in the code below, by simple concatenating all information which already exists in the same dataframe post_evacuation_plan, we achieve the first required output of the excellent level. After this, simply by group the destination with the same name, we can achieve the second requirement.
```scala
val final_destination_plan = post_evacuation_plan
.select(
col("place"),
col("num_evacuees").cast(LongType),
col("destination")
)
.union(
post_evacuation_plan
.filter(col("to_harbour?") === true)
.select(
col("place"),
col("num_evacuees_to_water").as("num_evacuees").cast(LongType),
col("harbour_destination").as("destination")
)
)
val waterworld = Seq(("Waterworld", 0L, total_evacuees_to_water)).toDF("destination", "old_population", "new_population")
val final_population_plan = post_evacuation_plan
.groupBy("destination", "old_population") //groupBy() can cause shuffle
.agg(
sum("num_evacuees").as("num_evacuees")
)
.withColumn("new_population", col("old_population") + col("num_evacuees"))
.select(
col("destination"),
col("old_population"),
col("new_population")
)
.union(waterworld)
```
## Performance
To have a general view of which parts take the most significant time, we both checked and analyzed information from the Spark history server and logs of the running applications.
According to the statistics from Spark history server, the whole process of running our application can be divided into 47 stages in total. It can be intuitively observed that most time is spent on stages 1 and 2.

Although stage 1 ends earlier than stage 2 and doesn't contribute to the most time cost, we still looked into the details of tasks executed in both stages. All those time-consuming tasks can be roughly divided into two types. The tasks shown in the following figure can be considered as the computation-intense type where βExecutor Computing Time" takes up the most time of tasks.

Both of the preceding two stages only contain this type of task. Stages 1 and 2 are triggered after the function "**output.write.option("compression", "snappy").orc(filePath)**" is called which is about writing data to an ORC file (referred to as write-to-file command). Because of the concept of "lazy evaluation" in Spark, these two stages are actually executing codes before this write-to-file command. With comparisons between info from the history server and terminal log, it can be concluded these tasks are all related to reading Parquet and ORC files, and structuring data with the specific DataFrame (executing "**spark.read.parquet(alos_filePath)**" in function "**calculate_avg_elevation_places_harbours_from_parquet**" and "**spark.read.orc(orc_filePath)**" in function "**get_places_harbours_from_orc**"). Especially for reading ORC files, due to the big size of it, several tasks are created to read the different parts of the file.

In the case, time costs on the following operations can be counted as executor computing time:
- Parsing and Decoding: When reading Parquet or ORC files, Spark needs to perform the task of decoding the columnar storage format and convert it into a format compatible with Spark's internal representation.
- Data transforming: Because transformations like filtering are applied immediately after reading from the file. With a large amount of data, time spent on transforming takes up a significant part of consumed time.
- User Defined Functions: Before the write-to-file command is executed, there are some UDFs are executed in advance, such as calculating H3 index and average elevations. These functions have a high calculation intensity.
- Schema Inference: Spark may perform schema inference to determine the structure of the data and construcure data into the specific DataFrame.
Another type of task is concluded as the shuffling-intense task. The time cost by shuffle write or read is not negligible anymore. Although this type of task is not that important (consuming 10% of the total time) compared with the first type of task (consuming 70% of the total time), its high frequency still attracts our attention. The following figures show two different types of shuffling-intense tasks. Those tasks at stage 3 will be analyzed according to the code and log history.
Shuffling intense type tasks at stage 3:

Shuffling intense type tasks at stage 14:

According to the DAG in Spark history server, stage three is triggered by two different dependencies with stages 1 and 2. The dependencies with stage 1 appear when "**join()**" is called in function "**calculate_avg_elevation_places_harbours_from_parquet**" to get the elevation of harbors and places to be evacuated. The dependencies with stage 2 appear when "**groupBy()**" is called in the same function to process the coordinate information from parquet files.

Wide dependencies caused by functions like "**join()**", "**groupBy()**" always result shuffling. The terminal log information provides evidence of the occurrence of shuffling, as shown in the following figure. It's good to note that deserializations always happen during the shuffling when the data blocks are processed by tasks on the receiving nodes. It explains why "Task Deserialization Time" takes up an obvious amount of time of the total task time at stage 3.

The shuffling-intense tasks in stage 3 are accompanied by a certain amount of computing intensity. The main reason is that functions like agg(avg()), agg(min()) are called, thereby increasing the computing intensity, which increases the "Executor Computing Time".