# 5-3. Hive
- Hadoop is a popular open-source map-reduce implementation to store and process extremely large data sets on commodity hardware.
- However, the map-reduce programming model is very low level and requires developers to write custom programs which are hard to maintain and reuse.
- Hive is an open-source data warehousing solution built on top of Hadoop.
- Hive supports queries expressed in a SQL-like declarative language - HiveQL, which are compiled into map-reduce jobs that are executed using Hadoop.
## Introduction
Core: Scalable analysis on large data sets
- a flexible infrastructure that caters to the needs of these diverse applications and users and that also scales up in a cost effective manner with the ever increasing amounts of data
- infrastructure that could scale along with our data -> **Hadoop**
Hadoop lacked the expressiveness of popular query languages like SQL
improve the query capabilities of Hadoop
- bring the familiar concepts of tables, columns, partitions and a subset of SQL to the unstructured world of Hadoop
## Data Model, Type System And Query Language
### Data Model and Type System
- primitive types
- Integers – bigint(8 bytes), int(4 bytes), smallint(2 bytes), tinyint(1 byte). All integer types are signed.
- Floating point numbers – float(single precision), double(double precision)
- String
- complex types (natively)
- Associative arrays – map\<key-type, value-type>
- Lists – list\<element-type>
- Structs – struct\<file-name: field-type, … >
These complex types are templated and can be composed to generate types of arbitrary complexity.
- Ex: list<map<string, struct<p1: int, p2:int>>>
- can all be put together in a create table statement to create tables with the desired schema
```sql
CREATE TABLE t1(st string, fl float, li list<map<string,
struct<p1:int, p2:int>>);
```
Query expressions can access fields within the structs using a ‘.’ operator.
Values in the associative arrays and lists can be accessed using ‘[]’ operator.
Ex:`t1.li[0]['key'].p2`
With these constructs Hive is able to support structures of arbitrary complexity.
用上述方法創建的table會直接用Hive提供的預設serializer和deserilaizer做序列化與反序列化。但Hive也有提供處理其他程式提供的資料或是legacy data的彈性,讓資料不需要透過轉換就能放入table,對大型的data set來說可以節省大量的時間。
:::spoiler providing a jar that implements the SerDe java interface to Hive
the type information can also be provided by that jar by
- providing a corresponding implementation of the ObjectInspector java interface
- exposing that implementation through the getObjectInspector method present in the SerDe interface
:::
<br/>
Any arbitrary data format and types encoded therein can be plugged into Hive by providing a jar that contains the implementations for the SerDe and ObjectInspector interfaces
Once the proper associations have been made between the table and the jar, the query layer treats these on par with the native types and formats.
Ex: Add a jar containing the SerDe and ObjectInspector interfaces to the distributed cache
```sql
add jar /jars/myformat.jar;
CREATE TABLE t2
ROW FORMAT SERDE 'com.myformat.MySerDe';
```
so that it is available to Hadoop and then proceeds to create the table with the custom serde.
### Query Language
Hive query language (HiveQL)
- a subset of SQL
- some extensions that we have found useful in our environment
SQL-like
- clause sub-queries
- various types of joins – inner, left outer, right outer and outer joins
- cartesian products
- group bys and aggregations
- union all
- create table as select
- many useful functions on primitive and complex types
讓任何熟悉SQL的人都可以輕鬆地著手使用Hive cli開始query。
Useful metadata browsing capabilities
- show tables and describe
- explain plan capabilities to inspect query plans
- though the plans look very different from what you would see in a traditional RDBMS
some limitations
- e.g. only equality predicates are supported in a join predicate and the joins have to be specified using the ANSI join syntax such as
```sql
SELECT t1.a1 as c1, t2.b1 as c2
FROM t1 JOIN t2 ON (t1.a2 = t2.b2);
```
而非傳統的
```sql
SELECT t1.a1 as c1, t2.b1 as c2
FROM t1, t2
WHERE t1.a2 = t2.b2;
```
- Hive currently does not support inserting into an existing table or data partition and all inserts overwrite the existing data.
```sql
INSERT OVERWRITE TABLE t1
SELECT * FROM t2;
```
with more frequent loads the number of partitions can become very large and that may require us to implement INSERT INTO semantics.
The lack of INSERT INTO, UPDATE and DELETE in Hive on the other hand do allow us to use very simple mechanisms to deal with reader and writer concurrency without implementing complex locking protocols.
除了上述限制外,HiveQL has extensions to support analysis expressed as map-reduce programs by users and in the programming language of their choice.
- enables advanced users to express complex logic in terms of map-reduce programs that are plugged into HiveQL queries seamlessly
```sql
FROM (
MAP doctext USING 'python wc_mapper.py' AS (word, cnt)
FROM docs
CLUSTER BY word
) a
REDUCE word, cnt USING 'python wc_reduce.py';
```
- MAP: how the input columns (doctext) can be transformed using a user program (python we_mapper.py) into output columns (word and cnt)
- CLUSTER BY: specifies the output columns that are hashed on to distributed the data to the reducers
- REDUCE: specifies the user program to invoke (wc_reduce.py) on the output columns of the sub query
Hive provides the DISTRIBUTE BY and SORT BY clauses
```sql
FROM (
FROM session_table
SELECT sessionid, tstamp, data
DISTRIBUTE BY sessionid SORT BY tstamp
) a
REDUCE sessionid, tstamp, data USING 'session_reducer.sh';
```
Hive allows users to interchange the order of the FROM and SELECT/MAP/REDUCE clauses within a given sub-query.
-> useful and intuitive when dealing with multi inserts
HiveQL supports inserting different transformation results into different tables, partitions, hdfs or local directories as part of the same query.
```sql
FROM t1
INSERT OVERWRITE TABLE t2
SELECT t3.c2, count(1)
FROM t3
WHERE t3.c1 <= 20
GROUP BY t3.c2
INSERT OVERWRITE DIRECTORY '/output_dir'
SELECT t3.c2, avg(t3.c1)
FROM t3
WHERE t3.c1 > 20 AND t3.c1 <= 30
GROUP BY t3.c2
INSERT OVERWRITE LOCAL DIRECTORY '/home/dir'
SELECT t3.c2, sum(t3.c1)
FROM t3
WHERE t3.c1 > 30
GROUP BY t3.c2;
```
different portions of table t1 are aggregated and used to generate a table t2, an hdfs directory(/output_dir) and a local directory(/home/dir on the user's machine).
## Data Storage, SerDe And File Formats
### Data Storage
Tables are logical data units in Hive.
The primary data units and their mappings in the hdfs name space:
- Tables – A table is stored in a directory in hdfs.
- Partitions – A partition of the table is stored in a subdirectory within a table's directory.
- Buckets – A bucket is stored in a file within the partition's or table's directory depending on whether the table is a partitioned table or not.
warehouse_root_directory 預設是 /user/hive/warehouse
A table may be partitioned or non-partitioned. 可在 CREATE TABLE 時透過 PARTITIONED BY 指定。
```sql
CREATE TABLE test_part(c1 string, c2 int)
PARTITIONED BY (ds string, hr int);
```
上方例子的 table partitions 在 hdfs 中會存在 /user/hive/warehouse/test_part 的 directory。對每個唯一的ds和hr組合,都會有一個partition。Partitioning columns不是table data的一部分,且partition column values are encoded in the directory path of that partition (also stored in the table metadata).
A new partition can be created through
- INSERT
- ALTER
```sql
INSERT OVERWRITE TABLE
test_part PARTITION(ds='2009-01-01', hr=12)
SELECT * FROM t;
```
-> populates the partition with data from table t
corresponding directory: /user/hive/warehouse/test_part/ds=2009-01-01/hr=12
```sql
ALTER TABLE test_part
ADD PARTITION(ds='2009-02-02', hr=11);
```
-> creates an empty partition
corresponding directory: /user/hive/warehouse/test_part/ds=2009-02-02/hr=11
Hive compiler可以用這些資訊刪減為了evaluate query需要掃過的資料。
```sql
SELECT * FROM test_part WHERE ds='2009-01-01'
```
只會掃描 /user/hive/warehouse/test_part/ds=2009-01-01
```sql
SELECT * FROM test_part
WHERE ds='2009-02-02' AND hr=11;
```
只會掃描 /user/hive/warehouse/test_part/ds=2009-02-02/hr=11
:::warning
原文 /user/hive/warehouse/test_part/ds=2009-01-01/hr=12 應該是寫錯。
:::
修剪資料對處理查詢所需的時間有顯著影響。在許多方面,這種分區方案類似於許多資料庫供應商的list partitioning,但差異在於partition keys的值與metadata而不是資料一起儲存。
A bucket is a file within the leaf level directory of a table or a partition.
- 建立表格時,使用者可以指定所需的bucket數量以及用於儲存資料的column
- 目前的實作中,這用於修剪資料,以利使用者對data sample查詢
- 例:被分成32個bucket的table,可以透過選擇查看第一個bucket的資料快速產生一個1/32的sample
```sql
SELECT * FROM t TABLESAMPLE(2 OUT OF 32);
```
掃描第二個儲存桶中存在的資料。
the bucketing information should be used with caution
- ensuring that the bucket files are properly created and named are a responsibility of the application
- HiveQL DDL statements do not currently try to bucket the data in a way that it becomes compatible to the table properties
雖然對應到table的資料總是儲存在<warehouse_root_directory>/test_table,Hive也提供EXTERNAL TABLE讓使用者可以query儲存在其他位置的資料
```sql
CREATE EXTERNAL TABLE test_extern(c1 string, c2 int)
LOCATION '/user/mytables/mydata';
```
test_extern is an external table with each row comprising of two columns – c1 and c2.
external table和一般table的差別,只在drop table時,external table只會drop table metadata,不會刪除任何資料。而normal table還會刪除與該table關聯的資料。
### Serialization/Deserialization (SerDe)
Hive 可以採用使用者提供的 SerDe java 介面的實作並將其關聯到table或partition,可以輕鬆地interprete和query自訂資料格式。
Hive中預設的SerDe實作是LazySerDe
- deserializes rows into internal objects lazily
- 因此,只有在query expression需要該row的column時,才會有column的deserialization cost
- assumes that the data is stored in the file such that
- the rows are delimited by a newline (ascii code 13)
- the columns within a row are delimited by ctrl-A (ascii code 1)
- 也可用於讀取在列之間使用任何其他delimiter的資料
```sql
CREATE TABLE test_delimited(c1 string, c2 int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\002'
LINES TERMINATED BY '\012';
```
- uses ctrl-B (ascii code 2) as a column delimiter
- uses ctrl-L(ascii code 12) as a row delimiter
也可以用不同的delimiters為以下定界
- serialized keys and values of maps
- the various elements of a list (collection)
```sql
CREATE TABLE test_delimited2(c1 string, c2 list<map<string, int>>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\002'
COLLECTION ITEMS TERMINATED BY '\003'
MAP KEYS TERMINATED BY '\004';
```
除了 LazySerDe 之外,隨發行版提供的 hive_contrib.jar 中還存在其他一些有趣的 SerDe。
其中 **RegexSerDe** 特別有用,讓使用者能夠指定正規表示式,從一行中解析出各個列。
例:可以使用以下statement來解釋apache log。
```sql
add jar 'hive_contrib.jar';
CREATE TABLE apachelog(
host string,
identity string,
user string,
time string,
request string,
status string,
size string,
referer string,
agent string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES(
'input.regex' = '([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^\"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^\"]*|\"[^\"]*\"))?',
'output.format.string' = '%1$s %2$s %3$s %4$s %5$s %6$s
%7$s %8$s %9$s');
```
- input.regex: regular expression applied on each record
- output.format.string: how the column fields can be constructed from the group matches in the regular expression
上方例子還說明如何了使用WITH SERDEPROPERTIES子句將任意key value pairs傳遞到serde,這對於將任意參數傳遞到自訂的SerDe非常有用。
### File Formats
- Hadoop file format
- TextInputFormat: Text files
- SequenceFileInputFormat: Binary files
- 使用者自訂
- Hive does not impose an restrictions on the type of file input formats, that the data is stored in.
- The format can be specified when the table is created.
- TextInputFormat
- SequenceFileInputFormat
- RCFileInputFormat: stores the data in a column oriented manner
-> performance improvements specially for queries that do not access all the columns of the table
Users can add their own file formats and associate them to a table as shown in the following statement.
```sql
CREATE TABLE dest1(key INT, value STRING)
STORED AS
INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileOutputFormat'
```
STORED AS: 指定用於table或partition的directory中,檔案的輸入和輸出格式的類別。
- 可以是實作 FileInputFormat 和 FileOutputFormat java 介面的任何class
- Classes 可以透過 jar 形式提供給 Hadoop,其方式類似於添加自訂 SerDes 範例中所示的方式
## System Architecture And Components

Main building blocks in Hive:
- **Metastore**: 儲存系統目錄及table、columns、partition等metadata
- **Driver**: 管理一個 HiveQL statement 在 Hive 中移動時的 lifcycle,也維護 session handle 和任何 session 的 statistics
- **Query Compiler**: 將 HiveQL 編譯成 map/reduce tasks 的 DAG
- **Execution Engine**: 以適當的dependency order執行編譯器產生的任務,和底層的 Hadoop instance 互動
- **HiveServer**: 提供 thrift interface 和 JDBC/ODBC server,並提供一種將 Hive 與其他應用程式整合的方法
- Clients components, like
- **Command Line Interface (CLI)**
- **web UI**
- **JDBC/ODBC driver**
- Extensibility Interfaces
- **SerDe**
- **ObjectInspector**
- **UDF(User Defined Function)** and **UDAF(User Defined Aggregate Function)** interfaces that enable users to define their own custom functions
流程
- HiveQL 語句透過 CLI、Web UI 或使用 thrift、odbc 或 jdbc 介面的外部客戶端提交。
- Driver將query傳遞給compiler
- 在compiler中使用Metastore儲存的metadata執行typical parse、type check和semantic analysis階段
- 編譯器產生一個logical plan,透過一個簡單的rule based optimizer對其進行最佳化,產生一個由map-reduce任務和hdfs任務組成的DAG形式的optimized plan
- Execution engine 用 Hadoop 依照task的dependencies順序執行這些任務
接下來說明 Metastore、Query Compiler和Execution Engine的細節。
### Metastore
- role: system catalog
- stores all the information about the tables, their partitions, the schemas, the columns and their types, the table locations etc
- can be queried or modified using a thrift interface
-> can be called from clients in different programming languages
- store in traditional RDBMS for very low latency (to serve the compiler)
- an application that runs on an RDBMS and uses an open source ORM layer called DataNucleus
- The DataNucleus layer allows us to plugin many different RDBMS technologies
- Facebook's deployment uses mysql to store this
Critical for Hive! Else cannot impose a structure on hadoop files
-> back up regularly
理想:replicated server,需能跟query數量一起scale。為此,mapper或reducer需要的任何metadata都由compiler產生的xml plan files傳遞。
Metastore 中的 ORM 邏輯可以部署在client libraries,以便它在用戶端運行,並直接呼叫 RDBMS。
與 Hive 互動的 client
- CLI 或 Web UI,容易上手,理想的選擇
- 非 Java 編寫的 client(由 python、php 等語言的程式)操作和查詢,則必須部署單獨的 Metastore server
### Query Compiler
Query compiler用Metastore的metadata產生execution plan。
- **Parse**: Hive 使用 Antlr 產生查詢的抽象語法樹 (AST)
- **Type checking and Semantic Analysis**: Compiler從Metastore獲取所有輸入和輸出table的資訊,並用其建立logical plan。檢查表達式中的類型相容性並標記此階段的任何編譯時語義錯誤。透過query block (QB) tree,轉換AST到operator DAG。編譯器將巢狀查詢轉換為 QB tree中的父子關係。同時,QB 樹表示法也有助於將 AST 樹的相關部分組織成一種比普通 AST 更適合轉換成operator DAG 的形式。
- **Optimization**: 最佳化邏輯由一系列的轉換組成。任何希望改變編譯器或希望添加新的最佳化邏輯的人都可以輕鬆地通過時做轉換作為Transform介面的擴展,並將其添加到最佳化器中的轉換鏈中來完成。
- Transformation logic通常由在operator DAG上遍歷所組成,當滿足相關條件或規則時在operator DAG上採取某些處理動作。
- 五個主要的interface: Node, GraphWalker, Dispatcher, Rule and Processor
典型的轉換包括遍歷DAG,並在訪問每個Node時檢查是否滿足某個Rule,如果滿足,則調用該Rule對應的Processor。Dispatcher維護從Rules到Processors的映射並進行Rule matching,被傳遞給Graph Walker,以便在訪問Node時可以派發適當的Processor。
下方的流程圖示意了典型的轉換結構。

The following transformations are done currently in Hive as part of the optimization stage:
- **Column pruning**: 確保只有query processing需要的columns會被project出來
- **Predicate pushdown**: Predicate會盡可能地被push down到scan,以便中盡早過濾掉不需要的rows
- **Partition pruning**: 用partitioned columns上的predicate剪除不滿足predicate的檔案
- **Map side joins**: 在join操作中,如果有某些table特別小的話,small tables會被複製到所有的mappers,然後和其他table做join。可在query中用以下形式的hint觸發:
```sql
SELECT /*+ MAPJOIN(t2) */ t1.c1, t2.c1
FROM t1 JOIN t2 ON(t1.c2 = t2.c2);
```
有些可以控制mapper用來複製table內容的記憶體量的參數:
- hive.mapjoin.size.key: 為系統提供join key的大小
- hive.mapjoin.cache.numrows: 控制隨時保存在記憶體中的table row數
- **Join reordering**: 較大的tables是串流傳輸的,不會在reducer的記憶體中具體化(materialized),而較小的table則保留在記憶體中。確保了 join 操作不會超出reducer 端的記憶體限制。
除了MAPJOIN hint以外,使用者還可以提供hint或設定參數來執行以下操作:
- **重新分區資料以處理GROUP BY形成的偏差(skew)**: 特別是在GROUP BY的column有 power law distribution時,最好使用兩個map/reduce階段來計算聚合。
1. 資料隨機分佈(或在不同聚合的情況下分佈在 DISTINCT 列上)到reducer,並計算部分聚合
2. 將部分聚合根據 GROUP BY columns 重新分配到 reducer
```sql
set hive.groupby.skewindata=true;
SELECT t1.c1, sum(t1.c2)
FROM t1
GROUP BY t1;
```
:::warning
應該是`GROUP BY t1.c1;`按照 t1 中 c1 的值進行分組,
將相同 c1 值的行放在同一組中,然後對每個組中的 c2 值進行加總。
:::
- **Mapper中基於hash的部分聚合**: 減少發送到reducer的資料量,最大限度地減少sorting和merging時間,提高效能。
- hive.map.aggr.hash.percentmemory: 讓使用者管理mapper上的記憶體使用量,指定分配給hash table的比例,在達到閾值時觸發資料傳輸到reducer。
- hive.map.aggr.hash.min.reduction: 控制mapper中的記憶體使用量,以進行進一步最佳化。
**Generation of the physical plan**
- Optimization階段生成的logical plan會被拆分為多個Map/Reduce任務和HDFS任務。
- 例:a group by on skewed data
- two map/reduce tasks
- a final hdfs task: moves the results to the correct location in hdfs
- 這個階段結束時,physical plan看起來像一個任務DAG,每個任務封裝了plan的一部分。
A sample multi-table insert query
```sql
FROM (SELECT a.status, b.school, b.gender
FROM status_updates a JOIN profiles b
ON (a.userid = b.userid
AND a.ds='2009-03-20' )) subq1
INSERT OVERWRITE TABLE gender_summary
PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1)
GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary
PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1)
GROUP BY subq1.school
```
- **Node**: physical operator
- **Last line in each node**: the output schema of that operator
- **Edge**: the flow of data between operators
下方的plan有三個map-reduce jobs。


同map-reduce job中, 以repartition operator (ReduceSinkOperator)為界,以下由mapper執行,以上由reducer執行。Repartition則由execution engine執行。
請注意第一個map-reduce job寫了兩個暫存檔案到HDFS(tmp1、tmp2),分別由第二個和第三個map-reduce job使用。因此,第二和第三個map-reduce job要等第一個map-reduce job完成。
### Execution Engine
- 任務按照其依賴關係的順序執行
- Map/Reduce task先將plan部分序列化到 plan.xml 檔案中
- 將該檔案新增至任務的job cache中,並使用 Hadoop 產生 ExecMapper 和 ExecReducers 的實例
- 其中每個類別都會反序列化 plan.xml 並執行 operator DAG 的相關部分
- 最終結果儲存在臨時位置
- 在整個查詢結束時,如果是 DML,最終資料會被移動到所需位置
- 在查詢的情況下,資料是從臨時位置提供的
## Hive Usage In Facebook
* Hive and Hadoop are extensively used at Facebook for various data processing tasks, with a current warehouse size of 700TB (2.1PB raw with replication) and adding 5TB (15TB with replication) of compressed data daily.
* More than half of the workload involves adhoc queries, enabled by Hive's simplicity, but sharing resources between adhoc and reporting users poses operational challenges due to job unpredictability and lack of resource tuning.
* Daily Hive jobs at Facebook vary widely, from simple summarization tasks to complex machine learning algorithms, catering to both novice and advanced users with minimal training required.
* Heavy usage has resulted in a large number of tables in the warehouse, increasing the need for data discovery tools, especially for new users, while maintaining cost efficiency compared to traditional data warehousing and ensuring scalability with Hadoop's ability to scale to thousands of commodity nodes.
## Related Work
Recent work on pet abyte scale data processing systems
- Scope is an SQL-like language on top of Microsoft's proprietary Cosmos map/reduce and distributed file system.
- Pig allows users to write declarative scripts to process data.
Hive 的特別在於它提供一個system catalog,用於保存有關系統內 table 的 metadata。這使得 Hive 可以充當傳統資料倉庫,與 MicroStrategy 等標準報表工具互動。
HadoopDB 重複使用了 Hive 的大部分系統,只不過它在每個節點中使用傳統的資料庫實例來儲存資料,而非使用分散式檔案系統。
## Conclusions And Future Work
- Hive is actively developed by Facebook and external contributors.
- HiveQL evolution: Moving towards full SQL syntax support
- Optimization enhancements:
- Transitioning to cost-based optimizer
- Implementing adaptive techniques for efficient query plans
- Performance improvements: Exploring columnar storage and data placement
- Benchmarking and analysis: Identifying areas for performance enhancement
- Integration and connectivity: Enhancing JDBC and ODBC drivers for BI tool integration
- Advanced query techniques: Researching multi-query optimization and efficient joins
##
YouTube Video
[Apache Hive tutorial for Beginners : Hive Training](https://www.youtube.com/playlist?list=PLFxgwFEQig6w_A4nQcXOM96m2-pZHMDOB)
Helpful Note
https://huayuzhang.medium.com/notes-on-the-apache-hive-paper-a9f5e55249d7