Try   HackMD

Distributed File Systems

We look at very large scale data processing techniques with the following characteristics

  • massive amounts of data
  • mostly read (OLAP) workload
  • efficiency (data propcessed per second) over consistency
  • separating storage and computation

Let's say we want to process a data file of size 1TB. This will take ~1 hour to read from SSD (~500MB/s) and ~2.5 hours to read from HDD (~150MB/s). Since the data storage medium is a bottleneck, we might want to use multiple disks to support parallel reads. But on a single machine there is bus limitation. So we might want to use N machines with K disks. This would be cost-efficient using cheap machines but hard to program

File System

A file system stores files (data), along with their metadata (name, access permission, timestamps, file size, location of data blocks) in a data structure called inodes (index nodes)

One machine can have many disks. How should files be stored across disks?

RAID: redundant array of independent disks

RAID is a data storage virtualization technique that combines multiple physical disk drive components into one or more logical units for the purposes of data redundancy and/or performance improvement. Different RAID levels provide different balance among the key goals of availability, fault tolerance, performance and capacity.

RAID 0 (striping)

  • Data is striped across multiple disks > better read/write performace
  • No redundancy or fault tolerate (data loss will occur if a single disk fails)

RAID 1 (Mirroring)

  • Data is duplicated across two or more disks, providing fault tolerance through redundancy
  • Improves read performance but needs more storage and slower writes

RAID 5 (Striping with parity)

  • Data and parity information (the XOR of data chunks across disks) are striped across at least three disks
  • Improves read performance fault tolerance (can handle 1 disk failure)

Parallel/Distributed File Systems

Features

  1. Distributed architecture
    • multiple servers work together to store/manage/access data
  2. Data chunking/striping
    • data is divided into smaller chunks and distributed across multiple storage devices
    • enables concurrent reads and writes for improved performance
  3. Metadata management
    • metadata can be stored and managed separately from data (stored on a different node)
    • allows for efficient searching, indexing, and data organization
  4. Scalability
    • designed to handle large-scale storage and computing
    • can easily expand storage and performance as needed
  5. Fault tolerance & data redundancy
    • replication, RAID, erasure coding (Reed–Solomon: recover any 1 block from any N other blocks while withstanding up to K concurrent failures)
  6. Load balancing
    • ensures optimal distribution of data and workload across available resources
    • minimizes bottleneck and maximizes performance

Design Considerations

  1. Disk failure is the norm given the number of storage machines
  2. Most files are large > block size I/O operations
  3. Different programming models with different physical architecture
    a) shared memory: multi-thread with locks on data structures
    b) massage passing: master-slave/producer-consumer

Examples

Google File System (GFS), Apache Hadoop Distributed File Sysmtem (HDFS)

Google File System (GFS)

Features

Motivation

  • Commodity hardware: error is frequent in large setups
  • Large files: 100Mb+
  • File operations: read and append, mostly sequential reads

Design Principles

  1. Fault tolerance
    • GFS is designed to handle hardware failures and network issues gracefully through replication, checksums, and automatic recovery mechanisms
  2. Data replication
    • To ensure durability and availability, the system replicates each chunk across multiple (default=3) nodes (chunkservers)
  3. High throughput
    • GFS is optimized for high sustained throughput over low latency (workloads typically involve large data processing tasks)
  4. Consistency and atomicity
    • GFS provides a consistent view of the file system using write serializability, chunk versioning, atomic record appends, etc
  5. Chunk-based storage
    • GFS stores files as fixed-size chunks and uses a single master server to manage the metadata associated with these chunks
    • Actual chunk data are stored on chunk servers distributed across the cluster
  6. Scalability
    • Architecture designed to handle large numberes of files and a vast amount of storage
    • Runs on commodity hardware
  7. Master-slave architecture
    • A single master server (SPOF) manages the file system metadata and handles garbage collection and chunk migration for load balancing and failure recovery

Master

  1. Stores the full file system metadata in memory for fast operations and background scans
    • namespace (file system tree): hierarchical information about files and directories
    • filename: an array of chunk handlers ???
    • chunk handler: list of chunkserver + chunk version number info
    • access control information
    • current locations of chunks
  2. Manages chunk leases (locks), garbage collection (freeing unused chunks) and chunk migration (copying/moving chunks)
  3. Fault tolerace achieved using operation log (oplog)
    • namespace and filename metadata are persisted by logging mutations to oplog on the master's local disk
    • oplog is repliacted periodically to another machine (shadow master)
    • new master can be started if the master fails by replaying the oplog
  4. Periodically communicates with all chunkservers via heartbeat messages to get states and send commands
    • heartbeat: short message ensuring that the server is alive and collecting statistics on the server
  5. Master does not store chunk location information persistently (only in memory)
    • needs heartbeats from chunkservers for chunk location and primary status info upon startup of (new) master

Chunkservers (slaves)

  1. Stores file chunks on local disks as Linux files
  2. Chunk size: default 64 MB, analogous to page size in RDBMS
  3. Each chunk has a chunk handle: global unique identifier assigned by the master
  4. Stores a 32-bit checksum (to detect data corruption) in memoery and logged to disk

Consistency

Operations are atomic:

  • Namespace mutations are atomic (managed by master using namespace locking)
  • Record append operations are atomic
  • Master's operation log defines a global order

Ensuring consistency after mutation in the presence of chunk replication

  • Applies mutations in order by designating a primary replica
  • Uses chunk version numbers to detect stale replicas (managed by the master and garbage-collected when appropriate)

Write Operation

  1. Lease acquisition [client > master]: chunk version increment
  2. Primary selection [master > client]: need to identify a primary replica which contains the latest version of the data being modified
  3. Data forwarding [client > chunknodes]: send data to primary replica node and secondary replica nodes
  4. Data buffering and acknoledgement [client > primary replica]
  5. Write command: primary mutation (write on primary replica node) > secondary mutation
  6. Primary acknowledgement [secondary replica nodes > primary replica node]: secondary mutations are done
  7. Client acknowledgement [primary replica > client]: write operation done

If any step fails the client is informed and retries the write

Colossus: next generation GFS

Apache Hadoop Distributed File System (HDFS)

What is Apache Hadoop?

  • A platform/ecosystem for big data processing, started by yahoo and inspired by MapReduce and GFS from Google, written mostly in Java
  • Core components include
    • Hadoop Distributed File System (HDFS)
    • A Map Reduce implementation
    • Other management services such as YARN

HDFS Motivations

  • Lots of cheap PCs but high failure rate
  • Write-once and read many times data (logs, archives; processing is mostly read-oriented)
  • Large streaming reads over random access (high sustained throughput over low latency)

HDFS Features

  • A stand-alone general purpose distributed file system, used as a part of a Hadoop cluster
  • Highly scalable (to Petabytes of data)
  • Fault-tolerant
  • Offers a simplified data acess
    • User sees their files as if they were on a single machine
    • Tuning is required for large deployments
  • Limited to file level permission (Linux like security)

HDFS Basic Operations

  1. When an input file is added to HDFS:
    • File is split into smaller blocks of fixed size
      • Block: single unit of storage, transparent to the end-use
      • Block size traditionally 64/128/256 MB: large enough to force a choice/tradeoff between a seek and a sequential disk read
        • Want to minimize seek_time/transfer_time where seek_time + transfer_time is the time to read a block
    • Each block is replicated to multiple host (machines)
    • Replication level (default=3) is configurable
  2. When a host crashes or is decommissioned, all blocks ware replicated to a new host
  3. when a new host is added, blocks are rebalanced to avoid data skew

HDFS Architecture

  1. NameNode (master)
    • Manages the file system namespace by storing
      • metadata: file-to-blocks and block-to-DataNodes mappings
      • file/directory structure, access permission, etc.
    • Coordinate file operations
      • Direct clients to DataNodes for reads and writes
      • No data is moved through the NameNode
    • Maintains overall health
      • Periodic communication (heartbeat) with DataNodes for block re-replication, block rebalancing and garbage collection
    • Communicates with Secondary NameNode: checkpointing
    • Executes file system namespace operations
      • opening, closing, renaming file/directories, etc.
  2. DataNode (slaves)
    • Actual storage and management of data block on a single host
    • Provides client with access to data
    • Perform block creation, deletion and replication upon instruction from NameNode
    • Sends heartbeat (every 3 seconds) and blockreport (every 6 hours) to NameNode
  3. Secondary NameNode: Checkpointing, NOT a failover NameNode
    • The NameNode keeps the current state of the HDFS in an fsimage (file system image) file, the last checkpoint of the namespace
    • All modifications to the HDFS are written to a log (edits)
    • During startup, NameNode merges fsimage and edits to get to the latest state
    • Secondary NameNode merges fsimage and edits periodically (default is every 1 hour or 1 million transcations)

Replication

  • Maximize reliability, availability and bandwidth through opportunistic execution
  • NameNode determins replica placement
    • First replica on the local node (where the writing process happens) or a random node
    • Second replica is usually on the same rack as the first replica to reduce cross-rack network I/O
    • Third replica on a different node in a different rack
    • Consider geo-replication for disaster recovery

Rebalancer & Block Placement Policy

Considerations

  • Non-uniform distribution of blocks across hosts (e.g. when hosts are added/removed)
  • Different hardware generation (different performace)
  • Current hardware unitilization across hosts
  • HDFS operates on block level and not on file level

The system administrator can perform periodic cluster-wide rebalancing of blocks:
hadoop balancer <threshold> (threshold X%, default=10)

  • Each node's utilization ratio is within X% of the overall cluster utilization ratio (utilized_space/overall_capacity)
  • Smaller threshold leads to more balanced distribution but might take a lot of time

HDFS allows for custom block placement policies, such as

  • using servers with lower utilization first
  • operating on file level
  • assigning weights to DataNodes
  • moving more blocks to newer generation hardware

Distributed Computation

Hadoop v1.0 (MapReduce)

What is MapReduce?

MapReduce can refer to

  • the programming paradigm/model using map (group by) and reduce (aggregate) functions
  • the execution framework or specific implementation of the paradigm: the data processing layer of Hadoop

In MapReduce, a job defines, schedules, monitors, and manages the execution of Hadoop Map Reduce .jar files. A job splits the input file(s) into chunks which are processed by multiple (map/reduce) tasks.

What can M/R do? e.g. estimate

π using (quasi-)MC methods.

Hadoop 1.0 Architecture

  1. Job Tracker
    • one Job Tracker per Hadoop instance
    • manages job life cycle
    • receives and accepts job submissions from clients
    • manages resources and scheduling for each task
    • communicates with task trackers to deploy and run tasks
    • attempts to assign tasks to support data locality
  2. Task Tracker
    • one task tracker per node
    • runs and manages individual (map/reduce) tasks locally
      • a mapper/reducer is a node running a map/reduce task
    • communicates progress of tasks back to Job Tracker
  3. MapReduce (data processing) is laid on top of HDFS (data storage/accessing)
    • One NameNode, one Secondary NameNode, one Job Tracker node
    • Each slave node contains a TaskTracker process and a DataNode process

MapReduce Implementation

Client provides Map and Reduce functions; everything else is handled by Hadoop M/R.

Steps

  1. Input Reader
    • divide input into splits (data chunks) and send each split to a map task
  2. Map Task
    • apply the Map function to each split
    • each Map returns a list of key-value pairs
    • Map tasks run in parallel,
    • usually set to 1 mapper per block (at most #DataNode tasks)
  3. Shuffle
    • the output of each mapper is sorted by key on the map instance and send to different reducers accordingly
      Shuffle and Sort on the Map Instance
      • Map outputs are buffered in memory using circular buffer (queue, 100 MB), and contents are spilled to disk when 80% full
      • After the mapper finishes, all spill files on the DataNode are merged into a single file, which is then sorted by key and partitioned based on the number of reducers
      • An optional combiner (local reducer that runs in memory after map phase) can be executed by the spilling thread before writing tuples to disk, reducing I/O and network traffic
    • records from across Map tasks with the same key are sent to the same reducer for aggregation
      Shuffle and Sort on the Reduce Instance
      • Map outputs are copied over to the reducer nodes
      • "Sort" is done by a multi-pass merge of map outputs: once a mapper finishes and sends the data to the reducer node, the chunk is combined with existing data in a sorted manner
      • The final merge pass goes directly into the reduce task
  4. Reduce Task
    • apply the Reduce function to each key and output a list of key-value pairs
    • need to wait for all mappers to finish (bottleneck) before starting
    • set the number of reducers depending on the expected number of keys: too few create bottleneck while too many create extensive network traffic
      • can set to one reducer if we just want sorted output

Things to handle

  • Scheduling: assign nodes to map/reduce tasks
  • Data distribution: move processes to data (locality)
  • Synchronization: gather, sort and shuffle intermediate data
  • Error and faults
  • Detect worker failures and restarts
  • Everything happen on top of HDFS (intermediary results, shyffling partitions, etc.)

Hadoop v2

Changes over v1

  1. YARN replacing old architecture
  2. HDFS 2: NameNode improvements
    • High availability: add an additional passive (failover) NameNode which is updated periodically
    • Federation: multiple NameNodes responsible for subnamespaces
  3. scheduling (FIFO, capacity scheduling, fair scheduling, delay scheduling)

Problems with Hadoop v1.0

  • Only one JobTracker
    • limited horizontal scalability (of Task Trackers)
    • fault tolerance and maintenance: if JT dies/stops all jobs must restart
  • Rigid programming model: only supports MapReduce

YARN: Yet Another Resource Negotiator

  • Generalized cluster management
  • Reponsible for resources in the clusters
    • resource: CPU/memory/etc.
      ×
      time required by a program
  • Any node can become a job tracker for a given job
  • Tasks are now "containers" (packages of software that contain all of the necessary elements to run in any environment)

YARN Components

  • Resource Manager
    • one node per cluster (can configure a failover node)
    • manages job scheduling and execution (but NO job tracking)
    • manages global resource allocation
  • Application Master
    • one node per job, replaces JobTracker in Hadoop v1
    • manages task scheduling and execution for a given job
  • Node Manager
    • One process per node, similar to TaskTracker in Hadoop v1
    • manages the lifecycle of task containers
    • reports to Resource Manager on health and resource usage

Execution flow of M/R on YARN

See p.10, lecture9_hadoops
Client submit application -> ResourceManager starts MRAppMaster (via NodeManager) ->MRAppMaster starts taskJVM (via NodeManager) to run M/R Task

Job Scheduling

Job scheduling should follow these resource management design principles:

  • provide fast response times to small jobs in a multitenant Hadoop cluster (many users)
  • improve utilization: proper load balancing, hotspot avoidance
  • improve data locality: running task closer to data (same node/rack)

Hadoop has a taste for simplicity. YARN is a centralized resource negotiator with very basic scheduling and decision techniques:

  1. FIFO: suffer from poor parallelism and small jobs starvation
  2. Capacity schedular
    • Organize jobs into (hierarchical) queues
      • One queue for production (heavier) jobs (80% overall capacity)
      • One queue for test (lighter) jobs (20% overall capacity)
    • FIFO scheduling withing each queue
    • Suppoerts preemption: system can stop a job/task to maintain the promised capacity
    • Issue: underutilization
  3. Fair scheduler
    • Conceptually similar to Capacity Scheduler
    • Group jobs into "pools" (implemented as queues) and assign to each pool a guaranteed minimum share of resources
    • On average every job in a pool will get an equal share overtime
    • Extra capacity is diveded evenly between pools
    • Implementation: maintain a priority queue of jobs (not tasks) by the number of running tasks (small is higher priority). When a resource is free, assign it to the head of queue (the job with the least number of running tasks)

However, the schedulings hurt data locality: the job picked by policy may not have data on free nodes

  1. Delay Scheduling: trade time for locality
    • force jobs to wait for a short time if they cannot launch local tasks
    • Empirically 1-5 seconds helps reaching almost 100% map locality
    • It works well under two conditions
      • many tasks are short relative to jobs
      • there are many locations where a task can run efficiently (block repliaction)

Hadoop v3

Long-term resource planning (cloud computing)

  • Resources: cumulative CPU and memory over time
    • Static capacity attribution managed by the system admin is not feasible in a cloud setup (millions of users)
    • resources underutilization: fluctuating max cluster capacity (hardware failure/maintenance) and fluctuating demands
    • when reaching max capacity: can move/prioritized jobs to the extent that SLA (serve-level agreement) allows

Improvements over v2

  1. Better YARN timeline service which improves scalability and reliability
  2. Intra DataNode balancing: alleviates skew within DataNode
  3. Erasure coding replacing replication factor of 3: storage overhead (extra storage needed) is reduced to 50% as opposed to 200%
  4. NameNode HA (high availability): supports 2 or more standby NameNodes as opposed to only one passive NameNode