Introduction to Redshift using Pagila Sample Dataset Including ETL from Postgres using AWS Glue

Contents

Introduction

In this article I give a practical introductory tutorial to using Amazon Redshift as an OLAP Data Warehouse solution for the popular Pagila Movie Rental dataset. I start with a basic overview of the unique architecture Redshift uses to accomplish its scalable and robust use case as an enterprise cloud data warehouse. Then armed with this basic knowledge of Redshift architecture I move on to give a practical example of designing a schema optimal for Redshift based off the Pagila sample dataset.

In order to lower the barrier for a reader to get up and running with the technologies used in this tutorial I've provided fairly complete [Terraform Infrastructure as Code](https://www.terraform.io/) which can be used to provision resources in the AWS cloud and follow along. However, please understand that these AWS services being provisioned do cost money at a rate of somewhere between $1 - $2 USD per hour. I do demonstrate how to destroy the Terraform provisioned services as well so, definitely use them along with the Terraform docs if necessary to cleanup when you are finished to avoid unnecessary charges. For reference I've created the following AWS Pricing Calculator estimate of the charges based off the services being used. Additionally, all source code for this tutorial is available on my GitHub account in a public repo.

I conclude the article with a section on using AWS Glue for performing Extract, Transform, and Load from a Postgres compatible Aurora database loaded with the OLTP Pagila schema. I demonstrate how to export, transform, and load intermediate files into AWS S3 from the Postgres side then, finish by showing how to load the S3 files into Redshift staging tables then utilize an upsert pattern to load into the target production tables.

Overview of Redshift Architecture

Redshift is uniquely architected to allow for both vertical and horizontal scaling, execution of queries utilizing massively parallel processing, prioritization of user groups and query types, along with compression efficient and aggregation friendly columnar storage. Additionally, more recently Redshift provides for decoupled scaling of both compute and storage plus the ability to query non-standard semi / unstructured data in S3 data lakes via Redshift Spectrum.

Above is an diagram which depicts the typcial architecture and systems integration for a Redshift data warehouse cluster and, below are descriptions of key architectural components and concepts that are helpful for Data Warehouse design and programming.

Leader Node

The Leader Node of the cluster is what I conceptually think of as the brain of the cluster. This is the node that SQL clients such as BI tools and other applications connect with and submit queries to. The leader node then uses schema and storage metadata about the data distributed across, or accessible from, compute nodes to formulate query execution plans. It then parallelizes the query execution as compiled code instructuions submitted to the compute nodes which carry out the work.

Compute Nodes

In my mental model the Compute Nodes are the brawn of the cluster. These nodes accept the compiled code instructions passed down from the Leader Node (aka, the brain) and cary out the execution instructions resulting in each compute node performing a parallelized specific portions of the overall query. Each compute node executes these instructions in a parallelized manner producing their own intermediate results and passing them back up to the Leader Node which stitches the results together and passes them back to the originating client application.

Column Oriented Data Storage Blocks

Data intended for analytical processing technologies such as in Redshift, as opposed to operational or transactional processing, is often organized in columnar form. Columar data storage translates to each column's data being physically colocated as blocks on storage medium as opposed to the common row oriented storage format where rows are colocated sequentially.

The benefits achieved from storing data as blocks of specific columns are acheived from reducing the amount of I/O required to perform an analytical query. One way this is accomplished is by using optimized compression types for each colocated column's data type resulting in smaller quantities of data being read from disk. Another benefit is that most analytical queries only utilize a subset of the total columns in a table thus it is not required to read all fields of every row only to parse the specific fields requested in they query as depicted in the below diagram.

Sorted Storage

Column oriented data warehouses such as Redshift differ from row storage oriented relational database systems in that traditional indexes are not applicable. Columnar storage achieves efficient data filtering by sorting data within blocks on disk in a manger that optimizes the column, or columns, most commonly filtered or joined on.

Redshift gives you the ability to select one or multiple columns to sort the rows of a table which then map to sorted columns of data within blocks. This concept is specified using the sortkey key word for each table which will be further elaborated in the section on designing Redshift table schemas.

Zone Maps

Zone maps are an in-memory data structure that maintain metadata about each storage block's sorted data along with their flanking min an max values. This allows the query planner to optmize the query by selectively discarding complete blocks of data which have min an max values that do not intersect with the values present in a filter or join condition.

Distributed Storage

The parallel computing benefits within Redshift's cluster are achieved by distributing data across the nodes and the internal computing units (aka slices) within compute nodes. This allows work to be pushed out in a horizontally scalable fashion to a maximum of 128 compute nodes, each of which can operate on data that is either located within a specific compute node or that is shuffled to it from other compute nodes.

Redshift provides the ability to choose one of three different distribution strategies for each table you design in your schema. You specify the distribution strategy with the diststyle keyword when defining a table.

Designing Pagila Star Schema for Redshift

Redshift falls into a category of database referred to as Online Analytical Processing Systems which differ from Online Transaction Processing Systems in their design principles when it comes to creating database Schemas. In my earlier article Exploring Online Analytical Processing Databases Plus Extract, Transform and, Load in PostgreSQL I presented an example of the popular learning sample database Pagila in both standard OLTP and OLAP Star Schema designs. In this section I will translate the below Pagila Star Schema into a Redshift optimized schema design.

Sorting Strategy

Back in the section on Redshift architecture I briefly covered how storing data on disk in a sorted manner is a key tenant to how Redshift is able to optimize queries efficiency. In this section I detail the thought process and best practices for choosing appropriate sorting strategies when designing table schemas.

Redshift allows you to specify either a single column to sort table data by or a compound sorting strategy specifying multiple columns. According to the Redshift Docs it is best practice to choose columns that are frequently filtered or joined on and, when specifying multiple columns to specify the column with lowest cardinality (lowest cardinality refering to the column with the the fewest unique values) first as well as to choose four or fewer columns in compound sort keys.

For the dimension tables I have selected the traditional OLTP primary keys to be the sort keys because they are the columns that are to be joined on. Then for the sales fact table I've chosen to use a compound sort key using the date_id and the film_id foreign keys with the assumption that basically all queries will use some sort of date filter and/or grouping plus queries will often join on films.

Table Sort Key(s)
film_dim film_id
customer_dim customer_id
staff_dim staff_id
date_dim date_id
sales_facts date_id, film_id

Distribution Strategy

The next important concept for schema design to further discuss is the notion of data distribution style. The idea is to choose a strategy that increases the liklihood that queried data will be distributed enough to facilitate parallized operations evenly across the cluster while also minimizing reshuffling of data in order to perform joins and aggregations among tables.

Redshift offers four options for distributing table data across the cluster of compute nodes.

  • KEY distribution results in records of a table being distributed across the nodes of a cluster based off the hashed value of a specific field (aka column) in each record with records having the same hashed key values being colocated in the cluster which is often useful when applied join columns of tables
  • ALL distribution results in the entire table being duplicated to each node within a cluster which is useful for small to medium sized dimensional table commonly joined against larger fact tables offering benefits of minimized data shuffling (Amazon defines small to medium tables as those having less than 3 million rows)
  • EVEN distribution results in the records of a table being equally distributed across all compute nodes within the cluster and is typically the best choice when neither KEY nor ALL are suitable
  • AUTO is the final distribution choice available which enables Redshift to make the distrubtion choice for you based off the size of the table and generally results in initially choosing the ALL method for small tables then usally the EVEN method as it crosses a size threshold but may also choose KEY if query loads are indicative of that being an optimal choice.

For the dimensional tables I have selected the ALL distribution style since they are likely to be joined on often and they are well within the small to medium table size range. For the sales fact table I've selected the KEY distribtion style and specified the date_id foreign key as that is likely to be the most joined on column. This really isn't terribly influential since the date_dim table is rather small and distributed with the ALL method but, rental sales of movies are likely exhibit a naturally uniform distribution over a large timespan resulting in a reasonably even distribtion of sales facts data across the cluster.

Table Distribution Style Key
film_dim ALL --
customer_dim ALL --
staff_dim ALL --
date_dim ALL --
sales_facts KEY date_id

Below is the Data Definition Language (DD) for the final star schema for the Pagila Redshift data warehouse.

DROP TABLE IF EXISTS customer_dim;
CREATE TABLE customer_dim (
    customer_id INTEGER SORTKEY,
    first_name VARCHAR(254),
    last_name VARCHAR(254)
)
DISTSTYLE ALL;

DROP TABLE IF EXISTS film_dim;
CREATE TABLE film_dim(
    film_id INTEGER SORTKEY,
    title VARCHAR(254),
    release_year INTEGER,
    rating VARCHAR(10)
)
DISTSTYLE ALL;

DROP TABLE IF EXISTS staff_dim;
CREATE TABLE staff_dim(
    staff_id INTEGER SORTKEY,
    first_name VARCHAR(254),
    last_name VARCHAR(254)
)
DISTSTYLE ALL;

DROP TABLE IF EXISTS date_dim;
CREATE TABLE date_dim(
    date_id INTEGER SORTKEY,
    date DATE,
    year INTEGER,
    month INTEGER,
    day_of_month INTEGER,
    week_of_year INTEGER,
    day_of_week INTEGER
)
DISTSTYLE ALL;

DROP TABLE IF EXISTS sales_facts;
CREATE TABLE sales_facts(
    film_id INTEGER NOT NULL,
    customer_id INTEGER NOT NULL,
    staff_id INTEGER NOT NULL,
    date_id INTEGER NOT NULL DISTKEY,
    amount NUMERIC(5, 2)
)
COMPOUND SORTKEY (date_id, film_id);

Provisioning the Aurora Postgres OLTP Source Database

I've choosen to use carry on using Amazon's PostgreSQL compatible Aurora RDS as the source OLTP database which is also consistent with my earlier article Exploring Online Analytical Processing Databases plus Extract, Transform and, Load in PostgreSQL which used the same setup. You don't have to spend too much time on The Coding Interface to see that I have a personal affinity for the PostgreSQL database as evidence by the frequency of articles using Postgres and, in fact, the site itself is running on a PostgreSQL instance. However, my preference has been shifting slightly to Aurora Postgres in the last year or so because then I don't have to think about storage provisioning or scaling, that is automatically taken care of by Aurora!

To make things as simple as possible for a reader to interactively follow along I've provided all the necessary infrastructure for provisioning an RDS Aurora Postgres cluster along with necessary security groups and a minimal Virtual Private Cloud (VPC) network in the popular DevOps Infrastructure as Code (IaC) language Terraform and shared in a public repo on my GitHub account.

For the adventurous reader these are the steps to get the OLTP source database provisioned via Terraform and loaded with the Pagila example dataset. Please be advised that this will incur charges in AWS to use while following along with this tutorial so, be aware of this and be sure to destroy the resources when you are finished to stop incuring charges. I provide examples of how to do this at the end. See the introduction section for a pricing estimate.

Clone the repo and jump into the aurora-postgres directory.

git clone https://github.com/amcquistan/big-data-storage.git
cd big-data-storage/aurora-postgres

If you haven't already done so please install Terraform on your system as described in the Terraform Docs. Then initialize a Terraform project to work with the Terraform code in the aurora-postgres directory

terraform init

There are a few Terraform variables you will need to provide which will be specific to your AWS account and your RDS instance which I've provided a boilerplate example file named terraform.tfvars.example so, copy it as shown below and provide your values for the database password and desired AWS region to deploy to in the new terraform.tfvars file.

cp terraform.tfvars.example terraform.tfvars

Generate a Terraform plan and inspect it to get an idea of the number and type of resources being generated for you then feel free to thank me for not forcing you to endlessly click through the AWS console to generate them yourself :)

terraform plan -out "aurora-postgres.tfplan"

Lastly, apply the plan to generate the AWS Cloud infrastructure specified.

terraform apply "aurora-postgres.tfplan"

When this finishes executing you will be presented with some important info such as a database conection string as well as two subnet Ids, a VPC Id and a default security group Id which is self referencing open to all TCP traffic within the VPC.

Here is some demo output.

Outputs:

database_url = "postgresql://postgres:password@aurora-cluster-endpoint.amazonaws.com/pagila"
pubsubnet_one = "subnet-mysubnetid1"
pubsubnet_two = "subnet-mysubnetid2"
vpc_id = "vpc-myvpcid"
vpc_security_group_id = "sg-mysecuritygroupid"

You can always get this information back using the following command from the same directory as the terrform.tfstate file just generated.

terraform show

Next up is to run the following two SQL scripts from within the aurora-postgres directory which generate the Pagila schema and load it with data. Note these commands are for a unix operating systems such as Mac OSX or Linux. I am unsure of the Windows OS equivalent but, I'd be very happy to have input from readers about the equivalent Windows versions. If need be you could always spin up a Bastion Host Linux OS with Postgres Installed and run them from there, in fact, I have an article explaining how that can be done Developer's Guide to PostgreSQL on Linux: Installation.

DB_CON="postgresql://username:password@aurora-cluster-endpoint.amazonaws.com/dbname"
psql $DB_CON --file pagila-oltp-schema.sql
psql $DB_CON --file pagila-oltp-data.sql

Then for completeness you could pop into the psql shell and peek at the tables.

$ psql $DB_CON                            
psql (11.11, server 11.9)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
Type "help" for help.

pagila=> \d
                     List of relations
 Schema |            Name            |   Type   |  Owner   
--------+----------------------------+----------+----------
 oltp   | actor                      | table    | postgres
 oltp   | actor_actor_id_seq         | sequence | postgres
 oltp   | actor_info                 | view     | postgres
 oltp   | address                    | table    | postgres
 oltp   | address_address_id_seq     | sequence | postgres
 oltp   | category                   | table    | postgres
 oltp   | category_category_id_seq   | sequence | postgres
 oltp   | city                       | table    | postgres
 oltp   | city_city_id_seq           | sequence | postgres
 oltp   | country                    | table    | postgres
 oltp   | country_country_id_seq     | sequence | postgres
 oltp   | customer                   | table    | postgres
 ...

Provisioning the Redshift Data Warehouse Cluster

Following in the same manner as the last section I've again provided the necessary Terraform Infrastructure as Code to spin up a small two node cluster of ds2.large instance class. Again, if you choose to follow along interactively using these AWS resources please be advised this Redshift cluster will cost around $0.50 in USD per hour so, see the AWS Pricing Calculator estimate I provided in the Introduction section.

To deploy using the provided Terraform code you need to be in the redshift directory of the repo cloned in the Provisioning Aurora Postgres section.

Start by initializing the Terraform project.

terraform init

Again, there are a few Terraform variables you will need to provide which will be specific to your AWS account and the Aurora/RDS instance you spun up in the last section. I've provided a boilerplate example file to facilitate this named terraform.tfvars.example so, copy it as shown below and provide your values in your specific terraform.tfvars file. Most of these values originate from the Terraform output in the last section.

cp terraform.tfvars.example terraform.tfvars

Here is some example output based off the dummy data I provided as output in the Provisioning Aurora Postgres section.

redshift_dbpasswd      = "Passw0rd"
redshift_dbname        = "redshift"

postgres_dbhost        = "aurora-cluster-endpoint.amazonaws.com"
postgres_dbname        = "pagila"
postgres_dbuser        = "postgres"
postgres_dbpasswd      = "password"

redshift_subnet_ids    = [
  "subnet-mysubnetid1",
  "subnet-mysubnetid2"
]
cluster_type           = "multi-node"
node_count             = 2
skip_final_snapshot    = true
vpc_id                 = "vpc-myvpcid"
redshift_cidr          = "0.0.0.0/0"
vpc_security_group_id  = "sg-mysecuritygroupid"

Its probably worth noting that these settings along with the ones for the Aurora Postgres provisioning leave both databases publically reachable. This is almost certainly not what you want to do in a production environment so please excercise intelligent thinking when using this example code.

At this point I can generate my Terraform Plan like so.

terraform plan -out "redshift.tfplan"

After creating the plan you can then apply it to provision the Redshift cluster along with some AWS Glue resources that I'll use to demonstrate an Extract, Transform, and Load (ETL) workflow later.

terraform plan -out "redshift.tfplan"

Again, at the conclusion of executing the last command you'll be given some useful output information such as a Redshift PostgreSQL client formatted database connection string, an IAM Role that will be needed to perform the S3 to Redshift COPY command, and the name of a bucket the name of an S3 bucket that will be used for ETL later. I definitely invite you to have a read through the Terraform code even if you don't know Terraform because it is still quite readable.

Here is some demo output.

Outputs:

database_url = "postgresql://userame:password@redshift-connection-url.amazonaws.com:5439/pagila"
redshift_iam_role_arn = "arn:aws:iam::youraccountid:role/terraform-20210399994854307100000001"
s3_bucket = "glue-etl20210304014887659000000003"

Remember you can get this output again using the show command in Terraform like so.

terraform show

At this point I can now generate the Redshift Pagila Data Warehouse Star Schema discussed earlier and saved in the scripts/redshift-pagila-dw-schema.sql file

REDSHIFT_CON=postgresql://userame:password@redshift-connection-url.amazonaws.com:5439/pagila
psql $REDSHIFT_CON --file scripts/redshift-pagila-dw-schema.sql

In the event you want to check the characteristics of particular table in your Redshift database schema, which you will likely do often, the below query can be quite useful.

$ psql $REDSHIFT_CON
psql (11.11, server 8.0.2)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
Type "help" for help.

pagila=# SELECT "column", type, encoding, distkey, sortkey
pagila-# FROM pg_table_def WHERE tablename = 'sales_facts';
   column    |     type     | encoding | distkey | sortkey 
-------------+--------------+----------+---------+---------
 film_id     | integer      | none     | f       |       2
 customer_id | integer      | az64     | f       |       0
 staff_id    | integer      | az64     | f       |       0
 date_id     | integer      | none     | t       |       1
 amount      | numeric(5,2) | az64     | f       |       0
(5 rows)

Automatic Compression Selection via Redshift COPY on Initial Table Load

The Redshift docs states the optimal, column specific, compression selection strategy is to allow Redshift to empirically determine compression encodings while loading empty tables using the COPY command. Therefore, in this section I will demonstrate how to export and transform the OLTP Pagila tables from Aurora Postgres to AWS S3 which will be used in the initial load of empty Redshift tables with automatic compression selection turned on.

There are two Dev/Ops type tasks that need taken care of before one can export table data from RDS Postgres / Aurora Postgres into S3. First, is to supply an IAM Role to the database instance (or cluster in Aurora's case) that gives RDS the ability to write to S3. I've accomplished this using the aws_iam_role and aws_iam_role_policy_attachment resources from the AWS Terraform provider along with an unfortuate hacky solution utilizing a local execution provisioner with the AWS CLI which is necessary do to a shortcoming in the Terraform AWS provider.

resource "aws_iam_role" "rds_s3" {
    assume_role_policy = jsonencode({
        Version   = "2012-10-17"
        Statement = [
            {
                Action = "sts:AssumeRole"
                Effect = "Allow"
                Sid    = ""
                Principal = {
                    Service = "rds.amazonaws.com"
                }
            }
        ]
    })
    path         = "/"
}

resource "aws_iam_role_policy_attachment" "rds_managed_policies" {
    policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
    role       = aws_iam_role.rds_s3.name
}

resource "aws_rds_cluster" "postgresql" {
    # ommiting part of the IaC for brevity ...

    lifecycle {
        ignore_changes = [ iam_roles ]
    }

    provisioner "local-exec" {
        command = "aws rds add-role-to-db-cluster --db-cluster-identifier ${aws_rds_cluster.postgresql.id} --role-arn ${aws_iam_role.rds_s3.arn} --feature-name s3Export --region ${var.region}"
    }
}

If you've been following along interactively then you've already taken care of this back in the section on Provisioning the Aurora Postgres OLTP Source Database but, I still wanted to point it out as it's key to understanding how to export data from Aurora Postgres.

The second task that needs accomplished is to install the Postgres aws_s3 extension on the Aurora Postgres cluster.

CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE;

However, I'll package that up in a reusable DDL SQL script rather than running it one off with a PostgreSQL client.

The aws_s3 Postgres extension makes it easy to export a data set generated as a result of a SELECT statement and have it saved to an S3 bucket. Below is an example of how it can be used to export the customer table table using a simple SELECT statement and save it to S3.

PERFORM aws_s3.query_export_to_s3(
    'SELECT customer_id, first_name, last_name FROM oltp.customer ORDER BY customer_id',
    'mys3-bucket',
    'customers.csv',
    'us-east-2',
     options:='format csv'
 );

The above usage example of the query_export_to_s3(...) function will work fine for many tables but, for those with more complex queries I have found it is useful to abstract complex or otherwise long SELECT statements into their own functions that return a TABLE result type and use it in conjunction with the query_export_to_s3(...) extension function. For example, the below code snippet shows a more complicated query which generates the sales_facts table resultset and returns that as a TABLE from one function named get_sales_facts() which I then can use in the SELECT query for the query_export_to_s3(...) function like so.

CREATE FUNCTION oltp.get_sales_facts() 
  RETURNS TABLE (film_id INTEGER,
                customer_id INTEGER,
                staff_id INTEGER,
                date_id INTEGER,
                amount NUMERIC(5, 2))
  LANGUAGE plpgsql
AS $$
  BEGIN
    RETURN QUERY 
            SELECT DISTINCT i.film_id, 
                  p.customer_id, 
                  p.staff_id, 
                  TO_CHAR(payment_date, 'yyyymmdd')::INT AS date_id, 
                  p.amount
            FROM oltp.payment p
                JOIN oltp.rental r ON p.rental_id = r.rental_id
                JOIN oltp.inventory i ON r.inventory_id = i.inventory_id
            ORDER BY date_id, i.film_id;
END; $$;

PERFORM aws_s3.query_export_to_s3(
    'SELECT * FROM oltp.get_sales_facts()',
    'mys3-bucket',
    'sales_facts.csv',
    'us-east-2',
    options :='format csv'
);

The aws_s3 extension docs are already pretty example rich so I suggest readers have a look at those for digging further into the usage of the extension. However, for completeness I have listed the contents of pagila-oltp-export-to-s3.sql script below which generates a stored procedure that will export all tables to an S3 bucket fed in as a stored proc parameter.

-- pagila-oltp-export-to-s3.sql


SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;

SELECT pg_catalog.set_config('search_path', 'public', false);

CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE;

SELECT pg_catalog.set_config('search_path', '', false);

DROP FUNCTION IF EXISTS oltp.get_sales_facts;
CREATE FUNCTION oltp.get_sales_facts() 
  RETURNS TABLE (film_id INTEGER,
                customer_id INTEGER,
                staff_id INTEGER,
                date_id INTEGER,
                amount NUMERIC(5, 2))
  LANGUAGE plpgsql
AS $$
  BEGIN
    RETURN QUERY 
            SELECT DISTINCT i.film_id, 
                  p.customer_id, 
                  p.staff_id, 
                  TO_CHAR(payment_date, 'yyyymmdd')::INT AS date_id, 
                  p.amount
            FROM oltp.payment p
                JOIN oltp.rental r ON p.rental_id = r.rental_id
                JOIN oltp.inventory i ON r.inventory_id = i.inventory_id
            ORDER BY date_id, i.film_id;
END; $$;

DROP FUNCTION IF EXISTS oltp.get_date_dim;
CREATE FUNCTION oltp.get_date_dim() 
  RETURNS TABLE (date_id INTEGER,
                date DATE,
                year INTEGER,
                month INTEGER,
                day_of_month INTEGER,
                week_of_year INTEGER,
                day_of_week INTEGER)
LANGUAGE plpgsql
AS $$
  BEGIN
  RETURN QUERY
      SELECT TO_CHAR(date_seq, 'YYYYMMDD')::INT AS date_id,
              date_seq AS date,
              EXTRACT(ISOYEAR FROM date_seq)::INTEGER AS year,
              EXTRACT(MONTH FROM date_seq)::INTEGER AS month,
              EXTRACT(DAY FROM date_seq)::INTEGER AS day_of_month,
              EXTRACT(WEEK FROM date_seq)::INTEGER AS week_of_year,
              EXTRACT(ISODOW FROM date_seq)::INTEGER AS day_of_week
      FROM (SELECT '2010-01-01'::DATE + SEQUENCE.DAY AS date_seq
              FROM GENERATE_SERIES(0, 5000) AS SEQUENCE(DAY)
              ORDER BY date_seq) DS;
END; $$;

DROP PROCEDURE IF EXISTS oltp.export_s3_staging_tables;
CREATE PROCEDURE oltp.export_s3_staging_tables(s3_bucket VARCHAR, aws_region VARCHAR)
  LANGUAGE plpgsql
  AS $$
  BEGIN
    RAISE NOTICE 'Exporting staging tables to S3 %', s3_bucket;

    PERFORM aws_s3.query_export_to_s3(
        'SELECT customer_id, first_name, last_name FROM oltp.customer ORDER BY customer_id',
        s3_bucket,
        'customers.csv',
        aws_region,
        options :='format csv'
    );

    PERFORM aws_s3.query_export_to_s3(
        'SELECT film_id, title, release_year, rating FROM film ORDER BY film_id',
        s3_bucket,
        'films.csv',
        aws_region,
        options :='format csv'
    );

    PERFORM aws_s3.query_export_to_s3(
        'SELECT staff_id, first_name, last_name FROM staff ORDER BY staff_id',
        s3_bucket,
        'staff.csv',
        aws_region,
        options :='format csv'
    );

    PERFORM aws_s3.query_export_to_s3(
        'SELECT * FROM oltp.get_date_dim()',
        s3_bucket,
        'dates.csv',
        aws_region,
        options :='format csv'
    );

    PERFORM aws_s3.query_export_to_s3(
        'SELECT * FROM oltp.get_sales_facts()',
        s3_bucket,
        'sales_facts.csv',
        aws_region,
        options :='format csv'
    );
  END;
  $$;

Before running this export_s3_staging_tables(...) stored procedure to generate the initial data to be loaded into a Redshift cluster I need a S3 bucket to save the CSV files to. This is easily accomplished in the AWS S3 Console or via the AWS CLI as shown below. Feel free to use any region that makes sense for where you are geographically. You could also use the S3 bucket that was generated by the Terraform code when the Redshift cluster and Glue resources were provisioned if you like. Make sure to use a bucket name unique for you as the one below will have already been taken.

aws s3 mb s3://tci-pagila-data --region us-east-2

I execute the pagila-oltp-export-to-s3.sql file with the psql client like so then run the export_s3_staging_tables(...) stored procedure within the psql shell passing in the newly created bucket name along with associated region.

$ DB_CON="postgresql://username:password@aurora-cluster-endpoint.amazonaws.com/dbname"
$ psql $DB_CON --file pagila-oltp-export-to-s3.sql
$ psql $DB_CON
psql (11.11, server 11.9)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
Type "help" for help.

pagila=> CALL export_s3_staging_tables('tci-pagila-data', 'us-east-2');
NOTICE:  Exporting staging tables to S3 tci-pagila-data

With the CSV data exports now nicely landed into S3 I can perform my initial data load into the Redshift tables. To do this I've included a COPY statement for each table / dataset pair in a SQL script shown below which accepts psql CLI parameters for each table's dataset path in S3 along with one for the IAM Role created and output in the Terraform code during Redshift cluster provisioning earlier.

-- initial-redshift-load.sql

COPY customer_dim FROM :customers_s3 
IAM_ROLE :iam_role 
DELIMITER ','
COMPUPDATE ON;

COPY film_dim FROM :films_s3 
IAM_ROLE :iam_role 
DELIMITER ','
COMPUPDATE ON;

COPY staff_dim FROM :staff_s3 
IAM_ROLE :iam_role 
DELIMITER ','
COMPUPDATE ON;

COPY date_dim FROM :dates_s3 
IAM_ROLE :iam_role 
DELIMITER ','
COMPUPDATE ON;

COPY sales_facts FROM :sales_s3 
IAM_ROLE :iam_role 
DELIMITER ','
COMPUPDATE ON;

Notice how each COPY statement has the COMPUPDATE ON component to it. This tells Redshift to empirically determine the best compression method for each column based off the schema definition and the data being loaded.

To execute the above load script run the following. Again, these commands were ran on a Unix style OS so they will need modified slightly if ran on a Windows OS. Also, be sure to fill in your specific values for the Redshift connection string, IAM role, as well as S3 bucket name and paths. Also pay close attention to the alternating double then single quotes around the parameters.

REDSHIFT_CON=postgresql://userame:password@redshift-connection-url.amazonaws.com:5439/pagila
psql $REDSHIFT_CON --file scripts/initial-redshift-load.sql \
  -v iam_role="'arn:aws:iam::youraccountid:role/terraform-20210399994854307100000001'" \
  -v customers_s3="'s3://tci-pagila-data/customers.csv'" \
  -v films_s3="'s3://tci-pagila-data/films'" \
  -v staff_s3="'s3://tci-pagila-data/staff.csv'" \
  -v dates_s3="'s3://tci-pagila-data/dates.csv'" \
  -v sales_s3="'s3://tci-pagila-data/sales_facts.csv'"

I can take a look at the selected compression encoding methods with the following query.

pagila=# SELECT "column", type, encoding, distkey, sortkey
FROM pg_table_def WHERE tablename = 'sales_facts';
   column    |     type     | encoding | distkey | sortkey 
-------------+--------------+----------+---------+---------
 film_id     | integer      | none     | f       |       2
 customer_id | integer      | none     | f       |       0
 staff_id    | integer      | none     | f       |       0
 date_id     | integer      | none     | t       |       1
 amount      | numeric(5,2) | none     | f       |       0
(5 rows)

You may have noticed the significant change since the initial values that where selected when the schema was initial created. This is due to the fact that the dataset itself is just rather small and Redshift requires at least 100,000 rows per table to determine compression. If you find yourself in this situation you can and should run ANALYZE COMPRESSION in order to force Redshift to take a second look for proper compression encoding strategy.

ANALYZE COMPRESSION;
    Table     |    Column    | Encoding | Est_reduction_pct 
--------------+--------------+----------+-------------------
 customer_dim | customer_id  | raw      | 0.00
 customer_dim | first_name   | raw      | 0.00
 customer_dim | last_name    | raw      | 0.00
 film_dim     | film_id      | raw      | 0.00
 film_dim     | title        | raw      | 0.00
 film_dim     | release_year | raw      | 0.00
 film_dim     | rating       | raw      | 0.00
 staff_dim    | staff_id     | raw      | 0.00
 staff_dim    | first_name   | raw      | 0.00
 staff_dim    | last_name    | raw      | 0.00
 date_dim     | date_id      | raw      | 0.00
 date_dim     | date         | raw      | 0.00
 date_dim     | year         | raw      | 0.00
 date_dim     | month        | raw      | 0.00
 date_dim     | day_of_month | raw      | 0.00
 date_dim     | week_of_year | raw      | 0.00
 date_dim     | day_of_week  | raw      | 0.00
 sales_facts  | film_id      | raw      | 0.00
 sales_facts  | customer_id  | raw      | 0.00
 sales_facts  | staff_id     | raw      | 0.00
 sales_facts  | date_id      | raw      | 0.00
 sales_facts  | amount       | raw      | 0.00
(22 rows)

Here the outout for each column shows that the RAW encoding type has been suggested. The Raw encoding is best for sort keys and also appears to be suggested when tables have a relatively small number of rows generally identified as those having fewer than 100,000 rows which is true for the tables in this example. If you meet this criteria then its likely that you either don't really need the hefty computing power of Redshift or you just need to wait for your data volume to grow which in this case you should rerun the ANALYZE COMPRESSION command and recreate tables with the suggested encodings or truncate and reload them with the COPY command while specifying COMPUPDATE ON.

Glue ETL Workflow from Postgres to S3 then Ending in Redshift

Alright, so by this point I've discussed the key architectural concepts that position Redshift to be solid Cloud Data Warehouse solution, given an example of the thought process that should be used when designing a typical star schema for a Redshift database and, demonstrated how to get initial data out of a Postgres database and into the example Pagila Star Schema. However, you are almost certain to have to continue to feed new operational data from your OLTP source databases such as Postgres into your Redshift Data Warehouse as the business continues to operate and grow. For this reason I've included a section on implementing an example ETL workflow using Glue to keep data synced from the Aurora Postgres database into the Redshift Data Warehouse used in this tutorial.

The below diagram depicts the general flow of data that will occur consisting of a scheduled Glue Python Shell job executing the previously demonstrated export_s3_staging_tables(...) Postgres stored procedure to land intermediate CSV data files into S3. Then use the other previously demonstrated Redshift COPY commands to load those CSV data files from S3 into Redshift temporary tables before performing a common upsert procedure to refresh the production tables.

There are a few important Glue concepts that I would like to call out that I am utilizing in this ETL workflow. First I am utilizing a scheduled Glue Trigger to initiate a Glue ETL Python Shell Job at 1:01 AM every day. Aside from those Glue resources a very important aspect of this implementation are the parameters that are being passed into the Glue Python Shell ETL job. Specifying parameters when defining the Glue Job like this passes them though to the underlying Python script making the implementation flexible. Using parameters like this I can easily iterate on the ETL workflow in different stages of software development lifecycle and just pass different parameters specific to development, testing or production environments.

resource "aws_glue_job" "etl" {
    depends_on = [ aws_s3_bucket.glue_bucket, aws_iam_role.glue_default, aws_glue_connection.glue_connection ]
    role_arn          = aws_iam_role.glue_default.arn
    name              = "postgres-to-redshift-etl"
    glue_version      = "1.0"
    max_capacity      = 1
    max_retries       = 2
    timeout           = 10
    default_arguments = {
        "--s3_bucket"                       = aws_s3_bucket.glue_bucket.id
        "--aws_region"                      = var.region
        "--iam_role"                        = aws_iam_role.redshift.arn
        "--redshift_dbname"                 = var.redshift_dbname
        "--redshift_dbhost"                 = aws_redshift_cluster.redshift.endpoint
        "--redshift_dbuser"                 = var.redshift_dbuser
        "--redshift_dbpasswd"               = var.redshift_dbpasswd
        "--postgres_dbname"                 = var.postgres_dbname
        "--postgres_dbhost"                 = var.postgres_dbhost
        "--postgres_dbuser"                 = var.postgres_dbuser
        "--postgres_dbpasswd"               = var.postgres_dbpasswd
    }

    command {
      name            = "pythonshell"
      script_location = "s3://${aws_s3_bucket.glue_bucket.id}/${var.etl_script_s3_key}"
      python_version  = "3"
    }
}

resource "aws_glue_trigger" "etl_tgr" {
    name     = "postgres-to-redshift-etl"
    type     = "SCHEDULED"
    schedule = "cron(1 1 * * ? *)"
    actions {
        job_name = aws_glue_job.etl.name
    }
}

Below is the Glue Python Shell Job's source code that you can find in the repo under the redshift/scripts directory as a file named etl.py which establishes a connection to the Aurora Postgres database and executes the export_s3_staging_tables(...) stored procedure introducted earlier specifying which S3 bucket to export to based off the job parameters for bucket and region. Then the Python Shell job connects to the Redshift Data Warehouse, creates temporary tables for each table to be loaded, fills each temp table with the data just exported to S3 using the COPY statement explicitly stating not to update compression which speeds loading. Lastly, an upsert technique is applied where rows in the production tables matching those in the newly loaded temp tables are deleted so that any potential newly updated rows in the source database will replace them, plus of course, new records get inserted as well.

# etl.py

import logging
import sys

from pgdb import connect

from awsglue.utils import getResolvedOptions

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def main(args):
    # extract / transform data from Postgres and load to S3
    con, cur = None, None
    try:
        con = connect(
                database=args['postgres_dbname'],
                host=args['postgres_dbhost'],
                user=args['postgres_dbuser'],
                password=args['postgres_dbpasswd']
        )
        cur = con.cursor()
        cur.execute('CALL export_s3_staging_tables(%s, %s)', (args['s3_bucket'], args['aws_region']))
    except Exception as e:
        logger.error({
            'resource': __file__,
            'message_type': 'error',
            'message': str(e)
        })
        return e
    finally:
        if cur is not None:
            cur.close()
        if con is not None:
            con.close()

    # load data from S3 into Redshift staging tables
    con, cur = None, None
    try:
        con = connect(
                database=args['redshift_dbname'],
                host=args['redshift_dbhost'],
                user=args['redshift_dbuser'],
                password=args['redshift_dbpasswd']
        )
        cur = con.cursor()

        cur.execute("CREATE TEMP TABLE tmp_customer_dim (LIKE customer_dim)")
        cur.execute(
            "COPY tmp_customer_dim FROM %s IAM_ROLE %s DELIMITER ',' COMPUPDATE OFF", (
            "s3://{}/customers.csv".format(args['s3_bucket']),
            args['iam_role']
        ))
        cur.execute("""DELETE FROM customer_dim c
                        USING tmp_customer_dim t
                        WHERE c.customer_id = t.customer_id""")
        cur.execute("INSERT INTO customer_dim SELECT * FROM tmp_customer_dim")

        cur.execute("CREATE TEMP TABLE tmp_film_dim (LIKE film_dim)")
        cur.execute(
            "COPY tmp_film_dim FROM %s IAM_ROLE %s DELIMITER ',' COMPUPDATE OFF", (
            "s3://{}/films.csv".format(args['s3_bucket']),
            args['iam_role']
        ))
        cur.execute("""DELETE FROM film_dim f
                        USING tmp_film_dim t
                        WHERE f.film_id = t.film_id""")
        cur.execute("INSERT INTO film_dim SELECT * FROM tmp_film_dim")

        cur.execute("CREATE TEMP TABLE tmp_staff_dim (LIKE staff_dim)")
        cur.execute(
            "COPY tmp_staff_dim FROM %s IAM_ROLE %s DELIMITER ',' COMPUPDATE OFF", (
            "s3://{}/staff.csv".format(args['s3_bucket']),
            args['iam_role']
        ))
        cur.execute("""DELETE FROM staff_dim s
                        USING tmp_staff_dim t
                        WHERE s.staff_id = t.staff_id""")
        cur.execute("INSERT INTO staff_dim SELECT * FROM tmp_staff_dim")

        cur.execute("CREATE TEMP TABLE tmp_date_dim (LIKE date_dim)")
        cur.execute(
            "COPY tmp_date_dim FROM %S IAM_ROLE %s DELIMITER ',' COMPUPDATE OFF", (
            "s3://{}/dates.csv".format(args['s3_bucket']),
            args['iam_role']
        ))
        cur.execute("""DELETE FROM date_dim d
                        USING tmp_date_dim t
                        WHERE d.date_id = t.date_id""")
        cur.execute("INSERT INTO date_dim SELECT * FROM tmp_date_dim")

        cur.execute("CREATE TEMP TABLE tmp_sales_facts (LIKE sales_facts)")
        cur.execute(
            "COPY tmp_sales_facts FROM %s IAM_ROLE %s DELIMITER ',' COMPUPDATE OFF", (
            "s3://{}/sales_facts.csv".format(args['s3_bucket']),
            args['iam_role']
        ))
        cur.execute("""DELETE FROM sales_facts s
                        USING tmp_sales_facts t
                        WHERE s.film_id = t.film_id
                            AND s.custoemr_id = t.film_id
                            AND s.staff_id = t.staff_id
                            AND s.date_id = t.date_id""")
        cur.execute("INSERT INTO sales_facts SELECT * FROM tmp_sales_facts")

        con.commit()
    except Exception as e:
        logger.error({
            'resource': __file__,
            'message_type': 'error',
            'message': str(e)
        })
        return e
    finally:
        if cur is not None:
            cur.close()
        if con is not None:
            con.close()


if __name__ == '__main__':
    expected_args = [
        "s3_bucket",
        "aws_region",
        "iam_role",
        "redshift_dbname",
        "redshift_dbhost",
        "redshift_dbuser",
        "redshift_dbpasswd",
        "postgres_dbname",
        "postgres_dbhost",
        "postgres_dbuser",
        "postgres_dbpasswd"
    ]
    args = getResolvedOptions(sys.argv, expected_args)
    main(args)

You can either run this manually via the AWS Glue Jobs page within the AWS Management Console or wait until 1:01 AM.

Tearing Down the Infrastructure

If you've been following along and provisioned these AWS resources to experiment with, when you no longer want to be incuring charges for these resources, then you should run the following commands in the redshift directory then in the aurora-postgres directory after the redshift resources are fully destroyed

terraform destroy

Each will take a few minutes to complete.

Conclusion

In this article I have provided a practical introduction to the Amazon Redshift Cloud Data Warehouse covering it's architecture, thought process necessary to build performant table schemas, and even an example ETL workflow for synchronizing data from a Postgres database into the Redshift Data Warehouse.

As always, thanks for reading and please do not hesitate to critique or comment below.

Share with friends and colleagues

[[ likes ]] likes

Navigation

Community favorites for Data Engineering

theCodingInterface