Course: Big Data - IU S23
Author: Firas Jolha
Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale. It is an Apache Hadoop ecosystem component developed by Facebook to query the data stored in Hadoop Distributed File System (HDFS). Here, HDFS is the data storage layer of Hadoop that at very high level divides the data into small blocks (default 128 MB) and stores these blocks on different nodes. It is also termed Data Warehousing framework of Hadoop and provides various analytical features, such as windowing and partitioning
Hive is built on top of Apache Hadoop. As a result, Hive is closely integrated with Hadoop, and is designed to work quickly on petabytes of data. What makes Hive unique is the ability to query large datasets, leveraging Apache Tez or MapReduce, with a SQL-like interface called HiveQL.
Traditional relational databases are designed for interactive queries on small to medium datasets and do not process huge datasets well. Hive instead uses batch processing so that it works quickly across a very large distributed database.
Hive transforms HiveQL queries into MapReduce or Tez jobs that run on Apache Hadoop’s distributed job scheduling framework, Yet Another Resource Negotiator (YARN). It queries data stored in a distributed storage solution, like the Hadoop Distributed File System (HDFS) or Amazon S3.
Hive stores the metadata of the databases and tables in a metastore, which is a database (for instance, MySQL) or file backed store that enables easy data abstraction and discovery. Hive includes HCatalog, which is a table and storage management layer that reads data from the Hive metastore to facilitate seamless integration between Hive, and MapReduce. By using the metastore, HCatalog allows MapReduce to use the same data structures as Hive, so that the metadata doesn’t have to be redefined for each engine. Custom applications or third party integrations can use WebHCat, which is a RESTful API for HCatalog to access and reuse Hive metadata.
Data in Apache Hive can be categorized into Table, Partition, and Bucket. The table in Hive is logically made up of the data being stored. It is of two types such as an internal or managed table and external table.
In this stage, we will create external Hive tables for the PostgreSQL tables imported by Sqoop. Then, we will perform Exploratory Data Analysis using HiveQL.
The dataset is about the departments and employees in a company as well as their salary categories. It consists of two .csv
files.
The file emps.csv
contains information about employees:
The file depts.csv
contains information about departments:
I created these csv
files from the tables provided in the link.
Before starting with Hive tables, make sure that you imported the PostgreSQL tables to HDFS (/project
warehouse folder, for instance) as AVRO files and compressed using Snoppy HDFS codec. You also need to move the schemas .avsc
to HDFS to a folder, for instance /project/avsc
, as follows (assuming that .avsc
files are in the current working directory in the local file system):
In this step, you need to write HiveQL statements for creating the Hive database (let's call it projectdb
) and importing the Snappy-compressed avro
data files. You can test the statements in an interactive mode but for project purposes, you need to write them in .hql
file and then you execute the statements by passing the file via command hive -f <file.hql>
which should replicate all the steps. Here, we will store the HiveQL statements in a file db.hql
.
I will give here some of the basic steps to create the Hive database projectdb
but you need to extend it for your project purposes.
If you got the following error
Then add CASCADE as follows:
projectdb
Since the data files are compressed in Snappy codec, we need to set some configurations for loading the data from HDFS as follows:
employees
and departments
select
queries.You can run the file db.hql
via the command hive -f
as follows:
The query results will be redirected to hive_results.txt
file. You canuse the redirection operator to store the output in a file as follows:
Note: Do not forget, that the file .hql
should not return errors when you run it for the second time, so you should clear/drop the objects before creating new database objects.
Info: If the ssh connection drops quickly, you can increase the interval of seconds to keep alive by passing the option ServerAliveInterval
a value like 60 seconds. This option sets a timeout interval 60 seconds after which if no data has been received from the server, ssh
will send a message through the encrypted channel to request a response from the server.
Data in Apache Hive can be categorized into Table, Partition, and Bucket. The table in Hive is logically made up of the data being stored.
Partitioning – Apache Hive organizes tables into partitions for grouping same type of data together based on a column or partition key. Each table in the hive can have one or more partition keys to identify a particular partition. Using partition we can make it faster to run queries on slices of the data.
Bucketing – Hive Tables or partitions are subdivided into buckets based on the hash function of a column in the table to give extra structure to the data that may be used for more efficient queries.
Partitioning in Hive is used to increase query performance. Hive is very good tool to perform queries on large datasets, especially datasets that require a scan of an entire table. Generally, users are aware of their data domain, and in most of cases they want to search for a particular type of data. For such cases, a simple query takes large time to return the result because it requires the scan of the entire dataset. The concept of partitioning can be used to reduce the cost of querying the data. Partitions are like horizontal slices of data that allows the large sets of data as more manageable chunks. Table partitioning means dividing the table data into some parts based on the unique values of particular columns (for example, city and country) and segregating the input data records into different files or directories.
Partitioning in Hive is done using the PARTITIONED BY clause in the create table statement of table. Table can have one or more partitions. A table can be partitioned on the basis of one or more columns. The columns on which partitioning is done cannot be included in the data of table. For example, you have the four fields id, name, age, and city, and you want to partition the data on the basis of the city field, then the city field will not be included in the columns of create table statement and will only be used in the PARTITIONED BY clause. You can still query the data in a normal way using where city=xyz. The result will be retrieved from the respective partition because data is stored in a different directory with the city name for each city.
There are two main types of table in Hive—Managed tables and External tables. Both tables support partitioning mechanism.
Partitioning can be done in one of the following two ways:
In static partitioning, you need to manually insert data in different partitions of a table. Let's use a table partitioned on the departments of the company. For each department, you need to manually insert the data from the data source to a department partition in the partitioned table. So for 4 depts, you need to write the equivalent number of Hive queries to insert data in each partition.
By default, Hive does not allow dynamic partitioning. We need to enable it by setting the following
properties on the CLI or in hive-site.xml
.
For example, we have a table Employees
containing the employee information of some company like empno, ename, job, deptno,
…etc. Now, if we want to perform partitioning on the basis of deptno
column. Then the information of all the employees belonging to a particular department will be stored together in a separate partition. Physically, a partition in Hive is nothing but just a sub-directory in the table directory. For example, we have data for three departments in our Employees
table – Accounting (deptno=10), Reseaerch (deptno=20), Operations (deptno=40) and Sales (deptno=30). Thus we will have four partitions in total for each of the departments as we can see clearly in diagram below.
We can run the following statement to create the partitioned table.
When you run the previous insert
statement, you will get the following error. This is due to the fact that dynamic partitioning is not active.
But we can insert the data as follows:
Once dynamic partitioning is enabled, we can create partitions for all unique values for any columns, say
deptno
of the employees_part
table, as follows:
We can insert the data from our unpartitioned table.
For each department we will have all the data regarding that department residing in a separate sub–directory under the table directory as follows:
For instance, the queries regarding Sales employees would only have to look through the data present in the Sales partition (deptno=30).
Therefore from above example, we can conclude that partitioning is very useful. It reduces the query latency by scanning only relevant partitioned data instead of the whole data set.
In the scenario where we query on a unique values column of a dataset, partitioning is not a good fit. If we go with a partition on a column with high unique values like ID
, it would create a large number of small datasets in HDFS and partition entries in the metastore, thus increasing the load on NameNode and the metastore service. To optimize queries on such a dataset, we group the data into a particular number of buckets and the data is divided into the maximum number of buckets.
We can create buckets on empno
column of employees_part
as follows:
Hive in Hortonworks Data Platform supports two engines for running HiveQL queries, and it can be changed from Optimization configs menu.
The default engine is Apache Tez but if it is not working for some reason, then you can use the traditional MapReduce.
Tez is a new application framework built on Hadoop Yarn that can execute complex directed acyclic graphs of general data processing tasks. In many ways it can be thought of as a more flexible and powerful successor of the map-reduce framework.
It generalizes map and reduce tasks by exposing interfaces for generic data processing tasks, which consist of a triplet of interfaces: input, output and processor. These tasks are the vertices in the execution graph. Edges (i.e.: data connections between tasks) are first class citizens in Tez and together with the input/output interfaces greatly increase the flexibility of how data is transferred between tasks.
Tez also greatly extends the possible ways of which individual tasks can be linked together; In fact any arbitrary DAG can be executed directly in Tez.
In Tez, a map-reduce job is basically a simple DAG consisting of a single map and reduce vertices connected by a “bipartite” edge (i.e.: the edge connects every map task to every reduce task). Map input and reduce outputs are HDFS inputs and outputs respectively. The map output class locally sorts and partitions the data by a certain key, while the reduce input class merge-sorts its data on the same key.
EDA is a data analytics process to understand the data in depth and learn the different data characteristics, often with visual means. This allows you to get a better feel of your data and find useful patterns in it.
We will use HiveQL to analyze the data. We need to provide insights about the data. Here I will give some examples:
After exporting results of each query, you need to merge the parts into a single file as follows:
The first command will add only the column names and the second command will append the data.