
Apache Hive is a data warehouse which is great for gathering analytics on large datasets (terabytes and petabytes of data) stored in Hadoop's HDFS.
In this series we will discuss features and optimizations that can speed up your performance when using Hive. The first article in this series will talk about partitions, and the map side join feature.
Partitions
Partitions in Hive allow you to store relatable information together in the same directory. This allows you to search a directory when filtering on the partitioned column rather than the entire dataset.
Lets consider an example:
First we create a table (users) with a partitioned column (city).
CREATE TABLE IF NOT EXISTS users (first_name STRING, age INT) PARTITIONED BY (city STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY "," LINES TERMINATED BY "\n"
Note that the table is formatted so a CSV (comma separated value) file can be loaded in
Let's create a CSV file (with the name users_ny.csv) containing the first name and age of the users. Note that the users in this file will be from the city of New York
Varun,23
John,25
Dylan,18
Alyssa,28
Suresh,27
Jane,25
David,19
Chris,23
Lets load the CSV file into our users table. Note that we specify the partition column city='NY'
LOAD DATA LOCAL INPATH '/root/users_ny.csv' INTO TABLE users PARTITION(city='NY')
Lets create another CSV file (with the name users_ca.csv) with first name and age of the users. Note that the users in this file will be from California
Kawhi, 27
Lebron,34
Mary,24
Olivia,28
Emily,33
Ella,55
Noah,25
William,27
Lets load the CSV file into our users table. Note that we specify the partition city='CA'
LOAD DATA LOCAL INPATH '/root/users_ca.csv' INTO TABLE users PARTITION(city='CA')
Now that our data is in the users table let's search the HDFS directory where our table is stored
hdfs dfs -ls /user/hive/warehouse/users
The output of this command is as follows:
/user/hive/warehouse/users/city=CA
/user/hive/warehouse/users/city=NY
Note that we have two separate directories, one for each value of the city column (California and New York). This is because we have partitioned the city column.
Now if we were to query the users table with a filter on the city column Hive will only search a single directory. This greatly improves performance
For example, if we were to query where city=NY Hive will only search the /user/hive/warehouse/users/city=NY directory and not the city=CA dir
ectory
Lets understand the performance impact of not partitioning your table
Performance with partitioning
SELECT * FROM users WHERE city='NY'Time taken: 0.768 seconds, Fetched: 8 row(s)
SELECT * FROM users WHERE city='CA'Time taken: 0.126 seconds, Fetched: 8 row(s)
Performance without partitioning
SELECT * FROM users WHERE city='NY'Time taken: 23.105 seconds, Fetched: 8 row(s)
SELECT * FROM users WHERE city='CA'Time taken: 23.611 seconds, Fetched: 8 row(s)
As you can see it takes Hive about 30 times longer to query the same 8 rows without partitions. The advantages of partitioning are clear, especially when we have billions of rows instead of 8.
Caveats of Over Partitioning
Over partitioning can create too many small files, an environment not suitable for HDFS.
"HDFS was designed for many millions of large files, not billions of small files"
A major caveat of having too many partitions is there will be a separate directory for each value of that partitioned column. This will increase seek time (the time it takes for the hard drive to find the location of the directory and place its read / write head at the desired track) thus increasing latency.
Another caveat of having too many partitions is that the NameNode will have to keep track of all the files in each partition. This will eventually exhaust the capacity of the NameNode to manage the filesystem metadata.
How Do Joins Work in Hive
In order to understand join optimizations / features we must first understand how an inner join works in Hive. Internally in Hive (if using the Map Reduce engine) any join is converted to a Map Reduce Job.
Lets consider an example where we join two tables, an O
rders Table with a Customers Table, where the join key is the Customer ID
The following are the steps Hive will take to join these two tables
1. Map Phase
Mapper 1 and Mapper 2 will read a block of data from the Orders Table; Mapper 3 will read the only block from the Customers Table
2. Shuffle the Data (Records with same join key will be sent to same reducer)
After all blocks of data have been read from both tables, the Map Reduce engine will send rows with the same join key (customer id) to the same reducer.
Mapper 2 will send the row 10507, 9, 07/19/2018 from the Orders Table to Reducer 2 and Mapper 3 will send the row 9, John from the Customers Table to Reducer 2 as well. This is because both records have the same Custom
er ID (9).
3. Reduce Phase (Join The Record
s)
Now that each reducer contains records from either table with the same join key, it can now join the two records.
For example within Reducer 2, the record 10507, 9, 07/19/2018 from the Orders Table will be combined with 9, John from the Customers Table to produce the record 10507, 9, 07/19/2018, John
The output of our join will be:
10405, 3, 06/17/2018, Chris
10507, 9, 07/19/2018, John
Map Side Join Feature
Map Side Join is a Hive feature that allows the smaller table to be loaded into a distributed cache so that the entire join can be performed entirely within the map phase (reduce phase isn't necessary)
Lets consider the same example as earlier where we are joining a Customers Table with an Orders Table. Let's assume that the Customers Table is considerably smaller than the Orders Table
The following are the steps Hive will take to perform a join between these tables using the Map Side join feature
1. Load the smaller of the two tables into a hash table.
The smaller of the two tables (customers table) is serialized an
d loaded into a hash table. The hash table is then uploaded into a distributed cache. This distributed cache is now available locally to each mapper
2. Map Phase
Each of the n mappers will read a block of data from the larger table (orders table). For each record the mapper will retrieve the value of the join key and check if it exists in the distributed cache. If there is a match then a join will be performed between the two records. Considering that the join was performed within the Map Phase, a reduce p
hase is not needed
Final Words
As a leader in Data Engineering Solutions our goal is to build a community of engineers and leaders that love to implement the latest Big Data Strategies for their organization.
Want to understand our Data Engineering solutions better? Check them out here
Want to learn more about Data Engineering / Devops / and MLOps? Check out more of our blogs here
---
In our next article I will be discussing Bucketing and the Sort Merge Bucket Map Join (SMB Map Join) optimization. In order to understand those concepts one must first understand the underlying concepts in this article.
Have any Questions? Have feedback, suggestions on a topic that I should write about? Share them below in the comments -- Until next time!
Varun is currently a Data Engineer at Viral Software Solutions, a leader in Data Engineering, MLOps, and Devops Solutions. You can find him on Linkedin here
Comments