> **Developed By:** Jaffar Ahamed > **Technology:** Big Data > **Tools:** HDFS, Hive, Spark, Cron, MySQL, Zeppelin --- **Table of Contents** [Toc] # Feature Definition Files associated with UK jobs will be sent to Data Engineering's drop box on daily basis. These files should be ingested into Enterprise Data Lake on daily basis. The data from these files should be available for various BI and Analytical needs of Job Markets Product team. ## Feature acceptance criteria 1. As a Job Market analyst, I need the ability to explore the data extracts on UK jobs. 2. As a Job Market analyst, I need the ability to slice and dice using a BI tool with following metrics and dimensions. |Country |City |Company Name |Posted Date |Title| | ----- | ------- | ------------ | ------------ | ------------| |Count | X |X| X |X |X | Rank based on mid point salary | X | X | | |X | Determine salary variance | | |X | |X ## Data Flow - Build an on demand data engineering pipeline to execute following steps. - You can trigger each of these steps using a series of shell command. - You need to create two Hive DBs. 1. job_market_enriched 2. job_market_curated <div class="center"> ```mermaid graph TD A[fa:fa-file Job Market files will be pushed to drop box folder daily] -->|copy to landing zone - HDFS| B[/Landing zone - HDFS/] B --HDFS to HDFS file move--> C[(Enriched zone - Hive DB with external tables)] C --Curate data using pyspark--> D[(Curation zone - Hive DB with managed tables)] D-->E[/BI tool/] ``` </div> > **Drop box** : This is a linux local directory. Source upstream process will push into this directory on daily basis. Once the files are copied from dropbox to landing zone, the files should be deleted. > **Landing Zone** : This is a HDFS folder on the files are copied from from dropbox. Once the files are moved from landing zone to enriched zone. The files should be deleted. > **Enriched Zone** : This is HDFS directory which will store the files in ORC format. Hive external tables use the data stored in this HDFS directory. This zone is daily partitioned and stores the history. Files in this directory should never be deleted. > **Curation Zone** : This a Hive database consists of managed tables. Data from core zone is transformed as per business rules and stored in these tables. ## Story Details | Story Name | Story Description |Definition of Done | Story Points | Actual Hours | | ------------ | ------------ | ------------ | ------------ | ------------ | | Pre work for data pipeline construction | Directory Structure Creation, DDL Creation| Compleletion of directory structure for landing zone, enriched zone, dropbox and DDL creation|1|1h| | Create script for moving files from dropbox to landing zone| Copy files from Dropbox to Landing zone | Completion of shell script to copy files from dropbox to landing directory and delete files from dropbox <ul><li>[copy_local_to_landing.sh](https://sharedby.blomp.com/GMKOU4)</li> </ul>| 1 |30m| | Create script for moving files from landing zone to enriched zone| Copy files from landing to core directory | Completion of shell script to copy files from landing to core directory and delete files from landing directory <ul><li>[copy_landing_to_core.sh](https://sharedby.blomp.com/CezFk0)</li> </ul>| 1 |15m| |Create a script for table creation in Hive|Execute DDL commands to create Hive external table|Completion of shell script to create Hive external table with data from core directory <ul><li>[create_DB_and_load_data_from_core_zone.sh](https://sharedby.blomp.com/5g19Gj)</li> </ul>|1|2h| | Create script for MSCK repair on the hive table | To add metadata about partitions | Completion of shell script to perform MSCK repair on tables after ingestion of data <ul><li>[DDL_Command_Core.sql](https://sharedby.blomp.com/e21apI)</li></ul>| 1 | 30m | |Create a script for data transformation|Execute Spark program to transform data based on requirements|Completion of shell script to submit Spark program to transform data and store result data in target directory <ul><li>[data_transform.sh](https://sharedby.blomp.com/4HWkeQ)</li><li>[pyspark_program_to_transform_data.py](https://sharedby.blomp.com/Ydmieq)</li></ul>|5|8h| |Create a script for hive managed table|Execute DDL commands to create hive internal table|Completion of shell script to create hive managed table with data from target directory <ul><li>[load_data_from_target.sh](https://sharedby.blomp.com/2XD1Oc)</li><li>[DDL_Command_Target.sql](https://sharedby.blomp.com/ySiK1m)</li></ul>|1|2h| | Spike: Research on How to schedule to run this job on daily basis| Perform research about efficient scheduler to do this job on daily basis| Learned about <ul><li>CRONTAB: It is default linux based scheduler to schedule shell scripts</li><li>Oozie</li></ul> | 3 |4h| | Create a job scheduler to run this job daily| Schedule this job to run on daily basis | Completion of cron setup to run shell scripts on daily basis <ul><li>Cron Pattern to run daily on 8PM (0 20 * * *)</li></ul> | 3 |4h | |Visualize target data|Use BI tool to visualize target data|Visualization is done through Zeppelin with the help of Hive interpreter|1|1h| ## Pyspark Program to Transform Data ```python=3 from pyspark.sql import SparkSession from pyspark.sql.functions import regexp_extract,col,when,avg,count,aggregate,dense_rank from pyspark.sql.types import IntegerType from pyspark.sql.window import Window #Creating Spark Session spark = SparkSession.builder.appName('citywise_avg_salary').getOrCreate() #Reading Source Data df = spark.read.csv('hdfs://localhost:50000/jobs/core', header=True) #Applying functions to get High,Low and Mid_Salary df = df.withColumn('High', regexp_extract(col('Salary'), '\D+\D(.*?)\s.*',1))\ .withColumn('High_Salary', when(df.Salary.contains('per hour'), col('High')*2500).otherwise(when(col('High')=='',None).otherwise(col('High'))))\ .withColumn('Low', regexp_extract(col('Salary'), '\D(.*)\s-.*',1))\ .withColumn('Low_Salary', when(df.Salary.contains('per hour'), col('Low')*2500).otherwise(when(col('Low')=='',10000).otherwise(col('Low'))))\ .withColumn('Mid_Salary', (col('High_Salary')+col('Low_Salary'))/2)\ .withColumn('Mid_Salary', col('Mid_Salary').cast(IntegerType())).drop('High','Low') #Droping Null values due to data quality issue df = df.na.drop(subset='Mid_Salary') #Performing aggregate functions to get citywise average salary and count of city df = df.groupBy('City').agg(avg('Mid_Salary').cast(IntegerType()).alias('Average_Salary'), count('City').alias('Count_of_city')).orderBy(col('Average_Salary').desc()) #City ranking based on average salary df = df.withColumn('Rank_based_on_avg_salary', dense_rank().over(Window.orderBy(col('Average_Salary').desc()))) #Changing column headers in lower case because real column name in Hive use lower case, otherwise table will show only NULL #Issue Link: https://issues.apache.org/jira/browse/ORC-502 for col in df.columns: df = df.withColumnRenamed(col, col.lower()) #Writing transformed data in ORC format df.write.mode('overwrite').option('header', 'true').orc('hdfs://localhost:50000/jobs/target') ``` ## Screenshots :::info **Source Data** ![](https://i.imgur.com/kORXS26.png) ::: :::info **Hive External Table** ![](https://i.imgur.com/ViVrCNR.png) ::: :::info **Hive Managed Table** - ORC Format ![](https://i.imgur.com/zUJMuZr.png) ::: :::info **Data Visualization** ![](https://i.imgur.com/fXQhypA.png) ::: ---