---
tags: BigData,IntroToBigData
title: Stage I - Data collection and Ingestion
---
# Stage I - Data collection and Ingestion
**Course:** Big Data - IU S23
**Author:** Firas Jolha
# Dataset
- [Some emps and depts](http://www.cems.uwe.ac.uk/~pchatter/resources/html/emp_dept_data+schema.html)
<!-- - -->
# Agenda
[toc]
# Prerequisites
- HDP 2.6.5 is installed
# Objectives
- Build a relational database in PostgreSQL
- Import the database into HDFS via Sqoop
# Description
Big data is changing the way companies do business and creating a need for data engineers who can collect and manage large quantities of data. Data engineering is the practice of designing and building systems for collecting, storing, and analyzing data at scale.
<center>
<img src="https://i.imgur.com/oIxLrv7.png" width="600" />
<p>This is just a typical big data pipeline</p>
</center>
In the final project of this course, you need to build an end-to-end big data pipeline which accepts input data files, analyzes the data in the cluster and displays the results in a web dashboard. We divided the pipeline into four stages where in this tutorial we will cover the first stage. Before starting to build the pipeline, it is always good to check the data and perform preprocessing if needed. For simplicity, here, we are dealing with batch data since the data in the files is fixed and no real-time update exists. In this first stage, we have to do as follows:
1- Build a relational database via PostgreSQL
2- Import the database into HDFS via Sqoop
# Dataset Description
The dataset is about the departments and employees in a company. It consists of 2 `.csv` files.
The file `emps.csv` contains information about employees:
- EMPNO is a unique employee number; it is the primary key of the employee table.
- ENAME stores the employee's name.
- The JOB attribute stores the name of the job the employee does.
- The MGR attribute contains the employee number of the employee who manages that employee. If the employee has no manager, then the MGR column for that employee is left set to null.
- The HIREDATE column stores the date on which the employee joined the company.
- The SAL column contains the details of employee salaries.
- The COMM attribute stores values of commission paid to employees. Not all employees receive commission, in which case the COMM field is set to null.
- The DEPTNO column stores the department number of the department in which each employee is based. This data item acts a foreign key, linking the employee details stored in the EMP table with the details of departments in which employees work, which are stored in the DEPT table.
The file `depts.csv` contains information about departments:
- DEPTNO: The primary key containing the department numbers used to identify each department.
- DNAME: The name of each department.
- LOC: The location where each department is based.
<!-- The file `salgrade.csv` contains information about salary categories:
- GRADE: A numeric identifier for the category of the salary.
- LOSAL: The lowest salary in this category.
- HISAL: The highest salary in this category.
-->
:::info
I created these `csv` files from the tables provided in the link.
:::
# Build a relational Database via PostgreSQL
In this step, you need to write SQL statements for creating the database (let's call it `project`) and importing the `csv` files into it. You can test the statements in an interactive mode but for project purposes, you need to write them in `.sql` file and then you execute the statements by passing the file via command `psql` which should replicate all the steps.
I will give here some of the basic steps to create the database `project` but feel free to extend it for your project purposes.
<!--
```sql!
DROP DATABASE IF EXISTS project;
```
<!-- 2. Drop
```sql!
DROP TABLE IF EXISTS
emps,
[CASCADE | RESTRICT];
```
--> -->
1. Drop the databases if exist.
```sh!
psql -U postgres -c 'DROP DATABASE IF EXISTS project;'
```
2. Create a database `project`
```sh!
psql -U postgres -c 'CREATE DATABASE project;'
```
:::info
You can merge the previous commands into one call of `psql` command.
:::
We will store the SQL statements in a file `db.sql`.
3. Create tables `emps`, `depts`, `salgrades`
```sql!
-- switch to the database
\c project;
-- Optional
START TRANSACTION;
-- Add tables
-- emps table
CREATE TABLE emps (
empno integer NOT NULL PRIMARY KEY,
ename VARCHAR ( 50 ) NOT NULL,
job VARCHAR ( 50 ) NOT NULL,
mgr integer,
hiredate date,
sal decimal(10, 2),
comm decimal(10, 2),
deptno integer NOT NULL
);
-- dept table
CREATE TABLE depts(
deptno integer NOT NULL PRIMARY KEY,
dname varchar(50) NOT NULL,
location varchar(50) NOT NULL
);
```
:::info
**Note:** In the standard, it is not necessary to issue START TRANSACTION to start a transaction block: any SQL command implicitly begins a block. PostgreSQL's behavior can be seen as implicitly issuing a COMMIT after each command that does not follow START TRANSACTION (or BEGIN), and it is therefore often called “autocommit”. Other relational database systems might offer an *autocommit* feature as a convenience.
:::
4. Add the constraints.
```sql!
-- Add constraints
-- FKs
ALTER TABLE emps ADD CONSTRAINT fk_emps_mgr_empno FOREIGN KEY(mgr) REFERENCES emps (empno);
ALTER TABLE emps ADD CONSTRAINT fk_emps_deptno_deptno FOREIGN KEY(deptno) REFERENCES depts (deptno);
```
5. Load data from `csv` files.
```sql!
SET datestyle TO iso, ymd;
\COPY emps FROM 'data/emps.csv' DELIMITER ',' CSV HEADER NULL AS 'null';
\COPY depts FROM 'data/depts.csv' DELIMITER ',' CSV HEADER NULL AS 'null';
```
4. Perform any additional CRUD operation needed on the database. At the end commit the transaction block.
```sql!
-- optional
COMMIT;
-- For checking the content of tables
SELECT * from emps;
SELECT * from depts;
```
You can run the file `db.sql` via the command `psql` as follows:
```sh!
psql -U postgres -d project -f sql/db.sql
```
:::warning
**Note:** Do not forget, that the file `.sql` should not return errors when you run it for the second time, so you should clear/drop the objects before creating new database objects.
<!-- This is true for all scripts in the pipeline. -->
:::
<!-- # HDFS -->
# AVRO & Snappy
**Avro** is a row oriented semi-structured data format for storing Big Data files, actively used in the Apache Hadoop ecosystem and widely used as a serialization platform. In addition to the semi-structured nature of the Avro format, the files are also splittable, which means that the Hadoop platform can separate the file into individual sections which increases the processing efficiency during data analysis.

Avro stores the data definition as schemas in JSON format `.avsc`, making it easy to read and interpret; the data itself is stored in binary format `.avro` making it compact and efficient. Avro files include markers that can be used to split large data sets into subsets suitable for Apache MapReduce processing. When Avro data is read, the schema used when writing it is always present. This also facilitates use with dynamic, scripting languages, since data, together with its schema, is fully self-describing.

**Snappy** is a compression/decompression library. It optimizes for very high-speed compression and decompression. It does not aim for maximum compression, or compatibility with any other compression library; instead, it aims for very high speeds and reasonable compression.

As shown in the chart above that Avro increases storage space and write time modestly while significantly reducing read time. The addition of Snappy compression increases write time minimally, while significantly decreasing storage space and maintaining minimal read time. The resulting combination optimizes for single archival write with multiple read usage.
<!-- Practically, when we import the dataset as AVRO file you will get a schema file `` -->
# Import the database into HDFS via Sqoop
Sqoop is a command-line tool used for importing data from relational databases to HDFS. It is a data ingestion tool that enables you to bulk import and export data from a database. You can use Sqoop to import data into HDFS or directly into Hive. Sqoop is installed in HDP as a client and we will use the command `sqoop` for that purpose. We will import the tables created in PostgreSQL to HDFS and store them in `/project` folder.
First of all, we need to add some configuration for PostgreSQL as follows:
`host all all 0.0.0.0/0 trust`
You need to add it to the file `/var/lib/pgsql/data/pg_hba.conf`.
:::warning
**Note:** Do not forget to restart Postgresql service as `systemctl restart postgresql` after updating the config file.
:::
Then we need to download the JDBC driver for Postgresql database and put it in the lib path of Sqoop tool as follows:
```powershell=
wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jar --no-check-certificate
cp postgresql-42.6.0.jar /usr/hdp/current/sqoop-client/lib/
```
<!--  -->
Now we are ready to run commands on Sqoop to call Postgresql databases. You run these commands on the cluster node. Here, I put some examples.
1. Sqoop help
```powershell!
sqoop help
```

2. Sqoop version
```powershell!
sqoop-version
```
OR
```powershell!
sqoop version # This is true for all other commands of Sqooop
```

3. List all databases for user `postgres`
```powershell!
sqoop list-databases --connect jdbc:postgresql://localhost/postgres --username postgres
```
:::info
The JDBC connection url `jdbc:postgresql://localhost/postgres` consists of the schema `jdbc:postgresql:`, hostname `//localhost` and database name `postgres` which is the default database.
Here `postgres` in the url is used only for passing some database. It does not affect the result if we change to some other existing databases.
:::

4. List all tables of database `project` for user `postgres`
```powershell!
sqoop list-tables \
--connect jdbc:postgresql://localhost/project \
--username postgres
```

## Sqoop eval
The `sqoop eval` tool allows users to quickly run simple SQL queries against a database; results are printed to the console. This allows users to preview their import queries to ensure they import the data they expect.
5. Evaluate a query result of database `project` for user `postgres`
<!-- sqoop list-d -->
```powershell!
sqoop eval \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--query "SELECT * FROM emps WHERE deptno=10"
```
:::info
`sqoop eval` accepts only `--query` option for evaluating the query. Here, you cannot user `--where` option but you can write WHERE predicate in the query.
:::
## Sqoop import
The `sqoop import` tool imports an individual table from an RDBMS to HDFS. Each row from a table is represented as a separate record in HDFS. Records can be stored as text files (one record per line), or in binary representation as Avro or SequenceFiles.
6. Import a specific table `emps` from the database `project` with specific columns `empno, sal, deptno` and specific predicate `deptno>10` and store it in HDFS at default target folder `/root/emps`.
```powershell!
sqoop import \
--connect jdbc:postgresql://localhost/postgres \
--username postgres \
--table emps \
--columns "empno, sal, deptno" \
--where "deptno>10"
```
:::info
**Note:** If you run import for the second time and the file exists in HDFS, it will not **fail** the job. You can pass `--delete-target-dir` option to remove the old files or specify the new target directory in HDFS via the option `--target-dir`.
:::
7. Import the result of a query from the database `project` with specific columns `empno, sal, deptno, dname` and specific predicate `deptno>10` and store it in HDFS at a specific target folder `/project/data`.
```powershell
sqoop import \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--query "SELECT empno, sal, e.deptno, dname FROM emps AS e JOIN depts AS d ON (e.deptno=d.deptno) WHERE e.deptno>10 AND \$CONDITIONS " \
--target-dir /project/data \
--split-by "e.deptno"
```
:::info
**Note:** If you use `--query` then you should specify the target directory `--target-dir`, the column to split by `--split-by`, and add the token `$CONDITIONS` to WHERE clause.
The default behavior of `--split-by` is to split the query based on the primary key. You can ask `Sqoop import` to use one mapper if no primary key is in the table by using the option `--autoreset-to-one-mapper`.
:::
8. Import all tables of the database `project` and store them in HDFS at `/project` folder.
```powershell!
sqoop import-all-tables \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--warehouse-dir /project
```
:::info
**Note:** Here you cannot use `--target-dir` but you have `--warehouse-dir` to specify the folder where the imported tables will be stored. To delete the previous warehouse folder, you need to do it manually via `hdfs dfs -rm -r` command as follows:
```powershell!
hdfs dfs -rm -r /project
```
:::
:::info
In import commands of Sqoop, you can use the option `--direct` to speed up the import process where the connector of the database is used instead of map-reduce jobs but it has limitations.
:::
9. Import all tables of the database `project` and store them in HDFS at `/project` folder as AVRO data files.
:::danger
```powershell!
sqoop import-all-tables \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--warehouse-dir /project \
--as-avrodatafile
```
:::
:::warning
**Note:** You may encounter an issue when importing as AVRO data file and will fail the Mapreduce job, as follows:
```!
Error: org.apache.avro.reflect.ReflectData.addLogicalTypeConversion(Lorg/apache/avro/Conversion;)V
23/03/26 01:03:36 INFO mapreduce.Job: Task Id : attempt_1679726770006_0016_m_000000_1, Status : FAILED
```
The solution is to add the option `-Dmapreduce.job.user.classpath.first=true` after the tool name immediately, as follows.
<!--  -->
```powershell!
sqoop import-all-tables \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--warehouse-dir /project \
--as-avrodatafile
```
Source: https://community.cloudera.com/t5/Support-Questions/Sqoop-import-to-avro-failing-which-jars-to-be-used/m-p/124080#M86824
By default Hadoop framework jars appear before the users’ jars in the classpath, such that the classes from the 2.x.x jar will still be picked.
:::
:::info
**Note:** You cannot use the option `--direct` for AVRO files since AVRO is not supported here.
Importing files as AVRO format will generate schema `.avsc` for each table in the current directory of the local file system. These schemas need to be placed in HDFS manually via command `hdfs dfs -put`. You can specify the path where the `.avsc` files are stored via the option `--outdir`.
:::
10. Import all tables of the database `project` and store them in HDFS at `/project` folder as AVRO data files and compressed using Snappy compression method in HDFS.
```powershell!
sqoop import-all-tables \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--warehouse-dir /project \
--as-avrodatafile \
--compression-codec=snappy \
--outdir /project/avsc \
--m 1
```
:::info
You can limit the number of file partitions or mappers via setting the option `--m`, for instance `--m 1` for one mapper.
:::
## Sqoop job
The `sqoop job` tool allows you to create and work with saved jobs stored in Sqoop metastore. Saved jobs remember the parameters used to specify a job, so they can be re-executed by invoking the job by its handle.
If a saved job is configured to perform an incremental import, state regarding the most recently imported rows is updated in the saved job to allow the job to continually import only the newest rows.
11. Create a job to import all tables of the database `project` and store it in HDFS at `/project` folder as AVRO data files and compressed using Snappy compression method in HDFS and using only one mapper.
```powershell!
sqoop job \
--create myprojectjob \
-- import-all-tables \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--warehouse-dir /project \
--as-avrodatafile \
--compression-codec=snappy \
--m 1
```
:::warning
If these `sqoop job` commands ask for the password of the database user, just skip it by pressing Enter. The Sqoop metastore is not a secure resource. Multiple users can access its contents. For this reason, Sqoop does not store passwords in the metastore. If you create a job that requires a password, you will be prompted for that password each time you execute the job.
:::
12. List all jobs.
```powershell!
sqoop job --list
```
13. Inspect your saved job.
```powershell!
sqoop job --show myprojectjob
```
14. Execute the saved job.
```powershell!
sqoop job --exec myprojectjob -- --username postgres
```
## [self-study] Incremental import
Sqoop provides an incremental import mode which can be used to retrieve only rows newer than some previously-imported set of rows. Here we have three options:
```powershell!
--check-column <col> # Specifies the column to be examined when determining which rows to import. The column should not be of string type.
--incremental <mode> # Specifies how Sqoop determines which rows are new. Legal values for mode include "append" and "lastmodified".
--last-value <value> # Specifies the maximum value of the check column from the previous import.
```
You should specify `append` mode when importing a table where new rows are continually being added with increasing row id values. You specify the column containing the row’s id with `--check-column`. Sqoop imports rows where the check column has a value greater than the one specified with `--last-value`.
An alternate table update strategy supported by Sqoop is called `lastmodified` mode. You should use this when rows of the source table may be updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value are imported.
At the end of an incremental import, the value which should be specified as `--last-value` for a subsequent import is printed to the screen. When running a subsequent import, you should specify `--last-value` in this way to ensure you import only the new or updated data. This is handled automatically by creating an incremental import as a saved job, which is the preferred mechanism for performing a recurring incremental import.
:::warning
Incremental import works only for `sqoop import` tool.
:::
You can see an example in this [tutorial](https://medium.com/@nitingupta.bciit/apache-sqoop-incremental-import-with-sqoop-job-949b60e1101a) for incremental import.
<!--
15. Import the data from table `emps` from the database `project` and store
```powershell!
sqoop import \
--connect jdbc:postgresql://localhost/project \
--username postgres \
--query "SELECT empno, sal, e.deptno, dname FROM emps AS e JOIN depts AS d ON (e.deptno=d.deptno) WHERE e.deptno>10 AND \$CONDITIONS " \
--target-dir /project/data \
--split-by "e.deptno" \
--incremental append \
--check-column "e.deptno"
``` -->
<!-- ## Sqoop export
-->
<!-- :::warning
Do not forget to clear the folder `/project` if exists before importing the data.
:::
-->
# References
- [toy-dataset](https://www.cs.uct.ac.za/mit_notes/database/htmls/chp03.html)
- [Sqoop User Guide](https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_introduction)
- [PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-copy.html)
- [Tutorial on Apache Sqoop](https://towardsdatascience.com/apache-sqoop-1113ce453639)
<!--
In order to use this tool
1. Install Impala server as explained.
2. Restart the sandbox
3. Add the service to Amabri dashboard

4. Select Impala and press Next.

5. In `Assign Masters` window, leave the default settings.
6. In `Assign Slaves and Clients` window, leave the default settings.

7. In `Customize Services` window. leave the default settings.

8. In the next window, leave the default settings.

9. You may get some warnings related to the resources. That is fine.

10. In `Review` window, click `Deploy` to deploy the server to the cluster.
11. The installation may take some time.

-->