# Distributed File Systems We look at very large scale data processing techniques with the following characteristics - massive amounts of data - mostly read (OLAP) workload - efficiency (data propcessed per second) over consistency - separating storage and computation Let's say we want to process a data file of size 1TB. This will take ~1 hour to read from SSD (~500MB/s) and ~2.5 hours to read from HDD (~150MB/s). Since the data storage medium is a bottleneck, we might want to use multiple disks to support parallel reads. But on a single machine there is bus limitation. So we might want to use *N* machines with *K* disks. This would be cost-efficient using cheap machines but hard to program ## File System A file system stores files (data), along with their metadata (name, access permission, timestamps, file size, location of data blocks) in a data structure called *inodes* (index nodes) One machine can have many disks. How should files be stored across disks? ### RAID: redundant array of independent disks RAID is a data storage virtualization technique that combines multiple physical disk drive components into one or more logical units for the purposes of data redundancy and/or performance improvement. Different RAID levels provide different balance among the key goals of availability, fault tolerance, performance and capacity. **RAID 0 (striping)** - Data is striped across multiple disks --> better read/write performace - No redundancy or fault tolerate (data loss will occur if a single disk fails) **RAID 1 (Mirroring)** - Data is duplicated across two or more disks, providing fault tolerance through redundancy - Improves read performance but needs more storage and slower writes **RAID 5 (Striping with parity)** - Data and parity information (the XOR of data chunks across disks) are striped across at least three disks - Improves read performance fault tolerance (can handle 1 disk failure) ## Parallel/Distributed File Systems ### Features 1. **Distributed architecture** - multiple servers work together to store/manage/access data 2. **Data chunking/striping** - data is divided into smaller chunks and distributed across multiple storage devices - enables concurrent reads and writes for improved performance 3. **Metadata management** - metadata can be stored and managed separately from data (stored on a different node) - allows for efficient searching, indexing, and data organization 4. **Scalability** - designed to handle large-scale storage and computing - can easily expand storage and performance as needed 5. **Fault tolerance & data redundancy** - replication, RAID, erasure coding (Reed–Solomon: recover any 1 block from any *N* other blocks while withstanding up to *K* concurrent failures) 6. **Load balancing** - ensures optimal distribution of data and workload across available resources - minimizes bottleneck and maximizes performance ### Design Considerations 1. Disk failure is the norm given the number of storage machines 2. Most files are large --> block size I/O operations 3. Different programming models with different physical architecture a) shared memory: multi-thread with locks on data structures b) massage passing: master-slave/producer-consumer ### Examples Google File System (GFS), Apache Hadoop Distributed File Sysmtem (HDFS) ## Google File System (GFS) ### Features - Central themes in distributed systems: parallel performance, fault tolerance, replication, (weak) consistency - MapReduce and BigTable are built on top of GFS - Single master architecture - Original paper: https://static.googleusercontent.com/media/research.google.com/en//archive/gfs-sosp2003.pdf ### Motivation - Commodity hardware: error is frequent in large setups - Large files: 100Mb+ - File operations: read and append, mostly sequential reads ### Design Principles 1. **Fault tolerance** - GFS is designed to handle hardware failures and network issues gracefully through replication, checksums, and automatic recovery mechanisms 2. **Data replication** - To ensure durability and availability, the system replicates each chunk across multiple (default=3) nodes (chunkservers) 3. **High throughput** - GFS is optimized for high sustained throughput over low latency (workloads typically involve large data processing tasks) 4. **Consistency and atomicity** - GFS provides a consistent view of the file system using write serializability, chunk versioning, atomic record appends, etc 5. **Chunk-based storage** - GFS stores files as fixed-size chunks and uses a single master server to manage the metadata associated with these chunks - Actual chunk data are stored on chunk servers distributed across the cluster 6. **Scalability** - Architecture designed to handle large numberes of files and a vast amount of storage - Runs on commodity hardware 7. **Master-slave architecture** - A single master server (SPOF) manages the file system metadata and handles garbage collection and chunk migration for load balancing and failure recovery ### Master 1. Stores the full file system metadata in memory for fast operations and background scans - namespace (file system tree): hierarchical information about files and directories - filename: an array of chunk handlers ??? - chunk handler: list of chunkserver + chunk version number info - access control information - current locations of chunks 2. Manages chunk leases (locks), garbage collection (freeing unused chunks) and chunk migration (copying/moving chunks) 3. Fault tolerace achieved using operation log (oplog) - namespace and filename metadata are persisted by logging mutations to oplog on the master's local disk - oplog is repliacted periodically to another machine (shadow master) - new master can be started if the master fails by replaying the oplog 4. Periodically communicates with all chunkservers via heartbeat messages to get states and send commands - heartbeat: short message ensuring that the server is alive and collecting statistics on the server 5. Master does not store chunk location information persistently (only in memory) - needs heartbeats from chunkservers for chunk location and primary status info upon startup of (new) master ### Chunkservers (slaves) 1. Stores file chunks on local disks as Linux files 2. Chunk size: default 64 MB, analogous to page size in RDBMS 3. Each chunk has a chunk handle: global unique identifier assigned by the master 4. Stores a 32-bit checksum (to detect data corruption) in memoery and logged to disk ### Consistency Operations are atomic: - Namespace mutations are atomic (managed by master using namespace locking) - Record append operations are atomic - Master's operation log defines a global order Ensuring consistency after mutation in the presence of chunk replication - Applies mutations in order by designating a primary replica - Uses chunk version numbers to detect stale replicas (managed by the master and garbage-collected when appropriate) ### Write Operation 1. Lease acquisition [client --> master]: chunk version increment 2. Primary selection [master --> client]: need to identify a primary replica which contains the latest version of the data being modified 3. Data forwarding [client --> chunknodes]: send data to primary replica node and secondary replica nodes 4. Data buffering and acknoledgement [client --> primary replica] 5. Write command: primary mutation (write on primary replica node) --> secondary mutation 6. Primary acknowledgement [secondary replica nodes --> primary replica node]: secondary mutations are done 7. Client acknowledgement [primary replica --> client]: write operation done If any step fails the client is informed and retries the write ### Colossus: next generation GFS - Better scalability and availability. Instead of using a single master node to manage metadata, Colossus architecture contains - Curators: distributed NoSQL database to store metadata - Custodians: responsible for storage management (space balancing, raid, etc.) - Blog post: https://cloud.google.com/blog/products/storage-data-transfer/a-peek-behind-colossus-googles-file-system ## Apache Hadoop Distributed File System (HDFS) ### What is Apache Hadoop? - A platform/ecosystem for big data processing, started by yahoo and inspired by MapReduce and GFS from Google, written mostly in Java - Core components include - Hadoop Distributed File System (HDFS) - A Map Reduce implementation - Other management services such as YARN ### HDFS Motivations - Lots of cheap PCs but high failure rate - Write-once and read many times data (logs, archives; processing is mostly read-oriented) - Large streaming reads over random access (high sustained throughput over low latency) ### HDFS Features - A stand-alone general purpose distributed file system, used as a part of a Hadoop cluster - Highly scalable (to Petabytes of data) - Fault-tolerant - Offers a simplified data acess - User sees their files as if they were on a single machine - Tuning is required for large deployments - Limited to file level permission (Linux like security) ### HDFS Basic Operations 1. When an input file is added to HDFS: - File is split into smaller blocks of fixed size - Block: single unit of storage, transparent to the end-use - Block size traditionally 64/128/256 MB: large enough to force a choice/tradeoff between a seek and a sequential disk read - Want to minimize seek_time/transfer_time where seek_time + transfer_time is the time to read a block - Each block is replicated to multiple host (machines) - Replication level (default=3) is configurable 2. When a host crashes or is decommissioned, all blocks ware replicated to a new host 3. when a new host is added, blocks are rebalanced to avoid data skew ### HDFS Architecture 1. **NameNode** (master) - Manages the file system namespace by storing - metadata: file-to-blocks and block-to-DataNodes mappings - file/directory structure, access permission, etc. - Coordinate file operations - Direct clients to DataNodes for reads and writes - No data is moved through the NameNode - Maintains overall health - Periodic communication (heartbeat) with DataNodes for block re-replication, block rebalancing and garbage collection - Communicates with Secondary NameNode: checkpointing - Executes file system namespace operations - opening, closing, renaming file/directories, etc. 2. **DataNode** (slaves) - Actual storage and management of data block on a single host - Provides client with access to data - Perform block creation, deletion and replication upon instruction from NameNode - Sends heartbeat (every 3 seconds) and blockreport (every 6 hours) to NameNode 3. **Secondary NameNode**: Checkpointing, NOT a failover NameNode - The NameNode keeps the current state of the HDFS in an `fsimage` (file system image) file, the last checkpoint of the namespace - All modifications to the HDFS are written to a log (`edits`) - During startup, NameNode merges `fsimage` and `edits` to get to the latest state - Secondary NameNode merges `fsimage` and `edits` periodically (default is every 1 hour or 1 million transcations) ### Replication - Maximize reliability, availability and bandwidth through opportunistic execution - NameNode determins replica placement - First replica on the local node (where the writing process happens) or a random node - Second replica is usually on the same rack as the first replica to reduce cross-rack network I/O - Third replica on a different node in a different rack - Consider geo-replication for disaster recovery ### Rebalancer & Block Placement Policy Considerations - Non-uniform distribution of blocks across hosts (e.g. when hosts are added/removed) - Different hardware generation (different performace) - Current hardware unitilization across hosts - HDFS operates on block level and not on file level The system administrator can perform periodic cluster-wide rebalancing of blocks: `hadoop balancer <threshold>` (`threshold` X%, default=10) - Each node's utilization ratio is within X% of the overall cluster utilization ratio (utilized_space/overall_capacity) - Smaller threshold leads to more balanced distribution but might take a lot of time HDFS allows for custom block placement policies, such as - using servers with lower utilization first - operating on file level - assigning weights to DataNodes - moving more blocks to newer generation hardware # Distributed Computation ## Hadoop v1.0 (MapReduce) ### What is MapReduce? MapReduce can refer to - the programming paradigm/model using map (group by) and reduce (aggregate) functions - the execution framework or specific implementation of the paradigm: the data processing layer of Hadoop In MapReduce, a job defines, schedules, monitors, and manages the execution of Hadoop Map Reduce .jar files. A job splits the input file(s) into chunks which are processed by multiple (map/reduce) tasks. What can M/R do? e.g. estimate $\pi$ using (quasi-)MC methods. ### Hadoop 1.0 Architecture 1. **Job Tracker** - one Job Tracker per Hadoop instance - manages job life cycle - receives and accepts job submissions from clients - manages resources and scheduling for each task - communicates with task trackers to deploy and run tasks - attempts to assign tasks to support data locality 2. **Task Tracker** - one task tracker per node - runs and manages individual (map/reduce) tasks locally - a mapper/reducer is a node running a map/reduce task - communicates progress of tasks back to Job Tracker 3. MapReduce (data processing) is laid **on top of HDFS** (data storage/accessing) - One NameNode, one Secondary NameNode, one Job Tracker node - Each slave node contains a TaskTracker process and a DataNode process ### MapReduce Implementation Client provides Map and Reduce functions; everything else is handled by Hadoop M/R. #### Steps 1. **Input Reader** - divide input into splits (data chunks) and send each split to a map task 2. **Map Task** - apply the Map function to each split - each Map returns a list of key-value pairs - Map tasks run in parallel, - usually set to 1 mapper per block (at most #DataNode tasks) 3. **Shuffle** - the output of each mapper is sorted by key on the map instance and send to different reducers accordingly **Shuffle and Sort on the Map Instance** - Map outputs are buffered in memory using circular buffer (queue, 100 MB), and contents are spilled to disk when 80% full - After the mapper finishes, all spill files on the DataNode are merged into a single file, which is then sorted by key and partitioned based on the number of reducers - An optional **combiner** (local reducer that runs in memory after map phase) can be executed by the spilling thread before writing tuples to disk, reducing I/O and network traffic - records from across Map tasks with the same key are sent to the same reducer for aggregation **Shuffle and Sort on the Reduce Instance** - Map outputs are copied over to the reducer nodes - "Sort" is done by a multi-pass merge of map outputs: once a mapper finishes and sends the data to the reducer node, the chunk is combined with existing data in a sorted manner - The final merge pass goes directly into the reduce task 4. **Reduce Task** - apply the Reduce function to each key and output a list of key-value pairs - need to wait for all mappers to finish (bottleneck) before starting - set the number of reducers depending on the expected number of keys: too few create bottleneck while too many create extensive network traffic - can set to one reducer if we just want sorted output #### Things to handle - Scheduling: assign nodes to map/reduce tasks - Data distribution: move processes to data (locality) - Synchronization: gather, sort and shuffle intermediate data - Error and faults - Detect worker failures and restarts - Everything happen on top of HDFS (intermediary results, shyffling partitions, etc.) ## Hadoop v2 ### Changes over v1 1. YARN replacing old architecture 2. HDFS 2: NameNode improvements - High availability: add an additional passive (failover) NameNode which is updated periodically - Federation: multiple NameNodes responsible for subnamespaces 3. scheduling (FIFO, capacity scheduling, fair scheduling, delay scheduling) ### Problems with Hadoop v1.0 - Only one JobTracker - limited horizontal scalability (of Task Trackers) - fault tolerance and maintenance: if JT dies/stops all jobs must restart - Rigid programming model: only supports MapReduce ### YARN: Yet Another Resource Negotiator - Generalized cluster management - Reponsible for resources in the clusters - resource: CPU/memory/etc. $\times$ time required by a program - Any node can become a job tracker for a given job - Tasks are now "containers" (packages of software that contain all of the necessary elements to run in any environment) #### YARN Components - **Resource Manager** - one node per cluster (can configure a failover node) - manages job scheduling and execution (but NO job tracking) - manages global resource allocation - **Application Master** - one node per job, replaces JobTracker in Hadoop v1 - manages task scheduling and execution for a given job - **Node Manager** - One process per node, similar to TaskTracker in Hadoop v1 - manages the lifecycle of task containers - reports to Resource Manager on health and resource usage #### Execution flow of M/R on YARN See p.10, lecture9_hadoops Client submit application -> ResourceManager starts MRAppMaster (via NodeManager) ->MRAppMaster starts taskJVM (via NodeManager) to run M/R Task ### Job Scheduling Job scheduling should follow these resource management design principles: - provide fast response times to small jobs in a multitenant Hadoop cluster (many users) - improve utilization: proper load balancing, hotspot avoidance - improve data locality: running task closer to data (same node/rack) Hadoop has a taste for simplicity. YARN is a centralized resource negotiator with very basic scheduling and decision techniques: 1. FIFO: suffer from poor parallelism and small jobs starvation 2. Capacity schedular - Organize jobs into (hierarchical) queues - One queue for production (heavier) jobs (80% overall capacity) - One queue for test (lighter) jobs (20% overall capacity) - FIFO scheduling withing each queue - Suppoerts preemption: system can stop a job/task to maintain the promised capacity - Issue: underutilization 3. Fair scheduler - Conceptually similar to Capacity Scheduler - Group jobs into "pools" (implemented as queues) and assign to each pool a guaranteed minimum share of resources - On average every job in a pool will get an equal share overtime - Extra capacity is diveded evenly between pools - Implementation: maintain a priority queue of jobs (not tasks) by the number of running tasks (small is higher priority). When a resource is free, assign it to the head of queue (the job with the least number of running tasks) However, the schedulings hurt data locality: the job picked by policy may not have data on free nodes 4. Delay Scheduling: trade time for locality - force jobs to wait for a short time if they cannot launch local tasks - Empirically 1-5 seconds helps reaching almost 100% map locality - It works well under two conditions - many tasks are short relative to jobs - there are many locations where a task can run efficiently (block repliaction) ## Hadoop v3 Long-term resource planning (cloud computing) - Resources: cumulative CPU and memory over time - Static capacity attribution managed by the system admin is not feasible in a cloud setup (millions of users) - resources underutilization: fluctuating max cluster capacity (hardware failure/maintenance) and fluctuating demands - when reaching max capacity: can move/prioritized jobs to the extent that SLA (serve-level agreement) allows ### Improvements over v2 1. Better YARN timeline service which improves scalability and reliability 2. Intra DataNode balancing: alleviates skew within DataNode 3. Erasure coding replacing replication factor of 3: storage overhead (extra storage needed) is reduced to 50% as opposed to 200% 4. NameNode HA (high availability): supports 2 or more standby NameNodes as opposed to only one passive NameNode