# Sacaleable ## Week1 ### Lecture 1 (Welcome to DATA420) ''### Lecture 1 - Processing (Apache Spark, Hadoop MapReduce) - Reaource Management (YARN) - Storage (HDFS, HBase, Kudu, Cloud) ### Lecture 2 (Introduction to distributed computing) How the course is going - tell me anytime through the link on learn. Course overview - better not to use social media for analysis. > No extension of assignment 2.. make sure try to do everything on time. Review Python, Shell and linux before week 2. #### What makes big Data (e.g. anything which can not be processed on your machine) - 4 V .. Volume, - Velocity (how fast data is generated), - Variety, (complex structure, unstrucutures, variety of formats) - Veracity Quality, consistency and reliability) - the one you can't trust the data #### Data generation - Retails ... consumer level data, they sell it and used for customer profiling - Social ... user to user engagement, FP, Insta, Twitter - Media ... Netflix, - Finance - Industry - Medical #### Why do we need to scale beyond one machine? - physical limitation about powerful one "computer" - cost/performance: $/core? $/GB RAM? $/TB HDD? - Reliability: one machine, one hardware, one screen ( limitation but if you have thousands, if 1 died, still rest of them are working) - Volume (because of 1 & 2) #### Computer Diagram - Intel Xeon 28 cores vs AMD Ryzen 64 Cores - RAM & Harddriver (HDD - SATA - 200 MB/s, SSD - SATA - 600 MB/s, SSD - - 3200MB/s) - THe bottle neck is around the components who link them together ![Uploading file..._u7nok07jm]() After the break, we are going to discuss How to scale after we have defined the WHY part. ![Uploading file..._xcpkzzpk0]() We should be agnostic to - number of machines - performance capability of machines (hardware) - Operating systems - physical ocation, methods of installation #### Challenges that we will face along the way - how to distribute the data to avoid issue like concurrence loss of data with hardware failure, maximise parallelism... (This mean, we are looking at how to STORE data which later than processed) - Think about synchronization, scheduling, communication overhead ( we have to design in a way that we don't want our machines/nodes to sit idles and use them effectively) e.g. LAN, WAN, DNS etc - Hardware failure isn't a rear event . It occurs with a specific rate and both storage and computation shouold be affected (permanently) if hardware fails. (For Amazon, it's like hundreds of nodes whereas it's less likely with the retail - as a company they should know which job fails to run their scheduler to run effectively) - Software failure #### Cluster Diagram ![Uploading file..._kr77nz48m]() Master node to orchestrate the jobs. E.g. If we want to host 32 cluster on AWS then it will assign the unique ip to each of them so that you can communicate anywhere in the world. Limitation our computation, storage, concurency, replication. #### Goals we want to satisfy 1. reliability ( we want hardware failure to be tolerated, work is distributed [if worker node is died then masternode should know that work should be distributed to other nodes] or restarted [if master node died]) 2. stability (more nodes with more capacity and no change to method required) 3. Concurency (distrivuted data can be accessed and udpated consistently 4. Distribution (Data and computation is distributed)) 5. Replication (Hardware failure doesn't result in permanent loss of data) [2 is the bear minimum to reduce data loss, 4 is the good default level under all circumstances, you can increase more than this but doesn't mean you might not lose the data but have less chances.] 6. Hetrogeneity (doesn't matter which hardware type - hardware type and operating system can vary) 7. Transparency (cluster appears as a whole, single unite where size and config can vary, but your doesn't have to vary) ### Lab 1 Euler problems 1, 5, 10 ## Week 2 ### Lecture 3 (Hadoop, MapReduce, and HDFS) #### Hadoop ![Uploading file..._rj8mx3xoa]() #### HDFS (distributed storage) ![Uploading file..._5p8o6pvsk]() ![Uploading file..._7m9g3rewu]() ![Uploading file..._7lwl1u0dc]() ### Lecture 4 (Hadoop, MapReduce, and HDFS) #### MapReduce programming model - refresher (how HDFS works...) - Data --> regular chunks (small size and replicate them) . ... NameNode, secondary NameNode (backUp). It's all about storing data -- why 128MB .. it was 64 3 years ago.. what about media files ... - How about computation - MAPReduce (distribution ) Programming Model.. - Theory (Map & Reduce): -Map & Reduce in this order. In map, we process our data in to an intermediate result where is done locally on each node where the chunk of data is being stored. This is done to maximise parallelism and minimize network overhead. The output of the map phase is always structured as (key, value) - Key = used to group, "shuffle", and collect intermediate results during the reduce phase - Value = a quantity relating to what we want to compute. - Examples: - Word count : map each work in our dataset to ("word", 1) - Line count = map each record in our dataset to ("line", 1) - count of each unique word = map each word to (word, 1) - In the reduce phase, these (key, value) pairs are groups/combnied locally then they are shuffled / copied/moved between the nodes to collect the pairs for each unique key in the same place. Then each unique key is finally reduced to give the final output. What doe we mean by combinereduce Examples * count -- reduce function (f(x,y) = x + y * x, y --> x + y * (key,x), (key, y)) --> (key, x+y) \\ There may be scenarios where mapReduce may not be able to reduce the data and give you the same output after the mapReduce... so it may increase the data.. The reduce take two (key, value) pairs and output one (key, value) pairs. If you do this iteratively How this orchestrated: Job tracker: Runs on the master node, one process for each separate job this is runnign Task Tracker - runs on each worker node, one for each overall job that is running e.g. multiple jobs (or multiple students) Master/Slaves (Job Tracker / TaskTracker) ... each task can load data locally from HDD by interacting with local datanode. It keep stuff locall to minimise the network overheard. #### BREAK --- Example.. suppose we want to take a dataset in HDFS and computer line count.. exactly 128 MB >> whatever left... ![Uploading file..._0xv0h5zs6]() - INPUT SPLITS -- It's kind of prelimnary rather a part of mapReduce... In real time we have thousands of lines in one chunk and it could possible that you might have a thousand of lines as a left-over.. - mapfunctions --> mapped results (they are pretty simple but we are going to use the complex one - you can use API..) - Now we reduce our intermediate results.... - mapped result in two splits of input value pair. - local reduction = "combine" - shuffle (copy/sort) - reduce : (("line",x,("line",y)) ---> ("line", x+y) - Final result will be written back to HDFS .. that is mapReduce with KeyValue pair. ?? How reduce is applied recrsively. ?? - log(n) ... - hello world example - hello something example these keys should be on separate nodes so reduction and saving to HDFS is parallel. - Match words to be done --- examples from Amazon Machine learning where we have to make sure all the nodes are efficiently being used to save lots of money. ![Uploading file..._57hosmq0m]() *BE A GOOD DATA SCIENTIST* Exploding ... of FlatMap.... ## Week 3 ### Lecture 5 (Hadoop, Pig, and Hive (Examples)) ![Uploading file..._d3kot26cc]() ![Uploading file..._cnez2exi6]() ### Lecture 6 (Heading to Spark) Problems with the MapReduce: - How hadoop could be improved.writing 1000s of lines in Java to do anything. - During map reduce. temporary data is written back to /tmp in HDFS. This has a input output (I/O) cost - MapReduce is a batch only, or it ca't be a used for streaming data.. (there are lot of application especially eCommerance with constant stream of data).. we can't process a large velocity of data in a streaming way. STREAMING DATA == a process of generating data continously - processed immedietly. Example: streaming data instroduce a delay ... we don't want a delay ( it's better to reduce a delay ... better not to do with batch as there is ainefficiency ) ###### - We are going for batch rather streaming but it's not difficult .. Today we are going through 3 steps improvements 1. PIG is a scripting language intended to make writing mapReduce job easier. (different framekwork, use writing language PIG-Latin" syntax - similar to SQL. ) - you write a small script - process your script - written correct code - try to determine the fewest mapReduce jobs required to do your script * you write 5 lines , it generates 1000s lines of java code and get OUTPUT * same hadoop mapReduce output. * EXAMPLE on mathmadsfarm. - for PIG, use google and PIG API to write the script. - ![](https://i.imgur.com/AiZZAaE.png) - I will be checking google and trying to find out the example. Run the PIG script given as part of the example. - We are going to run pig script on Hadoop - This include pig command line on hadoop which reads from ##### HDFS files/directories In HDFS data is data and the concepts of files and directories is vague. We we using HDFS to put the data on nameNodes. e.g. /data/helloworld (it might be a directory) (could be a path or at least a start of the path or the pre-fix) e.g. ![](https://i.imgur.com/2UchgEk.png) In distributed data, a single dataset is your input but that dataset could ve a single file (of many blocks), it could be adirectory of 32 part files....... or more commonly, it could be a collection of time stamped files..e.g. one file per hour/perday/per month/year - you may have massive directyory strucutre but they all may be of same data set. It's like a prefix where if you are refering something, mean you are loading everthing under that directory or prefix.... - AWS has tons of data ins S3 - Run the script --- copy the script line and run it on linux machine... - PIG scripts is giving the same job name - tell the job stats... - ![](https://i.imgur.com/lEdEWkC.png) The output format is same as of yesterday when we ran the mapReduce job. ##### HIVE .. Hive solves a slightly different problem. :: How do we do distributed relational data query (analysis) in a efficient/convinient way... :: How do we store distributed relational data? -- Most BI tools are all SQL queries, how we distribute them.. how we use Hadoop SQL like thing... Hive does this .. - HIVE is a layer on top of primarily HDFS to handle the structure how the data is handles and mapReduce to handle the queries that implements a relational database. It looks to the end user identical to everyother SQL database to end user... ![](https://i.imgur.com/gaC5SNx.png) There is aMetastore, Execution Engine and Driver.. the other side is similar.. everything is happening on LHS.. 1. Run a query --- 2. getPlan 3. getMetadata 4. sendMetaData 5. sendPlan(what is the efficient plan) 6. executePlan (back n forth untill you get your final output) - to represent a Hive table... The HIVE metastore tells you everything to keep a track (it tells you the strucutre just lieka a name node) ![](https://i.imgur.com/4GbvC1I.png) HIVE is Just an improvement to Hadoop.. HIVE SQL query - similar to PIG... HIVE QL .. (explode keyword) ![](https://i.imgur.com/TEQJh8K.png) - bash script which runs the HIVE query... ![](https://i.imgur.com/TnT8Im3.png) HIVE is just running hadoop - same log output, same info, same job strucutre.. Total time 0 1 min OVERHEAD has to be figured out - more easibility, more overhead.. just have to be aware of the price you are paying to process you data... HIVE is aconvinience tool, make programmers life easier.. ##### PIG is not useful anymore (as it was 4 to 5 years ago) - HIVE is still in the market #### SPARK -- - Resilient Ditributed Dataset - Direct Acyclic Graph Spark is a general purpose *cluster computing system* for *large scale data processing* *machine learning and modeling*. It is implemted in java and has interfaces in python, SQL, scala, R... It is not just a layer on top of HADOOOP .. it is same type of mapping n reducing ..IT has a similar programming / computation model... SCALA is the most efficient, campable, covinience writting java.. compile scala to java JAR file.. Spark is implemted in Java.. Scala is better with Spark .. but we are going to use Python.. SQL is good - no problem. R is not good........ (Python has almost all the feature / R is for statician language) Python has a huge support, everyone knows, it's more powersful especially building pipeline.. Highly encourage you to learn PYTHON >>>>>> ?? How does spark improves on Hadoop ?? we are going to solve 2 and 3rd problem we talked earlier.. There is an additonal cost with I/O with mapReduce.. ##### This is what SPARK does: ![](https://i.imgur.com/jTg2CNQ.png) The old model of HDFS is ![](https://i.imgur.com/cIJBSvv.png) Spark loads the data in to memory once, does the transformation and then finally saves the final output back to HDFS once. Spark solve most of the problem with Hadoop, e.g. map, reduce and count (aggregate). We will talk about performance later, let's talk about SPARK DAG -- (Direct Acyclic Graph) Spark job is represented as a DAG where you load an in put in to a Resilient Distributed Dataset or RDD, transform the RDD to give another RDD, repeat as many tmes as you have transformation, and then finally output to e.g. HDFS or some other database. ![](https://i.imgur.com/ihz2fpX.png) Everytime you tranform - input always move to next .. Acyclic.. e.g. RDD1 --> transform RDD2 (cache) --> RDD3 || RDD4 ![](https://i.imgur.com/4gLRNfj.png) The quickest way - don't use collect(master memory) as it crash the master node... Someone kind of distributed memory RDD... ## Week 4 ### Lecture 7 (still with Spark) - RDD - DAG (Directed Acyclic ) - You should be on top of your assignment each week.. So make sure you should follow the coures overview as the assignment is mapped to each week activity. - Office hours are on MONDAY - see me for any help or ask questions about assignment in the assignment Help section. - Use the forum, ask question, it's good for learning. - DHCN data for the assignment 1. Read the assignment in details. - DONOT COPY ANY CODE/DOCUMENTS etc... YOU CAN GOOGLE but add the reference and just don't copy paste. - Recap - Recall that hadoop suffers from disk I/O limitation (every step reads/ wrirte) - Spark solves both I/O and programming overhead issues. It keeps the data in memory during computation (as long as necessary) and gives you much better API, Scala, SQL and R options. Comments:: - Spark jobs are ephemeral; only exist while you are actively running a job. - Spark has similar managements components e.g. a java process to track what's going on and where data is located across nodes, a java process on the worker to store data in memory. .. Spark Abstraction - We need something to manage distribution of in-memory data. Previously HDFS was all we needed for distribution storage... but this is too slow. We want a resilient distributed dataset or RDD. 1. HDFS will always provide input splits based on the blocks available, these will always contain complete reads. 2. The number and size of the partition is not necessarily the same as the input splits. 3. How are parition different than the input splits? 4. Partitions are numbered across all nodes. | An RDD is a collection of parttions of data stored in a worker memory across the cluster. RDD are never "modified", they are immitable. If you do a transofrmation on an RDD, you always create a new RDD (and then if the old RDD is no longer needed, it is discarded). | Computation is always done on these partitions, which gives out parallel or distributed computation. A transformation on a single partion is a "task" and the overall set of tasks that make up a transformation is a "STAGE". ** Any spark job is a sequence of transformation... - Spark RDD partitions - HDFS will always provide imput splits ### Lecture 8 (Spark, Spark Python, Spark API) - Different interaces/structures that we can use in spark - RDD - data is stored as records in partitions which - None of the RDD N (transofrmation) is executed untill we ask for OUTPUT for some action. - saveAsText --> back to HDFS > - load data in to Spark > cashe / keep it in memoery untill we say so > transformation (RDD -to- RDD) IN Memory > when we take on RDD, and we want to do more than one thing with the same intermediate result... > sometimes, caching intermediate results can be very powerful. > ----- SPARK will only execute a 'job' of input --> transformation --> output, when you ask for an action or output. All intermediate code, e.g. transformation along the way only define what will happen when the overall "DAG" is executed. so, - Python interface --> - used to define the DAG, store 'references' to RDD, or datagrames throughout the DAF. --> - when you ask for an output, SPark goes and does this on the worker node (Java process) ----- #### SOME COMMENTS:- 1. We have python interface to Spark(Java) and some of our actual logiv e.g. mapper / reducer function are written in python as well. Spark workers will start a local python subprocess on the worker nodes andwill execute your logic there. 2. This comes with an additional serialization cost, e.g. coverting java objects topython objects and back (pyspark only). This is not a problem if you are using Spark scala. 3. If we use "Off the shelf" API method, e.g. join, groupBy.... :Python library - weired variables which can change type \\\ :Scala (spark library) - programming language is java, strong typing, every object has a type -- Different interfaces / strucutres we can use in Spark RDD -- data is stored as records in patitions which are distrivbuted. but everything you do is custom map, reduce. If we assume e.g. tabular strucutre, records = rows, and we have columns where columns have types, every row has the same number of entries, even if some of them are blanks etc. We have more strucutre, more off the shelf methods, even if we have less flexibility. ![](https://i.imgur.com/IgpK8Rs.png) ![](https://i.imgur.com/YiMrbkD.png) ![](https://i.imgur.com/c9HUjMG.png) ## Week 5 ### Lecture 9 RDD - - Keypoints.. - Metadata stored by the spark driver: - Partitioner - the logic representing how data is split in to partitions - Partitions - the actual set of data splits associated with RDD - Dependencies - list of upstream RDD(s) - Compute - how the RDD is transformed from the upstream RDD(s) - Preferred locations - where the partitions should be stored - Fault tolerance Partitions are recomputed on failure or cache eviction (e.g. if you don't have enough memory) ![](https://i.imgur.com/eKNXvkI.png) ##### Spark application components ![](https://i.imgur.com/ORYs8tP.png) Spark application components Application components 1. Application = single instance of SparkContext in a Spark Shell 2. Job = complete set of transformations leading to an action on an RDD 3. Stage = set of transformations that can be pipelined and executed by a single worker independently 4. Task = execution of a stage on a single partition of an RDD Controllers 1. Driver - Entry point of the spark shell - SparkContent is used to construct and execute the DAG 2. Executor - Reads and write all external data sources and stores the partitions of the RDD - Performs all data processing ##### DAG DAG represents a sequence of transformations and actions performed on data in a distributed way Keypoints: - Node = RDD - Edge = transformation or actions on an RDD - Acyclic = no circular dependencies - Direc = transofrmation on go forward - ## Week 5 ### Lecture 10 - 12 August pySpark API - pySpark (RDD) - pyspark.sql (DF) RDD - building block of spark (all of the following eventually do work on the RDD) - This is still how we distribute data even when we use the dataFrame API etc.. - This RDD is the class RDD[Row] -- that row is the class of each record in the RDD.That row could be a string, int or etc... ### Lecture 11 - Recall, we are looking at a tabular view of our data, with columns and schema. However, we can have more than native types in columns e.g. we can next "StructType" objects and have Rows within Rows.. - nested column structTypes #### Transformation