Streaming Logs to S3 with Kinesis Firehose in a Serverless Project

Introduction

In this article I demonstrate how to setup a AWS Serverless Application Model (SAM) project for near realtime streaming of CloudWatch logs to S3 using Kinesis Data Firehose. To keep things interesting I'll be using a Python based demo application that exposes two REST APIs, one for scraping and saving quotes from the web to a DyanmoDB table and another for listing the saved quotes.

Contents

Initial Project Setup

To start I initialize a new SAM project with the Python 3.8 runtime and use the Hello World REST API template app.

sam init --name sam-app-kinesis-streamed-logs \
  --runtime python3.8 \
  --package-type Zip \
  --app-template hello-world

This produces the following app structure.

sam-app-kinesis-streamed-logs/
├── README.md
├── __init__.py
├── events
│   └── event.json
├── hello_world
│   ├── __init__.py
│   ├── app.py
│   └── requirements.txt
├── template.yaml
└── tests
    ├── __init__.py
    ├── integration
    │   ├── __init__.py
    │   └── test_api_gateway.py
    ├── requirements.txt
    └── unit
        ├── __init__.py
        └── test_handler.py

First I rename hello_world directory to src then create two subdirectories within it, one named quote_fetcher and another named quote_lister, each get api.py and requirements.txt files. I also remove the original __init__.py, app.py and requirements.txt files initially generated by the Hello World template. To keep thing minimal for this demo I also removed the tests directory.

sam-app-kinesis-streamed-logs/
├── README.md
├── __init__.py
├── events
│   └── event.json
├── src
│   ├── quote_fetcher
│   │   ├── api.py
│   │   └── requirements.txt
│   └── quote_lister
│       ├── api.py
│       └── requirements.txt
└── template.yaml

The last bit of initial setup is to define the base SAM/CloudFormation template.yaml file's resources as shown below.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  sam-app-kinesis-streamed-logs

  Sample SAM Template for sam-app-kinesis-streamed-logs

Globals:
  Function:
    Timeout: 30

Resources:
  FetchQuoteFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/quote_fetcher
      Handler: api.lambda_handler
      Runtime: python3.8
      Environment:
        Variables:
          TABLE_NAME: !Ref QuotesTable
      Events:
        FetchQuoteApi:
          Type: Api
          Properties:
            Path: /fetch
            Method: get
      Policies:
        - DynamoDBWritePolicy:
            TableName: !Ref QuotesTable

  FetchQuoteFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName:
        Fn::Join:
          - ''
          - - /aws/lambda/
            - Ref: FetchQuoteFunction
      RetentionInDays: 5


  ListQuotesFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/quote_lister
      Handler: api.lambda_handler
      Runtime: python3.8
      Environment:
        Variables:
          TABLE_NAME: !Ref QuotesTable
      Events:
        ListQuotesApi:
          Type: Api
          Properties:
            Path: /list
            Method: get
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref QuotesTable

  ListQuotesFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName:
        Fn::Join:
          - ''
          - - /aws/lambda/
            - Ref: ListQuotesFunction
      RetentionInDays: 5

  QuotesTable:
    Type: AWS::Serverless::SimpleTable
    Properties:
      TableName: quotes-demo

Outputs:
  FetchQuoteApi:
    Description: "API Gateway endpoint URL for Prod stage for Fetch Quote API"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/fetch"
  ListQuotesApi:
    Description: "API Gateway endpoint URL for Prod stage for List Quotes API"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/list"

This Infrastructure as Code SAM/CloudFormation template specifies two Lambda function resources of type AWS::Serverless::Function. One function is for fetching new quotes from the web and saving them to a DynamoDB table which I've named FetchQuoteFunction and is initiated by an HTTP GET request. The second lambda function named ListQuotesFunction is for retrieving previously saved quotes from the DynamoDB table and returning them back through API Gateway. Each function has a CloudWatch Log Group associated with them for capturing log messages produced in the application. Lastly, there is a DynamoDB table SAM Resource of type AWS::Serverless::SimpleTable specified which, as mentioned previously, is used to save quotes fetched from the internet.

Fetching Quotes and Saving to DynamoDB

The Python packages being used to scrape the quotes from the web which are Requests and BeautifulSoup4 along with the Boto3 library which will used to save quotes to DynamoDB. These libraries are specified in the src/quote_fetcher/requirements.txt file like so.

boto3>=1.17.4,<1.18
requests>=2.25.1,<2.26
beautifulsoup4>=4.9.3,<4.10

The code for fetching and saving quotes is shown below. I will only briefly explain its purpose since the focus here is on integrating CloudWatch, Kinesis Firehose, and S3 to stream logs in near realtime.

import json
import logging
import os

from hashlib import sha256

import boto3
import requests

from bs4 import BeautifulSoup


logger = logging.getLogger()
logger.setLevel(logging.INFO)

db = boto3.resource('dynamodb')


RESOURCE = 'quote_fetcher'

def lambda_handler(event, context):
    logger.info({'resource': RESOURCE, 'action': 'invoke_initiated'})

    try:
        response = requests.get('http://quotes.toscrape.com/random')
    except Exception as e:
        logger.info({
            'resource': RESOURCE,
            'action': 'exception_handled',
            'details': {
              'error': str(e),
              'message': 'Failed to fetch quote from web', 
            }
        })
        return {
            "statusCode": 500,
            "body": json.dumps('Failed to fetch quote')
        }

    soup = BeautifulSoup(response.text)
    quote_element = soup.find(class_='quote')
    author  = quote_element.find(class_='author').get_text()
    text = quote_element.find(class_='text').get_text()\
                        .replace('“', '').replace('”', '')

    hash = sha256((author.lower() + text.lower()).encode('utf-8'))
    quote = {
      'author': author,
      'text': text,
      'id': hash.hexdigest()
    }

    try:
        tbl = db.Table(os.environ['TABLE_NAME'])
        tbl.put_item(Item=quote)
    except Exception as e:
        logger.info({
            'resource': RESOURCE,
            'action': 'exception_handled',
            'details': {
              'error': str(e),
              'message': 'Failed to save quote to database', 
              'quote': quote
            }
        })
        return {
            "statusCode": 500,
            "body": json.dumps('Failed to save quote')
        }

    return {
        "statusCode": 200,
        "body": json.dumps(quote)
    }

The above Python AWS Lambda function is designed to be integrated with AWS API Gateway GET requests but, other request types would work as well. The function utilizes the Requests library to request webpages from a popular webscraping friendly website produced by the good folks at Zyte formerly ScrapingHub. Then the scraped content is parsed with the BeautifulSoup to extract and clean the randomly generated quote and associated author. The quote data is saved in DynamoDB with the help of the Boto3 library before finally being returned to the API Gateway to be passed on to the calling application.

Retrieving Quotes from DynamoDB and Serving Over REST API

For the Python based AWS Lambda quote listing function the only hard dependency is the Boto3 library which again needs added to the requirements.txt file within the quote_lister directory.

boto3>=1.17.4,<1.18

The code for the Lambda function is quite simple as you can see below.

import json
import logging
import os

import boto3


logger = logging.getLogger()
logger.setLevel(logging.INFO)

db = boto3.resource('dynamodb')

RESOURCE = 'quote_lister'

def lambda_handler(event, context):
    logger.info({'resource': RESOURCE, 'action': 'invoke_initiated'})

    try:
        tbl = db.Table(os.environ['TABLE_NAME'])
        quotes = tbl.scan()
    except Exception as e:
        logger.info({
            'resource': RESOURCE,
            'action': 'exception_handled',
            'details': {
              'error': str(e),
              'message': 'Failed to fetch quotes from db', 
            }
        })
        return {
            "statusCode": 500,
            "body": json.dumps('Failed to fetch quotes')
        }

    return {
        "statusCode": 200,
        "body": json.dumps(quotes['Items']),
    }

The function simply uses Boto3 to perform a DynamoDB scan operation on the table specified by the environment variable TABLE_NAME (passed in from the SAM template resource) then returns the items retrieved back through API Gateway.

Test Driving Serverless App and Tailing Logs with SAM CLI

At this point the application can be deployed to the AWS Cloud using the SAM CLI commands build and deploy.

Building the same project (do this from the same directory as the template.yaml file).

sam build

Deploy with the help of the interactive deploy command (again this should be done from the same directory as the template.yaml file). Here I entered sam-app-kinesis-streamed-logs as the stack name and accepted the defaults on all the other prompts.

sam deploy --guided

This will eventually give output similiar to the following.

CloudFormation outputs from deployed stack
-------------------------------------------------------------------------------------------------
Outputs                                                                                         
-------------------------------------------------------------------------------------------------
Key                 ListQuotesApi                                                               
Description         API Gateway endpoint URL for Prod stage for List Quotes API                 
Value               https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/list            

Key                 FetchQuoteApi                                                               
Description         API Gateway endpoint URL for Prod stage for Fetch Quote API                 
Value               https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/fetch           
-------------------------------------------------------------------------------------------------

Successfully created/updated stack - sam-app-kinesis-streamed-logs in us-east-1

After the deploy command indicates it succesfully completed I can test the API Endpoints with my favorites HTTP client, of course written in Python, named HTTPie.

Testing the fetch endpoint like so.

$ http -j https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/fetch
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 186
Content-Type: application/json
Date: Tue, 09 Feb 2021 17:06:51 GMT
Via: 1.1 fa899decf29a8515a5481334de6baf5d.cloudfront.net (CloudFront)
X-Amz-Cf-Id: 0J039rFMtNVhNdPGtpiu84oWzyEyiwBg_gdnmdoNMpRK8QjDZCdXdw==
X-Amz-Cf-Pop: SFO5-C1
X-Amzn-Trace-Id: Root=1-6022c129-74fa441e267e085919502db5;Sampled=0
X-Cache: Miss from cloudfront
x-amz-apigw-id: afMefFEroAMFyvA=
x-amzn-RequestId: 056eac5c-8fe4-4b33-829a-948bb3ed4490

{
    "author": "George Bernard Shaw",
    "id": "1fb1efd2b193436a3633f35f497e9513f7ae7c19f6a1d6715b228b3d667c0642",
    "text": "Life isn't about finding yourself. Life is about creating yourself."
}

I repeat this once more to ensure more than one quote is available in the DynamoDB table.

$ http -j https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/fetch
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 174
Content-Type: application/json
Date: Tue, 09 Feb 2021 17:06:57 GMT
Via: 1.1 f18dd0c3095e2c73f72cff3122430cb9.cloudfront.net (CloudFront)
X-Amz-Cf-Id: d5q72SLn5ahkHI2ur83PjKQTl7N-HSFOFudMTdFEqTV2_Nc1D4zQpA==
X-Amz-Cf-Pop: SFO5-C1
X-Amzn-Trace-Id: Root=1-6022c131-2d70e28f46868bb86335acdf;Sampled=0
X-Cache: Miss from cloudfront
x-amz-apigw-id: afMfyGDJoAMFfQQ=
x-amzn-RequestId: b9335995-9445-4311-b3ee-d8fb09d764d7

{
    "author": "J.K. Rowling",
    "id": "0511a2d789da217808183ea93bd2701e3680163ac3e27892b51551754a3d8cb9",
    "text": "It matters not what someone is born, but what they grow to be."
}

Next I test the list endpoint to verify that the previous two quotes were saved to the database and retrievable via the Lambda function.

$ http -j https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/list
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 364
Content-Type: application/json
Date: Tue, 09 Feb 2021 17:07:46 GMT
Via: 1.1 208eb126ebe99fd5accb034c84a1eeca.cloudfront.net (CloudFront)
X-Amz-Cf-Id: lWKTxpe4k4KIz38lp5mRuyZAD1osad5sahTAlwufXIKcrz8_AsQDHw==
X-Amz-Cf-Pop: SFO5-C1
X-Amzn-Trace-Id: Root=1-6022c161-6a6b533b7bb015ed1e005dde;Sampled=0
X-Cache: Miss from cloudfront
x-amz-apigw-id: afMnQHiroAMFpvA=
x-amzn-RequestId: 3bc3f8f9-1c71-461b-a93d-a00b53eaeec1

[
    {
        "author": "George Bernard Shaw",
        "id": "1fb1efd2b193436a3633f35f497e9513f7ae7c19f6a1d6715b228b3d667c0642",
        "text": "Life isn't about finding yourself. Life is about creating yourself."
    },
    {
        "author": "J.K. Rowling",
        "id": "0511a2d789da217808183ea93bd2701e3680163ac3e27892b51551754a3d8cb9",
        "text": "It matters not what someone is born, but what they grow to be."
    }
]

Lastly I use the SAM CLI's logs function to make sure that logs are being captured by each function's log group.

First the quote fetcher function.

$ sam logs --stack-name sam-app-kinesis-streamed-logs \
        --name FetchQuoteFunction --region us-east-1 --tail
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:50.822000 START RequestId: 6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1 Version: $LATEST
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:50.822000 [INFO]  2021-02-09T17:06:50.822Z                Found credentials in environment variables.
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:50.861000 [INFO]  2021-02-09T17:06:50.861Z        6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1    {'resource': 'quote_fetcher', 'action': 'invoke_initiated'}
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.035000 /var/task/api.py:41: GuessedAtParserWarning: No parser was explicitly specified, so I'm using the best available HTML parser for this system ("html.parser"). This usually isn't a problem, but if you run this code on another system, or in a different virtual environment, it may use a different parser and behave differently.
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.035000 The code that caused this warning is on line 41 of the file /var/task/api.py. To get rid of this warning, pass the additional argument 'features="html.parser"' to the BeautifulSoup constructor.
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.035000 soup = BeautifulSoup(response.text)
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.302000 END RequestId: 6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.302000 REPORT RequestId: 6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1  Duration: 441.25 ms     Billed Duration: 442 ms Memory Size: 128 MB        Max Memory Used: 81 MB  Init Duration: 966.09 ms
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.776000 START RequestId: 5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5 Version: $LATEST
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.781000 [INFO]  2021-02-09T17:06:57.780Z        5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5    {'resource': 'quote_fetcher', 'action': 'invoke_initiated'}
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.980000 END RequestId: 5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.980000 REPORT RequestId: 5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5  Duration: 200.23 ms     Billed Duration: 201 ms Memory Size: 128 MB        Max Memory Used: 81 MB

Then the quote listing function.

$ sam logs --stack-name sam-app-kinesis-streamed-logs \
        --name ListQuotesFunction --region us-east-1 --tail
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.536000 START RequestId: 83fcb9c6-6523-4273-83c4-21328e25a0f0 Version: $LATEST
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.536000 [INFO]  2021-02-09T17:07:46.536Z                Found credentials in environment variables.
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.580000 [INFO]  2021-02-09T17:07:46.579Z        83fcb9c6-6523-4273-83c4-21328e25a0f0    {'resource': 'quote_lister', 'action': 'invoke_initiated'}
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.908000 END RequestId: 83fcb9c6-6523-4273-83c4-21328e25a0f0
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.908000 REPORT RequestId: 83fcb9c6-6523-4273-83c4-21328e25a0f0  Duration: 328.20 ms     Billed Duration: 329 ms Memory Size: 128 MB        Max Memory Used: 73 MB  Init Duration: 573.25 ms

Streaming CloudWatch Logs to Kinesis Firehose and Landing them in S3

In this section I configure Kinesis Data Firehose to be used as a delivery stream to ship the SAM Application Logs from CloudWatch to an S3 bucket. This allows for near real-time capture of systems logs and telemetry which could then be further analyzed and monitored downstream. Kinesis is Amazon's streaming / stream processing technology that is conceptually like a managed version of Kafka but designed to work specifically in the AWS Cloud and is organized into use cases of Real Time Streams with Kinesis Data Streams, Stream Processing Analytics with SQL interface with Kinesis Data Analytics, Near Real-time fully managed data stream ingestion and delivery with Kinesis Data Firehose, and Kinesis Video Streams.

For this tutorial I am only focusing on using Kinesis to funnel CloudWatch Log data into S3, one of the four main Kinesis Data Firehose delivery targets within AWS, which are S3, Redshift, Splunk and Elastic Search. Being a fully managed service, accomplishing this goal does not require any application code but, I will be continuing to use Infrastructure as Code with the SAM / CloudFormation template to metaphorically wire up the various components required to accomplish this goal.

Back in my template.yaml file for the SAM project I fill in the following appending to the list of resources in the Resources section.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  sam-app-kinesis-streamed-logs

  Sample SAM Template for sam-app-kinesis-streamed-logs

Globals:
  Function:
    Timeout: 30

Resources:
  # omitting earlier defined Resources for brevity

  QuotesLoggingBucket:
    Type: AWS::S3::Bucket
    Properties:
      Tags:
        - Key: Source
          Value: !Sub "${AWS::StackName}-cf-stack"

  KinesisFirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - "firehose.amazonaws.com"
            Action:
              - "sts:AssumeRole"
            Condition:
              StringEquals:
                sts:ExternalId: !Ref AWS::AccountId
      Path: "/"
      Policies:
        - PolicyName: "quotes-logging-firehose-role-policy"
          PolicyDocument:
            Statement:
              - Effect: "Allow"
                Action:
                  - "s3:AbortMultipartUpload"
                  - "s3:GetBucketLocation"
                  - "s3:GetObject"
                  - "s3:ListBucket"
                  - "s3:ListBucketMultipartUploads"
                  - "s3:PutObject"
                  - "logs:PutLogEvents"
                Resource:
                  - !Join
                    - ''
                    - - 'arn:aws:s3:::'
                      - !Ref QuotesLoggingBucket
                  - !Join
                    - ''
                    - - 'arn:aws:s3:::'
                      - !Ref QuotesLoggingBucket
                      - '/*'
                  - !Join
                    - ''
                    - - 'arn:aws:logs:'
                      - !Ref AWS::Region
                      - ':'
                      - !Ref AWS::AccountId
                      - ':log-group:'
                      - !Sub ${FetchQuoteFunctionLogGroup}
                      - ':log-stream:*'
                  - !Join
                    - ''
                    - - 'arn:aws:logs:'
                      - !Ref AWS::Region
                      - ':'
                      - !Ref AWS::AccountId
                      - ':log-group:'
                      - !Sub ${ListQuotesFunctionLogGroup}
                      - ':log-stream:*'

  QuotesCloudWatchLogsRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                !Join
                - ''
                - - 'logs.'
                  - !Ref AWS::Region
                  - '.amazonaws.com'
            Action:
              - "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: "quotes-cloudwatch-logs-role-policy"
          PolicyDocument:
            Statement:
              - Effect: "Allow"
                Action:
                  - "firehose:*"
                Resource:
                  - !Join
                    - ''
                    - - 'arn:aws:firehose:'
                      - !Ref AWS::Region
                      - ':'
                      - !Ref AWS::AccountId
                      - ':*'

  QuoteFetcherFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt QuotesLoggingBucket.Arn
        BufferingHints:
          IntervalInSeconds: '120'
          SizeInMBs: '5'
        CompressionFormat: UNCOMPRESSED
        Prefix: sam-quote-fetcher-logs/
        RoleARN: !GetAtt KinesisFirehoseRole.Arn
        ProcessingConfiguration:
          Enabled: 'false'
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref FetchQuoteFunctionLogGroup
          LogStreamName: "S3Delivery"

  QuoteListerFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt QuotesLoggingBucket.Arn
        BufferingHints:
          IntervalInSeconds: '120'
          SizeInMBs: '5'
        CompressionFormat: UNCOMPRESSED
        Prefix: sam-quote-lister-logs/
        RoleARN: !GetAtt KinesisFirehoseRole.Arn
        ProcessingConfiguration:
          Enabled: 'false'
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref ListQuotesFunctionLogGroup
          LogStreamName: "S3Delivery"

  QuoteFetcherLogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref FetchQuoteFunctionLogGroup
      LogStreamName: "S3Delivery"

  QuoteListerLogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref ListQuotesFunctionLogGroup
      LogStreamName: "S3Delivery"

  QuoteFetcherLogsSubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      DestinationArn: !GetAtt QuoteFetcherFirehoseDeliveryStream.Arn
      FilterPattern: ''
      LogGroupName: !Ref FetchQuoteFunctionLogGroup
      RoleArn: !GetAtt QuotesCloudWatchLogsRole.Arn

  QuoteListerLogsSubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      DestinationArn: !GetAtt QuoteListerFirehoseDeliveryStream.Arn
      FilterPattern: ''
      LogGroupName: !Ref ListQuotesFunctionLogGroup
      RoleArn: !GetAtt QuotesCloudWatchLogsRole.Arn

Outputs:
  FetchQuoteApi:
    Description: "API Gateway endpoint URL for Prod stage for Fetch Quote API"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/fetch"
  ListQuotesApi:
    Description: "API Gateway endpoint URL for Prod stage for List Quotes API"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/list"

Although the solution is all comprised of "Managed services" as you can see from the number of resources being added to the SAM / CloudFormation template this is still a pretty hefty amount of IaC code to link up all the needed peices which I've based off the AWS CLI example tutorial Send the Data from Amazon CloudWatch to Kinesis Data Firehose from the AWS docs.

There was one gotcha that did slip me up in that the CloudWatch logs already get GZip compressed by default so it is important to specify CompressionType of UNCOMPRESSED in the Firehose Delivery stream part as described in this Kinesis Firehose FAQ. However, due to the expressive readability of the CloudFormation template above and the linked tutorial from the AWS docs I'm not going to further describe all of the components but instead deploy and show the end result.

So again I use the SAM CLI to build the project.

sam build

Then I can deploy in guided mode once accepting the defaults and confirming its intents again.

sam deploy --guided

Once the updates are done deploying I can checkout the resources in the AWS CloudFormation Console to get the full name of the new S3 destination bucket for the logs or I can use the below helper script I've written for such tasks which simply wraps the Boto3 Python library and formats its output in a way that is easily digestible to my eyes.

#!/usr/bin/python3

import argparse
import textwrap

import boto3


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Lists AWS CloudFormation Stack Resources')
    parser.add_argument('stack_name', help='CloudFormation stack name')
    parser.add_argument('--profile', help='AWS CLI Profile to use')
    parser.add_argument('--region', help='AWS region to execute command against')

    args = parser.parse_args()

    options = {}
    if args.profile:
        options.update(profile_name=args.profile)
    if args.region:
        options.update(region_name=args.region)

    session = boto3.Session(**options)

    cf = session.client('cloudformation')

    tmpl = "{:<35}  {:<55}  {:<120}"
    print(tmpl.format('Resource Type', 'Logical ID', 'Physical ID'))
    print(tmpl.format('-'*35, '-'*55, '-'*16, '-'*16, '-'*120))

    for stack_resource in cf.list_stack_resources(StackName=args.stack_name)['StackResourceSummaries']:
        print(tmpl.format(
            stack_resource['ResourceType'],
            stack_resource['LogicalResourceId'],
            stack_resource['PhysicalResourceId']
        ))

Running is as simple as the following.

$ list-cf-stack-resources sam-app-kinesis-streamed-logs --region us-east-1 --profile default
Resource Type                        Logical ID                                               Physical ID                                                                                                             
-----------------------------------  -------------------------------------------------------  ----------------                                                                                                        
AWS::Lambda::Function                FetchQuoteFunction                                       sam-app-kinesis-streamed-logs-FetchQuoteFunction-JQSISUEBJAO3                                                           
AWS::Lambda::Permission              FetchQuoteFunctionFetchQuoteApiPermissionProd            sam-app-kinesis-streamed-logs-FetchQuoteFunctionFetchQuoteApiPermissionProd-XWRMZ0KGB151                                
AWS::Logs::LogGroup                  FetchQuoteFunctionLogGroup                               /aws/lambda/sam-app-kinesis-streamed-logs-FetchQuoteFunction-JQSISUEBJAO3                                               
AWS::IAM::Role                       FetchQuoteFunctionRole                                   sam-app-kinesis-streamed-lo-FetchQuoteFunctionRole-7TGDWO5JB4GO                                                         
AWS::IAM::Role                       KinesisFirehoseRole                                      sam-app-kinesis-streamed-logs-KinesisFirehoseRole-1TES9QUBI2JOT                                                         
AWS::Lambda::Function                ListQuotesFunction                                       sam-app-kinesis-streamed-logs-ListQuotesFunction-8B4MYV8YQUIC                                                           
AWS::Lambda::Permission              ListQuotesFunctionListQuotesApiPermissionProd            sam-app-kinesis-streamed-logs-ListQuotesFunctionListQuotesApiPermissionProd-DFLCDUE1MJT5                                
AWS::Logs::LogGroup                  ListQuotesFunctionLogGroup                               /aws/lambda/sam-app-kinesis-streamed-logs-ListQuotesFunction-8B4MYV8YQUIC                                               
AWS::IAM::Role                       ListQuotesFunctionRole                                   sam-app-kinesis-streamed-lo-ListQuotesFunctionRole-CJHZSMOIT4PC                                                         
AWS::KinesisFirehose::DeliveryStream  QuoteFetcherFirehoseDeliveryStream                       sam-app-kinesis-streamed-logs-QuoteFetcherFirehoseD-hkSEZmcO3zsM                                                        
AWS::Logs::LogStream                 QuoteFetcherLogStream                                    S3Delivery                                                                                                              
AWS::Logs::SubscriptionFilter        QuoteFetcherLogsSubscriptionFilter                       sam-app-kinesis-streamed-logs-QuoteFetcherLogsSubscriptionFilter-A2A0IXYELQE4                                           
AWS::KinesisFirehose::DeliveryStream  QuoteListerFirehoseDeliveryStream                        sam-app-kinesis-streamed-logs-QuoteListerFirehoseDe-NeY939sPbvf6                                                        
AWS::Logs::LogStream                 QuoteListerLogStream                                     S3Delivery                                                                                                              
AWS::Logs::SubscriptionFilter        QuoteListerLogsSubscriptionFilter                        sam-app-kinesis-streamed-logs-QuoteListerLogsSubscriptionFilter-RCC26OXTYCLR                                            
AWS::IAM::Role                       QuotesCloudWatchLogsRole                                 sam-app-kinesis-streamed-QuotesCloudWatchLogsRole-1VO47NQ56DXKV                                                         
AWS::S3::Bucket                      QuotesLoggingBucket                                      sam-app-kinesis-streamed-logs-quotesloggingbucket-1649a0uj6tbe6                                                         
AWS::DynamoDB::Table                 QuotesTable                                              quotes-demo                                                                                                             
AWS::ApiGateway::RestApi             ServerlessRestApi                                        vs6yowuahg                                                                                                              
AWS::ApiGateway::Deployment          ServerlessRestApiDeployment8068fc6cf5                    vf93bc                                                                                                                  
AWS::ApiGateway::Stage               ServerlessRestApiProdStage                               Prod  

From the above output I see that the autogenerated bucket name is sam-app-kinesis-streamed-logs-quotesloggingbucket-1649a0uj6tbe6 which I can then use with the regular AWS CLI to check that my CloudWatch logs are in fact being streamed into it.

I'll just copy its contents down to my laptop to locally for easier inspection.

S3_BUCKET=s3://sam-app-kinesis-streamed-logs-quotesloggingbucket-1649a0uj6tbe6/
aws s3 cp $S3_BUCKET cloudwatchlogs \
   --recursive --region us-east-1

From there I see that Kinesis Data Firehose has depositied the logs in a nicely partitioned format with the prefixes I specified in the CloudFormation template followed by year, month, day, hour directories

$ tree cloudwatchlogs 
cloudwatchlogs
├── sam-quote-fetcher-logs
│   └── 2021
│       └── 02
│           └── 09
│               └── 17
│                   └── sam-app-kinesis-streamed-logs-QuoteFetcherFirehoseD-hkSEZmcO3zsM-2-2021-02-09-17-07-00-120c49d0-af11-450e-94f3-2ceaf9d91092
└── sam-quote-lister-logs
    └── 2021
        └── 02
            └── 09
                └── 17
                    └── sam-app-kinesis-streamed-logs-QuoteListerFirehoseDe-NeY939sPbvf6-2-2021-02-09-17-07-55-10c0f0a7-8680-4494-ab32-16f495dcfbfa

Now to view the original log data I just use gunzip to decompress the data like so.

$ gunzip -c sam-app-kinesis-streamed-logs-QuoteFetcherFirehoseD-hkSEZmcO3zsM-2-2021-02-09-17-07-00-120c49d0-af11-450e-94f3-2ceaf9d91092 > uncompressed
$ cat uncompressed

Resources for Further Learning

Conclusion

In this article I have demonstrated how to use the Kinesis Data Firehose managed service of the AWS Kinesis product line to stream CloudWatch logs from an AWS Serverless Application Model project and land them into AWS S3 for long term storage and downstream analysis.

As always, thanks for reading and please do not hesitate to critique or comment below.

Share with friends and colleagues

[[ likes ]] likes

Navigation

Community favorites for Data Engineering

theCodingInterface