# Distributed File Systems
We look at very large scale data processing techniques with the following characteristics
- massive amounts of data
- mostly read (OLAP) workload
- efficiency (data propcessed per second) over consistency
- separating storage and computation
Let's say we want to process a data file of size 1TB. This will take ~1 hour to read from SSD (~500MB/s) and ~2.5 hours to read from HDD (~150MB/s). Since the data storage medium is a bottleneck, we might want to use multiple disks to support parallel reads. But on a single machine there is bus limitation. So we might want to use *N* machines with *K* disks. This would be cost-efficient using cheap machines but hard to program
## File System
A file system stores files (data), along with their metadata (name, access permission, timestamps, file size, location of data blocks) in a data structure called *inodes* (index nodes)
One machine can have many disks. How should files be stored across disks?
### RAID: redundant array of independent disks
RAID is a data storage virtualization technique that combines multiple physical disk drive components into one or more logical units for the purposes of data redundancy and/or performance improvement. Different RAID levels provide different balance among the key goals of availability, fault tolerance, performance and capacity.
**RAID 0 (striping)**
- Data is striped across multiple disks --> better read/write performace
- No redundancy or fault tolerate (data loss will occur if a single disk fails)
**RAID 1 (Mirroring)**
- Data is duplicated across two or more disks, providing fault tolerance through redundancy
- Improves read performance but needs more storage and slower writes
**RAID 5 (Striping with parity)**
- Data and parity information (the XOR of data chunks across disks) are striped across at least three disks
- Improves read performance fault tolerance (can handle 1 disk failure)
## Parallel/Distributed File Systems
### Features
1. **Distributed architecture**
- multiple servers work together to store/manage/access data
2. **Data chunking/striping**
- data is divided into smaller chunks and distributed across multiple storage devices
- enables concurrent reads and writes for improved performance
3. **Metadata management**
- metadata can be stored and managed separately from data (stored on a different node)
- allows for efficient searching, indexing, and data organization
4. **Scalability**
- designed to handle large-scale storage and computing
- can easily expand storage and performance as needed
5. **Fault tolerance & data redundancy**
- replication, RAID, erasure coding (Reed–Solomon: recover any 1 block from any *N* other blocks while withstanding up to *K* concurrent failures)
6. **Load balancing**
- ensures optimal distribution of data and workload across available resources
- minimizes bottleneck and maximizes performance
### Design Considerations
1. Disk failure is the norm given the number of storage machines
2. Most files are large --> block size I/O operations
3. Different programming models with different physical architecture
a) shared memory: multi-thread with locks on data structures
b) massage passing: master-slave/producer-consumer
### Examples
Google File System (GFS), Apache Hadoop Distributed File Sysmtem (HDFS)
## Google File System (GFS)
### Features
- Central themes in distributed systems: parallel performance, fault tolerance, replication, (weak) consistency
- MapReduce and BigTable are built on top of GFS
- Single master architecture
- Original paper: https://static.googleusercontent.com/media/research.google.com/en//archive/gfs-sosp2003.pdf
### Motivation
- Commodity hardware: error is frequent in large setups
- Large files: 100Mb+
- File operations: read and append, mostly sequential reads
### Design Principles
1. **Fault tolerance**
- GFS is designed to handle hardware failures and network issues gracefully through replication, checksums, and automatic recovery mechanisms
2. **Data replication**
- To ensure durability and availability, the system replicates each chunk across multiple (default=3) nodes (chunkservers)
3. **High throughput**
- GFS is optimized for high sustained throughput over low latency (workloads typically involve large data processing tasks)
4. **Consistency and atomicity**
- GFS provides a consistent view of the file system using write serializability, chunk versioning, atomic record appends, etc
5. **Chunk-based storage**
- GFS stores files as fixed-size chunks and uses a single master server to manage the metadata associated with these chunks
- Actual chunk data are stored on chunk servers distributed across the cluster
6. **Scalability**
- Architecture designed to handle large numberes of files and a vast amount of storage
- Runs on commodity hardware
7. **Master-slave architecture**
- A single master server (SPOF) manages the file system metadata and handles garbage collection and chunk migration for load balancing and failure recovery
### Master
1. Stores the full file system metadata in memory for fast operations and background scans
- namespace (file system tree): hierarchical information about files and directories
- filename: an array of chunk handlers ???
- chunk handler: list of chunkserver + chunk version number info
- access control information
- current locations of chunks
2. Manages chunk leases (locks), garbage collection (freeing unused chunks) and chunk migration (copying/moving chunks)
3. Fault tolerace achieved using operation log (oplog)
- namespace and filename metadata are persisted by logging mutations to oplog on the master's local disk
- oplog is repliacted periodically to another machine (shadow master)
- new master can be started if the master fails by replaying the oplog
4. Periodically communicates with all chunkservers via heartbeat messages to get states and send commands
- heartbeat: short message ensuring that the server is alive and collecting statistics on the server
5. Master does not store chunk location information persistently (only in memory)
- needs heartbeats from chunkservers for chunk location and primary status info upon startup of (new) master
### Chunkservers (slaves)
1. Stores file chunks on local disks as Linux files
2. Chunk size: default 64 MB, analogous to page size in RDBMS
3. Each chunk has a chunk handle: global unique identifier assigned by the master
4. Stores a 32-bit checksum (to detect data corruption) in memoery and logged to disk
### Consistency
Operations are atomic:
- Namespace mutations are atomic (managed by master using namespace locking)
- Record append operations are atomic
- Master's operation log defines a global order
Ensuring consistency after mutation in the presence of chunk replication
- Applies mutations in order by designating a primary replica
- Uses chunk version numbers to detect stale replicas (managed by the master and garbage-collected when appropriate)
### Write Operation
1. Lease acquisition [client --> master]: chunk version increment
2. Primary selection [master --> client]: need to identify a primary replica which contains the latest version of the data being modified
3. Data forwarding [client --> chunknodes]: send data to primary replica node and secondary replica nodes
4. Data buffering and acknoledgement [client --> primary replica]
5. Write command: primary mutation (write on primary replica node) --> secondary mutation
6. Primary acknowledgement [secondary replica nodes --> primary replica node]: secondary mutations are done
7. Client acknowledgement [primary replica --> client]: write operation done
If any step fails the client is informed and retries the write
### Colossus: next generation GFS
- Better scalability and availability. Instead of using a single master node to manage metadata, Colossus architecture contains
- Curators: distributed NoSQL database to store metadata
- Custodians: responsible for storage management (space balancing, raid, etc.)
- Blog post: https://cloud.google.com/blog/products/storage-data-transfer/a-peek-behind-colossus-googles-file-system
## Apache Hadoop Distributed File System (HDFS)
### What is Apache Hadoop?
- A platform/ecosystem for big data processing, started by yahoo and inspired by MapReduce and GFS from Google, written mostly in Java
- Core components include
- Hadoop Distributed File System (HDFS)
- A Map Reduce implementation
- Other management services such as YARN
### HDFS Motivations
- Lots of cheap PCs but high failure rate
- Write-once and read many times data (logs, archives; processing is mostly read-oriented)
- Large streaming reads over random access (high sustained throughput over low latency)
### HDFS Features
- A stand-alone general purpose distributed file system, used as a part of a Hadoop cluster
- Highly scalable (to Petabytes of data)
- Fault-tolerant
- Offers a simplified data acess
- User sees their files as if they were on a single machine
- Tuning is required for large deployments
- Limited to file level permission (Linux like security)
### HDFS Basic Operations
1. When an input file is added to HDFS:
- File is split into smaller blocks of fixed size
- Block: single unit of storage, transparent to the end-use
- Block size traditionally 64/128/256 MB: large enough to force a choice/tradeoff between a seek and a sequential disk read
- Want to minimize seek_time/transfer_time where seek_time + transfer_time is the time to read a block
- Each block is replicated to multiple host (machines)
- Replication level (default=3) is configurable
2. When a host crashes or is decommissioned, all blocks ware replicated to a new host
3. when a new host is added, blocks are rebalanced to avoid data skew
### HDFS Architecture
1. **NameNode** (master)
- Manages the file system namespace by storing
- metadata: file-to-blocks and block-to-DataNodes mappings
- file/directory structure, access permission, etc.
- Coordinate file operations
- Direct clients to DataNodes for reads and writes
- No data is moved through the NameNode
- Maintains overall health
- Periodic communication (heartbeat) with DataNodes for block re-replication, block rebalancing and garbage collection
- Communicates with Secondary NameNode: checkpointing
- Executes file system namespace operations
- opening, closing, renaming file/directories, etc.
2. **DataNode** (slaves)
- Actual storage and management of data block on a single host
- Provides client with access to data
- Perform block creation, deletion and replication upon instruction from NameNode
- Sends heartbeat (every 3 seconds) and blockreport (every 6 hours) to NameNode
3. **Secondary NameNode**: Checkpointing, NOT a failover NameNode
- The NameNode keeps the current state of the HDFS in an `fsimage` (file system image) file, the last checkpoint of the namespace
- All modifications to the HDFS are written to a log (`edits`)
- During startup, NameNode merges `fsimage` and `edits` to get to the latest state
- Secondary NameNode merges `fsimage` and `edits` periodically (default is every 1 hour or 1 million transcations)
### Replication
- Maximize reliability, availability and bandwidth through opportunistic execution
- NameNode determins replica placement
- First replica on the local node (where the writing process happens) or a random node
- Second replica is usually on the same rack as the first replica to reduce cross-rack network I/O
- Third replica on a different node in a different rack
- Consider geo-replication for disaster recovery
### Rebalancer & Block Placement Policy
Considerations
- Non-uniform distribution of blocks across hosts (e.g. when hosts are added/removed)
- Different hardware generation (different performace)
- Current hardware unitilization across hosts
- HDFS operates on block level and not on file level
The system administrator can perform periodic cluster-wide rebalancing of blocks:
`hadoop balancer <threshold>` (`threshold` X%, default=10)
- Each node's utilization ratio is within X% of the overall cluster utilization ratio (utilized_space/overall_capacity)
- Smaller threshold leads to more balanced distribution but might take a lot of time
HDFS allows for custom block placement policies, such as
- using servers with lower utilization first
- operating on file level
- assigning weights to DataNodes
- moving more blocks to newer generation hardware
# Distributed Computation
## Hadoop v1.0 (MapReduce)
### What is MapReduce?
MapReduce can refer to
- the programming paradigm/model using map (group by) and reduce (aggregate) functions
- the execution framework or specific implementation of the paradigm: the data processing layer of Hadoop
In MapReduce, a job defines, schedules, monitors, and manages the execution of Hadoop Map Reduce .jar files. A job splits the input file(s) into chunks which are processed by multiple (map/reduce) tasks.
What can M/R do? e.g. estimate $\pi$ using (quasi-)MC methods.
### Hadoop 1.0 Architecture
1. **Job Tracker**
- one Job Tracker per Hadoop instance
- manages job life cycle
- receives and accepts job submissions from clients
- manages resources and scheduling for each task
- communicates with task trackers to deploy and run tasks
- attempts to assign tasks to support data locality
2. **Task Tracker**
- one task tracker per node
- runs and manages individual (map/reduce) tasks locally
- a mapper/reducer is a node running a map/reduce task
- communicates progress of tasks back to Job Tracker
3. MapReduce (data processing) is laid **on top of HDFS** (data storage/accessing)
- One NameNode, one Secondary NameNode, one Job Tracker node
- Each slave node contains a TaskTracker process and a DataNode process
### MapReduce Implementation
Client provides Map and Reduce functions; everything else is handled by Hadoop M/R.
#### Steps
1. **Input Reader**
- divide input into splits (data chunks) and send each split to a map task
2. **Map Task**
- apply the Map function to each split
- each Map returns a list of key-value pairs
- Map tasks run in parallel,
- usually set to 1 mapper per block (at most #DataNode tasks)
3. **Shuffle**
- the output of each mapper is sorted by key on the map instance and send to different reducers accordingly
**Shuffle and Sort on the Map Instance**
- Map outputs are buffered in memory using circular buffer (queue, 100 MB), and contents are spilled to disk when 80% full
- After the mapper finishes, all spill files on the DataNode are merged into a single file, which is then sorted by key and partitioned based on the number of reducers
- An optional **combiner** (local reducer that runs in memory after map phase) can be executed by the spilling thread before writing tuples to disk, reducing I/O and network traffic
- records from across Map tasks with the same key are sent to the same reducer for aggregation
**Shuffle and Sort on the Reduce Instance**
- Map outputs are copied over to the reducer nodes
- "Sort" is done by a multi-pass merge of map outputs: once a mapper finishes and sends the data to the reducer node, the chunk is combined with existing data in a sorted manner
- The final merge pass goes directly into the reduce task
4. **Reduce Task**
- apply the Reduce function to each key and output a list of key-value pairs
- need to wait for all mappers to finish (bottleneck) before starting
- set the number of reducers depending on the expected number of keys: too few create bottleneck while too many create extensive network traffic
- can set to one reducer if we just want sorted output
#### Things to handle
- Scheduling: assign nodes to map/reduce tasks
- Data distribution: move processes to data (locality)
- Synchronization: gather, sort and shuffle intermediate data
- Error and faults
- Detect worker failures and restarts
- Everything happen on top of HDFS (intermediary results, shyffling partitions, etc.)
## Hadoop v2
### Changes over v1
1. YARN replacing old architecture
2. HDFS 2: NameNode improvements
- High availability: add an additional passive (failover) NameNode which is updated periodically
- Federation: multiple NameNodes responsible for subnamespaces
3. scheduling (FIFO, capacity scheduling, fair scheduling, delay scheduling)
### Problems with Hadoop v1.0
- Only one JobTracker
- limited horizontal scalability (of Task Trackers)
- fault tolerance and maintenance: if JT dies/stops all jobs must restart
- Rigid programming model: only supports MapReduce
### YARN: Yet Another Resource Negotiator
- Generalized cluster management
- Reponsible for resources in the clusters
- resource: CPU/memory/etc. $\times$ time required by a program
- Any node can become a job tracker for a given job
- Tasks are now "containers" (packages of software that contain all of the necessary elements to run in any environment)
#### YARN Components
- **Resource Manager**
- one node per cluster (can configure a failover node)
- manages job scheduling and execution (but NO job tracking)
- manages global resource allocation
- **Application Master**
- one node per job, replaces JobTracker in Hadoop v1
- manages task scheduling and execution for a given job
- **Node Manager**
- One process per node, similar to TaskTracker in Hadoop v1
- manages the lifecycle of task containers
- reports to Resource Manager on health and resource usage
#### Execution flow of M/R on YARN
See p.10, lecture9_hadoops
Client submit application -> ResourceManager starts MRAppMaster (via NodeManager) ->MRAppMaster starts taskJVM (via NodeManager) to run M/R Task
### Job Scheduling
Job scheduling should follow these resource management design principles:
- provide fast response times to small jobs in a multitenant Hadoop cluster (many users)
- improve utilization: proper load balancing, hotspot avoidance
- improve data locality: running task closer to data (same node/rack)
Hadoop has a taste for simplicity. YARN is a centralized resource negotiator with very basic scheduling and decision techniques:
1. FIFO: suffer from poor parallelism and small jobs starvation
2. Capacity schedular
- Organize jobs into (hierarchical) queues
- One queue for production (heavier) jobs (80% overall capacity)
- One queue for test (lighter) jobs (20% overall capacity)
- FIFO scheduling withing each queue
- Suppoerts preemption: system can stop a job/task to maintain the promised capacity
- Issue: underutilization
3. Fair scheduler
- Conceptually similar to Capacity Scheduler
- Group jobs into "pools" (implemented as queues) and assign to each pool a guaranteed minimum share of resources
- On average every job in a pool will get an equal share overtime
- Extra capacity is diveded evenly between pools
- Implementation: maintain a priority queue of jobs (not tasks) by the number of running tasks (small is higher priority). When a resource is free, assign it to the head of queue (the job with the least number of running tasks)
However, the schedulings hurt data locality: the job picked by policy may not have data on free nodes
4. Delay Scheduling: trade time for locality
- force jobs to wait for a short time if they cannot launch local tasks
- Empirically 1-5 seconds helps reaching almost 100% map locality
- It works well under two conditions
- many tasks are short relative to jobs
- there are many locations where a task can run efficiently (block repliaction)
## Hadoop v3
Long-term resource planning (cloud computing)
- Resources: cumulative CPU and memory over time
- Static capacity attribution managed by the system admin is not feasible in a cloud setup (millions of users)
- resources underutilization: fluctuating max cluster capacity (hardware failure/maintenance) and fluctuating demands
- when reaching max capacity: can move/prioritized jobs to the extent that SLA (serve-level agreement) allows
### Improvements over v2
1. Better YARN timeline service which improves scalability and reliability
2. Intra DataNode balancing: alleviates skew within DataNode
3. Erasure coding replacing replication factor of 3: storage overhead (extra storage needed) is reduced to 50% as opposed to 200%
4. NameNode HA (high availability): supports 2 or more standby NameNodes as opposed to only one passive NameNode