Using Apache Hive on AWS Elastic MapReduce (EMR) Clusters

Introduction

In this article I provide a brief overview of some of the characteristics and features of Apache Hive, a Modern Distributed Data Warehouse, that runs on a Hadoop Cluster and exposes functionality through a SQL-like programming interface. To reduce the technical overhead and burden of provisioning servers and manually installing the components of a proper Hive Data Warehouse I utilize the Amazon Elastic MapReduce (EMR) service. Amazon EMR makes it very simple to quickly provision and scale a Cluster Computing environment and has configurable bootstrapping recipies to install common popular big data tools like Apache Hive, Apache Flink, Apache Spark and many others.

Contents

Watch on Youtube if you Prefer Video

Overview of Apache Hive

According the the Apache project's home page, Apache Hive is a modern data warehouse technology that enables reading, writing, and managing large datasets in distributed storage, typically within a Hadoop cluster, all using SQL. For me this really means Hive is a data processing tool used on top of Hadoop and exposed through a SQL-like interface making it very accessible to a wide audience of analysts, data scientists, and engineers.

You typically interact with data using Hive by issuing a SQL query via clients such as Hive Web Interface, JDBC Driver, Hive CLI, or Thrift clients like beeline. Hive SQL queries that are submitted to the Hive Server use metadata in the metastore and decompose it into a query plan as a compiled MapReduce job. The MapReduce job is then executed on the Hadoop cluster which processes and aggregates the resultset data.

Just like with a traditional RDBMS you can organize tables of data in a databases but, in Hive there are two forms of tables: managed tables and external tables.

Managed Tables

Hive fully owns all data when it comes to managed tables and the files representing a table's data are stored on disk within the Hadoop HDFS file system under /user/hive/warehouse/databasename.db/tablename/ which should soley be controlled only using Hive functionality.

External Tables

Hive provides the ability to store metadata about the schema of files maintained outside of the control of Hive which is what is meant by an external table. This means that you can use Hive to processes these external datasets using the metadata data you provide about the structure of files but, Hive does not have control over another program altering them.

Creating an EMR Cluster

Please see my earlier article How to Create Interactive AWS Elastic Map Reduce (EMR) Clusters using the AWS CLI where I cover, in detail, how to create an EMR cluster specifically for this type of interactive development and experimentation complete with how to access the cluster with standard SSH connections.

Executing Queries with the Beeline Hive Client

Once you've SSH'd onto your master node of the EMR cluster as the hadoop user you can launch the beeline Hive client shell as shown below.

$ beeline -u jdbc:hive2://
21/06/23 17:24:05 [main]: WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
Connected to: Apache Hive (version 2.3.7-amzn-4)
Driver: Hive JDBC (version 2.3.7-amzn-4)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.7-amzn-4 by Apache Hive
0: jdbc:hive2://>

After you issue the hive command the shell prompt should change to "0: jdbc:hive2://>" signifying the beeline shell is active. Unless I state otherwise all subsequent Hive statements should be issued within the beeline client shell even though I'm omitting it in the examples.

Listing and Creating Databases

For the most part standard ANSI SQL statements apply so you can use things like SHOW DATABASES, USE <database-name>, SHOW TABLES, ect ...

SHOW DATABASES;

Output.

OK
+----------------+
| database_name  |
+----------------+
| default        |
+----------------+
1 rows selected (0.532 seconds)

Create a database named tcidemo to play around with.

CREATE DATABASE tcidemo;

Then show all databases again.

SHOW DATABASES;

Output.

OK
+----------------+
| database_name  |
+----------------+
| default        |
| tcidemo        |
+----------------+
2 rows selected (0.923 seconds)

Create a Hive Table

First I switch the beeline shell context to the tcidemo database so when I create a table it is contained within the tcidemo database.

USE tcidemo;

Then I create a managed table in the tcidemo database named blog_posts. Here is a link to all the supported datatypes available in Hive which are fairly closely inline with ANSI SQL.

CREATE TABLE IF NOT EXISTS blog_posts (
    category STRING,
    published DATE,
    likes INT,
    title STRING
);

Then I inspect and verify the table was created with the proper schema.

DESCRIBE blog_posts;

Output.

OK
+------------+------------+----------+
|  col_name  | data_type  | comment  |
+------------+------------+----------+
| category   | string     |          |
| published  | date       |          |
| likes      | int        |          |
| title      | string     |          |
+------------+------------+----------+
4 rows selected (0.071 seconds)

Quick and Dirty Data Loading into blog_posts Managed Table

Next I insert some data on a few of the more popular blog posts on thecodinginterface.com in what is a bit of an unconventional way to load data into Hive. That is, using a regular ole INSERT statement followed by values typed into the beeline CLI.

INSERT INTO blog_posts
( category, published, likes, title ) 
VALUES
("C++", "2020-04-07", 155, "How To Construct an OpenCV Mat Object from C++ Arrays and Vectors"),
("Data Engineering", "2021-03-29", 74, "How To Use Window Functions in SQL"),
("JavaScript", "2019-08-20", 41, "Bridging Node.js and Python with PyNode to Predict Home Prices"),
("Java", "2019-07-23", 27, "High Level Introduction to Java for Developers"),
("Python", "2019-07-15", 26, "Building a Text Analytics App in Python with Flask Requests BeautifulSoup and TextBlob"),
("Data Engineering", "2021-03-12", 26, "Example Driven High Level Overview of Spark with Python");

Then I run my first SELECT query to get a sum of each categories likes.

SELECT category, sum(likes) AS total_likes
FROM blog_posts
GROUP BY category
ORDER BY total_likes DESC;

Yielding output similar to this.

Query ID = hadoop_20210623174043_9eda813d-b94f-4ceb-a309-f4820bac82fa
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1624468003356_0003)

Map 1: 0/1	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 0(+1)/1	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 1/1	Reducer 2: 1(+0)/2	Reducer 3: 0/1
Map 1: 1/1	Reducer 2: 2/2	Reducer 3: 1/1
OK
+-------------------+--------------+
|     category      | total_likes  |
+-------------------+--------------+
| C++               | 155          |
| Data Engineering  | 100          |
| JavaScript        | 41           |
| Java              | 27           |
| Python            | 26           |
+-------------------+--------------+
5 rows selected (4.509 seconds)

Again, what I just created is referred to as a managed table which simply means Hive has full control over the actual physical data. I can convey this by exiting the beeline shell and showing that the data I just inserted actually lives under the /user/hive/warehouse HDFS filesystem which is where Hive keeps data it has full control over.

To exit the beeline shell type !quit

After doing so I'm back in the standard Linux shell and can inspect the hadoop HDFS filesystem to prove the existence of my tcidemos database and the blog_posts table files.

Databases live under /usr/hive/warehous

hadoop fs -ls /user/hive/warehouse

Output.

Found 1 items
drwxrwxrwt   - hadoop hdfsadmingroup          0 2021-06-23 17:33 /user/hive/warehouse/tcidemo.db

And tables are stored under each database.

hadoop fs -ls /user/hive/warehouse/tcidemo.db

Output.

Found 1 items
drwxrwxrwt   - hadoop hdfsadmingroup          0 2021-06-23 17:33 /user/hive/warehouse/tcidemo.db/blog_posts

Another way I can issue a query to the Hive Server using the beeline client is with the -e flag followed by a one off statement. Below I use this approach to drop the table and database.

beeline -u jdbc:hive2:// -e "drop table tcidemo.blog_posts; drop database tcidemo;"

Now if I list the /user/hive/warehouse directory again it is empty.

Creating External Tables

Hive also has the opposite of a Managed Table, the External Table, where Hive simply maintains metadata about the schema of a dataset which is not under the direct control of Hive.

To demonstrate this I'll create a regular CSV file of the same data I used in the last Managed Table example, import it into the HDFS filesystem then define a external table in Hive pointing to the location of the csv file which can then be queried.

First I create a simple CSV file named blogposts.csv like so.

tee blogposts.csv <<EOF
C++,2020-04-07,155,How To Construct an OpenCV Mat Object from C++ Arrays and Vectors
Data Engineering,2021-03-29,74,How To Use Window Functions in SQL
JavaScript,2019-08-20,41,Bridging Node.js and Python with PyNode to Predict Home Prices
Java,2019-07-23,27,High Level Introduction to Java for Developers
Python,2019-07-15,26,Building a Text Analytics App in Python with Flask Requests BeautifulSoup and TextBlob
Data Engineering,2021-03-12,26,Example Driven High Level Overview of Spark with Python
EOF

Now I create a directory within the HDFS filesystem at /blogposts and copy the new blogposts.csv file into the new /blogposts directory in HDFS.

hadoop fs -mkdir /blogposts
hadoop fs -copyFromLocal blogposts.csv /blogposts/

Now I hop back into the beeline shell.

beeline -u jdbc:hive2://

Then once inside the active beeline shell I recreate the tcidemo database and follow that by creating an external table which references the data at /blogposts/blogposts.csv

CREATE DATABASE tcidemo;
USE tcidemo;
CREATE EXTERNAL TABLE IF NOT EXISTS blog_posts (
	category STRING,
	published DATE,
	likes INT,
   title STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/blogposts';

Now I issue the same query I did last time to verify that the data is the same.

SELECT category, sum(likes) AS total_likes
FROM blog_posts
GROUP BY category
ORDER BY total_likes DESC;

Again, I will use !quit to exit the beeline client then do a listing of the /user/hive/warehouse/tcidemo.db directory and there is no blog_posts table because the data is external to Hive and resides in the /blogposts directory.

Before moving to the next section I will again drop the blog_posts table as I want to demonstrate how to build this table as a managed table once again but loaded from a file if Hive-QL statements refrencing CSV data for import not a series of insert statements. Its also worth noting that this does not effect the data stored in HDFS at /blogposts or the standard Linux filesystem data in the blogposts.csv file either.

beeline -u jdbc:hive2:// -e "drop table tcidemo.blog_posts;"

Executing Hive-QL Scripts to Create a Managed Table and Import CSV Data

For this example I create a managed table which will again be named blog_posts and load it from the file residing in the regular Linux filesystem at /home/hadoop/blogposts.csv. However, instead of executing queries interactively with the beeline shell I save them to a file named create-blogposts.hql and submit the file of queries to the Hive Server using beeline.

Here is the create-blogposts.hql file and contents saved along side the original blogposts.csv file at /home/hadoop

USE tcidemo;

CREATE TABLE IF NOT EXISTS blog_posts (
	category STRING,
	published DATE,
	likes INT,
   title STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH 'blogposts.csv' INTO TABLE blog_posts;

SELECT category, sum(likes) AS total_likes
FROM blog_posts
GROUP BY category
ORDER BY total_likes DESC;

Now I run the below command from the /home/hadoop directory

beeline -u jdbc:hive2:// -f create-blogposts.hql

I get output similar to below showing that the table got created and loaded with data from the blogposts.csv file.

Connected to: Apache Hive (version 2.3.7-amzn-4)
Driver: Hive JDBC (version 2.3.7-amzn-4)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://> USE tcidemo;
OK
No rows affected (0.785 seconds)
0: jdbc:hive2://>
0: jdbc:hive2://> CREATE TABLE IF NOT EXISTS blog_posts (
. . . . . . . . >     category STRING,
. . . . . . . . >     published DATE,
. . . . . . . . >     likes INT,
. . . . . . . . >    title STRING
. . . . . . . . > )
. . . . . . . . > ROW FORMAT DELIMITED
. . . . . . . . > FIELDS TERMINATED BY ',';
OK
No rows affected (0.33 seconds)
0: jdbc:hive2://>
0: jdbc:hive2://> LOAD DATA LOCAL INPATH 'blogposts.csv' INTO TABLE blog_posts;
Loading data to table tcidemo.blog_posts
OK
No rows affected (0.478 seconds)
0: jdbc:hive2://>
0: jdbc:hive2://> SELECT category, sum(likes) AS total_likes
. . . . . . . . > FROM blog_posts
. . . . . . . . > GROUP BY category
. . . . . . . . > ORDER BY total_likes DESC;
Query ID = hadoop_20210623203213_d7f067c2-16f8-4c7d-b0a1-f4761086491e
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1624468003356_0006)

Map 1: 0/1	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 0(+1)/1	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 1/1	Reducer 2: 0(+1)/2	Reducer 3: 0/1
Map 1: 1/1	Reducer 2: 2/2	Reducer 3: 0(+1)/1
Map 1: 1/1	Reducer 2: 2/2	Reducer 3: 1/1
OK
+-------------------+--------------+
|     category      | total_likes  |
+-------------------+--------------+
| C++               | 155          |
| Data Engineering  | 100          |
| JavaScript        | 41           |
| Java              | 27           |
| Python            | 26           |
+-------------------+--------------+
5 rows selected (11.508 seconds)

Note that I'm using the CREATE TABLE IF NOT EXISTS variant of the create table statement which means I could actually execute this command and repeatedly feed the queries from the create-blogposts.hql file into Hive and the table would only be created once. However, the data within the table will actually grow each time because right now it will load in an appending manner.

For example.

beeline -u jdbc:hive2:// -f create-blogposts.hql

Then focusing only on the results of the SELECT ... query I see that the sums have actually doubled because the loaded data was appended to the table.

+-------------------+--------------+
|     category      | total_likes  |
+-------------------+--------------+
| C++               | 310          |
| Data Engineering  | 200          |
| JavaScript        | 82           |
| Java              | 54           |
| Python            | 52           |
+-------------------+--------------+

I can change the LOAD DATA ... statement to use the OVERWRITE keyword so it actually overwrites the existing data each time its executed. The updated script is saved to a new overwrite-blogposts.hql file shown below.

USE tcidemo;

CREATE TABLE IF NOT EXISTS blog_posts (
	category STRING,
	published DATE,
	likes INT,
   title STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH 'blogposts.csv' OVERWRITE INTO TABLE blog_posts;

SELECT category, sum(likes) AS total_likes
FROM blog_posts
GROUP BY category
ORDER BY total_likes DESC;

And if I focus only on the SELECT results I see I get the expected counts again.

+-------------------+--------------+
|     category      | total_likes  |
+-------------------+--------------+
| C++               | 155          |
| Data Engineering  | 100          |
| JavaScript        | 41           |
| Java              | 27           |
| Python            | 26           |
+-------------------+--------------+

In the next section I'll present one more variant of an external table which will actually utilize data stored in S3 to query against so, I once again drop my blog_posts table cleaning things up for the next section.

beeline -u jdbc:hive2:// -e "drop table tcidemo.blog_posts; drop database tcidemo;"

Creating an External Table Referencing Data Stored in S3

The last interactive example I want to present is how to create an external table in my EMR Hive cluster which references the same blogposts.csv data but, this time the source file will reside in an Amazon S3 bucket.

EMR clusters come provisioned with the AWS CLI preinstalled so I can use that to copy up the blogposts.csv file to an S3 bucket I have named tci-emr-demo as shown below.

aws s3 cp blogposts.csv s3://tci-emr-demo/inputs/blogposts/blogposts.csv

I will again save my queries to a file named create-ext-blogposts.hql which can then be submitted to Hive Server via beeline client.

CREATE DATABASE tcidemo;

USE tcidemo;

CREATE EXTERNAL TABLE IF NOT EXISTS blog_posts (
    category STRING,
    published DATE,
    likes INT,
    title STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION "s3://tci-emr-demo/inputs/blogposts/";

SELECT category, sum(likes) AS total_likes
FROM blog_posts
GROUP BY category
ORDER BY total_likes DESC;

Then submit the file of queries as shown below.

beeline -u jdbc:hive2:// -f create-ext-blogposts.hql

Yielding the following output.

Connected to: Apache Hive (version 2.3.7-amzn-4)
Driver: Hive JDBC (version 2.3.7-amzn-4)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://> CREATE DATABASE tcidemo;
OK
No rows affected (0.942 seconds)
0: jdbc:hive2://>
0: jdbc:hive2://> USE tcidemo;
OK
No rows affected (0.028 seconds)
0: jdbc:hive2://>
0: jdbc:hive2://> CREATE EXTERNAL TABLE IF NOT EXISTS blog_posts (
. . . . . . . . >     category STRING,
. . . . . . . . >     published DATE,
. . . . . . . . >     likes INT,
. . . . . . . . >     title STRING
. . . . . . . . > )
. . . . . . . . > ROW FORMAT DELIMITED
. . . . . . . . > FIELDS TERMINATED BY ','
. . . . . . . . > STORED AS TEXTFILE
. . . . . . . . > LOCATION "s3://tci-emr-demo/inputs/blogposts/";
OK
No rows affected (4.342 seconds)
0: jdbc:hive2://>
0: jdbc:hive2://> SELECT category, sum(likes) AS total_likes
. . . . . . . . > FROM blog_posts
. . . . . . . . > GROUP BY category
. . . . . . . . > ORDER BY total_likes DESC;
Query ID = hadoop_20210624022654_8df4da4e-e1cf-491c-b0e3-8eb58b97a5b7
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1624500084588_0001)

Map 1: -/-	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 0/1	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 0/1	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 0(+1)/1	Reducer 2: 0/2	Reducer 3: 0/1
Map 1: 1/1	Reducer 2: 0(+1)/2	Reducer 3: 0/1
Map 1: 1/1	Reducer 2: 2/2	Reducer 3: 0(+1)/1
Map 1: 1/1	Reducer 2: 2/2	Reducer 3: 1/1
OK
+-------------------+--------------+
|     category      | total_likes  |
+-------------------+--------------+
| C++               | 155          |
| Data Engineering  | 100          |
| JavaScript        | 41           |
| Java              | 27           |
| Python            | 26           |
+-------------------+--------------+
5 rows selected (19.102 seconds)

Ok so that is was the last interactive demo for this article on using Hive on AWS EMR but, there is one more final way of using Hive on EMR that uses the concept of an EMR Job Step which I'll present in the next section. So for the final time please drop the blog_posts table and tcidemo database before proceeding if you're following along.

beeline -u jdbc:hive2:// -e "drop table tcidemo.blog_posts; drop database tcidemo;"

Also exit from the remote SSH session so you're back on your local computer or laptop.

Running Batch Hive Processing with EMR Job Step

In this final section I demonstrate how to use the AWS CLI to submit a EMR Job Step which executes a slighly modified version of the previous Hive-QL script named create-ext-blogposts-emr-step.hql yet, in this way of doing things the file is also stored in S3 along with the input dataset. The EMR Job Step is configured to fetch the script from S3, dynamically feed the input dataset location with an INPUT variable to the script and write the query result to a specified output S3 location fed to the same script as the variable OUTPUT.

The create-ext-blogposts-emr-step.hql is shown below.

CREATE EXTERNAL TABLE IF NOT EXISTS blog_posts (
category STRING,
    published DATE,
    likes INT,
    title STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION "${INPUT}";

INSERT OVERWRITE DIRECTORY "${OUTPUT}"
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
  SELECT category, sum(likes) AS total_likes
  FROM blog_posts
  GROUP BY category
  ORDER BY total_likes DESC;

I can again use the AWS CLI from my laptop to copy this create-ext-blogposts-emr-step.hql to S3 into a bucket I have named tci-emr-demo

aws s3 cp create-ext-blogposts-emr-step.hql s3://tci-emr-demo/scripts/create-ext-blogposts-emr-step.hql

In order to submit the EMR Job Step via the CLI I need the EMR Cluster ID. I can get this from the AWS Console or using the AWS CLI as described in my How to Create Interactive AWS Elastic Map Reduce (EMR) Clusters using the AWS CLI article. For the subsequent command I will use the shell variable CLUSTER_ID to hold the value of my EMR Cluster ID.

To add and execute a EMR Job Step to an EMR cluster using the AWS CLI you use the "emr add-steps" sub-commands but, the number of arguments tends to be fairly large so I prefer to store them in a JSON file which I've named hive-step.json and shown below.

[
  {
    "Type": "HIVE",
    "Name": "Likes By Category",
    "ActionOnFailure": "CONTINUE",
    "Args": [
      "-f",
      "s3://tci-emr-demo/scripts/create-ext-blogposts-emr-step.hql",
      "-d",
      "INPUT=s3://tci-emr-demo/inputs/blogposts",
      "-d",
      "OUTPUT=s3://tci-emr-demo/outputs"
    ]
  }
]

Then I just pass the hive-step.json file to the emr add-steps command's --steps argument.

aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./hive-step.json --profile acct_a --region us-east-1

The above command will report back the Step ID of the newly added EMR Job Step which I can use in the list-steps subcommand to get the running details of the step.

aws emr list-steps --cluster-id $CLUSTER_ID --step-ids $STEP_ID

Yielding something similar to this.

{
    "Steps": [
        {
            "Id": "s-219LDPVVO18LH",
            "Name": "Likes By Category",
            "Config": {
                "Jar": "command-runner.jar",
                "Properties": {},
                "Args": [
                    "hive-script",
                    "--run-hive-script",
                    "--args",
                    "-f",
                    "s3://tci-emr-demo/scripts/create-ext-blogposts-emr-step.hql",
                    "-d",
                    "INPUT=s3://tci-emr-demo/inputs/blogposts",
                    "-d",
                    "OUTPUT=s3://tci-emr-demo/outputs"
                ]
            },
            "ActionOnFailure": "CONTINUE",
            "Status": {
                "State": "COMPLETED",
                "StateChangeReason": {},
                "Timeline": {
                    "CreationDateTime": "2021-06-24T07:54:09.844000-05:00",
                    "StartDateTime": "2021-06-24T07:54:47.243000-05:00",
                    "EndDateTime": "2021-06-24T07:55:21.436000-05:00"
                }
            }
        }
    ]
}

Lastly, I will fetch the results stored in S3 to inspect and ensure they look as expected.

aws s3 cp s3://tci-emr-demo/outputs . --recursive
cat 000000_0

Showing output.

C++,155
Data Engineering,100
JavaScript,41
Java,27
Python,26

Pretty neat right?

Terminating the EMR Cluster

Once I'm finished experimenting with my EMR cluster I immediately terminate it so I don't accrue any extra charges by having it sititng idle.

aws emr terminate-clusters --cluster-ids $CLUSTER_ID

Conclusion

In this article I provided a high-level overview of the Apache Hive distributed data warehouse and specifically cover how to utilize the Amazon EMR service to easily provision a Hive Cluster ontop of Hadoop.

As always, thanks for reading and don't be shy about commenting or critiquing below.

Share with friends and colleagues

[[ likes ]] likes

Navigation

Community favorites for Data Engineering

theCodingInterface