changed 3 years ago
Published Linked with GitHub

Dynamic Provisioning of Large-scale Data Processing in Cloud Environments

B07902134

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
https://hackmd.io/@willypillow/hpc


Agenda

  • Introduction
    • Heterogeneous computing / Specialized hardware
    • Cloud computing
  • Goals
  • Software stack
    • Apache Spark
    • Kubernetes
    • Infrastructure-as-code
  • Workflow and challenges
  • Proof of Concept
  • Conclusion

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →
https://hackmd.io/@willypillow/hpc


Introduction


Heterogeneous Computing

David Patterson:
A New Golden Age for Computer Architecture

Only performance path left is Domain Specific Architectures

Renaissance of hardware design


Cloud Computing

Afford us extreme amounts of capacity and flexibility.



Increasing complex, spawning industries to optimize cost.


Not including accelerators such as FPGAs, GPUs, and TPUs!

Source: Trends and Challenges in Big Data


More and More Choices

  • Single-core perf / Number of cores / Memory size
  • FPGAs (e.g., Azure PBS w/ Intel Arria 10)
  • GPUs
  • ML chips: TPUs / AWS Inferentia
  • CPU instruction sets (e.g., AVX512)
  • Local NVMe storage
  • High speed networking (e.g., RDMA, InfinitiBand)

Not viable for complicated multi-stage tasks?

Source: Trends and Challenges in Big Data


Spot Instances

Big cloud providers now sell unused capacity at a lower price (up to 90% savings).

(https://azureprice.net/: A VM with \(120C\)/\(456G\) RAM can cost only \(0.36\) USD/hr.)


Issues?

  • Preemption (no availability guarantees)
  • Price volatility
  • Locality

Other Billing Complexities

  • Sustained usage discounts
  • Savings plans
  • Reserved instances / Committed usage discounts
  • RI marketplace

With great flexibility comes great responsibility!

To optimize performance/cost, one needs to consider:

  • Number of instances
  • Instance families & accelerators
  • On-demand / spot
  • On-demand / reserved
  • Application workload & requirements

Goals

High performance at low cost

  • Select instance types for each stage
  • Spin up sufficient number of nodes to remove bottlenecks
  • Schedule tasks on the nodes

This is unlike traditional scheduling problems, where the amount of resources is fixed.

And unlike cloud optimization solutions, we only focus on data analytics workloads.


Software Stack


Apache Spark

  • Framework for large-scale data processing
    • Both batch and real-time streams
  • Based on Resilient Distributed Datasets (RDDs)
    • Read-only dataset partitioned over multiple nodes
  • RDDs can be cached and recreated for reuse and fault-tolerance

A Spark workflow can be represented by a DAG,
in which vertices are RDDs and edges are operations

Source: Understanding your Apache Spark Application Through Visualization


Spark Example

val conf = new SparkConf().setAppName("wiki_test") // create a spark config object
val sc = new SparkContext(conf) // Create a spark context
val data = sc.textFile("/path/to/somedir") // Read files from "somedir" into an RDD of (filename, content) pairs.
val tokens = data.flatMap(_.split(" ")) // Split each file into a list of tokens (words).
val wordFreq = tokens.map((_, 1)).reduceByKey(_ + _) // Add a count of one to each token, then sum the counts per word type.
wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) // Get the top 10 words. Swap word and count to sort by count.

Source: https://wikipedia.org/wiki/Apache_Spark


Nodes in Spark

  • Driver for coordination; executor for running tasks
  • Needs a cluster manager to manage nodes

Source: Understanding the working of Spark Driver and Executor


Kubernetes

Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of containerized applications.

Can be used as cluster manager for Spark.


Accelerators with Spark-on-Kubernetes

Spark 3+ supports accelerators such as GPUs as custom resources (i.e., accelerator-aware scheduling).

ResourceProfile: allows the user to specify executor and task requirements for an RDD that will get applied during a stage.

Dynamic allocation: dynamically scales cluster resources based on workload.

Previous work re. dynamic scaling:

  • Dynamic memory-aware scheduling in spark computing environment
  • Sparker: Optimizing Spark for Heterogeneous Clusters

Spark: Graceful Executor Decommissioning

Move data to different node before preemption.


Spark: Locality

Source: https://stackoverflow.com/a/59152772


Infrastructure-as-code (IaC)

  • Manage and provision infrastructure through code and declarative config
  • Deploy complex components in a fast and consistent manner
  • Key role in DevOps workflow and CI/CD
  • Examples: AWS CloudFormation, Terraform
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 4.16"
    }
  }

  required_version = ">= 1.2.0"
}

provider "aws" {
  region  = "us-west-2"
}

resource "aws_instance" "app_server" {
  ami           = "ami-830c94e3"
  instance_type = "t2.micro"

  tags = {
    Name = "ExampleAppServerInstance"
  }
}

Workflow and Challenges

  • Monitor pricing & preemption information of instance types
  • Estimate time to compute task for configuration
  • Multi-objective optimization b/w performance and cost
  • Intelligently specify instance/resource requirements for each stage
  • Integration:
    • Sits between Spark and cluster manager
    • Interacts with the cloud service

digraph G {
    compound = true
    fontname = "Palatino Linotype"
    bgcolor = "#00000000"
    node [ fontname = "Palatino Linotype" ]
    
    subgraph cluster_cloud {
        label = "Cloud service / k8s cluster"
        n11, n12 [ label = "highcpu" ]
        {n11, n12} -> {n21, n22, n23}
        n21, n22, n23 [ label = "highmem" ]
        {n21, n22, n23} -> {n31, n32}
        n31, n32 [ label = "gpu" ]
        {n31, n32} -> n41
        n41 [ label = "driver" ]
    }
    
    monitor [label = "Pricing monitor"]
    monitor -> model
    
    model [ label = "Estimator" ]
    
    n22 -> model [ ltail = cluster_cloud ]
    
    opt [ label = "Optimizer" ]
    instance [ label = "Instance selection"]
    sched [ label = "Spark / Cluster manager" ]
    
    model -> opt -> instance -> sched
    
    sched -> n22 [ lhead = cluster_cloud ]
    n22 -> sched [ ltail = cluster_cloud ]
}

Pricing & Preemption Monitor

  • Cloud providers usually provide APIs for these data
  • Spot.io: ML prediction of spot instance interruptions


Time Estimation

  • Some literature on the topic, i.e., On Exploring the Optimum Configuration of Apache Spark Framework in Heterogeneous Clusters
  • Difficult and lots of possibilities
  • Possible approaches:
    • Automated experiments
    • Historical data
    • Predictive modeling
  • Things to consider:
    • Data locality & Time for sending data
    • Varying workloads over time for a routine job?

Intelligent Requirements

  • Fully-automated: Analyze stage to select best instance type & count
  • Semi-automated:
    • User-specified task type (e.g., GPU-intensive)
    • Select most cost-effective instance of that class

Proof of Concept


Since estimation and integration is complex,
we mostly deal with the optimization part here.


Recall that a Spark workflow can be represented as a DAG \((V, E)\).

Let \(I\) be the set of possible instance types.

Assume that we have the following estimators:

  • \(f_c: (E, I) \to \mathbb{R}^+\) giving us the time needed to perform an operation on an instance type
  • \(g_c: (E, I) \to \mathbb{R}^+\) giving us the cost per unit of time

We then want to find a mapping \(\theta: E \to I\) that minimizes both

  • \(\sum_{e \in E} f_c(e, \theta(e))\)
  • \(\sum_{e \in E} f_c(e, \theta(e)) \cdot g_c(e, \theta(e))\)

Network Costs?

It seems that the objectives above can be optimized via a greedy algorithm.

This is because we have yet to consider network costs \(f_n\) and \(g_n\).

  • \(\sum_{e \in E} f_c(e, \theta(e)) + \sum_{((u, v), (v, w)) \in E \times E} f_n(\theta(u), \theta(w))\)
  • The objective for pricing is defined similarly

A Simplified Model

  • Does not explicitly consider the number of instances per type
  • Omits possibility of multiple instance types for a stage
  • Availability (and thus cost) of spot instances may depend on your usage

We can then proceed to optimize this via
the multi-objective genetic algorithm NSGA-II.


problem = Problem(
    n_rdds=3,
    n_instances=3,
    edges=[
        [0, 1, [[10, 1], [5, 3], [100, 1]]],
        [1, 2, [[100, 1], [500, 3], [10, 5]]],
    ],
    network_cost=[
        [[0, 0], [1, 1], [1, 1]],
        [[1, 1], [0, 0], [1, 1]],
        [[1, 1], [1, 1], [0, 0]],
    ],
)

~/work/hpc via 🐍 v3.9.12 took 25s 
✦ ❯ python -i main.py
=======================================================
n_gen |  n_eval |  n_nds  |     eps      |  indicator  
=======================================================
    1 |       9 |       2 |            - |            -
    2 |       9 |       2 |  0.00000E+00 |            f
[0 2] [20. 60.]
[1 2] [15. 65.]


Tl;dr

Automatically provision workers on the cloud w/ accelerators and spot instances to ensure fast large-scale data processing at a low cost.


Q/A?


Further Reading

Select a repo