<style>
.reveal {
font-family: Palatino Linotype, Book Antiqua, Palatino, FreeSerif, serif;
font-size: 24px;
}
.reveal strong {
font-weight: 1000;
}
.reveal h1 {
font-size: 3em;
}
.reveal img {
filter: brightness(90%);
}
pre.graphviz {
background-color: transparent;
}
.small {
font-size: 0.5em;
}
.reveal .slides {
text-align: left;
}
</style>
# Dynamic Provisioning of Large-scale Data Processing in Cloud Environments
## B07902134
:link: <https://hackmd.io/@willypillow/hpc>
---
## Agenda
- Introduction
- Heterogeneous computing / Specialized hardware
- Cloud computing
- Goals
- Software stack
- Apache Spark
- Kubernetes
- Infrastructure-as-code
- Workflow and challenges
- Proof of Concept
- Conclusion
:link: <https://hackmd.io/@willypillow/hpc>
---
## Introduction
---
## Heterogeneous Computing
David Patterson:
A New Golden Age for Computer Architecture
> Only performance path left is Domain Specific Architectures
> Renaissance of hardware design
---
## Cloud Computing
![](https://i.imgur.com/4qoIIyR.png)
Afford us extreme amounts of capacity and flexibility.
---
![](https://i.imgur.com/LPkgrmB.png =x250)
![](https://i.imgur.com/pJNZ8Ke.png =x250)
Increasing complex, spawning industries to optimize cost.
---
![](https://i.imgur.com/0SXLrKi.png)
Not including accelerators such as FPGAs, GPUs, and TPUs!
<span class="small">
Source: [Trends and Challenges in Big Data](http://www.pdsw.org/pdsw-discs16/slides/Keynote-Stoica.pdf)
</span>
---
### More and More Choices
- Single-core perf / Number of cores / Memory size
- FPGAs (e.g., Azure PBS w/ Intel Arria 10)
- GPUs
- ML chips: TPUs / AWS Inferentia
- CPU instruction sets (e.g., AVX512)
- Local NVMe storage
- High speed networking (e.g., RDMA, InfinitiBand)
---
![](https://i.imgur.com/j2BnEE3.png)
Not viable for complicated multi-stage tasks?
<span class="small">
Source: [Trends and Challenges in Big Data](http://www.pdsw.org/pdsw-discs16/slides/Keynote-Stoica.pdf)
</span>
---
### Spot Instances
![](https://i.imgur.com/52zHVTP.png)
Big cloud providers now sell unused capacity at a lower price (up to 90% savings).
(<https://azureprice.net/>: A VM with $120C$/$456G$ RAM can cost only $0.36$ USD/hr.)
---
### Issues?
- Preemption (no availability guarantees)
- Price volatility
- Locality
---
### Other Billing Complexities
- Sustained usage discounts
- Savings plans
- Reserved instances / Committed usage discounts
- RI marketplace
---
### With great flexibility comes great responsibility!
To optimize performance/cost, one needs to consider:
- Number of instances
- Instance families & accelerators
- On-demand / spot
- On-demand / reserved
- Application workload & requirements
---
## Goals
### High performance at low cost
- Select instance types for each stage
- Spin up sufficient number of nodes to remove bottlenecks
- Schedule tasks on the nodes
This is unlike traditional scheduling problems, where the amount of resources is fixed.
And unlike cloud optimization solutions, we only focus on data analytics workloads.
---
## Software Stack
---
## Apache Spark
- Framework for large-scale data processing
- Both batch and real-time streams
- Based on *Resilient Distributed Datasets* (RDDs)
- Read-only dataset partitioned over multiple nodes
- RDDs can be cached and recreated for reuse and fault-tolerance
---
![](https://i.imgur.com/d7HbaLE.png =x500)
A Spark workflow can be represented by a DAG,
in which vertices are RDDs and edges are operations
<span class="small">
Source: [Understanding your Apache Spark Application Through Visualization](https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html)
</span>
---
### Spark Example
``` scala
val conf = new SparkConf().setAppName("wiki_test") // create a spark config object
val sc = new SparkContext(conf) // Create a spark context
val data = sc.textFile("/path/to/somedir") // Read files from "somedir" into an RDD of (filename, content) pairs.
val tokens = data.flatMap(_.split(" ")) // Split each file into a list of tokens (words).
val wordFreq = tokens.map((_, 1)).reduceByKey(_ + _) // Add a count of one to each token, then sum the counts per word type.
wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) // Get the top 10 words. Swap word and count to sort by count.
```
<span class="small">
Source: https://wikipedia.org/wiki/Apache_Spark
</span>
---
### Nodes in Spark
![](https://i.imgur.com/dJ9JGAT.png)
- Driver for coordination; executor for running tasks
- Needs a *cluster manager* to manage nodes
<span class="small">
Source: [Understanding the working of Spark Driver and Executor](https://blog.knoldus.com/understanding-the-working-of-spark-driver-and-executor/)
</span>
---
## Kubernetes
> Kubernetes, also known as K8s, is an open-source system for automating deployment, scaling, and management of containerized applications.
Can be used as cluster manager for Spark.
![](https://i.imgur.com/OUnuzpg.png =x300)
---
## Accelerators with Spark-on-Kubernetes
Spark 3+ supports accelerators such as GPUs as custom resources (i.e., accelerator-aware scheduling).
**ResourceProfile:** allows the user to specify executor and task requirements for an RDD that will get applied during a stage.
**Dynamic allocation:** dynamically scales cluster resources based on workload.
<div class="small">
Previous work re. dynamic scaling:
- *Dynamic memory-aware scheduling in spark computing
environment*
- *Sparker: Optimizing Spark for Heterogeneous
Clusters*
</div>
---
## Spark: Graceful Executor Decommissioning
![](https://i.imgur.com/VXiRPsX.png =x400)
Move data to different node before preemption.
---
## Spark: Locality
![](https://i.imgur.com/YDAAQdK.png)
<span class="small">
Source: https://stackoverflow.com/a/59152772
</span>
---
## Infrastructure-as-code (IaC)
- Manage and provision infrastructure through code and declarative config
- Deploy complex components in a fast and consistent manner
- Key role in DevOps workflow and CI/CD
- Examples: AWS CloudFormation, Terraform
```
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.16"
}
}
required_version = ">= 1.2.0"
}
provider "aws" {
region = "us-west-2"
}
resource "aws_instance" "app_server" {
ami = "ami-830c94e3"
instance_type = "t2.micro"
tags = {
Name = "ExampleAppServerInstance"
}
}
```
---
## Workflow and Challenges
- Monitor pricing & preemption information of instance types
- Estimate time to compute task for configuration
- Multi-objective optimization b/w performance and cost
- Intelligently specify instance/resource requirements for each stage
- Integration:
- Sits between Spark and cluster manager
- Interacts with the cloud service
---
``` graphviz
digraph G {
compound = true
fontname = "Palatino Linotype"
bgcolor = "#00000000"
node [ fontname = "Palatino Linotype" ]
subgraph cluster_cloud {
label = "Cloud service / k8s cluster"
n11, n12 [ label = "highcpu" ]
{n11, n12} -> {n21, n22, n23}
n21, n22, n23 [ label = "highmem" ]
{n21, n22, n23} -> {n31, n32}
n31, n32 [ label = "gpu" ]
{n31, n32} -> n41
n41 [ label = "driver" ]
}
monitor [label = "Pricing monitor"]
monitor -> model
model [ label = "Estimator" ]
n22 -> model [ ltail = cluster_cloud ]
opt [ label = "Optimizer" ]
instance [ label = "Instance selection"]
sched [ label = "Spark / Cluster manager" ]
model -> opt -> instance -> sched
sched -> n22 [ lhead = cluster_cloud ]
n22 -> sched [ ltail = cluster_cloud ]
}
```
---
### Pricing & Preemption Monitor
- Cloud providers usually provide APIs for these data
- Spot.io: ML prediction of spot instance interruptions
![](https://i.imgur.com/VzAHsHf.png)
---
### Time Estimation
- Some literature on the topic, i.e., *On Exploring the Optimum Configuration of Apache Spark Framework in Heterogeneous Clusters*
- Difficult and lots of possibilities
- Possible approaches:
- Automated experiments
- Historical data
- Predictive modeling
- Things to consider:
- Data locality & Time for sending data
- Varying workloads over time for a routine job?
---
### Intelligent Requirements
- Fully-automated: Analyze stage to select best instance type & count
- Semi-automated:
- User-specified task type (e.g., GPU-intensive)
- Select most cost-effective instance of that class
---
## Proof of Concept
---
Since estimation and integration is complex,
we mostly deal with the optimization part here.
---
Recall that a Spark workflow can be represented as a DAG $(V, E)$.
Let $I$ be the set of possible instance types.
Assume that we have the following estimators:
- $f_c: (E, I) \to \mathbb{R}^+$ giving us the time needed to perform an operation on an instance type
- $g_c: (E, I) \to \mathbb{R}^+$ giving us the cost per unit of time
We then want to find a mapping $\theta: E \to I$ that minimizes both
- $\sum_{e \in E} f_c(e, \theta(e))$
- $\sum_{e \in E} f_c(e, \theta(e)) \cdot g_c(e, \theta(e))$
---
### Network Costs?
It seems that the objectives above can be optimized via a greedy algorithm.
This is because we have yet to consider network costs $f_n$ and $g_n$.
- $\sum_{e \in E} f_c(e, \theta(e)) + \sum_{((u, v), (v, w)) \in E \times E} f_n(\theta(u), \theta(w))$
- The objective for pricing is defined similarly
---
### A Simplified Model
- Does not explicitly consider the number of instances per type
- Omits possibility of multiple instance types for a stage
- Availability (and thus cost) of spot instances may depend on your usage
---
We can then proceed to optimize this via
the multi-objective genetic algorithm *NSGA-II*.
---
``` py
problem = Problem(
n_rdds=3,
n_instances=3,
edges=[
[0, 1, [[10, 1], [5, 3], [100, 1]]],
[1, 2, [[100, 1], [500, 3], [10, 5]]],
],
network_cost=[
[[0, 0], [1, 1], [1, 1]],
[[1, 1], [0, 0], [1, 1]],
[[1, 1], [1, 1], [0, 0]],
],
)
```
---
``` text
~/work/hpc via 🐍 v3.9.12 took 25s
✦ ❯ python -i main.py
=======================================================
n_gen | n_eval | n_nds | eps | indicator
=======================================================
1 | 9 | 2 | - | -
2 | 9 | 2 | 0.00000E+00 | f
[0 2] [20. 60.]
[1 2] [15. 65.]
```
![](https://i.imgur.com/AuYKMRC.png =x400)
---
## Tl;dr
**Automatically provision** workers on the cloud w/ **accelerators** and **spot instances** to ensure fast large-scale data processing at a low cost.
- - -
## Q/A?
---
## Further Reading
- [Creating an 86,000 Hour Speech Dataset with Apache Spark and TPUs](https://www.youtube.com/watch?v=AndC8ESF8UE)
- [Performance Tuning Tips on Real-World Customer Spark-on-Kubernetes pipelines](https://www.youtube.com/watch?v=X6VOR2XEhjQ)
- [Getting Started with the RAPIDS Accelerator for Apache Spark](https://nvidia.github.io/spark-rapids/Getting-Started/)
{"metaMigratedAt":"2023-06-17T02:41:54.690Z","metaMigratedFrom":"YAML","title":"Dynamic Provisioning of Large-scale Data Processing in Cloud Environments","breaks":"false","slideOptions":"{\"theme\":\"serif\",\"spotlight\":{\"enabled\":false}}","contributors":"[{\"id\":\"55597f68-6250-4693-98a8-f13d6edc7104\",\"add\":13926,\"del\":2493}]"}