# Lab 3 Report
<!-- > This is the template for your lab report. Some guidelines and rules apply.
> You can only change the *contents* of the sections, but not the section
> headers above the second level of hierarchy! This means you can only add
> headers starting with `###`.
> You must remove the text fields prefixed with `>`, such as this one, resulting
> in a README.md without this template text in between.
> Any report violating these rules will be immediately rejected for corrections
> (i.e. as if there is no report). -->
## Usage
<!-- > Describe how to use your program. You can assume the TAs who will be grading
> this know how to do everything that is in the lab manual. You do not have to
> repeat how to use e.g. Docker etc.
> Please do explain briefly how to provide the correct inputs to your program. -->
1. Clone our repository `abs-tudelft-sbd20/lab-3-group-28`
2. cd into the repository `.../lab-3-group-28/`
3. Start docker using `systemctl start docker`
4. Start the docker images using `sudo docker-compose up`
5. Run Lab 3 sbt shell script using the command `./transformer/run-sbt.sh`
6. Enter the commands `compile` and `run`
7. Enter the window size in seconds.
8. Start the [**visualizer**](http://localhost:1234/).
9. You can additionally change the styles of beers to filter in the file `.../lab-3-group-28/transformer/beer.styles`
## Functional overview
<!-- > Describe how you have approached the problem, and the global steps of your
> solution. -->
The entire processing pipeline can be summarized in the following way:
#### What we maintain
We maintain a `KeyValueStore<Long, ArrayList<Long>>` to save the state of the application with `city_id` as the key and an ArrayList of timestamps as the value. Whenever a record of interest is intercepted we append it to the city's ArrayList. When the Window passes by, we remove the obsolete timestamp from the city's ArrayList.
#### ArrayListSerde
A serde by default only supports primitive data types, e.g. Int, Long, String, etc. We wanted to use an ArrayList and thus found [this](https://stackoverflow.com/a/57308543) Stackoverflow post with a buggy code. *The author was not serializing null data or empty arrays properly*. We corrected the code while maintaining the author's intented logic. We thus made a custom Serde for which we wrote a custom Serializer and Deserializer.
#### Flow
1. We declare a new Serde type `arrListSerde` using our custom Serde Java class `ArrayListSerde` through:
```{scala}
implicit def arrListSerde: ArrayListSerde[java.lang.Long] = new ArrayListSerde[java.lang.Long](Serdes.JavaLong)
```
2. We take user input for window size in seconds.
3. We read `beer.styles` file for the input and save in an array `beer_styles`;
4. We declare a new persistent `keyValueStore` with the types `<Long, arrListSerde>` Serde:
```{scala}
val keyValueStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Counts"),
Long,
arrListSerde
)
```
5. We convert the JSON string to a Scala class with appropriate schema.
6. We check if the JSON String was decoded properly and the beer style lies in `beer_styles`. If *true* then we forward `(sale.city_id, sale.timestamp)`. If *false* we forward negative values that are to be ignored.
7. We filter out the invalid data.
8. We apply a Transformer. In the `transform()` function we append the timestamp to the city's array list and forward the `(city_id, (long) alist.size())`, where the latter is the updated list size.
9. We map to encode the data of `city_id, count` in a JSON string.
10. We publish this string to the `updates` topic.
11. In the punctuation (Transformer's scheduled repeating event), every 3ms we check if any value timestamp in a city's Array list has passed outside the window; i.e. `x < (timestamp - windowLength`, where all timings are in millisecond resolution. If we remove something we forward the updated information and update the `keyValueStore`. This info forwarded data goes to **Step 9**.
## Kafka topology and setup
<!-- > Present and discuss the streams topology of your application.
> Describe why specific processing elements and steps are stateful or stateless. -->
Our stream processing topology is as follows:

### KSTREAM-SOURCE
Source for the stream: we are getting events by subscribing to the `events` channel.
```{scala}
val events: KStream[String, String] = builder.stream[String, String]("events")
```
### MAP 1
We decide the JSON string that comes in as the value.
```{scala}
case class BeerSale(timestamp: Long, city_id: Long, city_name: String, style: String)
...
events.map((key: String, json: String) => {
val sale = decode[BeerSale](json)
(key, sale)
})
```
### MAP 2
We check if the JSON string was decided properly, if not we send `(-1, -1)` forth. If it was decoded properly and matches our beers of interest then we forward the `(sale.city_id, sale.timestamp)` else `(-1, -1)`.
```{scala}
.map((key: String, sale: Either[Error, BeerSale]) => {
sale match {
case Left(e) => {
// Improper JSON file :/
(-1L, -1L)
}
case Right(sale: BeerSale) =>
if (beer_styles contains sale.style) {
println(sale)
(sale.city_id, sale.timestamp)
}
else {
(-1L, -1L)
}
}
})
```
### FILTER 1
We remove the data we are not concerned with. Kafka's implicit optimization pushes this after the transformer, therefore we need to sandwich the transformer with two filters.
```{scala}
.filter((k, v) => k != -1L)
```
### TRANSFORM
We use a Transformer and not a Processor because we want a non-terminal processing step which can forward the updates [[1]](https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration).
*Transformer.scala*
```{scala}
.transform(new TransformerSupplier[Long, Long, KeyValue[Long, Long]] {
override def get(): Transformer[Long, Long, KeyValue[Long, Long]] = new BeerCountTransformer(WINDOW_LEN_IN_MILLIS).asInstanceOf[Transformer[Long, Long, KeyValue[Long, Long]]]
}, "Counts")
```
**The `transform()` Function:**
Here we get the ArrayList for the particular city, append the timestamp to it and forward the city_id and updated ArrayList size.
*BeerCountTransformer.java*
```{java}
@Override
public KeyValue<Long, Long> transform(Long city_id, Long timestamp) {
ArrayList<Long> alist = keyValueStore.get(city_id);
if (alist == null) {
alist = new ArrayList<Long>();
alist.add(timestamp);
keyValueStore.put(city_id, alist);
} else {
alist.add(timestamp);
keyValueStore.put(city_id, alist);
}
return new KeyValue<Long, Long>(city_id, (long) alist.size());
}
```
**The Punctuation (runs every 3ms):**
Here we check the entire `keyValueStore` for timestamps that have passed outside the window. If we removed timestamps, then we forward the updated list size and update the `keyValueStore`. If the list size becomes zero we remove the entry for the `city_id` form the `keyValueStore`.
*BeerCountTransformer.java*
```{java}
this.context.schedule(Duration.ofMillis(3), PunctuationType.STREAM_TIME, (timestamp) -> {
ArrayList<Long> city_id_to_remove = new ArrayList<>();
KeyValueIterator<Long, ArrayList<Long>> iter = this.keyValueStore.all();
while (iter.hasNext()) {
KeyValue<Long, ArrayList<Long>> entry = iter.next();
ArrayList<Long> alist = entry.value;
// forward only if there is a change (something was removed)
boolean removed = alist.removeIf(x -> x < (timestamp - windowLength));
if (removed){
int size = alist.size();
context.forward(entry.key, (long) size);
if(size == 0)
city_id_to_remove.add(entry.key);
// make sure the new data is put in
keyValueStore.put(entry.key, alist);
}
}
iter.close();
// Start removing records (after already sending that we have 0) :)
for (Long city_id : city_id_to_remove)
keyValueStore.delete(city_id);
// commit the current processing progress
context.commit();
});
```
### FILTER 2
We remove the data we are not concerned with (*Again*)
```{scala}
.filter((k, v) => k != -1L)
```
### MAP 3
We put values in our desired schema and encode it in a JSON string.
```{scala}
case class OutputSchema(city_id: Long, count: Long)
...
.map((k, v) => {
(k.toString, OutputSchema(k, v).asJson.noSpaces.toString)
})
```
### SINK
We send the key (city_id) and JSON String to the topic updates.
```{scala}
.to("updates")
```
### What is stateful and stateless?
#### Stateless
All the steps leading up to the transformer are stateless: decoding of event JSON string, filtering by beer style and passing forth the city_id and timestamp. Also the steps after the Transformer are stateless: filtering, encoding of the output to a JSON string and passing it to the Sink.
#### Stateful
The `transform()` function in the Transformer is stateful because we need to see how many timestamps for the city the window has to update the count. In the punctuation we also need to check which timestamps in the state are obsolete for which we have to access the state.
## Result
<!-- > Present the result of your program. -->
### Visualization

The visualization of the result can be seen in the above GIF. The GIF is that of a screencapture of 55s for filtering 10 *styles* (listed below) of beer with a time window of 60s. The GIF is played at 8x speed and thus runs for 7s.
```
Gueuze
Gose
Hefeweizen
Helles
India pale ale
Kölsch
Lambic
Light ale
Maibock/Helles bock
Malt liquor
```
We see the updates appear and points grow as time goes on. We also see points decrease count (also changing colour) and even disappear when the window passes over. We can see that this is all real-time.
### Updates in Terminal
Below is an example of the updates we produce along with the events that are coming in: note that it is a random snapshot in time. The below output is for filtering only `Gueuze` and `Gose` beer styles in a **30s** window. The output is reduced and where data is removed we have put `## Data Omitted ##` as a marker for transparency.
#### Notes for output
* The application does read the streams that happen before the transformer app has started, so there will be a lot of updates for setting the value 0 for which you will not see counts of 1 before.
* We can verify that the intended functionality is indeed achieved. In the first 'row', a relevant event occured and the city_id 1205733 is updated. After 30 seconds, on 'row' 14, the city_id 1205733 moved out of the window, indicated by count equals to 0.
```
events_1 | 1205733 {"timestamp":1603789844924,"city_id":1205733,"city_name":"Chattogram","style":"Gose"}
updates_1 | 1254361 {"city_id":1254361,"count":1} // update from event that was seen before the transformer application started.
updates_1 | 1205733 {"city_id":1205733,"count":1} // new update
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 524901 {"timestamp":1603789850291,"city_id":524901,"city_name":"Moscow","style":"Gueuze"}
updates_1 | 524901 {"city_id":524901,"count":1}
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 2293538 {"timestamp":1603789854367,"city_id":2293538,"city_name":"Abidjan","style":"Gose"}
updates_1 | 2293538 {"city_id":2293538,"count":1}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 1529660 {"city_id":1529660,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 3456160 {"city_id":3456160,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 3566356 {"city_id":3566356,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 8348417 {"city_id":8348417,"count":0}
events_1 | 3453643 {"timestamp":1603789860995,"city_id":3453643,"city_name":"Piracicaba","style":"Bock"}
events_1 | 745044 {"timestamp":1603789861390,"city_id":745044,"city_name":"Istanbul","style":"Gose"}
updates_1 | 745044 {"city_id":745044,"count":1}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 3394682 {"city_id":3394682,"count":0}
events_1 | 1260716 {"timestamp":1603789862475,"city_id":1260716,"city_name":"Pāli","style":"Schwarzbier"}
updates_1 | 3687238 {"city_id":3687238,"count":0}
events_1 | 1566083 {"timestamp":1603789862752,"city_id":1566083,"city_name":"Ho Chi Minh City","style":"Amber ale"}
updates_1 | 1796236 {"city_id":1796236,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 2335204 {"timestamp":1603789863871,"city_id":2335204,"city_name":"Kano","style":"Amber ale"}
updates_1 | 2655168 {"city_id":2655168,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 3530597 {"timestamp":1603789866019,"city_id":3530597,"city_name":"Mexico City","style":"Gose"}
updates_1 | 3530597 {"city_id":3530597,"count":1}
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 3646738 {"timestamp":1603789866673,"city_id":3646738,"city_name":"Caracas","style":"Gose"}
updates_1 | 3646738 {"city_id":3646738,"count":1}
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 1529102 {"timestamp":1603789868846,"city_id":1529102,"city_name":"Ürümqi","style":"Gueuze"}
updates_1 | 1529102 {"city_id":1529102,"count":1}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 1254361 {"city_id":1254361,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 1205733 {"city_id":1205733,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 524901 {"city_id":524901,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 2293538 {"city_id":2293538,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 1283240 {"timestamp":1603789887739,"city_id":1283240,"city_name":"Kathmandu","style":"Gose"}
updates_1 | 1283240 {"city_id":1283240,"count":1}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 745044 {"city_id":745044,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 3530597 {"city_id":3530597,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 3646738 {"city_id":3646738,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
updates_1 | 1529102 {"city_id":1529102,"count":0}
-------------------------------------------------------------------------------------- ## Data Omitted ##
events_1 | 1172451 {"timestamp":1603789905306,"city_id":1172451,"city_name":"Lahore","style":"Gose"}
updates_1 | 1172451 {"city_id":1172451,"count":1}
```