A common task in the life of a Data Engineer is to convert data files between different formats in response to some event like an upload to an AWS S3 bucket. An especially common conversion I've found myself doing has been converting GZip compressed text files into the columnar Apache Parquet format facilitating downstream analytical processing. In this article I demonstrate using a Python based AWS Lambda SAM project with the AWS Data Wrangler Lambda Layer to perform data format translation from GZipped JSON files into Parquet upon an S3 upload event.
For readers wishing to follow along you'll need to have the AWS CLI and AWS SAM CLI installed so, if you haven't done so already please use the following AWS SAM Getting Started instructions from the AWS docs.
First off I will create an AWS SAM Python project using the SAM CLI and the sample Hello World template.
$ sam init --name sam-s3-parquet-converter \
--runtime python3.8 \
--app-template hello-world \
--package-type Zip
Cloning app templates from https://github.com/aws/aws-sam-cli-app-templates
-----------------------
Generating application:
-----------------------
Name: sam-s3-parquet-converter
Runtime: python3.8
Dependency Manager: pip
Application Template: hello-world
Output Directory: .
Next steps can be found in the README file at ./sam-s3-parquet-converter/README.md
Next I delete the hello_world example Lambda directory and replace it with new src/s3_parquet and src/awsdatawrangler directories.
cd sam-s3-parquet-converter
rm -r hello_world
mkdir -p src/s3_parquet src/awsdatawrangler
To seemlessly handle the conversion to Parquet format I'll utilize the AWS Data Wrangler toolset to do the heavy lifting. To gain access to the AWS Data Wrangler toolset I create a dedicated Lambda Layer using the Python 3.8 release of AWS Data Wrangler. If you are new to AWS Lambda Layers please have a look at my previous article Keeping Python AWS Serverless Apps DRY with Lambda Layers where I explore them in depth but, for a high-level understanding they are simply a way to package reusable code for you Lambda Functions in AWS.
In the src/awsdatawrangler directory I download the release using the HTTPie HTTP client (of course you could use curl or a browser also).
http --download https://github.com/awslabs/aws-data-wrangler/releases/download/2.6.0/awswrangler-layer-2.6.0-py3.8.zip \
--output src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip
Then over in the template.yaml file I clear out the existing boilerplate sections for Globals, Resources, and Output replacing it with the following SAM/CloudFormation definitions to generate the new versioned Lambda Layer as shown below.
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-s3-parquet-converter
Sample SAM Template for sam-s3-parquet-converter
Resources:
AwsDataWranglerLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: AWSDataWrangler
Description: AWS Data Wrangler Integration Libraries
ContentUri: src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip
CompatibleRuntimes:
- python3.8
RetentionPolicy: Retain
Outputs:
AwsDataWranglerLayer:
Value: !Ref AwsDataWranglerLayer
Description: ARN For AWS Data Wrangler Layer
Next I build and deploy the project using the SAM CLI commands shown below ran from the same directory as the template.yaml file.
sam build --use-container
sam deploy --guided --stack-name sam-s3-parquet-converter
I can now move on to building out more of the application by adding an S3 bucket to house the files to be converted. The SAM framework makes this incredibly easy to accomplish with a simple configuration definition in the SAM / CloudFormation template.yaml file using an S3 Bucket resource as shown below.
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-s3-parquet-converter
Sample SAM Template for sam-s3-parquet-converter
Resources:
AwsDataWranglerLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: AWSDataWrangler
Description: AWS Data Wrangler Integration Libraries
ContentUri: src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip
CompatibleRuntimes:
- python3.8
RetentionPolicy: Retain
DemoBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: tci-demo-parquet
Outputs:
AwsDataWranglerLayer:
Value: !Ref AwsDataWranglerLayer
Description: ARN For AWS Data Wrangler Layer
DemoBucket:
Value: !Ref DemoBucket
You will want to change the name of your S3 bucket to make it globally unique and different from what I've used.
Again I build and deploy this resource using the SAM CLI as shown below.
sam build --use-container
sam deploy
As I mentioned previously, for the conversion to Parquet I am utilizing the AWS Data Wrangler toolset to convert some demo JSON stock data in GZip format to Parquet using a Pandas DataFrame as an intermediate data structure. To start progressing in this direction I add two new files in the s3_parquet directory, one for the app.py Python source of the AWS Lambda function and another for an empty requirements.txt file that SAM CLI requires for all Python based Lambda Functions.
The app.py is where all the logic for this data format conversion and shuffle from raw-uploads S3 directory to parquet directory will occur using these basic steps.
# app.py
import boto3
import gzip
import json
import logging
import os
import urllib.parse
import awswrangler
import pandas as pd
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info({'event': event})
for record in event['Records']:
s3_bucket = record['s3']['bucket']['name']
s3_key = urllib.parse.unquote_plus(record['s3']['object']['key'], encoding='utf-8')
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=s3_bucket, Key=s3_key)
try:
with gzip.GzipFile(fileobj=obj['Body']) as gz:
data = json.loads(gz.read().decode())
except Exception as e:
logger.error({'error': str(e), 'message': 'failed gzip decoding'})
raise e
filename, _ = os.path.splitext(os.path.split(s3_key)[-1])
df = pd.DataFrame(data)
s3_path = 's3://{}/parquet/{}.parquet'.format(s3_bucket, filename)
try:
awswrangler.s3.to_parquet(df, dataset=False, path=s3_path, index=False)
except Exception as e:
logger.error({'error': str(e), 'message': 'Failed saving Parquet to S3'})
raise e
I now add two more resources to my SAM Template file. The first one is for the Lambda function complete with an S3 Event mapping rules that triggers it if files are uploaded to a raw-uploads directory as well as an IAM policy allowing it to read and write from the previously generated S3 bucket. The second resource is a CloudWatch log group for capturing logs from the lambda.
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-s3-parquet-converter
Sample SAM Template for sam-s3-parquet-converter
Resources:
AwsDataWranglerLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: AWSDataWrangler
Description: AWS Data Wrangler Integration Libraries
ContentUri: src/awsdatawrangler/awswrangler-layer-2.6.0-py3.8.zip
CompatibleRuntimes:
- python3.8
RetentionPolicy: Retain
DemoBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: tci-demo-parquet
ConversionFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/s3_parquet/
Handler: app.lambda_handler
Runtime: python3.8
Timeout: 300
MemorySize: 512
Layers:
- !Ref AwsDataWranglerLayer
Policies:
- Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:GetObject*'
- 's3:PutObject'
Resource: !Sub "arn:aws:s3:::tci-demo-parquet/*"
Events:
SegmentEventLogUpload:
Type: S3
Properties:
Bucket: !Ref DemoBucket
Events: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: prefix
Value: raw-uploads/
ConversionFunctionLogGroup:
Type: AWS::Logs::LogGroup
DependsOn: ConversionFunction
Properties:
LogGroupName: !Sub "/aws/lambda/${ConversionFunction}"
RetentionInDays: 7
Outputs:
AwsDataWranglerLayer:
Value: !Ref AwsDataWranglerLayer
Description: ARN For AWS Data Wrangler Layer
DemoBucket:
Value: !Ref DemoBucket
With this new Python Lambda code and SAM configuration in place I can again build and deploy.
sam build --use-container
sam deploy
With the Lambda function nicely wrapped into an IaC template backed SAM project and deployed to the AWS Cloud I can finally test it out. I have an example Stocks dataset already GZipped representing the Coke beverage company which you can download then subsequently upload to the raw-uploads directory of the S3 bucket thus triggering the conversion.
First, I download the GZip dataset using an HTTP Client like HTTPie, curl, or even your browser. I'll use HTTPie as follows.
http --download https://thecodinginterface-images.s3.amazonaws.com/blogposts/serverless-parquet-data-converter/COKE.gz
Then I use the AWS CLI to upload this file to the raw-uploads directory of the S3 bucket. Be sure to use the same S3 bucket name you used because mine will likely be already taken.
aws s3 cp COKE.gz s3://tci-demo-parquet/raw-uploads/COKE.gz
After waiting a few seconds I then head over to the S3 service page for my S3 bucket in the AWS Console where I see there are now two directories: raw-uploads and parquet. If I click the parquet link I see the converted Parquet data as shown below.
thecodinginterface.com earns commision from sales of linked products such as the book suggestions above. This enables providing high quality, frequent, and most importantly FREE tutorials and content for readers interested in Software Engineering so, thank you for supporting the authors of these resources as well as thecodinginterface.com
In this article I demonstrated how to use AWS SAM to build a event driven serverless based file data converter that is invoked upon uploading of a GZip JSON data file to S3 and results in a conversion to Parquet data format before reuploading the converted results back to S3.
As always, thanks for reading and please do not hesitate to critique or comment below.