In this article I demonstrate how to setup a AWS Serverless Application Model (SAM) project for near realtime streaming of CloudWatch logs to S3 using Kinesis Data Firehose. To keep things interesting I'll be using a Python based demo application that exposes two REST APIs, one for scraping and saving quotes from the web to a DyanmoDB table and another for listing the saved quotes.
To start I initialize a new SAM project with the Python 3.8 runtime and use the Hello World REST API template app.
sam init --name sam-app-kinesis-streamed-logs \
--runtime python3.8 \
--package-type Zip \
--app-template hello-world
This produces the following app structure.
sam-app-kinesis-streamed-logs/
├── README.md
├── __init__.py
├── events
│ └── event.json
├── hello_world
│ ├── __init__.py
│ ├── app.py
│ └── requirements.txt
├── template.yaml
└── tests
├── __init__.py
├── integration
│ ├── __init__.py
│ └── test_api_gateway.py
├── requirements.txt
└── unit
├── __init__.py
└── test_handler.py
First I rename hello_world directory to src then create two subdirectories within it, one named quote_fetcher and another named quote_lister, each get api.py and requirements.txt files. I also remove the original __init__.py, app.py and requirements.txt files initially generated by the Hello World template. To keep thing minimal for this demo I also removed the tests directory.
sam-app-kinesis-streamed-logs/
├── README.md
├── __init__.py
├── events
│ └── event.json
├── src
│ ├── quote_fetcher
│ │ ├── api.py
│ │ └── requirements.txt
│ └── quote_lister
│ ├── api.py
│ └── requirements.txt
└── template.yaml
The last bit of initial setup is to define the base SAM/CloudFormation template.yaml file's resources as shown below.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-app-kinesis-streamed-logs
Sample SAM Template for sam-app-kinesis-streamed-logs
Globals:
Function:
Timeout: 30
Resources:
FetchQuoteFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/quote_fetcher
Handler: api.lambda_handler
Runtime: python3.8
Environment:
Variables:
TABLE_NAME: !Ref QuotesTable
Events:
FetchQuoteApi:
Type: Api
Properties:
Path: /fetch
Method: get
Policies:
- DynamoDBWritePolicy:
TableName: !Ref QuotesTable
FetchQuoteFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName:
Fn::Join:
- ''
- - /aws/lambda/
- Ref: FetchQuoteFunction
RetentionInDays: 5
ListQuotesFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/quote_lister
Handler: api.lambda_handler
Runtime: python3.8
Environment:
Variables:
TABLE_NAME: !Ref QuotesTable
Events:
ListQuotesApi:
Type: Api
Properties:
Path: /list
Method: get
Policies:
- DynamoDBReadPolicy:
TableName: !Ref QuotesTable
ListQuotesFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName:
Fn::Join:
- ''
- - /aws/lambda/
- Ref: ListQuotesFunction
RetentionInDays: 5
QuotesTable:
Type: AWS::Serverless::SimpleTable
Properties:
TableName: quotes-demo
Outputs:
FetchQuoteApi:
Description: "API Gateway endpoint URL for Prod stage for Fetch Quote API"
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/fetch"
ListQuotesApi:
Description: "API Gateway endpoint URL for Prod stage for List Quotes API"
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/list"
This Infrastructure as Code SAM/CloudFormation template specifies two Lambda function resources of type AWS::Serverless::Function. One function is for fetching new quotes from the web and saving them to a DynamoDB table which I've named FetchQuoteFunction and is initiated by an HTTP GET request. The second lambda function named ListQuotesFunction is for retrieving previously saved quotes from the DynamoDB table and returning them back through API Gateway. Each function has a CloudWatch Log Group associated with them for capturing log messages produced in the application. Lastly, there is a DynamoDB table SAM Resource of type AWS::Serverless::SimpleTable specified which, as mentioned previously, is used to save quotes fetched from the internet.
The Python packages being used to scrape the quotes from the web which are Requests and BeautifulSoup4 along with the Boto3 library which will used to save quotes to DynamoDB. These libraries are specified in the src/quote_fetcher/requirements.txt file like so.
boto3>=1.17.4,<1.18
requests>=2.25.1,<2.26
beautifulsoup4>=4.9.3,<4.10
The code for fetching and saving quotes is shown below. I will only briefly explain its purpose since the focus here is on integrating CloudWatch, Kinesis Firehose, and S3 to stream logs in near realtime.
import json
import logging
import os
from hashlib import sha256
import boto3
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
db = boto3.resource('dynamodb')
RESOURCE = 'quote_fetcher'
def lambda_handler(event, context):
logger.info({'resource': RESOURCE, 'action': 'invoke_initiated'})
try:
response = requests.get('http://quotes.toscrape.com/random')
except Exception as e:
logger.info({
'resource': RESOURCE,
'action': 'exception_handled',
'details': {
'error': str(e),
'message': 'Failed to fetch quote from web',
}
})
return {
"statusCode": 500,
"body": json.dumps('Failed to fetch quote')
}
soup = BeautifulSoup(response.text)
quote_element = soup.find(class_='quote')
author = quote_element.find(class_='author').get_text()
text = quote_element.find(class_='text').get_text()\
.replace('“', '').replace('”', '')
hash = sha256((author.lower() + text.lower()).encode('utf-8'))
quote = {
'author': author,
'text': text,
'id': hash.hexdigest()
}
try:
tbl = db.Table(os.environ['TABLE_NAME'])
tbl.put_item(Item=quote)
except Exception as e:
logger.info({
'resource': RESOURCE,
'action': 'exception_handled',
'details': {
'error': str(e),
'message': 'Failed to save quote to database',
'quote': quote
}
})
return {
"statusCode": 500,
"body": json.dumps('Failed to save quote')
}
return {
"statusCode": 200,
"body": json.dumps(quote)
}
The above Python AWS Lambda function is designed to be integrated with AWS API Gateway GET requests but, other request types would work as well. The function utilizes the Requests library to request webpages from a popular webscraping friendly website produced by the good folks at Zyte formerly ScrapingHub. Then the scraped content is parsed with the BeautifulSoup to extract and clean the randomly generated quote and associated author. The quote data is saved in DynamoDB with the help of the Boto3 library before finally being returned to the API Gateway to be passed on to the calling application.
For the Python based AWS Lambda quote listing function the only hard dependency is the Boto3 library which again needs added to the requirements.txt file within the quote_lister directory.
boto3>=1.17.4,<1.18
The code for the Lambda function is quite simple as you can see below.
import json
import logging
import os
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
db = boto3.resource('dynamodb')
RESOURCE = 'quote_lister'
def lambda_handler(event, context):
logger.info({'resource': RESOURCE, 'action': 'invoke_initiated'})
try:
tbl = db.Table(os.environ['TABLE_NAME'])
quotes = tbl.scan()
except Exception as e:
logger.info({
'resource': RESOURCE,
'action': 'exception_handled',
'details': {
'error': str(e),
'message': 'Failed to fetch quotes from db',
}
})
return {
"statusCode": 500,
"body": json.dumps('Failed to fetch quotes')
}
return {
"statusCode": 200,
"body": json.dumps(quotes['Items']),
}
The function simply uses Boto3 to perform a DynamoDB scan operation on the table specified by the environment variable TABLE_NAME (passed in from the SAM template resource) then returns the items retrieved back through API Gateway.
At this point the application can be deployed to the AWS Cloud using the SAM CLI commands build and deploy.
Building the same project (do this from the same directory as the template.yaml file).
sam build
Deploy with the help of the interactive deploy command (again this should be done from the same directory as the template.yaml file). Here I entered sam-app-kinesis-streamed-logs as the stack name and accepted the defaults on all the other prompts.
sam deploy --guided
This will eventually give output similiar to the following.
CloudFormation outputs from deployed stack
-------------------------------------------------------------------------------------------------
Outputs
-------------------------------------------------------------------------------------------------
Key ListQuotesApi
Description API Gateway endpoint URL for Prod stage for List Quotes API
Value https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/list
Key FetchQuoteApi
Description API Gateway endpoint URL for Prod stage for Fetch Quote API
Value https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/fetch
-------------------------------------------------------------------------------------------------
Successfully created/updated stack - sam-app-kinesis-streamed-logs in us-east-1
After the deploy command indicates it succesfully completed I can test the API Endpoints with my favorites HTTP client, of course written in Python, named HTTPie.
Testing the fetch endpoint like so.
$ http -j https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/fetch
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 186
Content-Type: application/json
Date: Tue, 09 Feb 2021 17:06:51 GMT
Via: 1.1 fa899decf29a8515a5481334de6baf5d.cloudfront.net (CloudFront)
X-Amz-Cf-Id: 0J039rFMtNVhNdPGtpiu84oWzyEyiwBg_gdnmdoNMpRK8QjDZCdXdw==
X-Amz-Cf-Pop: SFO5-C1
X-Amzn-Trace-Id: Root=1-6022c129-74fa441e267e085919502db5;Sampled=0
X-Cache: Miss from cloudfront
x-amz-apigw-id: afMefFEroAMFyvA=
x-amzn-RequestId: 056eac5c-8fe4-4b33-829a-948bb3ed4490
{
"author": "George Bernard Shaw",
"id": "1fb1efd2b193436a3633f35f497e9513f7ae7c19f6a1d6715b228b3d667c0642",
"text": "Life isn't about finding yourself. Life is about creating yourself."
}
I repeat this once more to ensure more than one quote is available in the DynamoDB table.
$ http -j https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/fetch
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 174
Content-Type: application/json
Date: Tue, 09 Feb 2021 17:06:57 GMT
Via: 1.1 f18dd0c3095e2c73f72cff3122430cb9.cloudfront.net (CloudFront)
X-Amz-Cf-Id: d5q72SLn5ahkHI2ur83PjKQTl7N-HSFOFudMTdFEqTV2_Nc1D4zQpA==
X-Amz-Cf-Pop: SFO5-C1
X-Amzn-Trace-Id: Root=1-6022c131-2d70e28f46868bb86335acdf;Sampled=0
X-Cache: Miss from cloudfront
x-amz-apigw-id: afMfyGDJoAMFfQQ=
x-amzn-RequestId: b9335995-9445-4311-b3ee-d8fb09d764d7
{
"author": "J.K. Rowling",
"id": "0511a2d789da217808183ea93bd2701e3680163ac3e27892b51551754a3d8cb9",
"text": "It matters not what someone is born, but what they grow to be."
}
Next I test the list endpoint to verify that the previous two quotes were saved to the database and retrievable via the Lambda function.
$ http -j https://vs6yowuahg.execute-api.us-east-1.amazonaws.com/Prod/list
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 364
Content-Type: application/json
Date: Tue, 09 Feb 2021 17:07:46 GMT
Via: 1.1 208eb126ebe99fd5accb034c84a1eeca.cloudfront.net (CloudFront)
X-Amz-Cf-Id: lWKTxpe4k4KIz38lp5mRuyZAD1osad5sahTAlwufXIKcrz8_AsQDHw==
X-Amz-Cf-Pop: SFO5-C1
X-Amzn-Trace-Id: Root=1-6022c161-6a6b533b7bb015ed1e005dde;Sampled=0
X-Cache: Miss from cloudfront
x-amz-apigw-id: afMnQHiroAMFpvA=
x-amzn-RequestId: 3bc3f8f9-1c71-461b-a93d-a00b53eaeec1
[
{
"author": "George Bernard Shaw",
"id": "1fb1efd2b193436a3633f35f497e9513f7ae7c19f6a1d6715b228b3d667c0642",
"text": "Life isn't about finding yourself. Life is about creating yourself."
},
{
"author": "J.K. Rowling",
"id": "0511a2d789da217808183ea93bd2701e3680163ac3e27892b51551754a3d8cb9",
"text": "It matters not what someone is born, but what they grow to be."
}
]
Lastly I use the SAM CLI's logs function to make sure that logs are being captured by each function's log group.
First the quote fetcher function.
$ sam logs --stack-name sam-app-kinesis-streamed-logs \
--name FetchQuoteFunction --region us-east-1 --tail
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:50.822000 START RequestId: 6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1 Version: $LATEST
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:50.822000 [INFO] 2021-02-09T17:06:50.822Z Found credentials in environment variables.
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:50.861000 [INFO] 2021-02-09T17:06:50.861Z 6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1 {'resource': 'quote_fetcher', 'action': 'invoke_initiated'}
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.035000 /var/task/api.py:41: GuessedAtParserWarning: No parser was explicitly specified, so I'm using the best available HTML parser for this system ("html.parser"). This usually isn't a problem, but if you run this code on another system, or in a different virtual environment, it may use a different parser and behave differently.
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.035000 The code that caused this warning is on line 41 of the file /var/task/api.py. To get rid of this warning, pass the additional argument 'features="html.parser"' to the BeautifulSoup constructor.
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.035000 soup = BeautifulSoup(response.text)
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.302000 END RequestId: 6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:51.302000 REPORT RequestId: 6b1c7d61-7fd0-4090-85b4-fc0a16dce5b1 Duration: 441.25 ms Billed Duration: 442 ms Memory Size: 128 MB Max Memory Used: 81 MB Init Duration: 966.09 ms
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.776000 START RequestId: 5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5 Version: $LATEST
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.781000 [INFO] 2021-02-09T17:06:57.780Z 5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5 {'resource': 'quote_fetcher', 'action': 'invoke_initiated'}
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.980000 END RequestId: 5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5
2021/02/09/[$LATEST]2efce8761fcf498a8b0bdeb6014e069b 2021-02-09T17:06:57.980000 REPORT RequestId: 5a4b83cd-0610-4f6c-9907-c7d0ae8b38b5 Duration: 200.23 ms Billed Duration: 201 ms Memory Size: 128 MB Max Memory Used: 81 MB
Then the quote listing function.
$ sam logs --stack-name sam-app-kinesis-streamed-logs \
--name ListQuotesFunction --region us-east-1 --tail
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.536000 START RequestId: 83fcb9c6-6523-4273-83c4-21328e25a0f0 Version: $LATEST
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.536000 [INFO] 2021-02-09T17:07:46.536Z Found credentials in environment variables.
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.580000 [INFO] 2021-02-09T17:07:46.579Z 83fcb9c6-6523-4273-83c4-21328e25a0f0 {'resource': 'quote_lister', 'action': 'invoke_initiated'}
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.908000 END RequestId: 83fcb9c6-6523-4273-83c4-21328e25a0f0
2021/02/09/[$LATEST]2347ea80437e4021b40e1e0c9dd51b1e 2021-02-09T17:07:46.908000 REPORT RequestId: 83fcb9c6-6523-4273-83c4-21328e25a0f0 Duration: 328.20 ms Billed Duration: 329 ms Memory Size: 128 MB Max Memory Used: 73 MB Init Duration: 573.25 ms
In this section I configure Kinesis Data Firehose to be used as a delivery stream to ship the SAM Application Logs from CloudWatch to an S3 bucket. This allows for near real-time capture of systems logs and telemetry which could then be further analyzed and monitored downstream. Kinesis is Amazon's streaming / stream processing technology that is conceptually like a managed version of Kafka but designed to work specifically in the AWS Cloud and is organized into use cases of Real Time Streams with Kinesis Data Streams, Stream Processing Analytics with SQL interface with Kinesis Data Analytics, Near Real-time fully managed data stream ingestion and delivery with Kinesis Data Firehose, and Kinesis Video Streams.
For this tutorial I am only focusing on using Kinesis to funnel CloudWatch Log data into S3, one of the four main Kinesis Data Firehose delivery targets within AWS, which are S3, Redshift, Splunk and Elastic Search. Being a fully managed service, accomplishing this goal does not require any application code but, I will be continuing to use Infrastructure as Code with the SAM / CloudFormation template to metaphorically wire up the various components required to accomplish this goal.
Back in my template.yaml file for the SAM project I fill in the following appending to the list of resources in the Resources section.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
sam-app-kinesis-streamed-logs
Sample SAM Template for sam-app-kinesis-streamed-logs
Globals:
Function:
Timeout: 30
Resources:
# omitting earlier defined Resources for brevity
QuotesLoggingBucket:
Type: AWS::S3::Bucket
Properties:
Tags:
- Key: Source
Value: !Sub "${AWS::StackName}-cf-stack"
KinesisFirehoseRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
Service:
- "firehose.amazonaws.com"
Action:
- "sts:AssumeRole"
Condition:
StringEquals:
sts:ExternalId: !Ref AWS::AccountId
Path: "/"
Policies:
- PolicyName: "quotes-logging-firehose-role-policy"
PolicyDocument:
Statement:
- Effect: "Allow"
Action:
- "s3:AbortMultipartUpload"
- "s3:GetBucketLocation"
- "s3:GetObject"
- "s3:ListBucket"
- "s3:ListBucketMultipartUploads"
- "s3:PutObject"
- "logs:PutLogEvents"
Resource:
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref QuotesLoggingBucket
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref QuotesLoggingBucket
- '/*'
- !Join
- ''
- - 'arn:aws:logs:'
- !Ref AWS::Region
- ':'
- !Ref AWS::AccountId
- ':log-group:'
- !Sub ${FetchQuoteFunctionLogGroup}
- ':log-stream:*'
- !Join
- ''
- - 'arn:aws:logs:'
- !Ref AWS::Region
- ':'
- !Ref AWS::AccountId
- ':log-group:'
- !Sub ${ListQuotesFunctionLogGroup}
- ':log-stream:*'
QuotesCloudWatchLogsRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Effect: "Allow"
Principal:
Service:
!Join
- ''
- - 'logs.'
- !Ref AWS::Region
- '.amazonaws.com'
Action:
- "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: "quotes-cloudwatch-logs-role-policy"
PolicyDocument:
Statement:
- Effect: "Allow"
Action:
- "firehose:*"
Resource:
- !Join
- ''
- - 'arn:aws:firehose:'
- !Ref AWS::Region
- ':'
- !Ref AWS::AccountId
- ':*'
QuoteFetcherFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt QuotesLoggingBucket.Arn
BufferingHints:
IntervalInSeconds: '120'
SizeInMBs: '5'
CompressionFormat: UNCOMPRESSED
Prefix: sam-quote-fetcher-logs/
RoleARN: !GetAtt KinesisFirehoseRole.Arn
ProcessingConfiguration:
Enabled: 'false'
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref FetchQuoteFunctionLogGroup
LogStreamName: "S3Delivery"
QuoteListerFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt QuotesLoggingBucket.Arn
BufferingHints:
IntervalInSeconds: '120'
SizeInMBs: '5'
CompressionFormat: UNCOMPRESSED
Prefix: sam-quote-lister-logs/
RoleARN: !GetAtt KinesisFirehoseRole.Arn
ProcessingConfiguration:
Enabled: 'false'
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref ListQuotesFunctionLogGroup
LogStreamName: "S3Delivery"
QuoteFetcherLogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName: !Ref FetchQuoteFunctionLogGroup
LogStreamName: "S3Delivery"
QuoteListerLogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName: !Ref ListQuotesFunctionLogGroup
LogStreamName: "S3Delivery"
QuoteFetcherLogsSubscriptionFilter:
Type: AWS::Logs::SubscriptionFilter
Properties:
DestinationArn: !GetAtt QuoteFetcherFirehoseDeliveryStream.Arn
FilterPattern: ''
LogGroupName: !Ref FetchQuoteFunctionLogGroup
RoleArn: !GetAtt QuotesCloudWatchLogsRole.Arn
QuoteListerLogsSubscriptionFilter:
Type: AWS::Logs::SubscriptionFilter
Properties:
DestinationArn: !GetAtt QuoteListerFirehoseDeliveryStream.Arn
FilterPattern: ''
LogGroupName: !Ref ListQuotesFunctionLogGroup
RoleArn: !GetAtt QuotesCloudWatchLogsRole.Arn
Outputs:
FetchQuoteApi:
Description: "API Gateway endpoint URL for Prod stage for Fetch Quote API"
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/fetch"
ListQuotesApi:
Description: "API Gateway endpoint URL for Prod stage for List Quotes API"
Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/list"
Although the solution is all comprised of "Managed services" as you can see from the number of resources being added to the SAM / CloudFormation template this is still a pretty hefty amount of IaC code to link up all the needed peices which I've based off the AWS CLI example tutorial Send the Data from Amazon CloudWatch to Kinesis Data Firehose from the AWS docs.
There was one gotcha that did slip me up in that the CloudWatch logs already get GZip compressed by default so it is important to specify CompressionType of UNCOMPRESSED in the Firehose Delivery stream part as described in this Kinesis Firehose FAQ. However, due to the expressive readability of the CloudFormation template above and the linked tutorial from the AWS docs I'm not going to further describe all of the components but instead deploy and show the end result.
So again I use the SAM CLI to build the project.
sam build
Then I can deploy in guided mode once accepting the defaults and confirming its intents again.
sam deploy --guided
Once the updates are done deploying I can checkout the resources in the AWS CloudFormation Console to get the full name of the new S3 destination bucket for the logs or I can use the below helper script I've written for such tasks which simply wraps the Boto3 Python library and formats its output in a way that is easily digestible to my eyes.
#!/usr/bin/python3
import argparse
import textwrap
import boto3
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Lists AWS CloudFormation Stack Resources')
parser.add_argument('stack_name', help='CloudFormation stack name')
parser.add_argument('--profile', help='AWS CLI Profile to use')
parser.add_argument('--region', help='AWS region to execute command against')
args = parser.parse_args()
options = {}
if args.profile:
options.update(profile_name=args.profile)
if args.region:
options.update(region_name=args.region)
session = boto3.Session(**options)
cf = session.client('cloudformation')
tmpl = "{:<35} {:<55} {:<120}"
print(tmpl.format('Resource Type', 'Logical ID', 'Physical ID'))
print(tmpl.format('-'*35, '-'*55, '-'*16, '-'*16, '-'*120))
for stack_resource in cf.list_stack_resources(StackName=args.stack_name)['StackResourceSummaries']:
print(tmpl.format(
stack_resource['ResourceType'],
stack_resource['LogicalResourceId'],
stack_resource['PhysicalResourceId']
))
Running is as simple as the following.
$ list-cf-stack-resources sam-app-kinesis-streamed-logs --region us-east-1 --profile default
Resource Type Logical ID Physical ID
----------------------------------- ------------------------------------------------------- ----------------
AWS::Lambda::Function FetchQuoteFunction sam-app-kinesis-streamed-logs-FetchQuoteFunction-JQSISUEBJAO3
AWS::Lambda::Permission FetchQuoteFunctionFetchQuoteApiPermissionProd sam-app-kinesis-streamed-logs-FetchQuoteFunctionFetchQuoteApiPermissionProd-XWRMZ0KGB151
AWS::Logs::LogGroup FetchQuoteFunctionLogGroup /aws/lambda/sam-app-kinesis-streamed-logs-FetchQuoteFunction-JQSISUEBJAO3
AWS::IAM::Role FetchQuoteFunctionRole sam-app-kinesis-streamed-lo-FetchQuoteFunctionRole-7TGDWO5JB4GO
AWS::IAM::Role KinesisFirehoseRole sam-app-kinesis-streamed-logs-KinesisFirehoseRole-1TES9QUBI2JOT
AWS::Lambda::Function ListQuotesFunction sam-app-kinesis-streamed-logs-ListQuotesFunction-8B4MYV8YQUIC
AWS::Lambda::Permission ListQuotesFunctionListQuotesApiPermissionProd sam-app-kinesis-streamed-logs-ListQuotesFunctionListQuotesApiPermissionProd-DFLCDUE1MJT5
AWS::Logs::LogGroup ListQuotesFunctionLogGroup /aws/lambda/sam-app-kinesis-streamed-logs-ListQuotesFunction-8B4MYV8YQUIC
AWS::IAM::Role ListQuotesFunctionRole sam-app-kinesis-streamed-lo-ListQuotesFunctionRole-CJHZSMOIT4PC
AWS::KinesisFirehose::DeliveryStream QuoteFetcherFirehoseDeliveryStream sam-app-kinesis-streamed-logs-QuoteFetcherFirehoseD-hkSEZmcO3zsM
AWS::Logs::LogStream QuoteFetcherLogStream S3Delivery
AWS::Logs::SubscriptionFilter QuoteFetcherLogsSubscriptionFilter sam-app-kinesis-streamed-logs-QuoteFetcherLogsSubscriptionFilter-A2A0IXYELQE4
AWS::KinesisFirehose::DeliveryStream QuoteListerFirehoseDeliveryStream sam-app-kinesis-streamed-logs-QuoteListerFirehoseDe-NeY939sPbvf6
AWS::Logs::LogStream QuoteListerLogStream S3Delivery
AWS::Logs::SubscriptionFilter QuoteListerLogsSubscriptionFilter sam-app-kinesis-streamed-logs-QuoteListerLogsSubscriptionFilter-RCC26OXTYCLR
AWS::IAM::Role QuotesCloudWatchLogsRole sam-app-kinesis-streamed-QuotesCloudWatchLogsRole-1VO47NQ56DXKV
AWS::S3::Bucket QuotesLoggingBucket sam-app-kinesis-streamed-logs-quotesloggingbucket-1649a0uj6tbe6
AWS::DynamoDB::Table QuotesTable quotes-demo
AWS::ApiGateway::RestApi ServerlessRestApi vs6yowuahg
AWS::ApiGateway::Deployment ServerlessRestApiDeployment8068fc6cf5 vf93bc
AWS::ApiGateway::Stage ServerlessRestApiProdStage Prod
From the above output I see that the autogenerated bucket name is sam-app-kinesis-streamed-logs-quotesloggingbucket-1649a0uj6tbe6 which I can then use with the regular AWS CLI to check that my CloudWatch logs are in fact being streamed into it.
I'll just copy its contents down to my laptop to locally for easier inspection.
S3_BUCKET=s3://sam-app-kinesis-streamed-logs-quotesloggingbucket-1649a0uj6tbe6/
aws s3 cp $S3_BUCKET cloudwatchlogs \
--recursive --region us-east-1
From there I see that Kinesis Data Firehose has depositied the logs in a nicely partitioned format with the prefixes I specified in the CloudFormation template followed by year, month, day, hour directories
$ tree cloudwatchlogs
cloudwatchlogs
├── sam-quote-fetcher-logs
│ └── 2021
│ └── 02
│ └── 09
│ └── 17
│ └── sam-app-kinesis-streamed-logs-QuoteFetcherFirehoseD-hkSEZmcO3zsM-2-2021-02-09-17-07-00-120c49d0-af11-450e-94f3-2ceaf9d91092
└── sam-quote-lister-logs
└── 2021
└── 02
└── 09
└── 17
└── sam-app-kinesis-streamed-logs-QuoteListerFirehoseDe-NeY939sPbvf6-2-2021-02-09-17-07-55-10c0f0a7-8680-4494-ab32-16f495dcfbfa
Now to view the original log data I just use gunzip to decompress the data like so.
$ gunzip -c sam-app-kinesis-streamed-logs-QuoteFetcherFirehoseD-hkSEZmcO3zsM-2-2021-02-09-17-07-00-120c49d0-af11-450e-94f3-2ceaf9d91092 > uncompressed
$ cat uncompressed
In this article I have demonstrated how to use the Kinesis Data Firehose managed service of the AWS Kinesis product line to stream CloudWatch logs from an AWS Serverless Application Model project and land them into AWS S3 for long term storage and downstream analysis.
As always, thanks for reading and please do not hesitate to critique or comment below.