Distributed File Systems
We look at very large scale data processing techniques with the following characteristics
- massive amounts of data
- mostly read (OLAP) workload
- efficiency (data propcessed per second) over consistency
- separating storage and computation
Let's say we want to process a data file of size 1TB. This will take ~1 hour to read from SSD (~500MB/s) and ~2.5 hours to read from HDD (~150MB/s). Since the data storage medium is a bottleneck, we might want to use multiple disks to support parallel reads. But on a single machine there is bus limitation. So we might want to use N machines with K disks. This would be cost-efficient using cheap machines but hard to program
File System
A file system stores files (data), along with their metadata (name, access permission, timestamps, file size, location of data blocks) in a data structure called inodes (index nodes)
One machine can have many disks. How should files be stored across disks?
RAID: redundant array of independent disks
RAID is a data storage virtualization technique that combines multiple physical disk drive components into one or more logical units for the purposes of data redundancy and/or performance improvement. Different RAID levels provide different balance among the key goals of availability, fault tolerance, performance and capacity.
RAID 0 (striping)
- Data is striped across multiple disks –> better read/write performace
- No redundancy or fault tolerate (data loss will occur if a single disk fails)
RAID 1 (Mirroring)
- Data is duplicated across two or more disks, providing fault tolerance through redundancy
- Improves read performance but needs more storage and slower writes
RAID 5 (Striping with parity)
- Data and parity information (the XOR of data chunks across disks) are striped across at least three disks
- Improves read performance fault tolerance (can handle 1 disk failure)
Parallel/Distributed File Systems
Features
- Distributed architecture
- multiple servers work together to store/manage/access data
- Data chunking/striping
- data is divided into smaller chunks and distributed across multiple storage devices
- enables concurrent reads and writes for improved performance
- Metadata management
- metadata can be stored and managed separately from data (stored on a different node)
- allows for efficient searching, indexing, and data organization
- Scalability
- designed to handle large-scale storage and computing
- can easily expand storage and performance as needed
- Fault tolerance & data redundancy
- replication, RAID, erasure coding (Reed–Solomon: recover any 1 block from any N other blocks while withstanding up to K concurrent failures)
- Load balancing
- ensures optimal distribution of data and workload across available resources
- minimizes bottleneck and maximizes performance
Design Considerations
- Disk failure is the norm given the number of storage machines
- Most files are large –> block size I/O operations
- Different programming models with different physical architecture
a) shared memory: multi-thread with locks on data structures
b) massage passing: master-slave/producer-consumer
Examples
Google File System (GFS), Apache Hadoop Distributed File Sysmtem (HDFS)
Google File System (GFS)
Features
Motivation
- Commodity hardware: error is frequent in large setups
- Large files: 100Mb+
- File operations: read and append, mostly sequential reads
Design Principles
- Fault tolerance
- GFS is designed to handle hardware failures and network issues gracefully through replication, checksums, and automatic recovery mechanisms
- Data replication
- To ensure durability and availability, the system replicates each chunk across multiple (default=3) nodes (chunkservers)
- High throughput
- GFS is optimized for high sustained throughput over low latency (workloads typically involve large data processing tasks)
- Consistency and atomicity
- GFS provides a consistent view of the file system using write serializability, chunk versioning, atomic record appends, etc
- Chunk-based storage
- GFS stores files as fixed-size chunks and uses a single master server to manage the metadata associated with these chunks
- Actual chunk data are stored on chunk servers distributed across the cluster
- Scalability
- Architecture designed to handle large numberes of files and a vast amount of storage
- Runs on commodity hardware
- Master-slave architecture
- A single master server (SPOF) manages the file system metadata and handles garbage collection and chunk migration for load balancing and failure recovery
Master
- Stores the full file system metadata in memory for fast operations and background scans
- namespace (file system tree): hierarchical information about files and directories
- filename: an array of chunk handlers ???
- chunk handler: list of chunkserver + chunk version number info
- access control information
- current locations of chunks
- Manages chunk leases (locks), garbage collection (freeing unused chunks) and chunk migration (copying/moving chunks)
- Fault tolerace achieved using operation log (oplog)
- namespace and filename metadata are persisted by logging mutations to oplog on the master's local disk
- oplog is repliacted periodically to another machine (shadow master)
- new master can be started if the master fails by replaying the oplog
- Periodically communicates with all chunkservers via heartbeat messages to get states and send commands
- heartbeat: short message ensuring that the server is alive and collecting statistics on the server
- Master does not store chunk location information persistently (only in memory)
- needs heartbeats from chunkservers for chunk location and primary status info upon startup of (new) master
Chunkservers (slaves)
- Stores file chunks on local disks as Linux files
- Chunk size: default 64 MB, analogous to page size in RDBMS
- Each chunk has a chunk handle: global unique identifier assigned by the master
- Stores a 32-bit checksum (to detect data corruption) in memoery and logged to disk
Consistency
Operations are atomic:
- Namespace mutations are atomic (managed by master using namespace locking)
- Record append operations are atomic
- Master's operation log defines a global order
Ensuring consistency after mutation in the presence of chunk replication
- Applies mutations in order by designating a primary replica
- Uses chunk version numbers to detect stale replicas (managed by the master and garbage-collected when appropriate)
Write Operation
- Lease acquisition [client –> master]: chunk version increment
- Primary selection [master –> client]: need to identify a primary replica which contains the latest version of the data being modified
- Data forwarding [client –> chunknodes]: send data to primary replica node and secondary replica nodes
- Data buffering and acknoledgement [client –> primary replica]
- Write command: primary mutation (write on primary replica node) –> secondary mutation
- Primary acknowledgement [secondary replica nodes –> primary replica node]: secondary mutations are done
- Client acknowledgement [primary replica –> client]: write operation done
If any step fails the client is informed and retries the write
Colossus: next generation GFS
Apache Hadoop Distributed File System (HDFS)
What is Apache Hadoop?
- A platform/ecosystem for big data processing, started by yahoo and inspired by MapReduce and GFS from Google, written mostly in Java
- Core components include
- Hadoop Distributed File System (HDFS)
- A Map Reduce implementation
- Other management services such as YARN
HDFS Motivations
- Lots of cheap PCs but high failure rate
- Write-once and read many times data (logs, archives; processing is mostly read-oriented)
- Large streaming reads over random access (high sustained throughput over low latency)
HDFS Features
- A stand-alone general purpose distributed file system, used as a part of a Hadoop cluster
- Highly scalable (to Petabytes of data)
- Fault-tolerant
- Offers a simplified data acess
- User sees their files as if they were on a single machine
- Tuning is required for large deployments
- Limited to file level permission (Linux like security)
HDFS Basic Operations
- When an input file is added to HDFS:
- File is split into smaller blocks of fixed size
- Block: single unit of storage, transparent to the end-use
- Block size traditionally 64/128/256 MB: large enough to force a choice/tradeoff between a seek and a sequential disk read
- Want to minimize seek_time/transfer_time where seek_time + transfer_time is the time to read a block
- Each block is replicated to multiple host (machines)
- Replication level (default=3) is configurable
- When a host crashes or is decommissioned, all blocks ware replicated to a new host
- when a new host is added, blocks are rebalanced to avoid data skew
HDFS Architecture
- NameNode (master)
- Manages the file system namespace by storing
- metadata: file-to-blocks and block-to-DataNodes mappings
- file/directory structure, access permission, etc.
- Coordinate file operations
- Direct clients to DataNodes for reads and writes
- No data is moved through the NameNode
- Maintains overall health
- Periodic communication (heartbeat) with DataNodes for block re-replication, block rebalancing and garbage collection
- Communicates with Secondary NameNode: checkpointing
- Executes file system namespace operations
- opening, closing, renaming file/directories, etc.
- DataNode (slaves)
- Actual storage and management of data block on a single host
- Provides client with access to data
- Perform block creation, deletion and replication upon instruction from NameNode
- Sends heartbeat (every 3 seconds) and blockreport (every 6 hours) to NameNode
- Secondary NameNode: Checkpointing, NOT a failover NameNode
- The NameNode keeps the current state of the HDFS in an
fsimage
(file system image) file, the last checkpoint of the namespace
- All modifications to the HDFS are written to a log (
edits
)
- During startup, NameNode merges
fsimage
and edits
to get to the latest state
- Secondary NameNode merges
fsimage
and edits
periodically (default is every 1 hour or 1 million transcations)
Replication
- Maximize reliability, availability and bandwidth through opportunistic execution
- NameNode determins replica placement
- First replica on the local node (where the writing process happens) or a random node
- Second replica is usually on the same rack as the first replica to reduce cross-rack network I/O
- Third replica on a different node in a different rack
- Consider geo-replication for disaster recovery
Rebalancer & Block Placement Policy
Considerations
- Non-uniform distribution of blocks across hosts (e.g. when hosts are added/removed)
- Different hardware generation (different performace)
- Current hardware unitilization across hosts
- HDFS operates on block level and not on file level
The system administrator can perform periodic cluster-wide rebalancing of blocks:
hadoop balancer <threshold>
(threshold
X%, default=10)
- Each node's utilization ratio is within X% of the overall cluster utilization ratio (utilized_space/overall_capacity)
- Smaller threshold leads to more balanced distribution but might take a lot of time
HDFS allows for custom block placement policies, such as
- using servers with lower utilization first
- operating on file level
- assigning weights to DataNodes
- moving more blocks to newer generation hardware
Distributed Computation
Hadoop v1.0 (MapReduce)
What is MapReduce?
MapReduce can refer to
- the programming paradigm/model using map (group by) and reduce (aggregate) functions
- the execution framework or specific implementation of the paradigm: the data processing layer of Hadoop
In MapReduce, a job defines, schedules, monitors, and manages the execution of Hadoop Map Reduce .jar files. A job splits the input file(s) into chunks which are processed by multiple (map/reduce) tasks.
What can M/R do? e.g. estimate using (quasi-)MC methods.
Hadoop 1.0 Architecture
- Job Tracker
- one Job Tracker per Hadoop instance
- manages job life cycle
- receives and accepts job submissions from clients
- manages resources and scheduling for each task
- communicates with task trackers to deploy and run tasks
- attempts to assign tasks to support data locality
- Task Tracker
- one task tracker per node
- runs and manages individual (map/reduce) tasks locally
- a mapper/reducer is a node running a map/reduce task
- communicates progress of tasks back to Job Tracker
- MapReduce (data processing) is laid on top of HDFS (data storage/accessing)
- One NameNode, one Secondary NameNode, one Job Tracker node
- Each slave node contains a TaskTracker process and a DataNode process
MapReduce Implementation
Client provides Map and Reduce functions; everything else is handled by Hadoop M/R.
Steps
- Input Reader
- divide input into splits (data chunks) and send each split to a map task
- Map Task
- apply the Map function to each split
- each Map returns a list of key-value pairs
- Map tasks run in parallel,
- usually set to 1 mapper per block (at most #DataNode tasks)
- Shuffle
- the output of each mapper is sorted by key on the map instance and send to different reducers accordingly
Shuffle and Sort on the Map Instance
- Map outputs are buffered in memory using circular buffer (queue, 100 MB), and contents are spilled to disk when 80% full
- After the mapper finishes, all spill files on the DataNode are merged into a single file, which is then sorted by key and partitioned based on the number of reducers
- An optional combiner (local reducer that runs in memory after map phase) can be executed by the spilling thread before writing tuples to disk, reducing I/O and network traffic
- records from across Map tasks with the same key are sent to the same reducer for aggregation
Shuffle and Sort on the Reduce Instance
- Map outputs are copied over to the reducer nodes
- "Sort" is done by a multi-pass merge of map outputs: once a mapper finishes and sends the data to the reducer node, the chunk is combined with existing data in a sorted manner
- The final merge pass goes directly into the reduce task
- Reduce Task
- apply the Reduce function to each key and output a list of key-value pairs
- need to wait for all mappers to finish (bottleneck) before starting
- set the number of reducers depending on the expected number of keys: too few create bottleneck while too many create extensive network traffic
- can set to one reducer if we just want sorted output
Things to handle
- Scheduling: assign nodes to map/reduce tasks
- Data distribution: move processes to data (locality)
- Synchronization: gather, sort and shuffle intermediate data
- Error and faults
- Detect worker failures and restarts
- Everything happen on top of HDFS (intermediary results, shyffling partitions, etc.)
Hadoop v2
Changes over v1
- YARN replacing old architecture
- HDFS 2: NameNode improvements
- High availability: add an additional passive (failover) NameNode which is updated periodically
- Federation: multiple NameNodes responsible for subnamespaces
- scheduling (FIFO, capacity scheduling, fair scheduling, delay scheduling)
Problems with Hadoop v1.0
- Only one JobTracker
- limited horizontal scalability (of Task Trackers)
- fault tolerance and maintenance: if JT dies/stops all jobs must restart
- Rigid programming model: only supports MapReduce
YARN: Yet Another Resource Negotiator
- Generalized cluster management
- Reponsible for resources in the clusters
- resource: CPU/memory/etc. time required by a program
- Any node can become a job tracker for a given job
- Tasks are now "containers" (packages of software that contain all of the necessary elements to run in any environment)
YARN Components
- Resource Manager
- one node per cluster (can configure a failover node)
- manages job scheduling and execution (but NO job tracking)
- manages global resource allocation
- Application Master
- one node per job, replaces JobTracker in Hadoop v1
- manages task scheduling and execution for a given job
- Node Manager
- One process per node, similar to TaskTracker in Hadoop v1
- manages the lifecycle of task containers
- reports to Resource Manager on health and resource usage
Execution flow of M/R on YARN
See p.10, lecture9_hadoops
Client submit application -> ResourceManager starts MRAppMaster (via NodeManager) ->MRAppMaster starts taskJVM (via NodeManager) to run M/R Task
Job Scheduling
Job scheduling should follow these resource management design principles:
- provide fast response times to small jobs in a multitenant Hadoop cluster (many users)
- improve utilization: proper load balancing, hotspot avoidance
- improve data locality: running task closer to data (same node/rack)
Hadoop has a taste for simplicity. YARN is a centralized resource negotiator with very basic scheduling and decision techniques:
- FIFO: suffer from poor parallelism and small jobs starvation
- Capacity schedular
- Organize jobs into (hierarchical) queues
- One queue for production (heavier) jobs (80% overall capacity)
- One queue for test (lighter) jobs (20% overall capacity)
- FIFO scheduling withing each queue
- Suppoerts preemption: system can stop a job/task to maintain the promised capacity
- Issue: underutilization
- Fair scheduler
- Conceptually similar to Capacity Scheduler
- Group jobs into "pools" (implemented as queues) and assign to each pool a guaranteed minimum share of resources
- On average every job in a pool will get an equal share overtime
- Extra capacity is diveded evenly between pools
- Implementation: maintain a priority queue of jobs (not tasks) by the number of running tasks (small is higher priority). When a resource is free, assign it to the head of queue (the job with the least number of running tasks)
However, the schedulings hurt data locality: the job picked by policy may not have data on free nodes
- Delay Scheduling: trade time for locality
- force jobs to wait for a short time if they cannot launch local tasks
- Empirically 1-5 seconds helps reaching almost 100% map locality
- It works well under two conditions
- many tasks are short relative to jobs
- there are many locations where a task can run efficiently (block repliaction)
Hadoop v3
Long-term resource planning (cloud computing)
- Resources: cumulative CPU and memory over time
- Static capacity attribution managed by the system admin is not feasible in a cloud setup (millions of users)
- resources underutilization: fluctuating max cluster capacity (hardware failure/maintenance) and fluctuating demands
- when reaching max capacity: can move/prioritized jobs to the extent that SLA (serve-level agreement) allows
Improvements over v2
- Better YARN timeline service which improves scalability and reliability
- Intra DataNode balancing: alleviates skew within DataNode
- Erasure coding replacing replication factor of 3: storage overhead (extra storage needed) is reduced to 50% as opposed to 200%
- NameNode HA (high availability): supports 2 or more standby NameNodes as opposed to only one passive NameNode