Serverless Conversions From GZip to Parquet Format with Python AWS Lambda and S3 Uploads

Introduction

A common task in the life of a Data Engineer is to convert data files between different formats in response to some event like an upload to an AWS S3 bucket. An especially common conversion I've found myself doing has been converting GZip compressed text files into the columnar Apache Parquet format facilitating downstream analytical processing. In this article I demonstrate using a Python based AWS Lambda SAM project with the AWS Data Wrangler Lambda Layer to perform data format translation from GZipped JSON files into Parquet upon an S3 upload event.

Serverless Project Setup

For readers wishing to follow along you'll need to have the AWS CLI and AWS SAM CLI installed so, if you haven't done so already please use the following AWS SAM Getting Started instructions from the AWS docs.

First off I will create an AWS SAM Python project using the SAM CLI and the sample Hello World template.

$ sam init --name sam-s3-parquet-converter \
   --runtime python3.8 \
   --app-template hello-world \
   --package-type Zip

Cloning app templates from https://github.com/aws/aws-sam-cli-app-templates

    -----------------------
    Generating application:
    -----------------------
    Name: sam-s3-parquet-converter
    Runtime: python3.8
    Dependency Manager: pip
    Application Template: hello-world
    Output Directory: .

    Next steps can be found in the README file at ./sam-s3-parquet-converter/README.md

Next I delete the hello_world example Lambda directory and replace it with new src/s3_parquet and src/awsdatawrangler directories.

cd sam-s3-parquet-converter
rm -r hello_world
mkdir -p src/s3_parquet src/awsdatawrangler

Adding AWS Data Wrangler Layer

To seemlessly handle the conversion to Parquet format I'll utilize the AWS Data Wrangler toolset to do the heavy lifting. To gain access to the AWS Data Wrangler toolset I create a dedicated Lambda Layer using the Python 3.8 release of AWS Data Wrangler. If you are new to AWS Lambda Layers please have a look at my previous article Keeping Python AWS Serverless Apps DRY with Lambda Layers where I explore them in depth but, for a high-level understanding they are simply a way to package reusable code for you Lambda Functions in AWS.

In the src/awsdatawrangler directory I download the release using the HTTPie HTTP client (of course you could use curl or a browser also).

http --download https://github.com/awslabs/aws-data-wrangler/releases/download/2.6.0/awswrangler-layer-2.6.0-py3.8.zip \
  --output src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip

Then over in the template.yaml file I clear out the existing boilerplate sections for Globals, Resources, and Output replacing it with the following SAM/CloudFormation definitions to generate the new versioned Lambda Layer as shown below.

# template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  sam-s3-parquet-converter

  Sample SAM Template for sam-s3-parquet-converter

Resources:
  AwsDataWranglerLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: AWSDataWrangler
      Description: AWS Data Wrangler Integration Libraries
      ContentUri: src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip
      CompatibleRuntimes:
        - python3.8
      RetentionPolicy: Retain

Outputs:
  AwsDataWranglerLayer:
    Value: !Ref AwsDataWranglerLayer
    Description: ARN For AWS Data Wrangler Layer

Next I build and deploy the project using the SAM CLI commands shown below ran from the same directory as the template.yaml file.

sam build --use-container
sam deploy --guided --stack-name sam-s3-parquet-converter

Adding the S3 Bucket for Object File Storage

I can now move on to building out more of the application by adding an S3 bucket to house the files to be converted. The SAM framework makes this incredibly easy to accomplish with a simple configuration definition in the SAM / CloudFormation template.yaml file using an S3 Bucket resource as shown below.

# template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  sam-s3-parquet-converter

  Sample SAM Template for sam-s3-parquet-converter

Resources:
  AwsDataWranglerLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: AWSDataWrangler
      Description: AWS Data Wrangler Integration Libraries
      ContentUri: src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip
      CompatibleRuntimes:
        - python3.8
      RetentionPolicy: Retain

  DemoBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: tci-demo-parquet

Outputs:
  AwsDataWranglerLayer:
    Value: !Ref AwsDataWranglerLayer
    Description: ARN For AWS Data Wrangler Layer

  DemoBucket:
    Value: !Ref DemoBucket

You will want to change the name of your S3 bucket to make it globally unique and different from what I've used.

Again I build and deploy this resource using the SAM CLI as shown below.

sam build --use-container
sam deploy

Adding the Lambda Function to Perform Parquet Conversion on S3 File Upload Event

As I mentioned previously, for the conversion to Parquet I am utilizing the AWS Data Wrangler toolset to convert some demo JSON stock data in GZip format to Parquet using a Pandas DataFrame as an intermediate data structure. To start progressing in this direction I add two new files in the s3_parquet directory, one for the app.py Python source of the AWS Lambda function and another for an empty requirements.txt file that SAM CLI requires for all Python based Lambda Functions.

The app.py is where all the logic for this data format conversion and shuffle from raw-uploads S3 directory to parquet directory will occur using these basic steps.

  1.  AWS Lambda will get invoked when a file is uploaded to the raw-uploads directory of S3 bucket
  2. AWS Lambda will get passed a dictionary data structure containing a record for each S3 object created
  3. Python code parses each event record to assertain the s3 bucket and key
  4. Python code fetches each object from S3
  5. Python code decodes the S3 object bytes using the gzip module then json module to get a dictionary of the demo stock data being used
  6. Python code converts the daily stock metrics into a Pandas DataFrame
  7. Python code coverts the DataFrame to Parquet and saves to S3 using AWS Data Wrangler library
# app.py

import boto3

import gzip
import json
import logging
import os
import urllib.parse

import awswrangler
import pandas as pd


logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
    logger.info({'event': event})

    for record in event['Records']:
        s3_bucket = record['s3']['bucket']['name']
        s3_key = urllib.parse.unquote_plus(record['s3']['object']['key'], encoding='utf-8')

        s3 = boto3.client('s3')
        obj = s3.get_object(Bucket=s3_bucket, Key=s3_key)

        try:
            with gzip.GzipFile(fileobj=obj['Body']) as gz:
                data = json.loads(gz.read().decode())
        except Exception as e:
            logger.error({'error': str(e), 'message': 'failed gzip decoding'})
            raise e

        filename, _ = os.path.splitext(os.path.split(s3_key)[-1])

        df = pd.DataFrame(data)

        s3_path = 's3://{}/parquet/{}.parquet'.format(s3_bucket, filename)
        try:
            awswrangler.s3.to_parquet(df, dataset=False, path=s3_path, index=False)
        except Exception as e:
            logger.error({'error': str(e), 'message': 'Failed saving Parquet to S3'})
            raise e

I now add two more resources to my SAM Template file. The first one is for the Lambda function complete with an S3 Event mapping rules that triggers it if files are uploaded to a raw-uploads directory as well as an IAM policy allowing it to read and write from the previously generated S3 bucket. The second resource is a CloudWatch log group for capturing logs from the lambda.

# template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  sam-s3-parquet-converter

  Sample SAM Template for sam-s3-parquet-converter

Resources:
  AwsDataWranglerLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: AWSDataWrangler
      Description: AWS Data Wrangler Integration Libraries
      ContentUri: src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip
      CompatibleRuntimes:
        - python3.8
      RetentionPolicy: Retain

  DemoBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: tci-demo-parquet

  ConversionFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/s3_parquet/
      Handler: app.lambda_handler
      Runtime: python3.8
      Timeout: 300
      MemorySize: 512
      Layers:
        - !Ref AwsDataWranglerLayer
      Policies:
        - Version: 2012-10-17
          Statement:
            - Effect: Allow
              Action: 
                - 's3:GetObject*'
                - 's3:PutObject'
              Resource: !Sub "arn:aws:s3:::tci-demo-parquet/*"
      Events:
        SegmentEventLogUpload:
          Type: S3
          Properties:
            Bucket: !Ref DemoBucket
            Events: s3:ObjectCreated:*
            Filter:
              S3Key:
                Rules:
                  - Name: prefix
                    Value: raw-uploads/

  ConversionFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    DependsOn: ConversionFunction
    Properties:
      LogGroupName: !Sub "/aws/lambda/${ConversionFunction}"
      RetentionInDays: 7

Outputs:
  AwsDataWranglerLayer:
    Value: !Ref AwsDataWranglerLayer
    Description: ARN For AWS Data Wrangler Layer

  DemoBucket:
    Value: !Ref DemoBucket

With this new Python Lambda code and SAM configuration in place I can again build and deploy.

sam build --use-container
sam deploy

Testing the Functionality

With the Lambda function nicely wrapped into an IaC template backed SAM project and deployed to the AWS Cloud I can finally test it out. I have an example Stocks dataset already GZipped representing the Coke beverage company which you can download then subsequently upload to the raw-uploads directory of the S3 bucket thus triggering the conversion.

First, I download the GZip dataset using an HTTP Client like HTTPie, curl, or even your browser. I'll use HTTPie as follows.

http --download https://thecodinginterface-images.s3.amazonaws.com/blogposts/serverless-parquet-data-converter/COKE.gz

Then I use the AWS CLI to upload this file to the raw-uploads directory of the S3 bucket. Be sure to use the same S3 bucket name you used because mine will likely be already taken.

aws s3 cp COKE.gz s3://tci-demo-parquet/raw-uploads/COKE.gz

After waiting a few seconds I then head over to the S3 service page for my S3 bucket in the AWS Console where I see there are now two directories: raw-uploads and parquet. If I click the parquet link I see the converted Parquet data as shown below.

Resources to Learn More About Serverless Architecture and Computing

thecodinginterface.com earns commision from sales of linked products such as the book suggestions above. This enables providing high quality, frequent, and most importantly FREE tutorials and content for readers interested in Software Engineering so, thank you for supporting the authors of these resources as well as thecodinginterface.com

Conclusion

In this article I demonstrated how to use AWS SAM to build a event driven serverless based file data converter that is invoked upon uploading of a GZip JSON data file to S3 and results in a conversion to Parquet data format before reuploading the converted results back to S3.

As always, thanks for reading and please do not hesitate to critique or comment below.

Share with friends and colleagues

[[ likes ]] likes

Community favorites for Data Engineering

theCodingInterface