# Real-Time Evacuation Updates ## Usage You can follow the guide below step by step (code by code) to run our application. The meaning of each command is explained when necessary. Note that we assume you are using Ubuntu 22.04, as that is recommended in the manual to be the best linux distro for this assignment. ### 1. Clone the repository To run our codes locally, please use the following command to clone the repository and access the folder. ``` git clone git@github.com:abs-tudelft-sbd/lab-3-2023-group-02.git cd lab-3-2023-group-02 ``` Different solutions for different levels of requirements are pushed to disparate branches. - Branch 'main' has the solution for the excellent requirement: ``` git checkout main ``` - Branch 'good-t' has the solution for the good requirement: ``` git checkout good-t ``` - Branch 'ade-t' has the solution for the adequate requirement: ``` git checkout ade-t ``` ### 2. Ways to execute the application After building and starting all the **needed containers**, start an interactive sbt shell. ``` docker compose exec transformer sbt ``` To run the application in the sbt shell, please **reload** and **compile** the code first. For solutions for adequate and good requirements, you just simply need to use the command ``` run ``` For the solution for excellent requirements, the following command should be executed. ``` run <time_window> ### Command Parameter - <time_window> (Optional): Integer. The duration of time windows in seconds. Default: 2 (2 Seconds) ``` ## Functional overview ### 1. General view of solutions Our solutions for all requirements follow the same execution logic: **consume data from topic 'events'** -> **process data** -> **output data to topic 'updates'** When it is reflected in the structure of code, that is: **create class (data structure) for output data** -> **set input source** -> **create state store and add to topology** -> **set context and data process logic** -> **build the stream and start** To keep track of the number of evacuees, we employed the *Transformer* API to manage the **state store** during the "process data" stage. It's worth noting that we initially experimented with the *Processor* API. While the *Processor* API is convenient for straightforward, record-level transformations, it reveals limitations in scenarios where extensive state information needs to be maintained. This is especially evident when enriching or aggregating data and when context and state information must be retained across multiple records. Additionally, it's challenging to find a way to output data to a topic using the *Processor* API. These disparities between the *Processor* and *Transformer* APIs led us to opt for the latter. The library *circe* was applied to obey the type-safe manner of processing data. The predefined case classes (data structure) are actually used to validate whether the transformer is processing null data or not after decoding the input values with the function *circe::decode*. All decoded data will be logically operated and updated to the state store. ### 2. Difference between solutions When considering the implementation details of various solutions, their distinctions primarily manifest in three aspects: the output data structure, the state store field type, and the transformer's data processing logic. Next, we will provide a general overview. #### 2.1 Adequate For adequate requirements, only a total value is needed to output. So a case class called **RefugeeCount** is created, having a structure of ``` refugees: Long ``` Because only the refugee numbers need to be accumulated as the input data coming, therefore the state store has the following field types: ``` key: String, ("total") value: Long (used to store total number of refugees) ``` The data processing logic is only updating a total number according to the input and data in the state store, and storing the updated number with the same key 'total' in the state store. #### 2.2 Good For good requirements, both city names and refugee numbers for each city are needed for output. So a case class called **RefugeeUpdate** is defined as a structure of ``` city_id: Long, city_name: String, refugees: Long ``` Because more data needs to be relayed rather than a single number, several records will exist simultaneously. We changed the type of key to a Long type to store city IDs (it is easier to search the corresponding city). Also, the value type is changed to String to store city names and refugee numbers. The state store has the following field types: ``` key: Long, (store city ids) value: String (store strings parsing from info of city name and total number of refugees) ``` Regarding data processing logic, while the input data is still being accumulated, we will update the corresponding record in the state store based on the key content of the input data rather than merely piling up data together. #### 2.3 Excellent For excellent requirements, a time window limit is added. To ensure data within the time limit can be filtered, the case class **RefugeeUpdateWithTimestamp** is defined to store data temporarily for computation in the middle. It has a structure of ``` timestamp: Long, city_id: Long, city_name: String, refugees: Long, change: Long (change of refugees number in a time window) ``` For output data, we have case class **RefugeeUpdate** ``` city_id: Long, city_name: String, refugees: Long, change: Long (change of refugees number in a time window) ``` The state store keeps the same field types as the solution for good requirements. ``` key: Long, (store city ids) value: String (store strings parsing from info of city id, city name, refugees and changed number of refugees) ``` In the context of data processing logic, we define a ListBuffer called **holder** to store all data from the input stream within the designated time window. Whenever new data arrives, we check the timestamp and filter the "holder." Records that have become outdated are removed from the "holder." Subsequently, for records sharing the same city ID, we identify the maximum and minimum refugee numbers to calculate changes within the time window. The relevant keys and values are then updated in the state store. For thoes records which are out of scope, correspodnig data contains whoes city names appeared in the last window will still be output but with changed numbers of zero. ## Kafka topology and setup ### 1. Topology The topologies of the three solutions are highly similar, as depicted in the following diagrams. They all feature straightforward structures. KStream sources retrieve data from the topic. Subsequently, the data undergoes transformations, filters, and other logical operations within the transformer. In the end, data is passed to the kstream sink and output to the update topic. On the right side of the diagram, the state store is separated from the primary data processing flow, serving solely as an external structure to house data processed by the transformer that requires storage. The primary difference in topology among the three solutions is primarily evident in the naming and contents of the state stores. The topology of the solution for adequate requirements: ![](https://hackmd.io/_uploads/S1gLYhPGa.png) The topology of solutions for good and excellent requirements: ![](https://hackmd.io/_uploads/HyPHK3vMT.png) ### 2. Stateless and stateful operations As previously mentioned, in each solution, the input data requires decoding according to a custom case class. Additionally, in the superior solution, the content within the **holder** needs to be filtered based on the time window. These are considered typical stateless operations. They lack strong directionality and pertinence, involve no modifications to existing data, and focus primarily on streamlining the input data. However, in every solution, updating the data in the state store based on input data represents a stateful operation. In the case of the excellent solution, removing a record from the **holder** is also a stateful operation. These operations, as they are based on pre-existing data, emphasize the superposition and replacement of data. ## Result ### 1. Adequate The table and images below illustrate the progression of increasing refugee numbers with new inputs. In the table, the first column displays the count of new refugees, the second column presents the total number of refugees, and the third column indicates the change in the number of refugees compared to the previous total in a single update. | new refugees | total num | variation| | -------- | -------- | -------- | | ignore | 191282136 (start) | ignore | | 116828 | 191398964 | +116828 | | 12490 | 191411454 | +12490 | | 31 | 191411485 | +31 | | 6 | 191411491 | +6 | | 5 | 191411496 | +5 | ![](https://hackmd.io/_uploads/rJLSupPzp.png) ### 2. Good The table here provides the same data as the comprehensive results table, but it breaks down each city (Medllin, Beijing, Johannsburg) that appears multiple times in the figure below into individual tables, allowing for a detailed view of changes in the total number of refugees for each city. Medellin: |new refugees| total num | variation | | -------- | -------- | -------- | | ignore | 936329 | ignore | | 29136 | 965465 | +29136 | | 25648 | 991113 | +25648 | Beijing: |new refugees| total num | variation | | -------- | -------- | -------- | | ignore | 18951739 | ignore | | 7886 | 18959624 | +7886 | Johannsburg: |new refugees| total num | variation | | -------- | -------- | -------- | | ignore | 3781276 | ignore | | 185784 | 3967060 | +185784 | ![](https://hackmd.io/_uploads/HJLZqawMa.png) ### 3. Excellent The following figure shows the result of the solution for excellent requirements within a duration of more than 2 seconds (the time window is set as 2 seconds). To help you easily understand the result. Let's only focus on the record of the city Lagos. From the first input onward, until the fourth record emerges in the graph, the incremental 'change' consistently equals the number of refugees in the subsequent input data. After the first three inputs from the 'events' topic with an update on Lagos, the rest of following inputs can be observed in the figure are all about other cities. So, the 'change' in Lagos should remains the same. However, when the last input arrives, given that the duration has now exceeded 2 seconds, the first input is discarded, resulting in a significant reduction in the 'change' value. At this stage, the 'variation' value shown in the following form becomes equivalent to the number of refugees in the first input. Lagos: | timestamp(last 6 digits in ms) | new refugees | change |variation| | -------- | -------- | -------- |-------- | |898190(start state)| 32 | 32 |+32| |898683| 37 | 69 |+37| |898931| 3 | 72 |+3| |899090| none | 72 |0| |899341| none | 72 |0| |899534| none | 72 |0| |899937| none | 72 |0| |900218 (more than 2s)| none | 40 |-32| ![](https://hackmd.io/_uploads/HyUMgSOf6.png)