# Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo
###### tags: `GPUs`
###### origin: NSDI '21
###### paper: [link](https://arxiv.org/abs/2010.03035)
##
### Research problem
Resource provisioning in multi-tenant stream processing systems faces the dual challenges of keeping resource utilization high (without over-provisioning), and ensuring performance isolation.

### Method
* Fine-grained stream processing
## Introduction
* Current System
* parallelize operators across machines
* use multi-tenancy, wherein operators are collocated on shared resources.
* challenges
* High workload variability
* Latency targets vary across applications.
### Workload Characteristics
* Long-tail streams drive resource over-provisioning
* Users rarely have any means of accurately gauging how many nodes are required, and end up over-provisioning for their job.
* Temporal variation makes resource prediction difficult.
* A single stream can have spikes lasting one to a few seconds, as well as periods of idleness
* Users already try to do fine-grained scheduling.
* users prefer to provision a cluster using external resource managers (e.g., YARN, Mesos), and then run periodic micro-batch jobs.
* Latency requirements vary across jobs
* The job completion time for the micro-aggregation jobs ranges from less than 10 seconds up to 1000 seconds
### Prior Approaches
#### Dynamic resource provisioning for stream processing.

* Diagnosis And Policies: Mechanisms for when and how resource re-allocation is performed;
* Elasticity Mechanisms: Mechanisms for efficient query reconfiguration;
* Resource Sharing: Mechanisms for dynamic performance isolation among streaming queries.
These techniques make changes to the dataflows in reaction to a performance metric (e.g., latency) deteriorating.
#### The promise of event-driven systems
Operators here are not mapped to specific CPUs. This provides an opportunity to develop systems that can manage a unified queue of messages across query boundaries, and combat the over-provisioning of slot-based approaches
## Design Overview
### Assumptions, System Model
* Deal with streaming queries on clusters shared by **cooperative users**.
* user specifies a latency target at query submission time. e.g. SLAs
### Architecture
* A scheduling strategy which determines message priority by interpreting the semantics of query and data streams given a latency target.
* A scheduling framework
1. enables message priority to be generated using a pluggable strategy
2. schedules operators dynamically based on their current pending messsages’ priorities.
### Context converters
* A context converter modifies and propagates scheduling contexts attached to a message
* per-job static information: latency, topology
* dynamic information such as the timestamps of tuples being processed(stream progress)
### Stateless scheduler
Determines target operator’s priority by interpreting scheduling context attached to the message
### Example

## Scheduling Policies in Cameo
We model our setting as a non-preemptive, non-uniform task time, multi-processor, real-time scheduling problem
* Least-Laxity-First
* Default
* Earliest-Deadline-First
* Shortest-Job-First
### Definitions and Underpinnings
* Event
* Input data arrives as events, associated with a logical time that indicates the stream progress of these events in the input stream.
* Dataflow job and operators.
* A dataflow job consists of a DAG of stages. Each stage operates a user-defined function. A stage can be parallelized and executed by a set of dataflow operators
* Regular operator
* Windowed operator
* Message timestamps
* We denote a message M as a tuple (oM,(pM,tM)), where: a) oM is the operator executing the message; b) pM and tM record the logical and physical time of the input stream that is associated with M, respectively. Intuitively, M is influenced by input stream with logical time ≤ pM. Physical time tM marks the system time when pM is observed at a source operator.
* Latency
* We define latency as the difference between the last arrival time of any event in E and the time when M is generated.
### Topology Awareness

#### Calculating Message Deadline

* Single-operator dataflow, Regular operator

* Multiple-operator dataflow, Regular operator

* Multiple-operator dataflow, Windowed operator.

### Mapping Stream Progress
## Scheduling Mechanisms in Cameo
1. How to make static and dynamic information from both upstream and downstream processing available during priority assignment?
* by propagating scheduling contexts with messages
3. How can we efficiently perform fine-grained priority assignment and scheduling that scales with message volume?
* Cameo uses a two-layer scheduler architecture
* Context converter, is embedded into each operator and handles scheduling contexts whenever the operator sends or receives a message
* avoiding the bottleneck of having a centralized scheduler thread calculate priority for
5. How can we admit pluggable scheduling policies without modifying the scheduler mechanism?
* Cameo allows the priority generation process to be implemented through the context handling API. A context converter invokes the API with each operator.
### Scheduling Contexts
Scheduling contexts are data structures attached to messages, capturing message priority, and information required to perform priority-based scheduling
* Priority Context
* A PC is attached to a message before the message is sent
* Reply Context
* RC is attached to an acknowledgement message 2, sent by the target operator to its upstream operator after a message is received
### System Architecture

### Implementing the Cameo Policy
* PC
* 
* RC
Cameo utilizes RC to track critical path execution costCpath and execution cost CoM
### Customizing Cameo: Proportional Fair Scheduling
## Experimental Evaluation
* Study Cameo’s effects when
* Varying environmental parameters
* workload
* available resources
* workload bursts.
* Tuning internal parameters and optimization
* effect of scheduling granularity
* frontier prediction for event time windows
* starvation prevention.
* Machine configuration.
We use DS12-v2 Azure virtual machines (4 vCPUs/56GB memory/112G SSD) as server machines, and DS11-v2 Azure virtual machines (2 vCPUs/14GB memory/28G SSD) as client machines [12]. Single-tenant scenarios are evaluated on a single server machine. Unless otherwise specified, all multi-tenant experiments are evaluated using a 32-node Azure cluster with 16 client machine
* Evaluation workload.
* Latency Sensitive Jobs (Group 1): with SLAs
* Bulk Analytic Jobs (Group 2): longer-term analytics
* Latency constraints
### Single-tenant Scenario


messages that contribute to the first result (colored dots) and messages that contribute to the second result (grey dots) do not overlap on the timeline.
### Multi-tenant Scenario
* Cameo under increasing data volume

* Effect of limited resources.

* Effect of temporal variation of workload.

* Ingestion pattern from production trace.

### Cameo: Internal Evaluation
* LLF vs. EDF vs. SJF

* Scheduling Overhead

* Varying Scope of Scheduler Knowledge.

* Effect of Measurement Inaccuracies.
