# Lab 1 Report
> This is the template for your lab report. Some guidelines and rules apply.
> You can only change the *contents* of the sections, but not the section
> headers above the second level of hierarchy! This means you can only add
> headers starting with `###`.
> You must remove the text fields prefixed with `>`, such as this one, resulting
> in a README.md without this template text in between.
> Any report violating these rules will be immediately rejected for corrections
> (i.e. as if there is no report).
## Usage
> Describe how to use your program. You can assume the TAs who will be grading
> this know how to do everything that is in the lab manual. You do not have to
> repeat how to use e.g. Docker etc.
> Please do explain how to provide the correct inputs to your program.
In this section, we will explain how to use our program. We assume that you already have a file in the right format. As explained in the manual, you need the OpenStreetMap data file from a province in the Netherlands. This file should be in a `.orc` format.
The following steps will outline how to use our program:
* Clone our GitHub repository
* Ensure that the data file is located in the `lab-1-group-28` repository.
* Build the images with the command `./build-images.sh`. Note that this file requires execute permission
* We presume the TAs have the `.orc` file which they can place in `..../lab-1-group-28/zuid-holland.orc`
* Run sbt using `./run-sbt.sh`. In the sbt shell compile and package the scala files using `compile; package;`
* Run the script with the command `./run-lab1.sh`
The output can be found in the directory named `breweries_per_city.orc`. The final result yields a dataframe with the schema as requested by the lab assignment. Note that the files in this directory are distributed.
## Functional overview
> Introduce how you have approached the problem, and the global steps of your
> solution.
>
> Example:
>
> ### Step 1: Preparing the data
>
> * We want to be as type-safe as possible so we've declared a case class X
> with the fields y and z.
> * We are now able to load the ORC file into the DataSet\[X\].
> Take into consideration your robustness level (see Rubric), and what you had
> to do to make your answer as accurate as possible. Explain what information
> was missing and how you have mitigated that problem.
### Step 1: Reading the data
The first step is to read the data. Note that our data is semi-structured. Therefore, reading the data as type DataFrame and using the Spark SQL API will improve the performance with respect to using the type RDD. This is because under the hood, amongst others, optimised queries will take place and data will be serialised/deserialised in a more efficient manner [[1]](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html).
Besides, a custom schema is made to ensure we address the columns with the right names. The following code block with exception handling is used for this purpose:
```{scala}
var df: DataFrame = try{
spark.read.schema(customSchema).format("orc").load("file:/io/zuid-holland.orc")
} catch{
case e: Exception => {
println(e)
println("ERROR: File not found. Please make sure the file is located in the /lab-1-group-28 directory and named 'zuid-holland.orc'")
sys.exit(-1)
}
}
```
### Step 2: Filtering out brewery nodes
The next step is to filter out nodes that are breweries. Note that the objective is to find a place where we can drink a beverage (beer). Therefore, we assumed that an industrial beer plantation would also qualify as brewery. The relevant tags are found [here](wiki.openstreetmap.org/wiki/Brewery).
The code block below shows how we performed this. We check if the following conditions are true for the tags present in each row.
```{scala}
val breweries = df.select("*").filter(
df("tags").getItem("brewery").isNotNull ||
df("tags").getItem("craft") === "brewery" ||
df("tags").getItem("building") === "brewery" ||
df("tags").getItem("industrial") === "brewery" ||
df("tags").getItem("microbrewery") === "yes" )
```
#### Choice of tags
The [OpenStreetMap wiki](wiki.openstreetmap.org/wiki/Brewery) lists some as relevent tags for breweries but we decided to ignore some:
* craft=brewery for a craft brewery.
* building=brewery for a brewery building.
* <s>man_made=works</s> or industrial=brewery for an industrial brewery
* microbrewery=yes to indicate that a restaurant or pub is a brewpub.
* brewery=* to indicate the source of beer(s) at a restaurant, bar, pub, etc.
* <s>shop=alcohol</s> for a government-owned or licensed store.
* <s>shop=beverages</s> for an unlicensed store.
We removed `man_made=works` because it encompasses a lot of other (industrial) buildings that are not breweries. We decided not to go with `shop=alcohol` and `shop=beverages` because with shops selling alcohol the problem would become much larger compared to limiting the scope to 'bars/places' where we can find breweries. For shops selling beverages, note that beverages could include milkshakes, softdrinks or pig's blood and hence the accuracy of the query would suffer. To illustrate our point, filtering the dataset with the limited query scope resulted in 23 places where we could find a beverage. Filtering only the data based on `shop=alcohol` and `shop=beverages` tags resulted in 247 and 16 items respectively. Thus, for scaling and accuracy purposes mentioned above, leaving these two tags out of the filtering condition is preferred.
### Step 3: Filtering out cities
By inspecting the wikipedia of OpenStreetMap, we found that filtering city can be done by inspecting merely one key named `"place"` in the tags. In order to retrieve a DataFrame that contains the cities. It is important to note that these are *cities* only in the *political* sense and not the ground reality. In the Netherlands there are many cities that do not have the title of a city in the OSM data. The following code snippet filters out the nodes that are cities:
```{scala}
val cities = df.filter(df("tags").getItem("place")==="city")
```
Notice that while time passes, cities can be renamed. We found that some breweries would be tagged to a city's 'old' name. For example, several breweries in the dataset had an address tag that matched them to **'s-Gravenhage**, which is an 'alternative' name for **Den Haag**. Therefore, we also created an alternative names DataFrame to account for this. This DataFrame will be used later on to identify breweries that are tagged with 'old/alternative' city names and corrected for. This DataFrame contains two new columns that contain the original name of the city and the corresponding alternative name (if it exists).
```{scala}
val cities_alt_names = cities
.withColumn("original_name", col("tags").getItem("name"))
.withColumn("alternative_name", cities("tags").getItem("alt_name"))
.withColumnRenamed("lat","lat_city")
.withColumnRenamed("lon", "lon_city")
```
### Step 4: Matching breweries to cities
As we now have 2 DataFrames that contains the breweries and the cities, we now have to match each brewery to a city. There are 2 cases which are presented and 3 ways to assign a city. Therefore, our initial DataFrame that contains all breweries will be subdivided into three DataFrames according to the following 3 cases:
#### Case 1: tags have `addr:city`
If the brewery `tags` have the key `addr:city`, then we use it and assign it as the city. These nodes do not require any computation. Note further that in this way we capture cities which are **not legally cities**, but are big enough to build a brewery nearby. Thus, we have two different cases that can arise:
##### Case 1.1 `addr:city` is present in `cities`
In this scenario, the value with key `addr:city` matches with the nodes that we have identified as cities, which was explained in Step 3. Note that we join the two DataFrames based on whether the value of key `addr:city` matches either the original or alternative name of the identified cities. After the join operation, we can simply select only the column with the original names to hold our final result.
```{scala}
val breweries_cities_match = breweries_cities.as("brewery_table").join(cities_alt_names.as("city_table"),
col("brewery_table.city") === col("city_table.original_name") || col("brewery_table.city") === col("city_table.alternative_name"), "inner")
.select(
col("brewery_table.lat_brewery"),
col("brewery_table.lon_brewery"),
col("city_table.original_name"),
col("city_table.lat_city"),
col("city_table.lon_city"))
```
##### Case 1.2 `addr:city` is NOT present in `cities`
In this scenario, the value with key `addr:city` is not present in our list of cities. Therefore, we create a new DataFrame that excludes breweries that have: 1) no key `addr:city`, 2) a key `addr:city` that holds the value corresponding to one of the identified original city names, and 3) a key `addr:city` that holds the value corresponding to one of the identified alternative city names. This is done with the following code snippet:
```{scala}
val breweries_with_unknown_cities = breweries_cities.join(cities_alt_names,
(breweries_cities("city") === cities_alt_names("original_name")) || (breweries_cities("city") === cities_alt_names("alternative_name") || breweries_cities("city").isNull) ,
"leftanti")
```
#### Cases 2: tags do NOT have `addr:city`
In the second case, we consider breweries that do not have the `addr:city` key. Note that for these breweries, we have to do some computation to figure out to which city they belong.
We have done this by finding the distance of the brewey to each city. We then select the city with the shortest distance from the brewery. We decided to go with the distance metric as *Great Circle Distance* to account for the curvature of the earth's surface.
The equation for this is:
$d = 6371000 * arccos[(sin(x_1) * sin(x_2)) + cos(x_1) * cos(x_2) * cos(y_2 – y_1)]$
where $x_1, y_1$ and $x_2, y_2$ are the latitudes and longitudes of the two points respectively.
In Spark terminology, we achieved this in the following way. First, we crossjoin the DataFrame that contain the breweries without `addr:city` key and the DataFrame that contains the identified cities:
```{scala}
val brew_wo_city_cjoin = breweries_without_city.crossJoin(cities_table)
```
We then create a new column for which we calculate the distance for each brewery to each city
```{scala}
val brew_wo_city_dist = brew_wo_city_cjoin.withColumn("distance",
acos( sin(col("lat_brewery")) * sin(col("lat_city")) + cos(col("lat_brewery")) * cos(col("lat_city")) * cos(col("lon_brewery")-col("lon_city")))
)
```
Lastly, we select the city with the shortest distance by using the groupBy method and the `min` function. This newly obtained DataFrame is used to extract the brewery/city pairs that have the shortest distance from the initial DataFrame that was obtained with the crossjoin operator. The following code snippet illustrates this:
```{scala}
println("Selecting the city with shortest distance")
val brew_min_dists = brew_wo_city_dist.groupBy("brewery_table.id").agg(min("distance").as("distance"))
val brew_wo_city_joined = brew_wo_city_dist.join(brew_min_dists, Seq("distance", "id"), "right")
```
Note that using DataFrame significantly eased writing this functionality. Besides, by using the Spark SQL API, we expect that this code will have higher performance than using equivalent map functions and user defined fucntions on RDDs.
### Step 5: Taking the union of the cases
At this point, we have three separate DataFrames that have a column which indicates which city the brewery belongs to. To perform the GroupBy method, we must have a single DataFrame. Thus, we group the three cases (1.1, 1.2 and 2) together. This gives us a single DataFrame with a city for every brewery. This is illustrated with the following code snippet:
```{scala}
val r1 = brew_wo_city_joined.select("original_name").withColumnRenamed("original_name", "city")
val r2 = breweries_with_unknown_cities.select("city")
val r3 = breweries_cities_match.select("original_name").withColumnRenamed("original_name", "city")
var result = r1.union(r2.union(r3))
```
### Step 6: Counting the breweries
Lastly, we aggregate the result by grouping the cities and counting them. We then sort the results in descendig order. This is done as follows:
```{scala}
result = result.groupBy("city").agg(count("city").cast(types.IntegerType).as("breweries"))
result = result.orderBy(col("breweries").desc)
```
### Step 7: Saving the output
We now wish to save the output in an `.orc` format. We chose to override the previous output because we are not on the AWS yet. Note that each worker will write a file, meaning that we end up with multiple distributed files. For scalability and performance purposes, we decided not to write to a single file.
```{scala}
result.write.mode(SaveMode.Overwrite).format("orc").save("file:/io/breweries_zuidholland.orc")
```
## Result
> Present the output of your program. Explain what could be improved (if
> applicable).
The final result is shown in the table below:
| city|breweries|
|-------------------|---------|
| Rotterdam| 7|
| Delft| 5|
| Den Haag| 5|
| Bodegraven| 1|
| Hillegom| 1|
| Den Hoorn| 1|
|Alphen aan den Rijn| 1|
| Leiden| 1|
#### Notes
Our implementation accomodates the semi-structured data well and is robust against the inconsistancies that OpenStreetMaps presents. The output does not need every brewey to have the city tag. We also are not limited to what OpenStreetMaps considers as cities (in the technical/political sense). We have accomodated for a lot of missing data. We have also accomodated for cities having alternative names and have normalised the output to enforce consistency in the output.
#### Improvements
* We have the boundaries of cities. We can more accurately determine which brewery belongs to which city with point in polygon test.
* Enforcing the type of the DataFrame, i.e. making it strongly-typed, is an improvement that would lead to detecting analysis errors during compile time instead of runtime. For the second lab, this improvement would lead us to be more type-safe, meaning that we would be more efficient at detecting bugs.
## Scalability
> Give a concise analysis of the scalability of your solution. Identify in
> the various steps described previously how this affects the scalability.
>
> Example:
> Steps A and B were initially performed the other way around, but
> since Step B performs a sort which involves a shuffle in the physical plan
> of Spark, it was better to first apply the filter in Step A, and then sort.
-In an intial iteration, we solved with RDDs...
-Mention wrong steps made...
## Performance
- Put in obtained DAG and discuss what you see
> Present performance measurements. You can obtain these from the Spark history
> server.
> Relate the measurements to your code and describe why specific steps take a
> relatively longer or shorter time w.r.t. the other steps.
Notes:
* See the history server for how long our code executes
* For each stage, present an average+std (sample size 5 is enough I guess) of how long each stage takes
* Present the impact on performance on the amount of partitions (maybe too much?)