# Move Fast and Meet Deadlines: Fine-grained Real-time Stream Processing with Cameo ###### tags: `GPUs` ###### origin: NSDI '21 ###### paper: [link](https://arxiv.org/abs/2010.03035) ## ### Research problem Resource provisioning in multi-tenant stream processing systems faces the dual challenges of keeping resource utilization high (without over-provisioning), and ensuring performance isolation. ![](https://i.imgur.com/W4OeViX.png) ### Method * Fine-grained stream processing ## Introduction * Current System * parallelize operators across machines * use multi-tenancy, wherein operators are collocated on shared resources. * challenges * High workload variability * Latency targets vary across applications. ### Workload Characteristics * Long-tail streams drive resource over-provisioning * Users rarely have any means of accurately gauging how many nodes are required, and end up over-provisioning for their job. * Temporal variation makes resource prediction difficult. * A single stream can have spikes lasting one to a few seconds, as well as periods of idleness * Users already try to do fine-grained scheduling. * users prefer to provision a cluster using external resource managers (e.g., YARN, Mesos), and then run periodic micro-batch jobs. * Latency requirements vary across jobs * The job completion time for the micro-aggregation jobs ranges from less than 10 seconds up to 1000 seconds ### Prior Approaches #### Dynamic resource provisioning for stream processing. ![](https://i.imgur.com/7vQySXo.png) * Diagnosis And Policies: Mechanisms for when and how resource re-allocation is performed; * Elasticity Mechanisms: Mechanisms for efficient query reconfiguration; * Resource Sharing: Mechanisms for dynamic performance isolation among streaming queries. These techniques make changes to the dataflows in reaction to a performance metric (e.g., latency) deteriorating. #### The promise of event-driven systems Operators here are not mapped to specific CPUs. This provides an opportunity to develop systems that can manage a unified queue of messages across query boundaries, and combat the over-provisioning of slot-based approaches ## Design Overview ### Assumptions, System Model * Deal with streaming queries on clusters shared by **cooperative users**. * user specifies a latency target at query submission time. e.g. SLAs ### Architecture * A scheduling strategy which determines message priority by interpreting the semantics of query and data streams given a latency target. * A scheduling framework 1. enables message priority to be generated using a pluggable strategy 2. schedules operators dynamically based on their current pending messsages’ priorities. ### Context converters * A context converter modifies and propagates scheduling contexts attached to a message * per-job static information: latency, topology * dynamic information such as the timestamps of tuples being processed(stream progress) ### Stateless scheduler Determines target operator’s priority by interpreting scheduling context attached to the message ### Example ![](https://i.imgur.com/W4OeViX.png) ## Scheduling Policies in Cameo We model our setting as a non-preemptive, non-uniform task time, multi-processor, real-time scheduling problem * Least-Laxity-First * Default * Earliest-Deadline-First * Shortest-Job-First ### Definitions and Underpinnings * Event * Input data arrives as events, associated with a logical time that indicates the stream progress of these events in the input stream. * Dataflow job and operators. * A dataflow job consists of a DAG of stages. Each stage operates a user-defined function. A stage can be parallelized and executed by a set of dataflow operators * Regular operator * Windowed operator * Message timestamps * We denote a message M as a tuple (oM,(pM,tM)), where: a) oM is the operator executing the message; b) pM and tM record the logical and physical time of the input stream that is associated with M, respectively. Intuitively, M is influenced by input stream with logical time ≤ pM. Physical time tM marks the system time when pM is observed at a source operator. * Latency * We define latency as the difference between the last arrival time of any event in E and the time when M is generated. ### Topology Awareness ![](https://i.imgur.com/43acqSg.png) #### Calculating Message Deadline ![](https://i.imgur.com/W4OeViX.png) * Single-operator dataflow, Regular operator ![](https://i.imgur.com/Aqlx2RZ.png) * Multiple-operator dataflow, Regular operator ![](https://i.imgur.com/kZJkKC0.png) * Multiple-operator dataflow, Windowed operator. ![](https://i.imgur.com/uuyZKGY.png) ### Mapping Stream Progress ## Scheduling Mechanisms in Cameo 1. How to make static and dynamic information from both upstream and downstream processing available during priority assignment? * by propagating scheduling contexts with messages 3. How can we efficiently perform fine-grained priority assignment and scheduling that scales with message volume? * Cameo uses a two-layer scheduler architecture * Context converter, is embedded into each operator and handles scheduling contexts whenever the operator sends or receives a message * avoiding the bottleneck of having a centralized scheduler thread calculate priority for 5. How can we admit pluggable scheduling policies without modifying the scheduler mechanism? * Cameo allows the priority generation process to be implemented through the context handling API. A context converter invokes the API with each operator. ### Scheduling Contexts Scheduling contexts are data structures attached to messages, capturing message priority, and information required to perform priority-based scheduling * Priority Context * A PC is attached to a message before the message is sent * Reply Context * RC is attached to an acknowledgement message 2, sent by the target operator to its upstream operator after a message is received ### System Architecture ![](https://i.imgur.com/DeG5XYg.png) ### Implementing the Cameo Policy * PC * ![](https://i.imgur.com/sUVfgEQ.png) * RC Cameo utilizes RC to track critical path execution costCpath and execution cost CoM ### Customizing Cameo: Proportional Fair Scheduling ## Experimental Evaluation * Study Cameo’s effects when * Varying environmental parameters * workload * available resources * workload bursts. * Tuning internal parameters and optimization * effect of scheduling granularity * frontier prediction for event time windows * starvation prevention. * Machine configuration. We use DS12-v2 Azure virtual machines (4 vCPUs/56GB memory/112G SSD) as server machines, and DS11-v2 Azure virtual machines (2 vCPUs/14GB memory/28G SSD) as client machines [12]. Single-tenant scenarios are evaluated on a single server machine. Unless otherwise specified, all multi-tenant experiments are evaluated using a 32-node Azure cluster with 16 client machine * Evaluation workload. * Latency Sensitive Jobs (Group 1): with SLAs * Bulk Analytic Jobs (Group 2): longer-term analytics * Latency constraints ### Single-tenant Scenario ![](https://i.imgur.com/PsEcDzL.png) ![](https://i.imgur.com/BYkpBXN.png) messages that contribute to the first result (colored dots) and messages that contribute to the second result (grey dots) do not overlap on the timeline. ### Multi-tenant Scenario * Cameo under increasing data volume ![](https://i.imgur.com/YwUjBvR.png) * Effect of limited resources. ![](https://i.imgur.com/9FgXQNV.png) * Effect of temporal variation of workload. ![](https://i.imgur.com/hPu7X57.png) * Ingestion pattern from production trace. ![](https://i.imgur.com/waf1vH1.png) ### Cameo: Internal Evaluation * LLF vs. EDF vs. SJF ![](https://i.imgur.com/vPjxqsj.png) * Scheduling Overhead ![](https://i.imgur.com/NFWyrMB.png) * Varying Scope of Scheduler Knowledge. ![](https://i.imgur.com/gjYI5eL.png) * Effect of Measurement Inaccuracies. ![](https://i.imgur.com/i7Q368f.png)