Try   HackMD

Stage II - Data Storage/Preparation & EDA

Course: Big Data - IU S23
Author: Firas Jolha

Dataset

Agenda

Prerequisites

  • HDP 2.6.5 is installed
  • Stage I is done
    • The relational database is built.
    • The database is imported to HDFS via Sqoop.
    • The tables are stored in HDFS as Avro data files and compressed in Snappy.
    • The schema of AVRO files are stored in HDFS.

Objectives

  • Create Hive tables
  • Perform EDA

Description

Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale. It is an Apache Hadoop ecosystem component developed by Facebook to query the data stored in Hadoop Distributed File System (HDFS). Here, HDFS is the data storage layer of Hadoop that at very high level divides the data into small blocks (default 128 MB) and stores these blocks on different nodes. It is also termed Data Warehousing framework of Hadoop and provides various analytical features, such as windowing and partitioning

Hive is built on top of Apache Hadoop. As a result, Hive is closely integrated with Hadoop, and is designed to work quickly on petabytes of data. What makes Hive unique is the ability to query large datasets, leveraging Apache Tez or MapReduce, with a SQL-like interface called HiveQL.

Traditional relational databases are designed for interactive queries on small to medium datasets and do not process huge datasets well. Hive instead uses batch processing so that it works quickly across a very large distributed database.

Hive transforms HiveQL queries into MapReduce or Tez jobs that run on Apache Hadoop’s distributed job scheduling framework, Yet Another Resource Negotiator (YARN). It queries data stored in a distributed storage solution, like the Hadoop Distributed File System (HDFS) or Amazon S3.

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

Hive stores the metadata of the databases and tables in a metastore, which is a database (for instance, MySQL) or file backed store that enables easy data abstraction and discovery. Hive includes HCatalog, which is a table and storage management layer that reads data from the Hive metastore to facilitate seamless integration between Hive, and MapReduce. By using the metastore, HCatalog allows MapReduce to use the same data structures as Hive, so that the metadata doesn’t have to be redefined for each engine. Custom applications or third party integrations can use WebHCat, which is a RESTful API for HCatalog to access and reuse Hive metadata.

Data in Apache Hive can be categorized into Table, Partition, and Bucket. The table in Hive is logically made up of the data being stored. It is of two types such as an internal or managed table and external table.

In this stage, we will create external Hive tables for the PostgreSQL tables imported by Sqoop. Then, we will perform Exploratory Data Analysis using HiveQL.

Dataset Description

The dataset is about the departments and employees in a company as well as their salary categories. It consists of two .csv files.

The file emps.csv contains information about employees:

  • EMPNO is a unique employee number; it is the primary key of the employee table.
  • ENAME stores the employee's name.
  • The JOB attribute stores the name of the job the employee does.
  • The MGR attribute contains the employee number of the employee who manages that employee. If the employee has no manager, then the MGR column for that employee is left set to null.
  • The HIREDATE column stores the date on which the employee joined the company.
  • The SAL column contains the details of employee salaries.
  • The COMM attribute stores values of commission paid to employees. Not all employees receive commission, in which case the COMM field is set to null.
  • The DEPTNO column stores the department number of the department in which each employee is based. This data item acts a foreign key, linking the employee details stored in the EMP table with the details of departments in which employees work, which are stored in the DEPT table.

The file depts.csv contains information about departments:

  • DEPTNO: The primary key containing the department numbers used to identify each department.
  • DNAME: The name of each department.
  • LOC: The location where each department is based.

I created these csv files from the tables provided in the link.

Preparation

Before starting with Hive tables, make sure that you imported the PostgreSQL tables to HDFS (/project warehouse folder, for instance) as AVRO files and compressed using Snoppy HDFS codec. You also need to move the schemas .avsc to HDFS to a folder, for instance /project/avsc, as follows (assuming that .avsc files are in the current working directory in the local file system):

hdfs dfs -put *.avsc /project/avsc

Build Hive Tables

In this step, you need to write HiveQL statements for creating the Hive database (let's call it projectdb) and importing the Snappy-compressed avro data files. You can test the statements in an interactive mode but for project purposes, you need to write them in .hql file and then you execute the statements by passing the file via command hive -f <file.hql> which should replicate all the steps. Here, we will store the HiveQL statements in a file db.hql.

I will give here some of the basic steps to create the Hive database projectdb but you need to extend it for your project purposes.

  1. Drop the databases if exist.
DROP DATABASE IF EXISTS projectdb;

If you got the following error

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. InvalidOperationException(message:Database projectdb is not empty. One or more tables exist.)

Then add CASCADE as follows:

DROP DATABASE IF EXISTS projectdb CASCADE;
  1. Create a database projectdb
CREATE DATABASE projectdb;
USE projectdb;

Since the data files are compressed in Snappy codec, we need to set some configurations for loading the data from HDFS as follows:

SET mapreduce.map.output.compress = true;
SET mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
  1. Create tables employees and departments
-- Create tables

-- emps table
CREATE EXTERNAL TABLE employees STORED AS AVRO LOCATION '/project/emps' TBLPROPERTIES ('avro.schema.url'='/project/avsc/emps.avsc');


-- dept table
CREATE EXTERNAL TABLE departments STORED AS AVRO LOCATION '/project/depts' TBLPROPERTIES ('avro.schema.url'='/project/avsc/depts.avsc');

  1. We can check that tables are created by running some select queries.
-- For checking the content of tables
SELECT * FROM employees;
SELECT * FROM departments;

You can run the file db.hql via the command hive -f as follows:

hive -f db.hql

The query results will be redirected to hive_results.txt file. You canuse the redirection operator to store the output in a file as follows:

hive -f db.hql > hive_results.txt

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →

Note: Do not forget, that the file .hql should not return errors when you run it for the second time, so you should clear/drop the objects before creating new database objects.

Info: If the ssh connection drops quickly, you can increase the interval of seconds to keep alive by passing the option ServerAliveInterval a value like 60 seconds. This option sets a timeout interval 60 seconds after which if no data has been received from the server, ssh will send a message through the encrypted channel to request a response from the server.

ssh -o ServerAliveInterval=60 -p 2222 root@localhost

Hive Optimizations

Data in Apache Hive can be categorized into Table, Partition, and Bucket. The table in Hive is logically made up of the data being stored.

Partitioning – Apache Hive organizes tables into partitions for grouping same type of data together based on a column or partition key. Each table in the hive can have one or more partition keys to identify a particular partition. Using partition we can make it faster to run queries on slices of the data.

Bucketing – Hive Tables or partitions are subdivided into buckets based on the hash function of a column in the table to give extra structure to the data that may be used for more efficient queries.

Partitioning in Hive

Partitioning in Hive is used to increase query performance. Hive is very good tool to perform queries on large datasets, especially datasets that require a scan of an entire table. Generally, users are aware of their data domain, and in most of cases they want to search for a particular type of data. For such cases, a simple query takes large time to return the result because it requires the scan of the entire dataset. The concept of partitioning can be used to reduce the cost of querying the data. Partitions are like horizontal slices of data that allows the large sets of data as more manageable chunks. Table partitioning means dividing the table data into some parts based on the unique values of particular columns (for example, city and country) and segregating the input data records into different files or directories.

Partitioning in Hive is done using the PARTITIONED BY clause in the create table statement of table. Table can have one or more partitions. A table can be partitioned on the basis of one or more columns. The columns on which partitioning is done cannot be included in the data of table. For example, you have the four fields id, name, age, and city, and you want to partition the data on the basis of the city field, then the city field will not be included in the columns of create table statement and will only be used in the PARTITIONED BY clause. You can still query the data in a normal way using where city=xyz. The result will be retrieved from the respective partition because data is stored in a different directory with the city name for each city.

There are two main types of table in Hive—Managed tables and External tables. Both tables support partitioning mechanism.

Partitioning can be done in one of the following two ways:

  • Static partitioning
  • Dynamic partitioning

In static partitioning, you need to manually insert data in different partitions of a table. Let's use a table partitioned on the departments of the company. For each department, you need to manually insert the data from the data source to a department partition in the partitioned table. So for 4 depts, you need to write the equivalent number of Hive queries to insert data in each partition.

By default, Hive does not allow dynamic partitioning. We need to enable it by setting the following properties on the CLI or in hive-site.xml.

SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;

Create a Partitioned Table in Hive

For example, we have a table Employees containing the employee information of some company like empno, ename, job, deptno,etc. Now, if we want to perform partitioning on the basis of deptno column. Then the information of all the employees belonging to a particular department will be stored together in a separate partition. Physically, a partition in Hive is nothing but just a sub-directory in the table directory. For example, we have data for three departments in our Employees table – Accounting (deptno=10), Reseaerch (deptno=20), Operations (deptno=40) and Sales (deptno=30). Thus we will have four partitions in total for each of the departments as we can see clearly in diagram below.

We can run the following statement to create the partitioned table.

CREATE EXTERNAL TABLE employees_part(empno int, ename varchar(50), job varchar(50), mgr int, hiredate date, sal decimal(10,2), comm decimal(10,2)) PARTITIONED BY (deptno int) STORED AS AVRO LOCATION '/project/employees_part' TBLPROPERTIES ('AVRO.COMPRESS'='SNAPPY');


-- insert some data 
insert into employees_part values (7369,'SMITH','CLERK',7902,'93/6/13',800,0.00,20);

When you run the previous insert statement, you will get the following error. This is due to the fact that dynamic partitioning is not active.

But we can insert the data as follows:

insert into employees_part partition (deptno=20) values (7369,'SMITH','CLERK',7902,'93/6/13',800,0.00);

Once dynamic partitioning is enabled, we can create partitions for all unique values for any columns, say deptno of the employees_part table, as follows:

insert into employees_part partition (deptno) values (7369,'SMITH','CLERK',7902,'93/6/13',800,0.00,20);

We can insert the data from our unpartitioned table.

INSERT INTO employees_part partition (deptno) SELECT * FROM employees;

For each department we will have all the data regarding that department residing in a separate sub–directory under the table directory as follows:

  • /employees_part/deptno=10
  • /employees_part/deptno=20
  • /employees_part/deptno=30
  • /employees_part/deptno=40

For instance, the queries regarding Sales employees would only have to look through the data present in the Sales partition (deptno=30).

Therefore from above example, we can conclude that partitioning is very useful. It reduces the query latency by scanning only relevant partitioned data instead of the whole data set.

Bucketing in Hive

In the scenario where we query on a unique values column of a dataset, partitioning is not a good fit. If we go with a partition on a column with high unique values like ID, it would create a large number of small datasets in HDFS and partition entries in the metastore, thus increasing the load on NameNode and the metastore service. To optimize queries on such a dataset, we group the data into a particular number of buckets and the data is divided into the maximum number of buckets.

We can create buckets on empno column of employees_part as follows:


SET hive.enforce.bucketing=true;


CREATE EXTERNAL TABLE employees_part_buck(
    empno int, 
    ename varchar(50), 
    job varchar(50), 
    mgr int, 
    hiredate date, 
    sal decimal(10,2), 
    comm decimal(10,2)
) 
    PARTITIONED BY (deptno int) 
    CLUSTERED BY (empno) into 7 buckets
    STORED AS AVRO LOCATION '/project/employees_part_buck' 
    TBLPROPERTIES ('AVRO.COMPRESS'='SNAPPY');

Hive in Hortonworks Data Platform supports two engines for running HiveQL queries, and it can be changed from Optimization configs menu.

The default engine is Apache Tez but if it is not working for some reason, then you can use the traditional MapReduce.

Apache Tez

Tez is a new application framework built on Hadoop Yarn that can execute complex directed acyclic graphs of general data processing tasks. In many ways it can be thought of as a more flexible and powerful successor of the map-reduce framework.

It generalizes map and reduce tasks by exposing interfaces for generic data processing tasks, which consist of a triplet of interfaces: input, output and processor. These tasks are the vertices in the execution graph. Edges (i.e.: data connections between tasks) are first class citizens in Tez and together with the input/output interfaces greatly increase the flexibility of how data is transferred between tasks.

Tez also greatly extends the possible ways of which individual tasks can be linked together; In fact any arbitrary DAG can be executed directly in Tez.

In Tez, a map-reduce job is basically a simple DAG consisting of a single map and reduce vertices connected by a “bipartite” edge (i.e.: the edge connects every map task to every reduce task). Map input and reduce outputs are HDFS inputs and outputs respectively. The map output class locally sorts and partitions the data by a certain key, while the reduce input class merge-sorts its data on the same key.

Perform Exploratory Data Analysis (EDA)

EDA is a data analytics process to understand the data in depth and learn the different data characteristics, often with visual means. This allows you to get a better feel of your data and find useful patterns in it.

We will use HiveQL to analyze the data. We need to provide insights about the data. Here I will give some examples:

  1. Show the distribution of employees in departments.
INSERT OVERWRITE LOCAL DIRECTORY '/root/q1'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
SELECT dname, COUNT(*) 
FROM departments AS d JOIN employees AS e
ON d.deptno = e.deptno
GROUP BY dname 
ORDER BY d.dname;

After exporting results of each query, you need to merge the parts into a single file as follows:

echo "dname,emp_count" > output/q1.csv
cat /root/q1/* >> output/q1.csv

The first command will add only the column names and the second command will append the data.

  1. Show the distribution of salaries in departments.
INSERT OVERWRITE LOCAL DIRECTORY 'output/q2' 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
SELECT dname, AVG(CAST(sal AS DOUBLE)) AS avg_sal
FROM departments AS d 
JOIN employees AS e ON d.deptno = e.deptno
GROUP BY dname
ORDER BY avg_sal DESC;
  1. Show the manager(s) who works in each department.
-- To export the result in csv-like file
WITH mgrs AS 
(
    SELECT DISTINCT e2.ename AS mgr_name, e2.empno AS mgr_no, e2.deptno AS mgr_dept
    FROM employees AS e1
    JOIN employees AS e2
    ON e2.empno = e1.mgr
    ORDER BY e2.ename
)
INSERT OVERWRITE LOCAL DIRECTORY '/root/q3' 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
SELECT DISTINCT dname, mgrs.mgr_name AS manager 
FROM departments AS d 
JOIN employees AS e ON d.deptno = e.deptno
JOIN mgrs ON mgrs.mgr_dept = d.deptno
ORDER BY dname;
  1. Show distribution of employees with respect to hiredate.
INSERT OVERWRITE LOCAL DIRECTORY '/root/q4' 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
SELECT year(hiredate) AS dt, COUNT(empno) AS count_empno
FROM employees
GROUP BY hiredate
ORDER BY dt;
  1. Show the min, max, average salary of emplyees in each department.
INSERT OVERWRITE LOCAL DIRECTORY '/root/q5' 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
SELECT dname, MIN(sal), MAX(sal), AVG(sal)
FROM employees e
JOIN departments d ON e.deptno = d.deptno
GROUP BY dname
ORDER BY dname;
  1. Show the departments which has higher rates of commission.
INSERT OVERWRITE LOCAL DIRECTORY '/root/q6' 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
SELECT dname, SUM(comm) AS total_comm
FROM employees e
JOIN departments d ON (e.deptno = d.deptno)
WHERE comm IS NOT NULL
GROUP BY dname
HAVING total_comm > 0.0
ORDER BY total_comm DESC;

References