# Assignment 3: MapReduce
![](https://i.imgur.com/YfMc6Qs.jpg)
**Out: February 28, 2024
Due: March 14, 2024**
In this assignment, you will be designing and implementing MapReduce algorithms for a variety of common data processing tasks. Note that you may not use Spark functions such as `distinct` or `join`, as these would allow you to bypass much of the assignment. The purpose for this assignment is for you to gain a better understanding of how these functions perform under the hood. Unless otherwise indicated, please only use `map`, `flatMap`, `reduceByKey`, `sortByKey`, and `filter`.
In part 1 of this assignment, you will solve two simple problems on small datasets. You will build the MapReduce pipelines and implement your mappers and reducers.
In part 2 of this assignment, you will implement a movie recommendation system. Part of the MapReduce pipeline is provided. You will design the remaining part. And you will also need to implement the mappers and reducers. There are two datasets in part 2: small and big. For both datasets, you can directly run your program on the department machine or on your own device with PySpark set up.
## Part 0: Set Up
### Getting the Stencil
You can [click here](https://classroom.github.com/a/Hnoxhm2U) to get the stencil code for Homework 3. Reference [this guide](https://docs.google.com/document/d/1v3IQrC_0pFxsRBXsvCEzKBDAmYjzuSJCvXhkg8ewDn0/edit) for more information about Github and Github Classroom.
The data is located in the data folder. To ensure compatibility with the autograder, you should not modify the stencil unless instructed otherwise. For this assignment, please write your solutions in the respective .py files. Failing to do so may hinder with the autograder and result in a low grade.
### Running MapReduce
As before, you can choose to either work on the department machine or set up your own environment. **If you are on a Windows machine, we strongly recommend working on the department machine, as setting up PySpark locally is pretty tricky.**
- **Option 1: Department Machine**
**Execute Code**: First, ssh into the department machine by running `ssh [cs login]@ssh.cs.brown.edu` and typing your password when prompted. Then, navigate to the assignment directory and activate the course virtual environment by running `source /course/cs1951a/venv/bin/activate`. You can now run your code for the assignment. To deactivate this virtual environment, simply type `deactivate`.
If you want to use VS Code to edit your files on the department machine, this [guide](https://cs.brown.edu/about/system/connecting/) by the CS department gives instructions on how to do so.
- **Option 2: Your Own Device**
**Requirements**: This assignment assumes that you have already set up your Python environment during the [first assignment](https://hackmd.io/@cs1951a/HyBytvPoi#Python-Virtual-Environment). **In addition**, this also requires that you install Java locally, which you can setup using [this guide](http://cs.brown.edu/courses/csci0160/local.html) (thanks to the TAs from CS0160 for putting this together!). You need Java 8 (aka Java 1.8) or later. The relavent Java install information is found in the section **2) Local Java Install**. Note that you will ***not*** be writing Java code for this assignment. It is just required by PySpark.
**If you are running on a Windows machine, follow [this tutorial](https://www.guru99.com/install-java.html) instead to install Java.**
**Execute Code**: Activate the virtual environment and run your program in the command line.
> Note: You need to set `JAVA_HOME` correctly, on top of installing Java. If any of the Java setup is not done correctly, creating the PySpark SparkContext will error:
> ![](https://i.imgur.com/ZOWjNYU.png)
### Testing
1. To test that you have set up everything correctly, we have provided a simple example where we used a PySpark MapReduce pipeline to do the task of counting the number of occurrences of each word in an input text. The code is in the `wordcount.py` file. Make sure that you can run this program, and feel free to play around / examine this file to understand how PySpark works. **Make sure to activate the virtual environment (and install Java locally!) before running the `wordcount.py` file!** . If you run into issues running this locally, we **heavily suggest** running this project on the department machine.
2. When you hand in your assignment on Gradescope, be sure to check if the autograder runs into any issues running your code. The autograder should finish running your code in around 10-15 minutes.
### Running Files
The assignment code is written with the assumption that your current directory is in the `/code` folder. Before running any file, ensure that you're `cd`'d into the folder. Otherwise, you'll get an error about not being able to find the data file.
## Part 1: Intro to MapReduce
In this part of the assignment you will solve two simple problems by making use of the `PySpark` library.
For each problem, you will turn in a Python script (stencil provided) similar to `wordcount.py` that solves the problem using the supplied MapReduce framework, `PySpark`.
### Problem 1: Inverted Index
Fill in the code for `inverted_index.py`, which creates an inverted index of a given set of documents. Given a set of documents, an inverted index is a **matching from each word to a list of document IDs of documents in which that word appears.**
Your task is to design a MapReduce pipeline that would generate inverted indices for words in the given documents. You will have to think about how your data will move between the various stages of the pipeline and implement the following accordingly:
1. `def mapper1(record)`
2. `def reducer1(a, b)`
3. `def mapper2(record)`
4. `def reducer2(a, b)`
As a note, you can feel free to use more mapper/reducer functions than those stated above, but you shouldn't need to - our solution manages to do it using only those 4 functions. In general, you have total control over the number of mapper and reducer functions that you use.
Your final task is to create such an inverted index matching with a MapReduce pipeline, using the mapper and reducer functions you just implemented. This query should return the inverted index of the given documents. You should use the variable `inverted_index_result` to store the result of your query.
For this problem, use the `books.json` and `books_small.json` datasets as the input to your pipeline. To run the file, execute the following command:
`$ python inverted_index.py -d PATH/TO/data.json `
where `PATH/TO/data.json` is the path to the json file with the data (so either ending in `books_small.json` or `books.json`). By default, without the `-d` flag, the data file path is `../data/books_small.json`.
Successfully running the script will create a file named `output_inverted_index.json` in a directory called output, which will contain the data that was collected by the pipeline in `inverted_index_result`. The format of the answer should look something like this:
```
[
[
"Answer",
[
"shakespeare-caesar.txt"
]
],
...
]
```
We provide you with a script to help you check the format of your json files:
`$ ./check_format /PATH/TO/output_inverted_index.json`
You can also verify the output of your pipeline on `books_small.json` using the provided `check_outputs_equal` script, passing it the path to your generated file and the ta solution's generated file, which can be found at `../data/ta_output/output_inverted_index.json` (or at `/course/cs1951a/pub/mapreduce/data/ta_output/output_inverted_index.json`):
```
./check_outputs_equal output/output_inverted_index.json ../data/ta_output/output_inverted_index.json
```
### Problem 2: Relational Join
Fill in the code for `join.py`, where your task is to implement a SQL join query using a MapReduce pipeline. You will work with the data provided in `records.json` which contains tuples belonging to both 'Release' and 'Disposal' tables.
Consider the following SQL query:
```
SELECT *
FROM Release, Disposal
WHERE Release.CompanyID = Disposal.CompanyID
```
Your MapReduce query should produce the same output as the SQL query above, with each row formatted as a list. You can consider the two input tables, `'rele'` and `'disp'`, as one big concatenated bag of records which gets fed into the map function record by record. For each line/record in `records.json`, `record[0]` is the name of the table (either "rele" or "disp") and `record[2]` is CompanyID. Feel free to open the json file to see examples.
As before, the stencil gives you a suggested pipeline, but you are welcome to change it as necessary:
1. `def mapper1()`
1. `def reducer1()`
1. `def mapper2()`
1. `def filter()` (You may or may not need this, depending on how you do `mapper2`)
**Hint:** Consider what key might be necessary to organize all the given data. Once you have all the data for a key in one place, then you can figure out how to generate the output you want. **Consider what else you may need to add to your output (eg. a filterer) to get the result you need.**
You should use the variable `join_result` to store the result of your pipeline. Like above, you have total control over the number and order of functions in your pipeline; the above is just the order that our solution uses.
For this problem, use `records.json` as the input to your pipeline. Similar to Part 1, to run the file, activate the virtual environment and then execute the following command:
`$ python3 join.py -d PATH/TO/records.json `
This will create a file named `output_join.json` in your output directory, which will contain the data that was collected by the pipeline in `join_result.` It will look like:
```
[
[
"rele",
"1995",
"4836",
...,
"disp",
"2003",
"4836",
...
],
...
]
```
You can also use this script to check the format:
`$ ./check_format /PATH/TO/output_join.json`
## Part 2: Movie Recommendation System
In Edwin Chen's [blog article](http://blog.echen.me/2012/02/09/movie-recommendations-and-more-via-mapreduce-and-scalding/) on movie similarities, he describes how he used the [Scalding](https://github.com/twitter/scalding) MapReduce framework to find similarities between movies. You will do the same by calculating the similarity of pairs of movies so that if someone watched Frozen (2013), you can recommend other movies they might like, such as Monsters University (2013).
### Data
You are provided with a dataset of movie ratings:
**Source**: [MovieTweetings](https://github.com/sidooms/MovieTweetings) by Simon Dooms.
**Overview:** Ratings are extracted from tweets and it contains up-to-date movie ratings (the earliest rating contained in this dataset is from Feb 28, 2013). It contains 906,727 ratings from 37,338 movies. And we will only be using a fraction of this.
**Runtime:** Our implementation of `similarities.py` finishes executing in ~30 seconds on the small dataset and in ~45 seconds on the big one.
This dataset includes:
**ratings.dat**
* Format: user_id::movie_id::rating
* Ratings are from 1 to 10 (whole-number ratings only)
* 34,123 ratings (small)
* 190,750 ratings (big)
**movies.dat**
* Format: movie_id::movie_title
* Titles are identical to titles provided by IMDB (including year of release)
### Algorithm
As mentioned in Edwin Chen's blog article, we will use the different metrics between movie pairs to determine the similarity between them:
```
* For every pair of movies A and B, find all the people who rated both A and B.
* Use these ratings to form a Movie A vector and a Movie B vector.
* Calculate the similarity metrics between these two vectors.
* Whenever someone watches a movie, then you can recommend the most similar movies.
```
If you aren't familiar with this definition of a vector, all we mean by it is a list of numbers that we can imagine as `n` coordinates of an `n`-dimensional space.
Like what Edwin did in his article, you will also experiment with four similarity metrics. The implementations for these metrics are provided in `similarities.py`. Click on the link to find out more about the usage of these similarity metrics in your program.
### Implementation
In `similarities.py`, you will implement a series of mappers and reducers. You will pass two input files, `ratings.dat` and `movies.dat`, to `similarities.py`, which will then output a list of movie pairs along with their similarity metrics between them like below (pretty-print JSON):
```
[
[
[
"movie_title1",
"movie_title2"
],
[
correlation_value,
regularized_correlation_value,
cosine_similarity_value,
jaccard_similarity_value,
n,
n1,
n2
]
],
...
]
```
For every pair of movies A and B, find all the people who rated both A and B and compute the number of raters for every movie. Then you can calculate 4 similarity metrics for every movie pair.
Below are the mappers and reducers that you will implement. We have provided the first part of the pipeline for you in the stencil code. For the remaining part, your MapReduce pipeline can have as many mappers and reducers as long as your outputs match the the two checkpoints and the final requirement.
### Checkpoint 1
For part 2, we are giving you full control over how your pipeline looks! In the stencil, we are giving you the structure of how our solution's pipeline looks, but feel free to do whatever you want.
The output of your pipeline at this stage (after mapper1) should be of the following format:
```
[[key, value], [key, value], ...]
where -
key: movie_title
value: [ [user1_ID, user1_rating], [user2_ID, user2_rating], [user3_ID, user3_rating], ...]
```
The output at this stage should be stored in the variable `stage1_result` and will be written to the file `netflix_stage1_output.json`. This will serve as a checkpoint into your pipeline for the purposes of grading, so please make sure you implement this correctly. Its format will look like:
```
[
[
"$5 a Day (2008)",
[
[
"22136",
7
],
...
]
],
...
]
```
**Note**: that the json file is very compact. If you want to pretty print it like above, you can use the following command. Don't worry about the order. It is because the collect() action is parallelized, and then the results are assembled. We will sort your results when grading.
```
$ cat PATH/TO/netflix_stage1_output.json | python -m json.tool
```
You can use our script to check the format:
```
$ ./check_format /PATH/TO/netflix_stage1_output.json
```
You can also use our script to check the contents of your output on the small dataset:
```
$ ./check_outputs_equal PATH/TO/YOUR/netflix_stage1_output.json PATH/TO/TA/netflix_stage1_output.json
```
### Checkpoint 2
As before, you are free to design your own MapReduce pipeline! Just don't forget to satisfy the requirement of the second checkpoint before the final output.
You are provided with implementations of 4 similarity metrics. You should refer back to the Algorithm section above or the beginning of `similarities.py` to determine the input values for each of these metric functions. You will need to find the dot product between two vectors, the sum of each vector, the norm of each vector, and etc. In addition, you should ignore (do not include values for) movie pairs whose regularized correlation values are less than some threshold (i.e. 0.5) in order to keep only high value movie pairs.
The output of your pipeline at the second checkpoint should be of the following format:
```
[[key, value], [key, value], ...]
where -
key: movie_title1
value: [[movie_title2, correlation_value, regularized_correlation_value, cosine_similarity_value, jaccard_similarity_value, n, n1, n2], [movie_title3, ...]]
```
**IMPORTANT:** Only include movie_title2s for a movie_title1 when movie_title1 < movie_title2 (i.e. movie_title1 comes alphabetically before movie_title2). The output at this stage should be stored in the variable stage2_result and will be written to the file netflix_stage2_output.json. This will serve as the second checkpoint into your pipeline for the purposes of grading, so please make sure you implement this correctly. Its format will look like:
```
[
[
"12 Years a Slave (2013)",
[
[
"Jagten (2012)",
0.6671378907298551,
0.5221079144842344,
0.9937391441268904,
0.04265402843601896,
36,
617,
263
],
...
]
],
...
]
```
You can use our script to check the format:
```
$ ./check_format /PATH/TO/netflix_stage2_output.json
```
You can also use our script to check the contents of your output on the small dataset:
```
$ ./check_outputs_equal PATH/TO/YOUR/netflix_stage2_output.json PATH/TO/TA/netflix_stage2_output.json
```
::: spoiler **Appendix A: Similarity Metrics (Click to expand) 😃**
In this part, we provide more details about how the similarity metrics are being used in the assignment.
In the below equations, $n$ is the number of users who rated both movie X and movie Y, $n_1$ is the number of users who rated movie X, and $n_2$ is the number of users who rated movie Y.
1. Correlation
$Correlation(X,Y)=\frac{n∑xy−∑x∑y}{\sqrt{(n∑x^2−(∑x)^2)(n∑y^2−(∑y)^2)}}$
2. Regularized Correlation
$Weight(X,Y)=\frac{n}{n+VirtualCount}$
$RegularizedCorrelation(X,Y)=Weight(X,Y)∗Correlation(X,Y)+(1−Weight(X,Y))∗PriorCorrelation$
As Edwin states, "we can also also add a regularized correlation, by (say) adding N virtual movie pairs that have zero correlation. This helps avoid noise if some movie pairs have very few raters in common (for example, The Great Gatsby had an unlikely raw correlation of 1 with many other books, due simply to the fact that those book pairs had very few ratings)."
The stencil code uses VIRTUAL_COUNT = 10 and PRIOR_CORRELATION = 0.0, and you are welcome to experiment with different values (but don't forget to change them back before you submit!)
3. Cosine Similarity
$Cosine(X,Y)=\frac{∑xy}{\sqrt{∑x^2∑y^2}}$
13. Jaccard Similarity
Jaccard(X,Y)=$\frac{n}{n_1+n_2−n}$
As Edwin states, "recall that one of the lessons of the Netflix prize was that implicit data can be quite useful - the mere fact that you rate a James Bond movie, even if you rate it quite horribly, suggests that you'd probably be interested in similar action films. So we can also ignore the value itself of each rating and use a set-based similarity measure like Jaccard similarity."
:::
### Final Outputs
7. `... # any mappers/reducers`
The output of the last stage should have the following format.
```
[[key, value], [key, value], ...]
where -
key: [movie_title1, movie_title2]
value: [correlation_value, regularized_correlation_value, cosine_similarity_value, jaccard_similarity_value, n, n1, n2]
```
The output of the last stage should be stored in the variable `final_result`, which will be written to the file `netflix_final_output.json`. Then the format of the output file will look like:
```
[
[
[
"Captain America: The First Avenger (2011)",
"Iron Man 3 (2013)"
],
[
0.7280290128482472,
0.5824232102785978,
0.9886495309825268,
0.018682858477347034,
40,
82,
2099
]
],
[
[
"Captain America: The First Avenger (2011)",
"The Avengers (2012)"
],
[
0.8188595535772019,
0.603370197372675,
0.990419871882812,
0.0979020979020979,
28,
82,
232
]
],
...
]
```
You can use our script to check the format:
```
$ ./check_format /PATH/TO/netflix_final_output.json
```
You can also use our script to check the contents of your output on the small dataset:
```
$ ./check_outputs_equal PATH/TO/YOUR/netflix_final_output.json PATH/TO/TA/netflix_final_output.json
```
We have provided a skeleton MapReduce query based on the TA solution; however, you are free to choose the internal implementation of your query. Just please ensure that you adhere to the format of the data that you store in these three files: `netflix_stage1_output.json`, `netflix_stage2_output.json` and `netflix_final_output.json`.
### How to Run
To test your program, first activate the virtual environment, and then enter:
```
$ python3 similarities.py -d PATH/TO/data
```
where `PATH/TO/data` is a path to the folder containing `movies.dat` and ratings.dat.
The default data path is` ../data/recommendations/small/. `It will generate three json files in the folder output in your working directory.
### Grading Criteria
To get full credit for this assignment, your pipeline must in general be "MapReducy" - that is, you need to apply some of the common MapReduce techniques discussed in class. One very important technique is using yield when writing flatMap (this allows for data to be pushed down the pipeline without manually creating list and waiting for the function to finish).
Another (even more important) technique is using reduce functions to guarantee uniqueness instead of using sets whenever possible (since sets cannot be parallelized the same way that reducers can be). Otherwise, your computation could be skewed in favor of certain keys that appear more frequently than others, which could drastically slow down your pipeline (consider language, where common words such as "the" appear far more frequently than others).
In general, so long as your pipeline adheres to these rules, you will recieve full credit. Also, its important to note that the majority of your grade will come from simply having a pipeline that produces a correct output, even if it is inefficient.
## Part 3: Writen Questions
Fill out`writeup.md` with your answers to the following questions.
1. (10 points) Consider that you are given a dataset containing the details of babies born in the US in 2018. Each record is of the form recordID :: year :: month :: state :: city and there are around 3,978,497 (4 million) records. In order to find the number of babies born during each month of the year, you come up with the following mapper and reducer (Refer to `wordcount.py`):
```
Mapper: record -> (record.month, 1)
For each record map the month to count 1.
Reducer: k,[v] -> k, sum(list[v])
For each key sum all values associated.
```
![](https://i.imgur.com/xEmaNDZ.png)
The MapReduce cluster provided to you consists of N mappers and but only 2 reducers as shown in the figure above. Reducer1 receives all (key, value) pairs where keys are between A and M inclusive and Reducer2 receives (key, value) pairs between N and Z inclusive.
Given that mapper and reducer function produces the correct output, what possible issue(s) could you face while processing a job consisting of about 4 million records? Suggest a workaround for that issue (without changing the number of mappers or reducers).
**Hint:** If you're stuck, think about how the load will be distributed across the pipeline!
2. (5 points) You are given the following MapReduce pipeline which finds the 10 most frequent words beginning with each letter, in a large English text corpus.
```
def mapper1(sentence):
for word in sentence.split(' '):
yield (word.lower(), 1)
def mapper2(pair):
word, count = pair[0], pair[1]
return (word[0], [(word, count)])
def mapper3(letter_pair):
letter, word_pairs = letter_pair[0], letter_pair[1]
for word_pair in word_pairs[:10]:
yield (letter, word_pair[0])
output = sentences.flatMap(mapper1).reduceByKey(add).map(mapper2).reduceByKey(add).flatMap(mapper3)
```
After testing on a small text file, it was noted that the pipeline does not produce correct output. Explain why this pipeline does not produce the correct output.
## Handing In
After finishing the assignment (and any assignment in the future), run `python3 zip_assignment.py` in the command line from your assignment directory, and fix any issues brought up by the script.
After the script has been run successfully, you should find the file `sql-submission-1951A.zip` in your assignment directory. Please submit this zip file on Gradescope under the respective assignment.
## Credits
Made by Shunjia Zhu, Solomon Zitter, and Nam Do in Spring 2020 with past contribtions from Neel Virdy, Colby Tresness, Haomo Ni, and Ashish Rawat. Updated in Spring 2021 by Suhye Park, Daniel Civita Ramirez, Matteo Lunghi, Mary Dong, and Nam Do, and again in Summer 2021 by Julia Windham, Evan Dong, and Nam Do.
Part 1 adapted from a previous assignment which was developed by Karthik Harihar Reddy Battula, Ishan Bansal, Samuel Crisanto, Yufeng Zhou, Lezhi Qu, and John Ribbans with suggestions from Tim Kraska and Alex Galakatos. Movie recommendation problem is based on the Edwin Chen's blog article.