---
tags: BigData,IntroToBigData
title: Stage II - Data Storage/Preparation & EDA
---
# Stage II - Data Storage/Preparation & EDA
**Course:** Big Data - IU S23
**Author:** Firas Jolha
# Dataset
- [Some emps and depts](http://www.cems.uwe.ac.uk/~pchatter/resources/html/emp_dept_data+schema.html)
# Agenda
[toc]
# Prerequisites
- HDP 2.6.5 is installed
- Stage I is done
- The relational database is built.
- The database is imported to HDFS via Sqoop.
- The tables are stored in HDFS as Avro data files and compressed in Snappy.
- The schema of AVRO files are stored in HDFS.
# Objectives
- Create Hive tables
- Perform EDA
# Description
Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale. It is an Apache Hadoop ecosystem component developed by Facebook to query the data stored in Hadoop Distributed File System (HDFS). Here, HDFS is the data storage layer of Hadoop that at very high level divides the data into small blocks (default 128 MB) and stores these blocks on different nodes. It is also termed Data Warehousing framework of Hadoop and provides various analytical features, such as windowing and partitioning
Hive is built on top of Apache Hadoop. As a result, Hive is closely integrated with Hadoop, and is designed to work quickly on petabytes of data. What makes Hive unique is the ability to query large datasets, leveraging Apache Tez or MapReduce, with a SQL-like interface called HiveQL.
Traditional relational databases are designed for interactive queries on small to medium datasets and do not process huge datasets well. Hive instead uses batch processing so that it works quickly across a very large distributed database.
Hive transforms HiveQL queries into MapReduce or Tez jobs that run on Apache Hadoop’s distributed job scheduling framework, Yet Another Resource Negotiator (YARN). It queries data stored in a distributed storage solution, like the Hadoop Distributed File System (HDFS) or Amazon S3.

Hive stores the metadata of the databases and tables in a **metastore**, which is a database (for instance, MySQL) or file backed store that enables easy data abstraction and discovery. Hive includes **HCatalog**, which is a table and storage management layer that reads data from the Hive metastore to facilitate seamless integration between Hive, and MapReduce. By using the metastore, HCatalog allows MapReduce to use the same data structures as Hive, so that the metadata doesn’t have to be redefined for each engine. Custom applications or third party integrations can use **WebHCat**, which is a RESTful API for HCatalog to access and reuse Hive metadata.
Data in Apache Hive can be categorized into Table, Partition, and Bucket. The table in Hive is logically made up of the data being stored. It is of two types such as an internal or managed table and external table.
<!-- # Some data types in Hive
<!-- 
-->
<!--
| Data Type | Size | Example |
| -------- | -------- | -------- |
| TINYINT | 1-byte signed integer | 50Y |
| SMALLINT | 2-byte signed integer | 20,000S |
| INT | 4-byte signed integer | 1,000 |
| BIGINT | 8-byte signed integer | 50,000L |
|FLOAT | 4-byte single-precision floating point | 400.50 |
| DOUBLE | 8-byte double-precision floating point | 20,000.50 |
| DECIMAL | 17-byte precision up to 38 digits | DECIMAL(20,2) |
| STRING | 2GB at maximum | 'msg' |
| CHAR(num) | 255 chars at maximum |"name" |
|VARCHAR(num) | 65355 chars at maximum | "var_name"|
| Date | YYYY-MM-DD | 2023-04-04|
|Timestamp | YYYY-MM-DD HH:MM:SS.fffffffff | 2023-04-04 14:51:31.12|
| BOOLEAN | 1 bit | true, false|
| STRUCT | address STRUCT<City:STRING, State:STRING> | address.City| -->
<!--
Hive supports the relational operators: (A=B, A!=B, -->
In this stage, we will create external Hive tables for the PostgreSQL tables imported by Sqoop. Then, we will perform Exploratory Data Analysis using HiveQL.
# Dataset Description
The dataset is about the departments and employees in a company as well as their salary categories. It consists of two `.csv` files.
:::spoiler
The file `emps.csv` contains information about employees:
- EMPNO is a unique employee number; it is the primary key of the employee table.
- ENAME stores the employee's name.
- The JOB attribute stores the name of the job the employee does.
- The MGR attribute contains the employee number of the employee who manages that employee. If the employee has no manager, then the MGR column for that employee is left set to null.
- The HIREDATE column stores the date on which the employee joined the company.
- The SAL column contains the details of employee salaries.
- The COMM attribute stores values of commission paid to employees. Not all employees receive commission, in which case the COMM field is set to null.
- The DEPTNO column stores the department number of the department in which each employee is based. This data item acts a foreign key, linking the employee details stored in the EMP table with the details of departments in which employees work, which are stored in the DEPT table.
The file `depts.csv` contains information about departments:
- DEPTNO: The primary key containing the department numbers used to identify each department.
- DNAME: The name of each department.
- LOC: The location where each department is based.
<!-- The file `salgrade.csv` contains information about salary categories:
- GRADE: A numeric identifier for the category of the salary.
- LOSAL: The lowest salary in this category.
- HISAL: The highest salary in this category.
-->
:::info
I created these `csv` files from the tables provided in the link.
:::
# Preparation
Before starting with Hive tables, make sure that you imported the PostgreSQL tables to HDFS (`/project` warehouse folder, for instance) as AVRO files and compressed using Snoppy HDFS codec. You also need to move the schemas `.avsc` to HDFS to a folder, for instance `/project/avsc`, as follows (assuming that `.avsc` files are in the current working directory in the local file system):
```powershell!
hdfs dfs -put *.avsc /project/avsc
```
# Build Hive Tables
In this step, you need to write HiveQL statements for creating the Hive database (let's call it `projectdb`) and importing the Snappy-compressed `avro` data files. You can test the statements in an interactive mode but for project purposes, you need to write them in `.hql` file and then you execute the statements by passing the file via command `hive -f <file.hql>` which should replicate all the steps. Here, we will store the HiveQL statements in a file `db.hql`.
I will give here some of the basic steps to create the Hive database `projectdb` but you need to extend it for your project purposes.
1. Drop the databases if exist.
```sql!
DROP DATABASE IF EXISTS projectdb;
```
:::danger
If you got the following error
```!
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. InvalidOperationException(message:Database projectdb is not empty. One or more tables exist.)
```
Then add CASCADE as follows:
```sql!
DROP DATABASE IF EXISTS projectdb CASCADE;
```
:::
2. Create a database `projectdb`
```sql!
CREATE DATABASE projectdb;
USE projectdb;
```
:::info
Since the data files are compressed in Snappy codec, we need to set some configurations for loading the data from HDFS as follows:
```sql
SET mapreduce.map.output.compress = true;
SET mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
```
:::
3. Create tables `employees` and `departments`
```sql!
-- Create tables
-- emps table
CREATE EXTERNAL TABLE employees STORED AS AVRO LOCATION '/project/emps' TBLPROPERTIES ('avro.schema.url'='/project/avsc/emps.avsc');
-- dept table
CREATE EXTERNAL TABLE departments STORED AS AVRO LOCATION '/project/depts' TBLPROPERTIES ('avro.schema.url'='/project/avsc/depts.avsc');
```
4. We can check that tables are created by running some `select` queries.
```sql!
-- For checking the content of tables
SELECT * FROM employees;
SELECT * FROM departments;
```
You can run the file `db.hql` via the command `hive -f` as follows:
```console!
hive -f db.hql
```
The query results will be redirected to `hive_results.txt` file. You canuse the redirection operator to store the output in a file as follows:
```shell!
hive -f db.hql > hive_results.txt
```

:::warning
**Note:** Do not forget, that the file `.hql` should not return errors when you run it for the second time, so you should clear/drop the objects before creating new database objects.
<!-- This is true for all scripts in the pipeline. -->
:::
:::info
**Info:** If the ssh connection drops quickly, you can increase the interval of seconds to keep alive by passing the option `ServerAliveInterval` a value like 60 seconds. This option sets a timeout interval 60 seconds after which if no data has been received from the server, `ssh` will send a message through the encrypted channel to request a response from the server.
```powershell!
ssh -o ServerAliveInterval=60 -p 2222 root@localhost
```
:::
# Hive Optimizations
Data in Apache Hive can be categorized into **Table**, **Partition**, and **Bucket**. The table in Hive is logically made up of the data being stored.
**Partitioning** – Apache Hive organizes tables into **partitions** for grouping same type of data together based on a column or partition key. Each table in the hive can have one or more partition keys to identify a particular partition. Using partition we can *make it faster to run queries on slices of the data*.
**Bucketing** – Hive Tables or partitions are subdivided into **buckets** based on the hash function of a column in the table to give extra structure to the data that may be used for *more efficient queries*.
## Partitioning in Hive
Partitioning in Hive is used to increase query performance. Hive is very good tool to perform queries on large datasets, especially datasets that require a scan of an entire table. Generally, users are aware of their data domain, and in most of cases they want to search for a particular type of data. For such cases, a simple query takes large time to return the result because it requires the **scan of the entire dataset**. The concept of partitioning can be used to reduce the cost of querying the data. Partitions are like horizontal slices of data that allows the large sets of data as more manageable chunks. Table partitioning means dividing the table data into some parts based on the unique values of particular columns (for example, city and country) and segregating the input data records into different files or directories.
Partitioning in Hive is done using the **PARTITIONED BY** clause in the create table statement of table. Table can have one or more partitions. A table can be partitioned on the basis of one or more columns. The columns on which partitioning is done cannot be included in the data of table. For example, you have the four fields id, name, age, and city, and you want to partition the data on the basis of the city field, then the city field will not be included in the columns of create table statement and will only be used in the PARTITIONED BY clause. You can still query the data in a normal way using where city=xyz. The result will be retrieved from the respective partition because data is stored in a different directory with the city name for each city.
There are two main types of table in Hive—**Managed** tables and **External** tables. Both tables support partitioning mechanism.
Partitioning can be done in one of the following two ways:
- Static partitioning
- Dynamic partitioning
In static partitioning, you need to manually insert data in different partitions of a table. Let's use a table
partitioned on the departments of the company. For each department, you need to manually insert the data from the data source to a department partition in the partitioned table. So for 4 depts, you need to write the equivalent number of Hive queries to insert data in each partition.
By default, Hive does not allow dynamic partitioning. We need to enable it by setting the following
properties on the CLI or in `hive-site.xml`.
```powershell!
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
```
### Create a Partitioned Table in Hive
For example, we have a table `Employees` containing the employee information of some company like `empno, ename, job, deptno,`...etc. Now, if we want to perform partitioning on the basis of `deptno` column. Then the information of all the employees belonging to a particular department will be stored together in a separate partition. Physically, a partition in Hive is nothing but just a sub-directory in the table directory. For example, we have data for three departments in our `Employees` table – Accounting (deptno=10), Reseaerch (deptno=20), Operations (deptno=40) and Sales (deptno=30). Thus we will have four partitions in total for each of the departments as we can see clearly in diagram below.
<!--  -->

We can run the following statement to create the partitioned table.
```sql!
CREATE EXTERNAL TABLE employees_part(empno int, ename varchar(50), job varchar(50), mgr int, hiredate date, sal decimal(10,2), comm decimal(10,2)) PARTITIONED BY (deptno int) STORED AS AVRO LOCATION '/project/employees_part' TBLPROPERTIES ('AVRO.COMPRESS'='SNAPPY');
-- insert some data
insert into employees_part values (7369,'SMITH','CLERK',7902,'93/6/13',800,0.00,20);
```
When you run the previous `insert` statement, you will get the following error. This is due to the fact that dynamic partitioning is not active.

But we can insert the data as follows:
```sql!
insert into employees_part partition (deptno=20) values (7369,'SMITH','CLERK',7902,'93/6/13',800,0.00);
```
Once dynamic partitioning is enabled, we can create partitions for all unique values for any columns, say
`deptno` of the `employees_part` table, as follows:
```sql!
insert into employees_part partition (deptno) values (7369,'SMITH','CLERK',7902,'93/6/13',800,0.00,20);
```
<!--
```sql!
insert into employees_part partition (deptno) values (7499,'ALLEN','SALESMAN',7698,'98/8/15',1600,300,30);
insert into employees_part partition (deptno) values (7782,'CLARK','MANAGER',7839,'93/5/14',2450,null,10);
insert into employees_part partition (deptno) values (7839,'KING','PRESIDENT',null,'90/6/9',5000,0,10);
```
-->
We can insert the data from our unpartitioned table.
<!-- -- You need to activate ACID from configs of Hive to update/delete
UPDATE employees SET HIREDATE = from_unixtime(hiredate DIV 1000);
-- we did this update since hiredate was sotred as bigint in unixtime format.
-->
```sql!
INSERT INTO employees_part partition (deptno) SELECT * FROM employees;
```
For each department we will have all the data regarding that department residing in a separate sub–directory under the table directory as follows:
- ..../employees_part/deptno=10
- ..../employees_part/deptno=20
- ..../employees_part/deptno=30
- ..../employees_part/deptno=40
For instance, the queries regarding Sales employees would only have to look through the data present in the Sales partition (deptno=30).
Therefore from above example, we can conclude that partitioning is very useful. It reduces the query latency by scanning only relevant partitioned data instead of the whole data set.
# Bucketing in Hive
In the scenario where we query on a unique values column of a dataset, partitioning is not a good fit. If we go with a partition on a column with high unique values like `ID`, it would create a large number of small datasets in HDFS and partition entries in the metastore, thus increasing the load on NameNode and the metastore service. To optimize queries on such a dataset, we group the data into a particular number of buckets and the data is divided into the maximum number of **buckets**.
We can create buckets on `empno` column of `employees_part` as follows:
```sql!
SET hive.enforce.bucketing=true;
CREATE EXTERNAL TABLE employees_part_buck(
empno int,
ename varchar(50),
job varchar(50),
mgr int,
hiredate date,
sal decimal(10,2),
comm decimal(10,2)
)
PARTITIONED BY (deptno int)
CLUSTERED BY (empno) into 7 buckets
STORED AS AVRO LOCATION '/project/employees_part_buck'
TBLPROPERTIES ('AVRO.COMPRESS'='SNAPPY');
```
<!-- # Analytics functions in Hive
# Windowing in Hive
Windowing in Hive allows an analyst to create a window of data to operate aggregation and other analytical functions, such as LEAD and LAG. -->
Hive in Hortonworks Data Platform supports two engines for running HiveQL queries, and it can be changed from *Optimization configs* menu.

The default engine is Apache Tez but if it is not working for some reason, then you can use the traditional MapReduce.
# Apache Tez
**Tez** is a new application framework built on Hadoop Yarn that can execute complex directed acyclic graphs of general data processing tasks. In many ways it can be thought of as a more flexible and powerful successor of the map-reduce framework.

It generalizes map and reduce tasks by exposing interfaces for generic data processing tasks, which consist of a triplet of interfaces: input, output and processor. These tasks are the **vertices** in the execution graph. **Edges** (i.e.: data connections between tasks) are first class citizens in Tez and together with the input/output interfaces greatly increase the flexibility of how data is transferred between tasks.
Tez also greatly extends the possible ways of which individual tasks can be linked together; In fact any arbitrary DAG can be executed directly in Tez.

In Tez, a map-reduce job is basically a simple DAG consisting of a single map and reduce vertices connected by a “bipartite” edge (i.e.: the edge connects every map task to every reduce task). Map input and reduce outputs are HDFS inputs and outputs respectively. The map output class locally sorts and partitions the data by a certain key, while the reduce input class merge-sorts its data on the same key.
<!-- You can query data on the partitioned table as follows:
```sql!
SELECT * FROM employees WHERE deptno = 30;
```
This query will return the result only from a particular partition. -->
<!--
Predicates added to the WHERE clauses that filter on partition values are named **partition filters**. When you have large data with high number of partitions, executing query without any partition filters might trigger an enormous MapReduce job. To avoid such cases, there is the **map-reduce mode** configuration hive.mapred.mode, which prevents running risky queries on Hive. The default value of **hive.mapred.mode** is set to **nonstrict**. This mode specifies how Hive operations are being performed. By setting the value of hive.mapred.mode to `strict`, it will prevent running risky queries. For example, in strict mode, you cannot run a full table scan query:
```sql!
-- strict mode
SET hive.tez.mode=strict;
SELECT * FROM employees c;
-- nonstrict mode
set hive.mapred.mode=nonstrict;
SELECT * FROM employees c;
```
-->
# Perform Exploratory Data Analysis (EDA)
**EDA** is a data analytics process to understand the data in depth and learn the different data characteristics, often with visual means. This allows you to get a better feel of your data and find useful patterns in it.
We will use HiveQL to analyze the data. We need to provide insights about the data. Here I will give some examples:
1. Show the distribution of employees in departments.
```sql!
INSERT OVERWRITE LOCAL DIRECTORY '/root/q1'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT dname, COUNT(*)
FROM departments AS d JOIN employees AS e
ON d.deptno = e.deptno
GROUP BY dname
ORDER BY d.dname;
```
:::warning
After exporting results of each query, you need to merge the parts into a single file as follows:
```powershell!
echo "dname,emp_count" > output/q1.csv
cat /root/q1/* >> output/q1.csv
```
The first command will add only the column names and the second command will append the data.
:::
2. Show the distribution of salaries in departments.
```sql!
INSERT OVERWRITE LOCAL DIRECTORY 'output/q2'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT dname, AVG(CAST(sal AS DOUBLE)) AS avg_sal
FROM departments AS d
JOIN employees AS e ON d.deptno = e.deptno
GROUP BY dname
ORDER BY avg_sal DESC;
```
3. Show the manager(s) who works in each department.
```sql!
-- To export the result in csv-like file
WITH mgrs AS
(
SELECT DISTINCT e2.ename AS mgr_name, e2.empno AS mgr_no, e2.deptno AS mgr_dept
FROM employees AS e1
JOIN employees AS e2
ON e2.empno = e1.mgr
ORDER BY e2.ename
)
INSERT OVERWRITE LOCAL DIRECTORY '/root/q3'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT DISTINCT dname, mgrs.mgr_name AS manager
FROM departments AS d
JOIN employees AS e ON d.deptno = e.deptno
JOIN mgrs ON mgrs.mgr_dept = d.deptno
ORDER BY dname;
```
4. Show distribution of employees with respect to hiredate.
```sql!
INSERT OVERWRITE LOCAL DIRECTORY '/root/q4'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT year(hiredate) AS dt, COUNT(empno) AS count_empno
FROM employees
GROUP BY hiredate
ORDER BY dt;
```
5. Show the min, max, average salary of emplyees in each department.
```sql!
INSERT OVERWRITE LOCAL DIRECTORY '/root/q5'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT dname, MIN(sal), MAX(sal), AVG(sal)
FROM employees e
JOIN departments d ON e.deptno = d.deptno
GROUP BY dname
ORDER BY dname;
```
6. Show the departments which has higher rates of commission.
```sql!
INSERT OVERWRITE LOCAL DIRECTORY '/root/q6'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT dname, SUM(comm) AS total_comm
FROM employees e
JOIN departments d ON (e.deptno = d.deptno)
WHERE comm IS NOT NULL
GROUP BY dname
HAVING total_comm > 0.0
ORDER BY total_comm DESC;
```
<!--
# Upcoming lab
- Optimizations in Hive.
- Partitions and buckets in Hive.
- More EDA queries.
- Spark ML lib. -->
<!--
:::info
If the HDF write procedure to local file system or HDFS is slow, you can check the following optimizations.

:::
-->
<!--
## How to enable Hive LLAP
1. Enable pre-emption from Yarn configs. Save the change and click OK then Proceed even if you some alerts.
<center>
<img src="https://i.imgur.com/Rmui0O0.png" width="200" />
</center>
2. Restart Yarn (Restart all affected is enough).
3. Go to Hive configs and enable interactive query option.
<center>
<img src="https://i.imgur.com/ltnxznd.png" width="200" />
</center>
It will show you a window to select the HiveServer2. Just click select since you have only one host in the cluster.
<center>
<img src="https://i.imgur.com/z1TiabT.png" width="600" />
</center>
4. Keep the default setting of Hive LLAP and save the change.
<center>
<img src="https://i.imgur.com/QriLDOD.png" width="200" />
</center>
5. Restart all Hive components and check the Hive interactive UI as shown below.

-->
<!-- :::warning
Do not forget to clear the folder `/project` if exists before importing the data.
::: -->
# References
- [toy-dataset](https://www.cs.uct.ac.za/mit_notes/database/htmls/chp03.html)
- [PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-copy.html)
- [Hive documentation](https://cwiki.apache.org/confluence/display/Hive/Home)
- [HiveQL Cheat Sheet](https://hortonworks.com/wp-content/uploads/2013/05/hql_cheat_sheet.pdf)
- https://hashdork.com/apache-hive-tutorial/