# DSPBench: A suite of Benchmark Applications for Distributed Data Stream Processing System
###### tags: `Benchmarking`
###### paper origin: IEEE Access, 2020
###### papers: [link](https://ieeexplore.ieee.org/document/9290133)
###### github: [link](https://github.com/GMAP/DSPBench)
## 1. INTRODUCTION
### Research Problems
* The existing applications commonly used in the research literature mainly focus on the domain of relational algebra queries and streaming analytics. However, they do not exhibit a high variety of features in terms of their workload characterization and complexity of the data-flow graphs.
### Proposed Solution
* Provide a new benchmark suite (called DSPBench) of 15 applications coming from different areas and all needing the processing features offered by modern DSPSs.
## 2. RELATED WORK
### Existing Benchmarks
* Linear Road Benchmark
* Simulates a tolling system in a fictitious city with the purpose of calculating the toll values based on trafic jams and accident proximity.
* Consists of five queries where only one is a continuous query, while the others are historical queries.
* Yahoo Streaming Benchmark
* Relational algebra query simulating an advertisement campaign.
* BigDataBench
* Include applications beyond the set of relational algebra queries (search engines, e-commerce, and social networks).
* Most of the applications are not for live streaming tasks, but they model offline workloads that belong to the batch processing domain.
* StreamBench
* Workload characterization in three dimensions: data type, complexity, and use of historical data.
* Have a very simple form and perform simple computations which is the main downside of this suite.
* RIoTBench
* Defines basic Internet of Things (IoT) tasks that can be combined to create micro-benchmarks.
* API has been designed to be implemented in any DSPS.
* HiBench
* Extended to cover DSPSs with four micro-benchmarks (wordcount, fixed window, identity, and re-partition).
* Streaming suite is quite small and does not contain complex applications involving the interconnection of several streaming tasks.
### Comparison of Benchmark suites

## 3. DSPBENCH ARCHITECTURE

* Adapters
* Translation of the application-agnostic code to the DSPS-specific code
* Calling the API of the specific DSPS and adjusting the application logic.
* Probers
* Insert timestamps in the operators and tuples for monitoring purposes.
* Collecting throughput, latency, and resource usage, verify the correctness.
* Input
* Feeding the applications with data streams
* Output
* Storing the results so that they can be verified.
## 4. DSPBENCH APPLICATION
### Application Domains

### The Benchmark suite
* Word Count (WC)

* Splitting the sentences of a long text in words, and counting the number of occurrences of each word in the whole corpus.
* Operators
* Splitter: Splitting words, stateless.
* WordCounter: Counting the occurrences of each word.
* Machine Outlier (MO)

* Receives resource usage readings from computers in a network, calculates the Euclidean distance of reading from the cluster center of a set of readings in a given time period, and applies the BFPRT algorithm to detect abnormal readings.
* Log Processing (LP)

* Receives a stream of logs coming from HTTP web servers, need to be parsed in order to extract the relevant data fields.
* Operators
* VolumeCounter: Counts the number of visits per minute.
* StatusCounter: Stores the number of occurrences of each status code.
* GeoFinder: Finds the location of the user using its IP address and an IP location database.
* GeoStats: Updates the counter per country.
* Sentiment Analysis (SA)

* Uses NLP technique to calculate the sentiment of sentences, consisting of counting positive and negative words and using the difference to indicate the polarity of the sentence.
* Receive a stream of tweets, where each tweet has to be preliminary parsed.
* Operators
* Classifier: Removing words that usually do not carry sentiment. Then, it counts the positive and negative words in the message.
* Traffic Monitoring ( TM)

* Receives events from vehicles containing their identifier, location, direction, current speed, and timestamp.
* Operators
* MapMatcher: Identifies the road that vehicles are riding.
* SpeedCalculator: Calculates the average speed of the vehicles for each road.
* Spam Filter (SF)

* Uses Naive Bayes to analyze if email messages are spam.
* Supports both real-time and offline training.
* Processes two logically separated streams (for online training and for the analysis).
* For offline training, the probabilities of words stored in a static read-only table are pre-loaded by the Word-Probability operator.
* Trending Topics (TT)

* Extract topics from a stream of tweets, count the occurrences for each topic in a window of events, and emits only the popular topic.
* Operators
* RollingCounter: A sliding-window operator, which advances by a fixed interval of time. Tracks the occurrences of topics.
* IntermediateRanking: Rank a subset of topics, and these intermediate scores are sent to the TotalRanker operator in a fixed interval of time.
* TotalRanker: Merges the intermediate scores and emits the final ranking scores of topics.
* Click Analytics (CA)

* Receives events from users accessing a website. The inputs are logs from the web server.
* Operators
* RepeatVisit: Verify if the user has already visited the URL or not.
* VisitStats: Counts the total number of visits and unique visits.
* GeoFinder: Queries the database with the user IP address and receives as a result the location of the user.
* GeoStats: Updates counter of visits and counters per city within the country.
* Fraud Detection (FD)

* Uses a Markov model, to calculate the probability of a credit card transaction is a fraud.
* Operators
* Source: Cleaning the raw input stream of credit card transactions.
* Predictor: Uses the model to emit transactions that are considered a fraud with a minimum threshold probability.
* Spike Detection (SD)

* Receives a stream of readings from sensors in order to monitor spikes.
* Operators
* MovingAverage: Maintains a moving window for each identifier. When a new event is received, the operator adds the new value to the corresponding window.
* SpikeDetector: Checks whether the current event is a spike or not based on a threshold value specified at initialization.
* Bargain Index (BI)

* Analyzes stocks available for selling in quantity and price below the mean observed in the last time window, computes a bargain index.
* A trade is a completed transaction while a quote is a selling/buying proposal.
* Operators
* VWAP: Computes the volume-weighted average price of trades for the same stock symbol over the last 15 trades received.
* BargainIndexer: Calculates the bargain index which requires the comparison between the price and volume of the quote.
* Reinforcement Learner (RL)

* The ReinforcementLearner operator decides which page to display from a pre-configured list and sends a new event with the session and page identifiers.
* The Click Through Rates for each one of the pre-configured pages are also fed into the ReinforcementLearner to improve its decision making.
* Smart Grid Monitoring (SM)

* Monitors the energy consumption of a smart electricity grid.
* Produces two results: outliers per house and house/plug load predictions.
* Telecom Spam Detection

* Detects telemarketing users by analyzing Call Detail Records.
* Operators
* Filters (CT24, ECR, CR24, ENCR, RCR): Implement on-demand timedecaying bloom filters to keep track of the number of unique incoming participants.
* Scorers: Receive rates from filters and emit a properly weighted value once they have received at least one rate from each input.
* ADS Analytics (AD)

* Calculate the Click Through Rate of advertisements.
* Operators
* RollingCTR: The total sum of impressions and clicks per query/ads pair is stored in memory and updated based on a time window. The CTR is emitted at fixed intervals of time for each query/ads pair.
## 5. WORKLOAD CHARACTERIZATION

* Parameters affecting the Operation Cost:
* Input size
* Selectivity
* The average time to execute the user-function on each input
* The time to send outputs to destination operators
* Operator Selectivity
* The ratio between the total number of tuples received and emitted by the same operator.
* For most applications, selectivity is small (filtering, map and projection operators).

* Processing time
* The time spent by an operator to process a single input tuple.
* Reasons for causing high processing time:
* Operator with higher selectivity (Tokenizer in SF).
* Broadcasting each input tuple to distinct operators (ValidationDetector in VS).
* Calls of the other tool functions (MapMatcher in TM calling GeoTools functions).
* 
* Tuple size
* The size of the tuples handled by the operators of the applications.
* The tuple size and the selectivity are often not correlated.
* Some applications' tuple size increases along the path. This happens because the original tuple is carried along with the steps in the graph, while more information fields are aggregated with the tuple at each step.

* Memory usage
* Distribution of Memory usage measured at fixed time intervals during the execution.
* Bigger tuple sizes do not necessarily mean high and variable memory usage.

* Operation type
* Classify based on their selectivity (filter, map, and flat-map).
* Communication pattern
* The form of the data-flow graph
* Linear chains (pipelines) of operators
* Complex acyclic graphs, where an operator can apply several distribution strategies to different subscribers.
* Distribution strategies
* Shuffle: Outputs of an operator are delivered to the next operator, and internally to one of its replicas, randomly.
* Group-by field: All the inputs with the same key attribute(s) are delivered to the same replica of the next operator.
* Broadcast: Each input is delivered to all the replicas of the next operator.
## 6. EXPERIMENTS
### Metrics
* Throughput
* Instant throughput is measured as the ratio between the number of outputs produced and the current running time.
* Latency
* Instant latency is measured as the difference between the current time at the sink and the creation timestamp recorded when generated by the source outside the DSPS.
* Resource consumption
* Collect consumption metrics related to CPU, memory and network utilization, in order to study the behavior of the DSPS by changing the application configuration.
### Latency analysis

### Throughput analysis

### Resource consumption analysis

