# Big Data Assignment 1 Kamil Kamaliev, DS-02 Amina Miftahova, DS-01 ## Introduction In this assignment we were supposed to implement a naive search engine under the MapReduce paradigms. There are two main components of the search engine: 1. **Indexer** - the part that is responsible for documents indexing. In our case the indexing is convertation of each Wikipedia page given in dump into its TF-IDF vector representation 2. **Query resolver** - the part that is responsible for obtaining the most relevant Wikipedia pages from the dump as a responce to given query. Here is a diagram representing the structure of our search engine components. Later on we will explain the implementation of each component in details. ![](https://i.imgur.com/D536KCQ.png) In the assignment description we were presented two different ways for counting the relevance score. Our team was interested in comparing these two methods in terms of relevant results accuracy and in terms of performance speed. # Version with no Okapi ## Formulas $$ IDF(t) = n(t) $$ $$ TF/IDF(t_i, D_i) = \frac{f(t_i, D_i)}{n(t_i)}$$ $$ r(q,D) = \sum_{i: i\in d, i\in q} q_i \cdot d_i $$ where - $n(t)$ is the number of documents containing term $t$ - $f(t_i, D_i)$ is the number of times that term $t$ occurres in document $D_i$ - $d_i$ is the TF/IDF weight of the $i^{th}$ term in the document $D$ - $q_i$ is the TF/IDF weight of the $i^{th}$ term in the query. ## Indexing Our Indexer consists of 2 mapreduce tasks executed one after the other. **Input**: path to the directory with Wikipedia dump, path to output folder **Output**: document with IDF score for each word in `output_idf` folder, document with vectorized representation for each doc in specified output folder ### 1. Calculating IDF The first mapreduce goes through all documents and returns IDF of each word. For a given word, we decided to calculate IDF as the number of documents in which the word occurres. #### Mapping Stage **Input**: line in JSON format representing a Wikipedia page **Output**: key=`word`, value=`IDF score` Every document is passed as a single line to mapreduce task. **Mapper** extracts a set of words from the `text` portion of the document and passes to the reducer `<word, 1>` key-value pair (where `word` is an element of the acquired set). #### Reducing Stage **Reducer** then simply adds ones in the obtained `<word, [1, 1, 1, ..., 1]>` pair. The results `<word, IDF>` are stored in the folder `output_idf` ### 2. Vectorizer and TF-IDF representation of the document Before the execution of mapreduce, output of the previous stage acquired from `output_idf` folder is added to distributed cache. This is done to speed up the reading proccess by copying files to local file systems of workers, so that they do not have to access hdfs everytime. #### Mapping Stage **Input**: line in JSON format representing a Wikipedia page **Output**: key=`doc id`, value=`mapping from word hash codes to their tf-idf scores` **Mapper** reads the document and counts how many times each word occurred. This information is stored in hashmap. Then, `<word, hashmap>` pair is passed to reducer. #### Redcuing Stage In **reducer**, cached files are read and stored in hashmap. It now has all the data needed to calculate tf-idf of each word: - `tf` is accuired from the mapper - `idf` is acquired from the cached files Now, it is also possible to calculate the length of the documents by summing `ts`'s of all words. For vectorization, we use hash codes of words. Reducer writes doc id, doc length and its sparse vector representation. ```<doc id> length: <num of words in doc> {<word hash>=<word tf-idf>, ..., word hash>=<word tf-idf>}``` Example" ``` 1000 length: 6725 {106669658=0.030303031, 3194931=0.002244669, -1224599801=0.02, -1883283526=7.122507E-4, ...} ``` ### !!! VERY IMPORTANT !!! DO NOT delete the `output_idf` folder, it is needed for the next step. ## Query Resolution The resolver consists of two main parts: **Query to TF-IDF Converter** and **MapReduce Relevance Score Counter**. **Input**: query, number of results to return, path to originial files with wiki docs, path to Indexer step output, path to output folder **Output**: top N relevant Wikipedia pages titles and links with their relevance score ### 0. Preparaion In order to create the query TF-IDF representation we need to use IDFs obtained during the indexer step. Luckily, we have not deleted our `output_idf` folder that contains the IDF for each word occured in the documents. For faster further processing, before starting the actual task, we read the file and store `<word, word_idf>` pairs presented there in a `HashMap`. ### 1. Query to TF-IDF This component is responsible for creating the TF-IDF vector representation of the query. For that it needs the query itself and the outputs of the **Indexer** step. The algorithm of query vectorization: 1. Tokenize the query and remove all non-letter characters from each token in order to obtain clean words 2. Count and store in `HashMap` TF for each word in a query, that is, `<word, number of occurences>` pairs. 3. For each word occured, divide its TF by IDF score stored in a `HashMap` from the *preparation* step. 4. For each word occured, if it existed in Wiki documents, obtain its `idf` and save it to `configuration` for later use in MapReduce tasks. The format saved is: `<!!query!!<word hash code>, tf-idf value>`. If it did not occur in Wiki documents, we ignore it as it is not needed for further steps. ### 2. Relevance score counter This component is a MapReduce task that count relevance score for each document. #### Mapping stage Iterate through all output records from the **Indexer** stage - that is where the vectorized representations for each doc are stored. **Input**: line from the Indexer output **Output**: key=`relevance score`, value=`doc id` Each line of the output is of the following form: `<doc id> length: <num of words in doc> {<word hash>=<word tf-idf>, ..., word hash>=<word tf-idf>}` For each doc we obtain its TF-IDF representation and calculate the relevance score with respect to the query vectorized representation. #### Reducing stage Receive output from the previous stage (sorted by the relevance score in descending order using custom comparator) and write to output directory **Input**: key=`relevance score`, value=`doc id` **Output**: key=`relevance score`, value=`doc id` ### 3. Output the results As the output from the MapReduce stage is already sorted by relevance score in descending order, the only thing left is to obtain first N documents ids from the file. Then, for each document id, obtain the link and the title from the original Wikipedia dump files and output the results into the standard output. # Version with Okapi ## Formulas $$IDF(t) = ln \frac{N - n(t) + 0.5}{n(t) + 0.5} + 1$$ $$r(q, d) = \sum_{i: i\in d, i\in q}IDF(i) \frac{tf(i, d) * (k_1 + 1)}{tf(i, d) + k_1 * (1 - b + b * \frac{|D|}{avg\_len})}$$ where * $N$ is the total number of documents * $n(t)$ is the number of documents containing the term $t$. * $i$ is the term occured both in query and document * $tf(i, d)$ is the number of times term $i$ occured in document $d$ * $k_1=2$ * $b=0.75$ ## Indexing Indexer is pretty much the same as one that we used in `No Okapi` version with minor differences: 1. intermediate folder `output_idf` was renamed to `output_docsww`. DocsWW stands for Documents With Word and represents the `n(t)` from formulas section. 2. output of Indexer now stores the vectorized form of documents with their length and term frequencies: ``` # In no Okapi version <doc id> length: <num of words in doc> {<word hash>=<word tf-idf>, ..., word hash>=<word tf-idf>} ``` Changed to ``` # In version with Okapi <doc id> length: <num of words in doc> {<word hash>=<word tf>, ..., word hash>=<word tf>} ``` Execution steps are almost identical except the fact that, before the 2nd mapreduce job, we do not need to cache files anymore as reducer needs only term frequencies which are passed to it from mapper. After the last mapreduce job, average length and number of documents are calculated and stored in the file `avg_len` inside the output directory. They will be used in Query resolution for obtaining relevance score. ### !!! VERY IMPORTANT !!! DO NOT delete the `output_docsww` folder, it is needed for the next step. ## Query Resolution The resolver consists of two main parts: **Query to TF-IDF Converter** and **MapReduce Relevance Score Counter**. **Input**: query, number of results to return, path to originial files with wiki docs, path to Indexer step output, path to output folder **Output**: top N relevant Wikipedia pages titles and links with their relevance score ### 0. Preparaion This step is the same as in previous version of search engine except, now, we read from folder `output_docsww`. ### 1. Query to TF-IDF The algorithm of query vectorization changed slightly: 1. Tokenize the query and remove all non-letter characters from each token in order to obtain clean words 2. Obtain set of unique words in query. 3. For each word occured, if it existed in Wiki documents, obtain its `docsww`. 4. Calculate idfs of words and save them to `configuration` for later use in MapReduce tasks. The format saved is: `<!!query_idf!!<word hash code>, idf value>`. If it did not occur in Wiki documents, we ignore it as it is not needed for further steps. ### 2. Relevance score counter This component is a MapReduce task that count relevance score for each document. #### Mapping stage Iterate through all output records from the **Indexer** stage - that is where the vectorized representations for each doc are stored. **Input**: line from the Indexer output **Output**: key=`relevance score`, value=`doc id` Now the output from the previous stage contains only `tf` score for each word. We obtain these scores and calculate the Okapi BM25 score for the given query. #### Reducing stage Receive output from the previous stage (sorted by the relevance score in descending order using custom comparator) and write to output directory **Input**: key=`relevance score`, value=`doc id` **Output**: key=`relevance score`, value=`doc id` ### 3. Output the results This stage has encountered no changes, it simply takes N first lines and outputs them. ## Design choices There are several crucial choices made that greatly affect the performance of our search engine and below we explain why we did them. For the `tf-idf` encodings, instead of assigning unique ID to each word, we decided to use hashing. Firstly, that speeds up both indexing and query resolution stages, as we do not need to look up for the index of each word. Secondly, that implies the sparse representation of document vectors, which is memory-saving. When vectorizing documents and queries we tokenize them and then for each token remove all non-letter symbols. That allows us to handle words on any language and not to take into account punctuation marks. For example, this allows us to treat tokens `penguin` and `penguin,` as referring to one word and not to take into account special symbols as `\t` or `\n`. In the second version of the search engine implementation, when calculating the Okapi BM25 relevance score, we speed up the program by obtaining the IDF scores needed in the preparation step. That is done as we need these scores only for the words occured in the query. That minimizes the disk read operations needed. ## How to execute ``` hadoop jar Indexer.java Indexer /EnWikiSmall IndexerOutput hadoop jar Query.jar Query "<query text>" <number of relevant results> /EnWikiSmall IndexerOutput QueryOutput ``` ## Conclusion Hadoop MapReduce provides a convenient and intuitive way of processing large ammounts of data. However, it may be not the best tool for implementing the search engine. The given way of implementing the search engine involves multiple chained MapReduce jobs. The MapReduce paradigm requires that each job writes its output to HDFS directly, which significantly increases the execution time due to multiple disk reads. Nonetheless, we have a RAM constraint that demands disk writes. We did not manage to find out significant differences between the accuracy of relevant results suggestions between simple *relevancy score* and *Okapi BM 25*. However, the second implementation using MapReduce introduces a considerable improvement in terms of execution time. The reason for that is the decrease in needed disk reads. Our implementation is 🙂 But it could be 😎 #### Possible improvements: * Lemmatizing tokens. As our search engine is simple and performs well without it, we did not make the implementation too complicated. However, introducing lemmatization might improve the suggestions accuracy. * Come up with some other idea for storing the IDF scores when calculating Okapi BM25 score, as the current one might bring up memory problems if the query is significantly long. ### Team members contribution The components design and the way of creating the TF-IDF representation of the documents was done collaboratively, as well as main work on the code. However, here are the parts for which each team member had the most contribution. Kamil Kamaliev: * Implementation of IDF and Vectorizer MapReduce tasks * Implementation of query resolver output * Work on running time minimization while keeping used memory low Amina Miftahova: * Implementation of Query resolver * Combining IDF and Vectorizer into one Indexer class * Argument correctness checking ## References 1. [Distributed cache usage](https://www.geeksforgeeks.org/distributed-cache-in-hadoop-mapreduce/) 2. [Apache MapReduce guide](https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html) 3. [Finding top-K using MapReduce](https://acadgild.com/blog/mapreduce-design-pattern-finding-top-k-records) 4. [Job configuration](https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/conf/Configuration.html) 5. [Provide directory as input to MapReduce](https://stackoverflow.com/questions/20094366/hadoop-provide-directory-as-input-to-mapreduce-job) ## Link to Git repo https://github.com/noteisenheim/bd_assignment1