<style> .reveal { font-family: Palatino Linotype, Book Antiqua, Palatino, FreeSerif, serif; font-size: 24px; } .reveal strong { font-weight: 1000; } .reveal h1 { font-size: 3em; } .reveal img { filter: brightness(90%); } pre.graphviz { background-color: transparent; } .small { font-size: 0.5em; } .reveal .slides { text-align: left; } </style> # Dynamic Provisioning of Large-scale Data Processing in Cloud Environments ## B07902134 :link: <https://hackmd.io/@willypillow/hpc> --- ## Agenda - Introduction - Heterogeneous computing / Specialized hardware - Cloud computing - Goals - Software stack - Apache Spark - Kubernetes - Infrastructure-as-code - Workflow and challenges - Proof of Concept - Conclusion :link: <https://hackmd.io/@willypillow/hpc> --- ## Introduction --- ## Heterogeneous Computing David Patterson: A New Golden Age for Computer Architecture > Only performance path left is Domain Specific Architectures > Renaissance of hardware design --- ## Cloud Computing ![](https://i.imgur.com/4qoIIyR.png) Afford us extreme amounts of capacity and flexibility. --- ![](https://i.imgur.com/LPkgrmB.png =x250) ![](https://i.imgur.com/pJNZ8Ke.png =x250) Increasing complex, spawning industries to optimize cost. --- ![](https://i.imgur.com/0SXLrKi.png) Not including accelerators such as FPGAs, GPUs, and TPUs! <span class="small"> Source: [Trends and Challenges in Big Data](http://www.pdsw.org/pdsw-discs16/slides/Keynote-Stoica.pdf) </span> --- ### More and More Choices - Single-core perf / Number of cores / Memory size - FPGAs (e.g., Azure PBS w/ Intel Arria 10) - GPUs - ML chips: TPUs / AWS Inferentia - CPU instruction sets (e.g., AVX512) - Local NVMe storage - High speed networking (e.g., RDMA, InfinitiBand) --- ![](https://i.imgur.com/j2BnEE3.png) Not viable for complicated multi-stage tasks? <span class="small"> Source: [Trends and Challenges in Big Data](http://www.pdsw.org/pdsw-discs16/slides/Keynote-Stoica.pdf) </span> --- ### Spot Instances ![](https://i.imgur.com/52zHVTP.png) Big cloud providers now sell unused capacity at a lower price (up to 90% savings). (<https://azureprice.net/>: A VM with $120C$/$456G$ RAM can cost only $0.36$ USD/hr.) --- ### Issues? - Preemption (no availability guarantees) - Price volatility - Locality --- ### Other Billing Complexities - Sustained usage discounts - Savings plans - Reserved instances / Committed usage discounts - RI marketplace --- ### With great flexibility comes great responsibility! To optimize performance/cost, one needs to consider: - Number of instances - Instance families & accelerators - On-demand / spot - On-demand / reserved - Application workload & requirements --- ## Goals ### High performance at low cost - Select instance types for each stage - Spin up sufficient number of nodes to remove bottlenecks - Schedule tasks on the nodes This is unlike traditional scheduling problems, where the amount of resources is fixed. And unlike cloud optimization solutions, we only focus on data analytics workloads. --- ## Software Stack --- ## Apache Spark - Framework for large-scale data processing - Both batch and real-time streams - Based on *Resilient Distributed Datasets* (RDDs) - Read-only dataset partitioned over multiple nodes - RDDs can be cached and recreated for reuse and fault-tolerance --- ![](https://i.imgur.com/d7HbaLE.png =x500) A Spark workflow can be represented by a DAG, in which vertices are RDDs and edges are operations <span class="small"> Source: [Understanding your Apache Spark Application Through Visualization](https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html) </span> --- ### Spark Example ``` scala val conf = new SparkConf().setAppName("wiki_test") // create a spark config object val sc = new SparkContext(conf) // Create a spark context val data = sc.textFile("/path/to/somedir") // Read files from "somedir" into an RDD of (filename, content) pairs. val tokens = data.flatMap(_.split(" ")) // Split each file into a list of tokens (words). val wordFreq = tokens.map((_, 1)).reduceByKey(_ + _) // Add a count of one to each token, then sum the counts per word type. wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) // Get the top 10 words. Swap word and count to sort by count. ``` <span class="small"> Source: https://wikipedia.org/wiki/Apache_Spark </span> --- ### Nodes in Spark ![](https://i.imgur.com/dJ9JGAT.png) - Driver for coordination; executor for running tasks - Needs a *cluster manager* to manage nodes <span class="small"> Source: [Understanding the working of Spark Driver and Executor](https://blog.knoldus.com/understanding-the-working-of-spark-driver-and-executor/) </span> --- ## Kubernetes > Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of containerized applications. Can be used as cluster manager for Spark. ![](https://i.imgur.com/OUnuzpg.png =x300) --- ## Accelerators with Spark-on-Kubernetes Spark 3+ supports accelerators such as GPUs as custom resources (i.e., accelerator-aware scheduling). **ResourceProfile:** allows the user to specify executor and task requirements for an RDD that will get applied during a stage. **Dynamic allocation:** dynamically scales cluster resources based on workload. <div class="small"> Previous work re. dynamic scaling: - *Dynamic memory-aware scheduling in spark computing environment* - *Sparker: Optimizing Spark for Heterogeneous Clusters* </div> --- ## Spark: Graceful Executor Decommissioning ![](https://i.imgur.com/VXiRPsX.png =x400) Move data to different node before preemption. --- ## Spark: Locality ![](https://i.imgur.com/YDAAQdK.png) <span class="small"> Source: https://stackoverflow.com/a/59152772 </span> --- ## Infrastructure-as-code (IaC) - Manage and provision infrastructure through code and declarative config - Deploy complex components in a fast and consistent manner - Key role in DevOps workflow and CI/CD - Examples: AWS CloudFormation, Terraform ``` terraform { required_providers { aws = { source = "hashicorp/aws" version = "~> 4.16" } } required_version = ">= 1.2.0" } provider "aws" { region = "us-west-2" } resource "aws_instance" "app_server" { ami = "ami-830c94e3" instance_type = "t2.micro" tags = { Name = "ExampleAppServerInstance" } } ``` --- ## Workflow and Challenges - Monitor pricing & preemption information of instance types - Estimate time to compute task for configuration - Multi-objective optimization b/w performance and cost - Intelligently specify instance/resource requirements for each stage - Integration: - Sits between Spark and cluster manager - Interacts with the cloud service --- ``` graphviz digraph G { compound = true fontname = "Palatino Linotype" bgcolor = "#00000000" node [ fontname = "Palatino Linotype" ] subgraph cluster_cloud { label = "Cloud service / k8s cluster" n11, n12 [ label = "highcpu" ] {n11, n12} -> {n21, n22, n23} n21, n22, n23 [ label = "highmem" ] {n21, n22, n23} -> {n31, n32} n31, n32 [ label = "gpu" ] {n31, n32} -> n41 n41 [ label = "driver" ] } monitor [label = "Pricing monitor"] monitor -> model model [ label = "Estimator" ] n22 -> model [ ltail = cluster_cloud ] opt [ label = "Optimizer" ] instance [ label = "Instance selection"] sched [ label = "Spark / Cluster manager" ] model -> opt -> instance -> sched sched -> n22 [ lhead = cluster_cloud ] n22 -> sched [ ltail = cluster_cloud ] } ``` --- ### Pricing & Preemption Monitor - Cloud providers usually provide APIs for these data - Spot.io: ML prediction of spot instance interruptions ![](https://i.imgur.com/VzAHsHf.png) --- ### Time Estimation - Some literature on the topic, i.e., *On Exploring the Optimum Configuration of Apache Spark Framework in Heterogeneous Clusters* - Difficult and lots of possibilities - Possible approaches: - Automated experiments - Historical data - Predictive modeling - Things to consider: - Data locality & Time for sending data - Varying workloads over time for a routine job? --- ### Intelligent Requirements - Fully-automated: Analyze stage to select best instance type & count - Semi-automated: - User-specified task type (e.g., GPU-intensive) - Select most cost-effective instance of that class --- ## Proof of Concept --- Since estimation and integration is complex, we mostly deal with the optimization part here. --- Recall that a Spark workflow can be represented as a DAG $(V, E)$. Let $I$ be the set of possible instance types. Assume that we have the following estimators: - $f_c: (E, I) \to \mathbb{R}^+$ giving us the time needed to perform an operation on an instance type - $g_c: (E, I) \to \mathbb{R}^+$ giving us the cost per unit of time We then want to find a mapping $\theta: E \to I$ that minimizes both - $\sum_{e \in E} f_c(e, \theta(e))$ - $\sum_{e \in E} f_c(e, \theta(e)) \cdot g_c(e, \theta(e))$ --- ### Network Costs? It seems that the objectives above can be optimized via a greedy algorithm. This is because we have yet to consider network costs $f_n$ and $g_n$. - $\sum_{e \in E} f_c(e, \theta(e)) + \sum_{((u, v), (v, w)) \in E \times E} f_n(\theta(u), \theta(w))$ - The objective for pricing is defined similarly --- ### A Simplified Model - Does not explicitly consider the number of instances per type - Omits possibility of multiple instance types for a stage - Availability (and thus cost) of spot instances may depend on your usage --- We can then proceed to optimize this via the multi-objective genetic algorithm *NSGA-II*. --- ``` py problem = Problem( n_rdds=3, n_instances=3, edges=[ [0, 1, [[10, 1], [5, 3], [100, 1]]], [1, 2, [[100, 1], [500, 3], [10, 5]]], ], network_cost=[ [[0, 0], [1, 1], [1, 1]], [[1, 1], [0, 0], [1, 1]], [[1, 1], [1, 1], [0, 0]], ], ) ``` --- ``` text ~/work/hpc via 🐍 v3.9.12 took 25s ✦ ❯ python -i main.py ======================================================= n_gen | n_eval | n_nds | eps | indicator ======================================================= 1 | 9 | 2 | - | - 2 | 9 | 2 | 0.00000E+00 | f [0 2] [20. 60.] [1 2] [15. 65.] ``` ![](https://i.imgur.com/AuYKMRC.png =x400) --- ## Tl;dr **Automatically provision** workers on the cloud w/ **accelerators** and **spot instances** to ensure fast large-scale data processing at a low cost. - - - ## Q/A? --- ## Further Reading - [Creating an 86,000 Hour Speech Dataset with Apache Spark and TPUs](https://www.youtube.com/watch?v=AndC8ESF8UE) - [Performance Tuning Tips on Real-World Customer Spark-on-Kubernetes pipelines](https://www.youtube.com/watch?v=X6VOR2XEhjQ) - [Getting Started with the RAPIDS Accelerator for Apache Spark](https://nvidia.github.io/spark-rapids/Getting-Started/)
{"metaMigratedAt":"2023-06-17T02:41:54.690Z","metaMigratedFrom":"YAML","title":"Dynamic Provisioning of Large-scale Data Processing in Cloud Environments","breaks":false,"slideOptions":"{\"theme\":\"serif\",\"spotlight\":{\"enabled\":false}}","contributors":"[{\"id\":\"55597f68-6250-4693-98a8-f13d6edc7104\",\"add\":13926,\"del\":2493}]"}
    305 views